ユーザー単位で分析するカスタマーアナリティクスの場合、Google AnalyticsやAdobe Analyticsの画面で表示できるレポート機能が物足りないので、Data Warehouseなどで生データ(に近い集計データ)を抽出してTableauで集計することが多いですが、データが巨大だとBIツールで読み込めません。そんな時にPythonで巨大ファイルを並列処理し、不要なカラムやレコードを削除してからBIで読み込む方法についてです。

以下のように、Adobe AnalyticsのデータをData WarehouseでFTP配信したCSVファイルをPythonで前処理してみます。

元ファイルは18.5GBもあり、ExcelでもテキストエディタでもTableauでも開けません。少しでもデータ量を減らすためにセグメントを適用してありますが、Data Warehouseのセグメント機能には制約があり、どうしても不要なデータが混ざってしまいます。

データベースやGCPで処理するのが確実ですが、手元のパソコンでサクっと分析したいので、Pythonの並列・分散処理ライブラリ「Dask」を使いました。

使ったPythonコードはこちら。
pipでdaskをインストールしておく必要があります(Anacondaには標準で含まれているようです)。

import dask.dataframe as dd
# 分割されたCSVでも一つの巨大CSVでも同じように読み込める
file1 = 'DW-exported-*.csv'
df = dd.read_csv(file1, header=0, names=('Pages','VisitNum','HitDepth','Date','CV','Customer ID','Referrer','PV'), dtype={'Pages':'object','VisitNum':'int16','HitDepth':'int16','Date':'object','CV':'object','eVar13':'object','Referrer':'object','PV':'int16'})
# カラム名や型を指定しない場合
#df = dd.read_csv(file1, header=0)
# この結果、複数のパーティションに分割されたpandas dataframeが生成される
print('生成されたパーティションの数:' + str(df.npartitions))
# フィルタや変換などの前処理はパーティション単位で並列処理される
# まずフィルタでレコード数を減らす例
df = df[df['Customer ID'].str.contains('^\d+', na=False)]
# 値を変換する例
df['Customer ID'] = df['Customer ID'].str.replace('\/.*$', '')
# NULLを含むレコードを除外する例
df = df.dropna(subset=['Pages', 'Date', 'Customer ID', 'PV'])
# カラム削除の例
df = df.drop('CV', axis=1)
# 他のCSVとマージ(JOIN)する例
file2 = 'ID一覧.csv'
loyal = dd.read_csv(file2, header=0)
merge = dd.merge(df, loyal, left_on='Customer ID', right_on='AAID')
merge = merge.drop('AAID', axis=1)
# headやtailなら全データを読み込まないので速い
merge.head()
# 処理が終わったらパーティション単位でファイル保存
merge.to_csv("DW-processed-*.csv")
# メモリに収まるほど小さくなった場合は、一つのファイルにまとめて保存できる
merge.compute().to_csv("DW-processed.csv")

to_csvで普通にCSV保存すると、読み込みの時に分割されたパーティションの単位で分割された複数のCSVファイルが生成されます。時間はかかりますが、pandasで普通に処理した時のようにフリーズしたり待った挙句にエラーになったりしません。

メモリに読み込める程度までデータ量が減った場合は、一つにまとめて単一のCSVファイルとして保存することもできます。

上のコードを実行して生成されたのは3.6GBのCSV。Macbook上のTableauでも読み込めるようになりました。

注意点など

  • CSV読み込み時に最小限のデータ型を指定するとメモリを節約できる
  • 並び替えなど、全データをメモリに読み込むような処理は避ける必要がある
  • NumPyとpandasのAPIを完全サポートしているわけではない

pandasだけでもchunksizeを指定してループ処理できますが、DASKだと小さな単一ファイルと同じように処理しても自動で分割や並列処理してくれるのが便利!