継続は力なり

タイトル通り定期的な更新を心掛けるブログです。

EventBridge Rules で指定した ECS タスクのイベントをトリガーに Step Functions から Athena にクエリを実行する

タダです.

前回 Athena で Aurora 監査ログをクエリする記事を書きました.,毎回 Athena のクエリをアドホックに実行するのでもいいですが,特定のイベントを契機に定期的に Athena を実行するワークフローを構築して定期チェックできるようにしました.この記事では概観をまとめていきます.

sadayoshi-tada.hatenablog.com

ワークフロー概要

ワークフローとしては,EventBridge Rule で特定のイベントでのみ Step Fucntions をキックするようにします.Step Functions は Athena を実行して結果を得ることをやりました.特定イベントは以前の記事で ECS でログを S3 にアップロードすることをやりましたが,この ECS タスクが停止して command パラメーターを見て制御します.

関連記事

sadayoshi-tada.hatenablog.com

docs.aws.amazon.com

ワークフローの構築

Step Functions のASL と IAM ポリシー

Step Functions から Athena を実行する ASL を定義します.クエリを実行し,そのクエリの結果を取得するワークフローになっています.

{
  "Comment": "A description of my state machine",
  "StartAt": "AthenaQueryExecution",
  "States": {
    "AthenaQueryExecution": {
      "Next": "wait",
      "Type": "Task",
      "ResultPath": "$.AthenaQueryExecution",
      "Resource": "arn:aws:states:::athena:startQueryExecution",
      "Parameters": {
        "QueryString.$": "実行するクエリ",
        "ResultConfiguration": {
          "OutputLocation": "Athena の結果出力場所"
        },
        "WorkGroup": "ワークグループ"
      }
    },
    "Wait": {
      "Type": "Wait",
      "Seconds": 5,
      "Next": "GetAthenaQueryExecution"
    },
    "Choice": {
      "Type": "Choice",
      "Choices": [
        {
          "Or": [
            {
              "Variable": "$.GetAthenaQueryExecutionOutPut.queryState",
              "StringEquals": "QUEUED"
            },
            {
              "Variable": "$.GetAthenaQueryExecutionOutPut.queryState",
              "StringEquals": "RUNNING"
            }
          ],
          "Next": "Wait"
        }
      ],
      "Default": "GetAthenaQueryResults"
    },
    "GetAthenaQueryExecution": {
      "Next": "Choice",
      "Type": "Task",
      "ResultPath": "$.GetAthenaQueryExecutionOutPut",
      "ResultSelector": {
        "queryState.$": "$.QueryExecution.Status.State"
      },
      "Resource": "arn:aws:states:::athena:getQueryExecution",
      "Parameters": {
        "QueryExecutionId.$": "$.AthenaQueryExecutionOutPut.QueryExecutionId"
      }
    },
    "GetAthenaQueryResults": {
      "Type": "Task",
      "Resource": "arn:aws:states:::athena:getQueryResults",
      "Parameters": {
        "QueryExecutionId.$": "$.AthenaQueryExecutionOutPut.QueryExecutionId"
      },
      "End": "True"
    }
  }
}

StepFucntions から Athena を実行するための IAM としては次のポリシーを定義しました.ステートマシンを作ったら自動生成されるものに多少追記しています.

{
  "Version": "2012-10-17",
  "Statement": [
      {
          "Effect": "Allow",
          "Action": [
              "athena:startQueryExecution",
              "athena:getDataCatalog"
          ],
          "Resource": [
              "arn:aws:athena:ap-northeast-1:1234567891911:workgroup/ワークグループ",
              "arn:aws:athena:ap-northeast-1:1234567891911:datacatalog/*"
          ]
      },
      {
          "Effect": "Allow",
          "Action": [
              "s3:GetBucketLocation",
              "s3:GetObject",
              "s3:ListBucket",
              "s3:ListBucketMultipartUploads",
              "s3:ListMultipartUploadParts",
              "s3:AbortMultipartUpload",
              "s3:CreateBucket",
              "s3:PutObject"
          ],
          "Resource": [
              "arn:aws:s3:::*"
          ]
      },
      {
          "Effect": "Allow",
          "Action": [
              "glue:CreateDatabase",
              "glue:GetDatabase",
              "glue:GetDatabases",
              "glue:UpdateDatabase",
              "glue:DeleteDatabase",
              "glue:CreateTable",
              "glue:UpdateTable",
              "glue:GetTable",
              "glue:GetTables",
              "glue:DeleteTable",
              "glue:BatchDeleteTable",
              "glue:BatchCreatePartition",
              "glue:CreatePartition",
              "glue:UpdatePartition",
              "glue:GetPartition",
              "glue:GetPartitions",
              "glue:BatchGetPartition",
              "glue:DeletePartition",
              "glue:BatchDeletePartition"
          ],
          "Resource": [
              "arn:aws:glue:ap-northeast-1:1234567891911:catalog",
              "arn:aws:glue:ap-northeast-1:1234567891911:database/*",
              "arn:aws:glue:ap-northeast-1:1234567891911:table/*",
              "arn:aws:glue:ap-northeast-1:1234567891911:userDefinedFunction/*"
          ]
      },
      {
          "Effect": "Allow",
          "Action": [
              "lakeformation:GetDataAccess",
              "athena:GetQueryExecution",
              "athena:GetQueryResults"
          ],
          "Resource": [
              "*"
          ]
      }
  ]
}

ECS タスクを絞り込む EventBridge Rule

