タダです.
AWS 謹製の Python データ分析モジュールの「AWS Data Wrangler」がリリースされました.今回は普段 Python を使ってデータ分析の勉強をしているため,「AWS Data Wrangler」を公式ブログチュートリアルを参考に使ってみた所感を書いていきます.
利用方法はドキュメントで確認していきましょう.
aws-data-wrangler.readthedocs.io
AWS Data Wrangler のメリット
「AWS Data Wrangler」のメリットは下記の通りです.
- 「AWS Data Wrangler」を利用することで, Athena や S3 の CSV データから Pandas を数行のコードで実現できる
- PySpark から Redshift に連携できるため利用者は ETL(Extract/Transform/Load) に集中することが可能
最大のメリットは, 利用者は ETL 処理に集中してコーディングを行える
ことだと読み取れます.それでは実際に環境を作ってどれくらい簡単かをコーディングして確認していきます.
AWS Data Wrangler を使って ETL を行う
今回の環境は以下の画像の環境で,ブログで紹介された構成です.CSV を S3 に配置し,SageMaker から「AWS Data Wrangler」経由で Athena,S3 の CSVデータにアクセスした後,ETL 処理後の CSV データを S3 に出力するチュートリアルとなっています.
引用元: https://aws.amazon.com/jp/blogs/news/how-to-use-aws-data-wrangler/ シナリオの構成図より
1. S3 への CSV データをアップロード
まず,S3 へ CSV データをアップロードします.データは下記のGreen Taxi Trip Records(CSV)
の1月データを使いました.
ローカルにダウンロードしたデータを S3 にアップロードします.
2. Athena でデータベースおよびテーブルを作成する
Athena でデータベースとテーブルを作成します.
# データベース作成 CREATE DATABASE greentripdata; #テーブル作成 CREATE EXTERNAL TABLE green_tripdata( VendorID string, lpep_pickup_datetime string, lpep_dropoff_datetime string, store_and_fwd_flag string, RatecodeID string, PULocationID string, DOLocationID string, passenger_count int, trip_distance double, fare_amount double, extra double, mta_max double, tip_amount double, tolls_amount double, ehail_fee string, improvement_surcharge double, total_amount double, payment_type string, trip_type string, congestion_surcharge double ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 's3://S3バケット名/CSV データ格納ディレクトリ/';
そして,後続でも使うためテーブルのデータ数を確認しておきます.630919
件のデータがあることを確認しました.
select count(*) from green_tripdata
3. SageMaker から AWS Data Wrangler 経由で Athena,S3 の CSVデータにアクセスする
SageMaker ノートブックインスタンス起動時に設定する IAM ロールにAmazonS3FullAccess
とAmazonAthenaFullAccess
を付与しておきます.起動後に,「AWS Data Wrangler」モジュールをインストールします.
!pip install awswrangler Collecting awswrangler Downloading https://files.pythonhosted.org/packages/ce/ab/677e5f5aa33584a6bacc15b7eaabea31f5ad7eb4e850a3105f5b73ebc99e/awswrangler-0.0.8.tar.gz Collecting pyarrow>=0.14.0 (from awswrangler) Downloading https://files.pythonhosted.org/packages/c9/ed/e9fda0abcf087e0288ce78f744dffbfc2ac8dfba6f242a8ab025d76bee27/pyarrow-0.15.0-cp36-cp36m-manylinux1_x86_64.whl (60.1MB) 100% |████████████████████████████████| 60.1MB 815kB/s eta 0:00:01 Collecting pandas>=0.25.1 (from awswrangler) Downloading https://files.pythonhosted.org/packages/73/9b/52e228545d14f14bb2a1622e225f38463c8726645165e1cb7dde95bfe6d4/pandas-0.25.1-cp36-cp36m-manylinux1_x86_64.whl (10.5MB) 100% |████████████████████████████████| 10.5MB 7.8MB/s eta 0:00:01 Requirement already satisfied: botocore>=1.12.239 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from awswrangler) (1.12.239) Requirement already satisfied: boto3>=1.9.239 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from awswrangler) (1.9.239) Collecting s3fs>=0.3.4 (from awswrangler) Downloading https://files.pythonhosted.org/packages/01/5c/5899c874ac3a00c4b99be983eae22c8a3800c3d5fc3d22f6f1e5058aacf2/s3fs-0.3.4-py3-none-any.whl Collecting tenacity>=5.1.1 (from awswrangler) Downloading https://files.pythonhosted.org/packages/1e/a1/be8c8610f4620c56790965ba2b564dd76d13cbcd7c2ff8f6053ce63027fb/tenacity-5.1.1-py2.py3-none-any.whl Collecting pg8000>=1.13.2 (from awswrangler) Downloading https://files.pythonhosted.org/packages/16/32/ae895597e43bc968e0e3e63860e9932b851115457face0d06d7f451b71fc/pg8000-1.13.2-py3-none-any.whl Requirement already satisfied: numpy>=1.14 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from pyarrow>=0.14.0->awswrangler) (1.14.3) Requirement already satisfied: six>=1.0.0 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from pyarrow>=0.14.0->awswrangler) (1.11.0) Requirement already satisfied: pytz>=2017.2 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from pandas>=0.25.1->awswrangler) (2018.4) Requirement already satisfied: python-dateutil>=2.6.1 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from pandas>=0.25.1->awswrangler) (2.7.3) Requirement already satisfied: urllib3<1.26,>=1.20; python_version >= "3.4" in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from botocore>=1.12.239->awswrangler) (1.23) Requirement already satisfied: docutils<0.16,>=0.10 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from botocore>=1.12.239->awswrangler) (0.14) Requirement already satisfied: jmespath<1.0.0,>=0.7.1 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from botocore>=1.12.239->awswrangler) (0.9.4) Requirement already satisfied: s3transfer<0.3.0,>=0.2.0 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from boto3>=1.9.239->awswrangler) (0.2.1) Collecting fsspec>=0.2.2 (from s3fs>=0.3.4->awswrangler) Downloading https://files.pythonhosted.org/packages/95/2c/31fce3889ce89ec13e47201c71a0cb6d2ff6e5c7b5fed066fe0ac5c5e22b/fsspec-0.5.1-py3-none-any.whl (56kB) 100% |████████████████████████████████| 61kB 30.3MB/s ta 0:00:01 Collecting scramp==1.1.0 (from pg8000>=1.13.2->awswrangler) Downloading https://files.pythonhosted.org/packages/bb/ef/6bdba6756ba7ccb81187833504ebba0511af750a2d9beaa04e4b56c3974f/scramp-1.1.0-py3-none-any.whl Building wheels for collected packages: awswrangler Running setup.py bdist_wheel for awswrangler ... done Stored in directory: /home/ec2-user/.cache/pip/wheels/d9/81/7d/f4e8f56f0d44f17a571fcbe5b90a4ceb6001d6debdf8951be9 Successfully built awswrangler Installing collected packages: pyarrow, pandas, fsspec, s3fs, tenacity, scramp, pg8000, awswrangler Found existing installation: pandas 0.24.2 Uninstalling pandas-0.24.2: Successfully uninstalled pandas-0.24.2 Found existing installation: s3fs 0.1.5 Uninstalling s3fs-0.1.5: Successfully uninstalled s3fs-0.1.5 Successfully installed awswrangler-0.0.8 fsspec-0.5.1 pandas-0.25.1 pg8000-1.13.2 pyarrow-0.15.0 s3fs-0.3.4 scramp-1.1.0 tenacity-5.1.1 You are using pip version 10.0.1, however version 19.2.3 is available. You should consider upgrading via the 'pip install --upgrade pip' command.
「AWS Data Wrangler」経由で Athena,S3 の CSVデータにアクセスしてデータの件数を確認してみます.2. Athena でデータベースおよびテーブルを作成する
で確認したのと同じ630919
件であることを確認できました.
import pandas as pd import awswrangler session = awswrangler.Session() df = session.pandas.read_sql_athena( sql="select * from green_tripdata", database="greentripdata" ) print(df) 【output】 vendorid lpep_pickup_datetime lpep_dropoff_datetime \ 0 VendorID lpep_pickup_datetime lpep_dropoff_datetime 1 2 2018-12-21 15:17:29 2018-12-21 15:18:57 2 2 2019-01-01 00:10:16 2019-01-01 00:16:32 3 2 2019-01-01 00:27:11 2019-01-01 00:31:38 4 2 2019-01-01 00:46:20 2019-01-01 01:04:54 ... ... ... ... 630914 2 2019-01-31 23:08:27 2019-01-31 23:22:59 630915 2 2019-01-31 23:21:26 2019-01-31 23:23:05 630916 2 2019-01-31 23:30:05 2019-01-31 23:36:14 630917 2 2019-01-31 23:59:58 2019-02-01 00:04:18 630918 2 2019-01-31 23:18:22 2019-01-31 23:26:06 store_and_fwd_flag ratecodeid pulocationid dolocationid \ 0 store_and_fwd_flag RatecodeID PULocationID DOLocationID 1 N 1 264 264 2 N 1 97 49 3 N 1 49 189 4 N 1 189 17 ... ... ... ... ... 630914 N 1 255 226 630915 N 1 75 151 630916 N 1 75 238 630917 N 1 74 74 630918 N 1 75 262 passenger_count trip_distance fare_amount extra mta_max \ 0 NaN NaN NaN NaN NaN 1 5 0.00 3.0 0.5 0.5 2 2 0.86 6.0 0.5 0.5 3 2 0.66 4.5 0.5 0.5 4 2 2.68 13.5 0.5 0.5 ... ... ... ... ... ... 630914 1 3.33 13.0 0.5 0.5 630915 1 0.72 4.0 0.5 0.5 630916 1 1.75 7.0 0.5 0.5 630917 1 0.57 5.0 0.5 0.5 630918 1 2.11 8.5 0.5 0.5 tip_amount tolls_amount ehail_fee improvement_surcharge \ 0 NaN NaN ehail_fee NaN 1 0.00 0.0 NaN 0.3 2 0.00 0.0 NaN 0.3 3 0.00 0.0 NaN 0.3 4 2.96 0.0 NaN 0.3 ... ... ... ... ... 630914 2.14 0.0 NaN 0.3 630915 1.06 0.0 NaN 0.3 630916 0.00 0.0 NaN 0.3 630917 1.00 0.0 NaN 0.3 630918 1.96 0.0 NaN 0.3 total_amount payment_type trip_type congestion_surcharge 0 NaN payment_type trip_type NaN 1 4.30 2 1 NaN 2 7.30 2 1 NaN 3 5.80 1 1 NaN 4 19.71 1 1 NaN ... ... ... ... ... 630914 18.39 1 1 0.0 630915 6.36 1 1 0.0 630916 8.30 1 1 0.0 630917 7.30 1 1 0.0 630918 11.76 1 1 0.0 [630919 rows x 20 columns]
4. ETL 処理の実行
それでは ETL 処理の実行をしていきます.まず,trip_distance
カラムのデータが0の部分を分析対象外として行の削除処理を行います.削除する行は10721
行であることを確認できます.
# trip_distanceが0の値を抽出 rows_drop = df.index[df["trip_distance"] == 0.00] # trip_distanceが0の値の件数を確認 print(df.loc[rows_drop].count()) 【output】 vendorid 10721 lpep_pickup_datetime 10721 lpep_dropoff_datetime 10721 store_and_fwd_flag 10721 ratecodeid 10721 pulocationid 10721 dolocationid 10721 passenger_count 10721 trip_distance 10721 fare_amount 10721 extra 10721 mta_max 10721 tip_amount 10721 tolls_amount 10721 ehail_fee 0 improvement_surcharge 10721 total_amount 10721 payment_type 10721 trip_type 10721 congestion_surcharge 1228 dtype: int64
trip_distance
カラムの0のデータ部分を削除していきます.総データ数が630919
から10721
行を削除するので,620198
件のデータ削除処理しました.
# trip_distanceが0の値を削除 df_drop = df.drop(rows_drop) print(df_drop) 【output】 vendorid lpep_pickup_datetime lpep_dropoff_datetime \ 0 VendorID lpep_pickup_datetime lpep_dropoff_datetime 2 2 2019-01-01 00:10:16 2019-01-01 00:16:32 3 2 2019-01-01 00:27:11 2019-01-01 00:31:38 4 2 2019-01-01 00:46:20 2019-01-01 01:04:54 5 2 2019-01-01 00:19:06 2019-01-01 00:39:43 ... ... ... ... 630914 2 2019-01-31 23:08:27 2019-01-31 23:22:59 630915 2 2019-01-31 23:21:26 2019-01-31 23:23:05 630916 2 2019-01-31 23:30:05 2019-01-31 23:36:14 630917 2 2019-01-31 23:59:58 2019-02-01 00:04:18 630918 2 2019-01-31 23:18:22 2019-01-31 23:26:06 store_and_fwd_flag ratecodeid pulocationid dolocationid \ 0 store_and_fwd_flag RatecodeID PULocationID DOLocationID 2 N 1 97 49 3 N 1 49 189 4 N 1 189 17 5 N 1 82 258 ... ... ... ... ... 630914 N 1 255 226 630915 N 1 75 151 630916 N 1 75 238 630917 N 1 74 74 630918 N 1 75 262 passenger_count trip_distance fare_amount extra mta_max \ 0 NaN NaN NaN NaN NaN 2 2 0.86 6.0 0.5 0.5 3 2 0.66 4.5 0.5 0.5 4 2 2.68 13.5 0.5 0.5 5 1 4.53 18.0 0.5 0.5 ... ... ... ... ... ... 630914 1 3.33 13.0 0.5 0.5 630915 1 0.72 4.0 0.5 0.5 630916 1 1.75 7.0 0.5 0.5 630917 1 0.57 5.0 0.5 0.5 630918 1 2.11 8.5 0.5 0.5 tip_amount tolls_amount ehail_fee improvement_surcharge \ 0 NaN NaN ehail_fee NaN 2 0.00 0.0 NaN 0.3 3 0.00 0.0 NaN 0.3 4 2.96 0.0 NaN 0.3 5 0.00 0.0 NaN 0.3 ... ... ... ... ... 630914 2.14 0.0 NaN 0.3 630915 1.06 0.0 NaN 0.3 630916 0.00 0.0 NaN 0.3 630917 1.00 0.0 NaN 0.3 630918 1.96 0.0 NaN 0.3 total_amount payment_type trip_type congestion_surcharge 0 NaN payment_type trip_type NaN 2 7.30 2 1 NaN 3 5.80 1 1 NaN 4 19.71 1 1 NaN 5 19.30 2 1 NaN ... ... ... ... ... 630914 18.39 1 1 0.0 630915 6.36 1 1 0.0 630916 8.30 1 1 0.0 630917 7.30 1 1 0.0 630918 11.76 1 1 0.0 [620198 rows x 20 columns] # trip_distanceが0の値の件数を確認 df_lens = df_drop.count() print(df_lens) 【output】 vendorid 620198 lpep_pickup_datetime 620198 lpep_dropoff_datetime 620198 store_and_fwd_flag 620198 ratecodeid 620198 pulocationid 620198 dolocationid 620198 passenger_count 620197 trip_distance 620197 fare_amount 620197 extra 620197 mta_max 620197 tip_amount 620197 tolls_amount 620197 ehail_fee 1 improvement_surcharge 620197 total_amount 620197 payment_type 620198 trip_type 620198 congestion_surcharge 83310 dtype: int64
不要データを削除したものに対してデータ内のカラムの置き換えを行います.payment_type
という項目に対してデータの置き換えを行います.データの置き換えしたことで一部のみの表示ですがCredit card
に置き換わっていることを確認しました.
df_replace = df_drop.replace( {'payment_type': { '1': 'Credit card', '2': 'Cash', '3': 'No charge', '4': 'Dispute', '5': 'Unknown', '6': 'Voided trip' } } ) print(df_replace['payment_type']) 【output】 0 payment_type 2 Cash 3 Credit card 4 Credit card 5 Cash ... 630914 Credit card 630915 Credit card 630916 Credit card 630917 Credit card 630918 Credit card Name: payment_type, Length: 620198, dtype: object
5. ETL 後のデータを別の CSV ファイルにして S3 に出力する
ETL 後のデータを別の CSV ファイルにして S3 に出力します.replace_csv
フォルダに CSV データを出力します.S3 に2件のデータが出力されていることを確認しました.
session.pandas.to_csv( dataframe=df_replace, path="s3://xxxx/replace_csv/", sep=",", database=None, table=None, partition_cols=None, preserve_index=True, mode='append', procs_cpu_bound=None, procs_io_bound=None ) 【output】 ['s3://xxxx/replace_csv/c379726f1d6d4b1b939fd64c730f059d.csv', 's3://xxxxreplace_csv/febc156980ec4a0ea23a640558a3a596.csv']
出力後のデータの件数が行削除後のデータ件数かも確認します.620198
のデータ件数であることを確認できました.一緒ですね.
df2 = session.pandas.read_sql_athena( sql="select * from green_tripdata_replace", database="greentripdata" ) print(df2) 【output】 vendorid lpep_pickup_datetime lpep_dropoff_datetime \ 0 "315602" "2" "2019-01-16 17:12:12" 1 "315603" "2" "2019-01-16 17:05:29" 2 "315604" "2" "2019-01-16 17:30:44" 3 "315605" "2" "2019-01-16 17:09:35" 4 "315606" "2" "2019-01-16 17:37:14" ... ... ... ... 620193 "315597" "2" "2019-01-16 18:00:02" 620194 "315598" "2" "2019-01-16 17:08:57" 620195 "315599" "2" "2019-01-16 17:29:20" 620196 "315600" "2" "2019-01-16 17:24:21" 620197 "315601" "2" "2019-01-16 18:01:00" store_and_fwd_flag ratecodeid pulocationid dolocationid \ 0 "2019-01-16 17:28:05" "N" "1" "74" 1 "2019-01-16 17:13:48" "N" "1" "95" 2 "2019-01-16 17:44:44" "N" "5" "134" 3 "2019-01-16 17:16:01" "N" "1" "130" 4 "2019-01-16 17:46:56" "N" "1" "130" ... ... ... ... ... 620193 "2019-01-16 18:15:39" "N" "1" "182" 620194 "2019-01-16 17:17:41" "N" "1" "75" 620195 "2019-01-16 17:33:48" "N" "1" "75" 620196 "2019-01-16 17:56:35" "N" "1" "97" 620197 "2019-01-16 18:43:47" "N" "1" "97" passenger_count trip_distance fare_amount extra mta_max \ 0 NaN NaN NaN NaN NaN 1 NaN NaN NaN NaN NaN 2 NaN NaN NaN NaN NaN 3 NaN NaN NaN NaN NaN 4 NaN NaN NaN NaN NaN ... ... ... ... ... ... 620193 NaN NaN NaN NaN NaN 620194 NaN NaN NaN NaN NaN 620195 NaN NaN NaN NaN NaN 620196 NaN NaN NaN NaN NaN 620197 NaN NaN NaN NaN NaN tip_amount tolls_amount ehail_fee improvement_surcharge \ 0 NaN NaN "0.0" NaN 1 NaN NaN "0.0" NaN 2 NaN NaN "0.0" NaN 3 NaN NaN "0.0" NaN 4 NaN NaN "0.0" NaN ... ... ... ... ... 620193 NaN NaN "0.0" NaN 620194 NaN NaN "0.0" NaN 620195 NaN NaN "0.0" NaN 620196 NaN NaN "0.0" NaN 620197 NaN NaN "0.0" NaN total_amount payment_type trip_type congestion_surcharge 0 NaN "16.62" "Credit card" NaN 1 NaN "9.8" "Cash" NaN 2 NaN "18.02" "Credit card" NaN 3 NaN "9.36" "Credit card" NaN 4 NaN "11.16" "Credit card" NaN ... ... ... ... ... 620193 NaN "15.3" "Credit card" NaN 620194 NaN "9.8" "Cash" NaN 620195 NaN "8.76" "Credit card" NaN 620196 NaN "23.3" "Credit card" NaN 620197 NaN "34.8" "Cash" NaN [620198 rows x 20 columns]
まとめ
リリースされた Python データ分析モジュールの「AWS Data Wrangler」のチュートリアルを行なってみました.Pandas で CSV を読み書きするときに JupyterNotebook の実行環境のローカルに配置して処理していましたが,S3 や Athena に接続設定などを書かずにローカルに ETL 処理対象があるかのようにデータを扱えた印象でした.本モジュールのメリットにあるように ETL 処理に集中していくことが可能なのかと感じます.AWS のデータ解析のエコシステムを作るときに登場してくる存在として今後のアップデートに注目していきたいですし,採用も検討していきたいですね!