継続は力なり

タイトル通り定期的な更新を心掛けるブログです。

CloudWatch Events の定数を使って Lambda の処理を分岐させる

タダです.

定期イベントを CloudWatch Events で設定し,定型処理を実行する Lambda をキックすることはよくあることだと思います.今回 CloudWatch Events の定数を使って Lambda の処理を分岐させるのをやってみたのでこの記事にまとめていきます.今回は ECS タスクの数を調整する処理を作ってみました.

ECS のタスク数を調整する Lambda

Lambda のコードは以下のようなコードを書いてます.CloudWatch Events から desired_count をパラメーターとして渡し,desired_countが0ならコンテナをなくして,desired_countを1ならコンテナを1台起動させます.

import json
import boto3
import logging
from botocore.exceptions import ClientError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    logger.info("Update ECS Task Count")
    CLUSTER_NAME = 'cluster name'
    SERVICE_NAME =  'service name'
    desired_count = event['desired_count'] 
    try:
        client = boto3.client('ecs')
        
        if desired_count == 0:
            result = client.update_service(
                    cluster = 'cluster name',
                    service = 'service name',
                    desiredCount = desired_count
                )
            return "Stop Complete."
            
        elif desired_count == 1:
            result = client.update_service(
                    cluster = 'cluster name',
                    service = 'service name',
                    desiredCount = desired_count
                )
            return "Start Complete."
            
            
    except ClientError as e:
        logger.exception("Exception.")

CloudWatch Events の定数

Lambda のコードで設定している desired_count を定数で定義していきます.イベントのターゲットを指定する箇所で定数を {"desired_count": 0(1)}と設定します.

コンテナを0台にするイベント f:id:sadayoshi_tada:20220130180916p:plain

コンテナを1台にするイベント f:id:sadayoshi_tada:20220130180902p:plain

関連情報

docs.aws.amazon.com

動作テスト

Lambda のテストイベントで次のような設定をして実行してみます.下記のテストでコンテナを起動できるか確認します.1台のコンテナ起動できました.

{
  "desired_count": 1
}

f:id:sadayoshi_tada:20220130181755p:plain

f:id:sadayoshi_tada:20220130181902p:plain

次にコンテナをなくしてみます.意図通りコンテナを消せました.

{
  "desired_count": 0
}

f:id:sadayoshi_tada:20220130182019p:plain

f:id:sadayoshi_tada:20220130182035p:plain

まとめ

CloudWatch Events の定数を使って ECS タスク数を Lambda 側で処理を分岐させました.この機能を知るまでは Lambda を処理ごとに作ってしまっていたので,管理する関数の数が少なくなるので良かったです.

redash のデータソースで Python を使ってみる

タダです.

redash のデータソースとして Python を使ってみる機会があったので,この記事にまとめていきます.なお,redash のバージョンは 8.0.0+b32245 で確認しています.

Python をデータソースに追加

デフォルトでは,データソースに Python がないため追加の設定が必要です.自分の確認環境は EC2 の AMI を使っているため /opt/redash/env に下記の記述を追記後,docker-compse up -d で起動し直せばデータソースに Python が追加されます.

REDASH_ADDITIONAL_QUERY_RUNNERS=redash.query_runner.python

追加されたデータソース画面

f:id:sadayoshi_tada:20211211141627p:plain

データソース追加後のコードを書いてみる

別々のクエリの結果をテーブル表示したいみたいな場合は下記のようなコードで表示できました.この場合は ALB のログの中から0.5秒以上のログと全件のログを出して表示させるみたいなことができます.

queryResults1 = execute_query('データソース名',"SELECT COUNT(*) FROM alb_logs WHERE url LIKE '%/hoge/hoge?%' target_processing_time >= 0.5 AND target_status_code != '-'")
queryResults2 = execute_query('データソース名',"SELECT COUNT(*) FROM alb_logs WHERE url LIKE '%/hoge/hoge?%' AND target_status_code != '-'")

results = {}
for rows1 in queryResults1['rows']:
    for rows2 in queryResults2['rows']:
        add_result_row(result, {
            'over': rows1['_col0'],
            'all': rows2['_col0']
        })
    break

add_result_column(result, 'all', '', 'string')
add_result_column(result, 'over', '', 'string')

クエリ結果

f:id:sadayoshi_tada:20211211150752p:plain

また, 上記のクエリを複数保存していたのを合体して表にさせることも下記のようなクエリでできます.

queryResults1 = get_query_result(redash のクエリ ID)
queryResults2 = get_query_result(redash のクエリ ID)

results = {}
for rows1 in queryResults['rows']:
    add_result_row(result, {
        'over': rows1['over'],
        'all': rows1['all'] 
    })