次に,ECS タスクが停止した時に特定の command が指定されていたらイベント発火する EventBridge Rule を設定します.以下のような Rule を定義の上に Step Functions をターゲットに指定します.

{
  "source": ["aws.ecs"],
  "detail-type": ["ECS Task State Change"],
  "detail": {
    "clusterArn": ["ECSクラスター ARN"],
    "containers": {
      "exitCode": [0]
    },
    "lastStatus": ["STOPPED"],
    "overrides": {
      "containerOverrides": {
        "command": ["Aurora クラスター名"]
      }
    }
  }
}

ワークフローの実行結果概観

指定 command が実行された ECS タスクが停止後,EventBridge Rule が発火し Step Functions に以下のようなインプットデータが入ってきます.

{
  "version": "0",
  "id": "xxx",
  "detail-type": "ECS Task State Change",
  "source": "aws.ecs",
  "account": "1234567891911",
  "time": "2023-04-29T06:05:09Z",
  "region": "ap-northeast-1",
  "resources": [
    "arn:aws:ecs:ap-northeast-1:1234567891911:task/クラスター名/タスクID"
  ],
  "detail": {
    "attachments": [
      {
        "id": "xxx",
        "type": "eni",
        "status": "DELETED",
        "details": [
          {
            "name": "subnetId",
            "value": "subnet-xxx"
          },
          {
            "name": "networkInterfaceId",
            "value": "eni-xxx"
          },
          {
            "name": "macAddress",
            "value": "xxx"
          },
          {
            "name": "privateDnsName",
            "value": "ip-12.34.56.789.ap-northeast-1.compute.internal"
          },
          {
            "name": "privateIPv4Address",
            "value": "12.34.56.789"
          }
        ]
      }
    ],
    "attributes": [
      {
        "name": "ecs.cpu-architecture",
        "value": "x86_64"
      }
    ],
    "availabilityZone": "ap-northeast-1a",
    "clusterArn": "arn:aws:ecs:ap-northeast-1:1234567891911:cluster/クラスター名",
    "connectivity": "CONNECTED",
    "connectivityAt": "2023-04-29T06:00:15.714Z",
    "containers": [
      {
        "containerArn": "arn:aws:ecs:ap-northeast-1:1234567891911:container/クラスター名/xxx/xxx",
        "exitCode": 0,
        "lastStatus": "STOPPED",
        "name": "コンテナ名",
        "image": "1234567891911.dkr.ecr.ap-northeast-1.amazonaws.com/コンテナ名:xxx",
        "imageDigest": "sha256:xxx",
        "runtimeId": "xxx-xxx",
        "taskArn": "arn:aws:ecs:ap-northeast-1:1234567891911:task/クラスター名/xxx",
        "networkInterfaces": [
          {
            "attachmentId": "xxx",
            "privateIpv4Address": "12.34.56.789"
          }
        ],
        "cpu": "1024",
        "memory": "2048"
      }
    ],
    "cpu": "1024",
    "createdAt": "2023-04-29T06:00:12.512Z",
    "desiredStatus": "STOPPED",
    "enableExecuteCommand": false,
    "ephemeralStorage": {
      "sizeInGiB": 20
    },
    "executionStoppedAt": "2023-04-29T06:04:46.52Z",
    "group": "family:タスク定義名",
    "launchType": "FARGATE",
    "lastStatus": "STOPPED",
    "memory": "2048",
    "overrides": {
      "containerOverrides": [
        {
          "command": [
           "指定したコマンドデータ"
          ],
          "name": "コンテナ名"
        }
      ]
    },
    "platformVersion": "1.4.0",
    "pullStartedAt": "2023-04-29T06:00:22.932Z",
    "pullStoppedAt": "2023-04-29T06:00:26.962Z",
    "startedAt": "2023-04-29T06:00:27.588Z",
    "startedBy": "chronos-schedule/EventBridge Scheduler 名",
    "stoppingAt": "2023-04-29T06:04:56.548Z",
    "stoppedAt": "2023-04-29T06:05:09.471Z",
    "stoppedReason": "Essential container in task exited",
    "stopCode": "EssentialContainerExited",
    "taskArn": "arn:aws:ecs:ap-northeast-1:1234567891911:task/クラスター名/xxx",
    "taskDefinitionArn": "arn:aws:ecs:ap-northeast-1:1234567891911:task-definition/タスク定義名:xx",
    "updatedAt": "2023-04-29T06:05:09.471Z",
    "version": 5
  }
}

前回記事で実行した管理者ユーザーのログインカウントを取った場合ワークフローが正常に動作すると,次の結果が出力されました.この時は管理者ユーザーのログインがなかった時間帯だったので VarCharValue が0となっています.

{
  "ResultSet": {
    "ResultSetMetadata": {
      "ColumnInfo": [
        {
          "CaseSensitive": false,
          "CatalogName": "hive",
          "Label": "_col0",
          "Name": "_col0",
          "Nullable": "UNKNOWN",
          "Precision": 19,
          "Scale": 0,
          "SchemaName": "",
          "TableName": "",
          "Type": "bigint"
        }
      ]
    },
    "Rows": [
      {
        "Data": [
          {
            "VarCharValue": "_col0"
          }
        ]
      },
      {
        "Data": [
          {
            "VarCharValue": "0"
          }
        ]
      }
    ]
  },
  "UpdateCount": 0
}

まとめ

ECS タスクで特定のイベントを絞った形で Step Functions から Athena を実行してみました.これで ECS から Aurora ログを収集したタイミングで都度 Athena からクエリを走行することができました.クエリの結果を Slack に通知して検知の仕組みを次に作っていきます.