DataflowカスタムコンテナでDBSCANクラスタリングを実行してみた
DBSCANとDataflowのサンプルソースで試した
Table of contents
author: kyou
背景
Dataflow now supports custom containers in GA.
※ このカスタム コンテナ機能は、Python で一般提供が開始されました。Java ではプレビューで利用できます。
になってから、もうすぐ半年が経ちます。一方、初心者にやさしいBigQuery MLのクラスタリングはK平均法(k-means)のみサポートしています。DBSCANはk-meansより優れるように見えますので、Dataflowカスタムコンテナで回してみることにしました。
準備
Dataflow でのカスタム コンテナの使用には詳しい内容がありますから、Apache Beam SDK (>2.30.0)とDockerインストールの説明は割愛させていただきます。
データの準備
DBSCAN クラスタリングの解説と実験のソースを参照し、Dataset 1(またはDataset 2)をCSVファイルにエクスポートします。
import pandas as pd
df = pd.DataFrame(dataset1)
df.to_csv("sample.csv", header=False, index=False)
DBSCANのインプットとして、Google Cloud Storageのバケットに格納します。
$ gsutil cp sample.csv gs://$MY_BUCKET
Copying file://...
実装
Docker イメージ
Apache Beam SDKのベースイメージを指定し、DBSCAN用のscikit-learnとインプット用のCloud Storageライブラリをインストールします。
FROM apache/beam_python3.8_sdk:2.35.0
RUN pip install --no-cache-dir scikit-learn google-cloud-storage
COPY dbscan.py /workspace/
WORKDIR /workspace
Dataflow 実装
サンプルソースのwordcount.pyをベースに実装します。まず、ライブラリを追加します。
from google.cloud import storage
from sklearn import cluster
import numpy as np
wordcount.pyのWordExtractingDoFn関数では入力データを行ごとに単語に分解していますが、DBSCAN の処理単位は一つのデータセットになっているため、DBSCANの処理は以下のように実装します。
class DbscanDoFn(beam.DoFn):
def __init__(
self,
project=None,
bucket_name=None,
destination_file_name=None,
):
self._project = project
self._bucket_name = bucket_name
self._destination_file_name = destination_file_name
def process(self, element):
#element(input data): csv file name
storage_client = storage.Client(self._project)
bucket = storage_client.get_bucket(self._bucket_name)
blob = bucket.blob(element)
blob.download_to_filename(self._destination_file_name)
with open(self._destination_file_name) as file_name:
array = np.loadtxt(file_name, delimiter=",")
return cluster.DBSCAN(eps=1, min_samples=5, metric="euclidean").fit_predict(
array
)
DbscanDoFnを使うように、pipelineのソースを以下のように変更します。
実行時にknown_args.input
へ渡すlist.txtにはsample.csvのファイル名のみが入っています。
with beam.Pipeline(options=pipeline_options) as p:
lines = p | "Read" >> ReadFromText(known_args.input)
output = lines | "DBSCAN" >> (
beam.ParDo(
DbscanDoFn(
project=known_args.bucket_project_id,
bucket_name=known_args.bucket_name,
destination_file_name=known_args.destination_name,
)
)
)
output | "Write" >> WriteToText(known_args.output)
know_argsにパラメータが増えましたから、パラメータ処理にも以下の内容を追加します。
parser.add_argument("--bucket_project_id", required=True, help="The project id")
parser.add_argument("--bucket_name", required=True, help="The name of the bucket")
parser.add_argument(
"--destination_name", required=True, help="The destination name of the files"
)
実行してみる
$ export PROJECT=
$ export REPO=dbscan
$ export TAG=latest
$ export IMAGE_URI=gcr.io/$PROJECT/$REPO:$TAG
$ gcloud builds submit . --tag $IMAGE_URI
$ python dbscan.py \
--runner "DataflowRunner" \
--project "$PROJECT" \
--temp_location "gs://$MY_BUCKET/tmp" \
--sdk_container_image "$IMAGE_URI" \
--experiments "use_runner_v2" \
--job_name "dbscan" \
--input "gs://$MY_BUCKET/list.txt" \
--output "gs://$MY_BUCKET/output" \
--bucket_project_id "$PROJECT" \
--bucket_name "$MY_BUCKET" \
--destination_name "sample.csv" \
--region "us-central1"
gs://$MY_BUCKET/output
ではクラスタリングの結果を確認できたら、終わりです。
参考資料
※本記事は、ジーアイクラウド株式会社の見解を述べたものであり、必要な調査・検討は行っているものの必ずしもその正確性や真実性を保証するものではありません。
※リンクを利用する際には、必ず出典がGIC dryaki-blogであることを明記してください。
リンクの利用によりトラブルが発生した場合、リンクを設置した方ご自身の責任で対応してください。
ジーアイクラウド株式会社はユーザーによるリンクの利用につき、如何なる責任を負うものではありません。