for rows2 in queryResults2['rows']:
    add_result_row(result, {
        'over': rows2['over'],
        'all': rows2['all'] 
    })

add_result_column(result, 'all', '', 'string')
add_result_column(result, 'over', '', 'string')

クエリ結果

f:id:sadayoshi_tada:20211211151020p:plain

まとめ

redash のデータソースとして Python を使うことがあったのでまとめました.SQL だけでなく Python のコードを書いてデータを集計したりできるのは redash の特徴だと思いますので,今後も適宜使っていきます!

Lambda を使って Elasticsearch のエイリアスとインデックスを更新する

タダです.

Amazon OpenSearch Service のインデックス再設計に関わった際に,次の運用上の懸念がありました.

  • インデックスは月ごとに生成し,生成したインデックスに対してアプリケーションから見た時のエイリアスとして参照用と書き込み用の2つを貼りたい
  • 前月以前のインデックスにも参照用エイリアスを残す形で運用したい

上記の運用を実現するために Lambda で上記の処理を行えないか検証したためその内容をこの記事にまとめます.

Lambda の権限と実行コード

Lambda の権限

Lambda の権限は検証として下記の IAM を付与しています.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:ap-northeast-1:xxx:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:ap-northeast-1:xxx:log-group:/aws/lambda/xxx:*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "es:ESHttpPost",
                "es:ESHttpGet",
                "es:ESHttpPut",
                "es:ESHttpDelete"
            ],
            "Resource": "*"
        }
    ]
}

実行コード

コードは Python 3.9 で確認しています.

import boto3
import os
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection
from urllib.request import Request, urlopen
from datetime import datetime, timedelta, timezone
from dateutil.relativedelta import relativedelta

ENDPOINT = os.environ['ES_ENDPOINT']
REGION = os.environ['ES_REGION']
SERVICE = 'es'
CREDINTIAL = boto3.Session().get_credentials()
AWSAUTH = AWS4Auth(CREDINTIAL.access_key, CREDINTIAL.secret_key, REGION, SERVICE, session_token=CREDINTIAL.token)
JST = timezone(timedelta(hours=+9), 'JST')

def lambda_handler(event, context):
    client = Elasticsearch(
        hosts = [{'host': ENDPOINT, 'port': 443}],
        http_auth = AWSAUTH,
        use_ssl = True,
        verify_certs = True,
        connection_class = RequestsHttpConnection
    )

    write_alias_name = "test-writer-alias"
    read_alias_name = "test-reader-alias"
    index_prefix = "test-index-"
    new_index = index_prefix + datetime.now(JST).strftime('%Y%m')
    old_index = index_prefix + datetime.strftime(datetime.today() - relativedelta( months = 1 ), '%Y%m')
   
    client.indices.update_aliases(
            body={
                "actions": [
                    {"remove": {"alias": write_alias_name, "index": old_index}},
                    {"add": {"alias": write_alias_name, "index": new_index}},
                    {"add": {"alias": read_alias_name, "index": new_index}},
                ]
            }
        )

Lambda の実行結果確認

Lambda 実行後の挙動を確認していきます.実行前に予め前月のインデックス test-index-202110エイリアスとして test-reader-ailias(参照用エイリアス)と test-writer-alias(書き込み用エイリアス)を作った状態で確認しています.本記事は2021年の11月に書いているので,インデックスとして test-index-202111エイリアスが書き込み,参照が貼られ,前月にも参照用エイリアスが残った状態にできていたら期待通りです.

Lambda実行前

% curl -XGET 'https://xxx.ap-northeast-1.es.amazonaws.com/_alias?pretty=true'
{
  ".kibana_1" : {
    "aliases" : {
      ".kibana" : { }
    }
  },
  "test-index-202110" : {
    "aliases" : {
      "test-reader-alias" : { }, 
      "test-writer-alias" : { }  
    }
  }
}

Lambda を実行してみたところ期待通りの結果が得られました.

Lambda実行後

% curl -XGET 'https://xxx.ap-northeast-1.es.amazonaws.com/_alias?pretty=true'
{
  ".kibana_1" : {
    "aliases" : {
      ".kibana" : { }
    }
  },
  "test-index-202110" : {
    "aliases" : {
      "test-reader-alias" : { }
    }
  },
  "test-index-202111" : {
    "aliases" : {
      "test-reader-alias" : { },
      "test-writer-alias" : { }
    }
  }

まとめ

Elasticsearch のインデックスとエイリアスの運用上の懸念に対して Lambda を使ったプローチで検証したのをまとめました.次は,インデックス再設計するためのインデックス移行について書いていければと思います.

関連記事

sadayoshi-tada.hatenablog.com

pytest と moto でモックを作ったテストを行う

タダです.

以前から気になっていた moto というツールを試す機会があったので,この記事で導入と pytest と組み合わせたテストをチュートリアルの内容を通して見ていきたいと思います.

moto って?

moto とはテストで AWS サービスをモックで作っていくための Python ライブラリです.

github.com

導入はpipでさくっとできます.なお,motoは全ての AWS サービスを呼び出す関数を網羅しているわけではなさそうなので,その点は注意です.

$ pip install moto

moto が対応しているサービス一覧 github.com

moto でモックを作り pytest でテストする

それでは, moto を使ってみようと思います.

$ pipenv install boto3
$ pipenv install --dev pytest moto

以下のようなディレクトリ切って,app.pyとそのテストをするためのtest.pyを作りました.

.
├── app
│   ├── app.py
│   └── __init__.py
└── tests
    ├── __init__.py
    └── test.py

チュートリアルとして S3 に put_object を実行する save 関数があるコードを用意しました.

import boto3

class MyModel(object):
    def __init__(self, name, value):
        self.name = name
        self.value = value

    def save(self):
        s3 = boto3.client('s3', region_name='ap-northeast-1')
        s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)

テスト用コードとしては次のものを用意しました.@mock_s3を記述することで S3 へのアクセスをモックしてくれてます.save関数を呼んで,オブジェクトを読みファイルの中身がis awesomeかを確認するようになっています.

import boto3
from moto import mock_s3
from app.app import MyModel

@mock_s3
def test_my_model_save():
    conn = boto3.resource('s3', region_name='ap-northeast-1')
    conn.create_bucket(Bucket='mybucket',CreateBucketConfiguration={
        'LocationConstraint':'ap-northeast-1'
    })

    model_instance = MyModel('steve', 'is awesome')
    model_instance.save()

    body = conn.Object('mybucket', 'steve').get()[
        'Body'].read().decode("utf-8")

    assert body == 'is awesome'

テストコードを走らせてみたところテストをパスしたのを確認できました.

$ pipenv run pytest tests/test.py 
=============================================================================== test session starts ===============================================================================
platform linux -- Python 3.7.9, pytest-6.2.3, py-1.10.0, pluggy-0.13.1
rootdir: /xxxx/xxxx/environment
collected 1 item                                                                                                                                                                  

tests/test.py .                                                                                                                                                             [100%]

================================================================================ warnings summary =================================================================================
../.local/share/virtualenvs/environment-QZ1wNgYc/lib64/python3.7/distutils/__init__.py:1
  /xxxx/xxxx/.local/share/virtualenvs/environment-QZ1wNgYc/lib64/python3.7/distutils/__init__.py:1: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses
    import imp

-- Docs: https://docs.pytest.org/en/stable/warnings.html
========================================================================== 1 passed, 1 warning in 0.69s ===========================================================================

まとめ

簡単になりますが,motoを使ったテストコードを書くことをやってみました.AWS リソースを作ってまで確認するとコストがかかって確認が遅くなってしまうような状況ならこう言ったツールを作って素早く確認して行けると良さそうです.

FastAPI と Falcon を Docker イメージにした時のサイズとレスポンスのスピードを計測した

タダです.

業務で API で使用する言語を置き換える検討をしていたりするんですが, FastAPI と Falcon を比較した際Docker イメージにした時の軽量さとベンチマークしてみてどれくらいレスポンスが速いかを計測した機会があったので,この記事に検証した内容をまとめておきます.

FastAPI と Falcon の特徴

2つの Web フレームワークがどのような特徴を持っているかをさらっておきます.FastAPI は Python 3.6 以上の速い Web フレームワークであると謳ってます.また,ASGI を使います.

FastAPI fastapi.tiangolo.com

特徴としては次のようなものがあります.

・ 速い。非常に高性能で、NodeJSやGoと同等のパフォーマンスを発揮します(StarletteとPydanticのおかげです)。利用可能なPythonフレームワークの中で最も高速なものの一つです。

・コード化が速い。機能開発のスピードを約200%~300%向上させます。

・バグが少ない。人間(開発者)によるエラーを約40%削減。

・ 直感的。素晴らしいエディタサポート。デバッグ時間を短縮。

・ 簡単。使いやすく、学習しやすいように設計されています。ドキュメントを読む時間を短縮。

・ 短い。コードの重複を最小限に抑えます。各パラメータ宣言から複数の機能を利用できるようになっています。

・ 堅牢。すぐに使えるコードを取得できます。自動でインタラクティブなドキュメントを提供します。

・ 標準ベース。APIのオープンスタンダードに基づいています(完全に互換性があります)。OpenAPI(以前はSwaggerとして知られていた)とJSON Schema。

Falcon も速く,軽量な Web フレームワークです. WSGI を使います.

Falcon falcon.readthedocs.io

特徴としては次のようなものがあります.

URIテンプレートRFCに基づくルート

・リソースへのURIのRESTに触発されたマッピング

・グローバル、リソース、およびメソッドのフック

・慣用的なHTTPエラー応答

Unicodeの完全サポート

・直感的なリクエストオブジェクトとレスポンスオブジェクト

・geventのような非同期ライブラリでうまく機能します

・安全なAPIを作成するための最小限の攻撃対象領域

・包括的なテストスイートで100%のコードカバレッジ

・他のPythonパッケージへの依存関係はありません

Python 2.7、3.5以降をサポート

・PyPyと互換性があります

Docker イメージ化した時のサイズ

Alphine Linux ベースの Docker イメージでビルドした場合のサイズですが,2つとも同じくらいのサイズ感でした.

# Falcon
$ docker images | grep falcon
falcon                                     latest              c74c32855823        About a minute ago   49.2MB
# FastAPI
$ docker images | grep fastapi
fastapi                      latest              88fd7ac8fa13        3 hours ago          50.1MB

どれくらい早くレスポンスがされるかのベンチマーク

wrk2 を使って以下の条件でベンチマークしてみました.

  • プログラムは Hello World を返すもの
  • 30 秒間で固定してベンチマークをする
  • 秒間100,500,1000,5000,10000 リクエストを投げてその結果を確認する

10のコネクションを維持して、秒間 100 リクエスト投げた時の計測

FastAPI,Falcon ともに大きな差異はない.

# FastAPI
wrk2 -t2 -c10 -d30s -R100 http://localhost:8000/
Running 30s test @ http://localhost:8000/
  2 threads and 10 connections
  Thread calibration: mean lat.: 7.524ms, rate sampling interval: 25ms
  Thread calibration: mean lat.: 7.752ms, rate sampling interval: 26ms
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     6.06ms    3.00ms  22.78ms   79.35%
    Req/Sec    53.01     71.93   208.00     80.30%
  3002 requests in 30.01s, 416.29KB read
Requests/sec:    100.02
Transfer/sec:     13.87KB

# Falcon
wrk2 -t2 -c10 -d30s -R100 http://localhost:8080/
Running 30s test @ http://localhost:8080/
  2 threads and 10 connections
  Thread calibration: mean lat.: 32.647ms, rate sampling interval: 209ms
  Thread calibration: mean lat.: 20.032ms, rate sampling interval: 185ms
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     6.24ms    4.06ms  29.98ms   82.81%
    Req/Sec    49.44      5.69    62.00     84.00%
  3002 requests in 30.01s, 483.72KB read
Requests/sec:    100.03
Transfer/sec:     16.12KB

50のコネクションを維持して、秒間 500 リクエスト投げた時の計測

Latency で FastAPI と Falcon の間に差がでてきた.

# FastAPI
wrk2 -t2 -c50 -d30s -R500 http://localhost:8000/
Running 30s test @ http://localhost:8000/
  2 threads and 50 connections
  Thread calibration: mean lat.: 7.982ms, rate sampling interval: 28ms
  Thread calibration: mean lat.: 7.915ms, rate sampling interval: 27ms
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     5.11ms    2.25ms  19.73ms   81.05%
    Req/Sec   254.29     63.28   423.00     61.43%
  14991 requests in 30.00s, 2.03MB read
Requests/sec:    499.66
Transfer/sec:     69.29KB

# Falcon
wrk2 -t2 -c50 -d30s -R500 http://localhost:8080/
Running 30s test @ http://localhost:8080/
  2 threads and 50 connections
  Thread calibration: mean lat.: 46.716ms, rate sampling interval: 204ms
  Thread calibration: mean lat.: 45.854ms, rate sampling interval: 200ms
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    14.29ms   20.81ms 135.04ms   88.48%
    Req/Sec   250.44      6.95   280.00     80.10%
  14991 requests in 30.00s, 2.36MB read
Requests/sec:    499.63
Transfer/sec:     80.51KB

100のコネクションを維持して、秒間 1000 リクエスト投げた時の計測

1000リクエストになったら Requests を捌く量も Latency と同じくひらいてきた.

# FastAPI
 wrk2 -t2 -c100 -d30s -R1000 http://localhost:8000/
Running 30s test @ http://localhost:8000/
  2 threads and 100 connections
  Thread calibration: mean lat.: 241.727ms, rate sampling interval: 819ms
  Thread calibration: mean lat.: 240.797ms, rate sampling interval: 815ms
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   241.60ms  260.77ms 777.22ms   75.42%
    Req/Sec   481.31     47.68   583.00     79.17%
  29201 requests in 30.01s, 3.96MB read
Requests/sec:    973.16
Transfer/sec:    135.12KB

# Falcon
wrk2 -t2 -c100 -d30s -R1000 http://localhost:8080/
Running 30s test @ http://localhost:8080/
  2 threads and 100 connections
  Thread calibration: mean lat.: 124.110ms, rate sampling interval: 383ms
  Thread calibration: mean lat.: 124.684ms, rate sampling interval: 393ms
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   527.57ms  528.20ms   1.86s    73.48%
    Req/Sec   161.29    255.12     0.99k    71.57%
  16458 requests in 30.27s, 2.59MB read
  Socket errors: connect 0, read 0, write 0, timeout 600
Requests/sec:    543.71
Transfer/sec:     87.61KB

500のコネクションを維持して、秒間 5000 リクエスト投げた時の計測

# FastAPI
wrk2 -t2 -c500 -d30s -R5000 http://localhost:8000/
Running 30s test @ http://localhost:8000/
  2 threads and 500 connections
  Thread calibration: mean lat.: 3006.903ms, rate sampling interval: 11796ms
  Thread calibration: mean lat.: 3028.214ms, rate sampling interval: 11796ms
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    12.31s     3.42s   18.87s    58.52%
    Req/Sec   456.00     68.00   524.00    100.00%
  26269 requests in 30.01s, 3.56MB read
  Socket errors: connect 251, read 64, write 0, timeout 3514
Requests/sec:    875.44
Transfer/sec:    121.59KB

# Falcon
wrk2 -t2 -c500 -d30s -R5000 http://localhost:8080/
Running 30s test @ http://localhost:8080/
  2 threads and 500 connections
  Thread calibration: mean lat.: 2613.849ms, rate sampling interval: 10739ms
  Thread calibration: mean lat.: 2724.868ms, rate sampling interval: 10936ms
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     8.13s     2.36s   15.61s    65.29%
    Req/Sec   275.50     29.50   305.00    100.00%
  16381 requests in 31.28s, 2.58MB read
  Socket errors: connect 251, read 153, write 0, timeout 5439
Requests/sec:    523.76
Transfer/sec:     84.39KB

1000のコネクションを維持して、秒間 10000 リクエスト投げた時の計測

# FastAPI
wrk2 -t2 -c1000 -d30s -R10000 http://localhost:8000/
Running 30s test @ http://localhost:8000/
  2 threads and 1000 connections
  Thread calibration: mean lat.: 3875.593ms, rate sampling interval: 13352ms
  Thread calibration: mean lat.: 3795.580ms, rate sampling interval: 13303ms
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    12.20s     3.03s   18.14s    57.44%
    Req/Sec   514.50     39.50   554.00    100.00%
  27956 requests in 30.00s, 3.79MB read
  Socket errors: connect 751, read 0, write 0, timeout 9763
Requests/sec:    931.79
Transfer/sec:    129.26KB

# Falcon
wrk2 -t2 -c1000 -d30s -R10000 http://localhost:8080/
Running 30s test @ http://localhost:8080/
  2 threads and 1000 connections
  Thread calibration: mean lat.: 3676.434ms, rate sampling interval: 14540ms
  Thread calibration: mean lat.: 3547.812ms, rate sampling interval: 14319ms
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     7.69s     2.42s   17.81s    67.25%
    Req/Sec   155.50      5.50   161.00    100.00%
  16445 requests in 30.53s, 2.59MB read
  Socket errors: connect 751, read 45, write 0, timeout 12373
Requests/sec:    538.65

Docker イメージにした時のサイズとベンチマークを踏まえての所感

検証の結果を踏まえて次の感想を持ちました.

  • Docker イメージにした時の軽量さは2つに大差はない
  • リクエスト数が増えていった時も FastAPI のほうが処理スピードが速くリクエストを捌く量も多い

また, FastAPI のドキュメントをみていると,FastAPI を作る時に既存のフレームワークを参考に開発したと解説されており, Falcon も参考にして開発されているようなので改良されたのが FastAPI って位置付けなのだと感じました.

fastapi.tiangolo.com

まとめ

FastAPI と Falcon の比較をするための検証で行ったことを整理してみました.ベンチマークしてみて FastAPI が速くてパフォーマンスも優れているという所以を感じることができました.FastAPI を業務で使う機会が出てきたら習熟していけるようになっていきたいです.