DataflowカスタムコンテナでDBSCANクラスタリングを実行してみた

2022/01/24に公開されました。
2022/01/24に更新されました。

DBSCANとDataflowのサンプルソースで試した


author: kyou

背景

Dataflow now supports custom containers in GA.

※ このカスタム コンテナ機能は、Python で一般提供が開始されました。Java ではプレビューで利用できます。

になってから、もうすぐ半年が経ちます。一方、初心者にやさしいBigQuery MLのクラスタリングはK平均法(k-means)のみサポートしています。DBSCANはk-meansより優れるように見えますので、Dataflowカスタムコンテナで回してみることにしました。

準備

Dataflow でのカスタム コンテナの使用には詳しい内容がありますから、Apache Beam SDK (>2.30.0)とDockerインストールの説明は割愛させていただきます。

データの準備

DBSCAN クラスタリングの解説と実験のソースを参照し、Dataset 1(またはDataset 2)をCSVファイルにエクスポートします。

import pandas as pd

df = pd.DataFrame(dataset1)

df.to_csv("sample.csv", header=False, index=False)

DBSCANのインプットとして、Google Cloud Storageのバケットに格納します。

$ gsutil cp sample.csv gs://$MY_BUCKET
Copying file://...

実装

Docker イメージ

Apache Beam SDKのベースイメージを指定し、DBSCAN用のscikit-learnとインプット用のCloud Storageライブラリをインストールします。

FROM apache/beam_python3.8_sdk:2.35.0

RUN pip install --no-cache-dir scikit-learn google-cloud-storage

COPY dbscan.py /workspace/

WORKDIR /workspace

Dataflow 実装

サンプルソースのwordcount.pyをベースに実装します。まず、ライブラリを追加します。

from google.cloud import storage
from sklearn import cluster
import numpy as np

wordcount.pyのWordExtractingDoFn関数では入力データを行ごとに単語に分解していますが、DBSCAN の処理単位は一つのデータセットになっているため、DBSCANの処理は以下のように実装します。

class DbscanDoFn(beam.DoFn):
    def __init__(
        self,
        project=None,
        bucket_name=None,
        destination_file_name=None,
    ):
        self._project = project
        self._bucket_name = bucket_name
        self._destination_file_name = destination_file_name

    def process(self, element):
        #element(input data): csv file name
        storage_client = storage.Client(self._project)
        bucket = storage_client.get_bucket(self._bucket_name)
        blob = bucket.blob(element)
        blob.download_to_filename(self._destination_file_name)
        with open(self._destination_file_name) as file_name:
            array = np.loadtxt(file_name, delimiter=",")
        return cluster.DBSCAN(eps=1, min_samples=5, metric="euclidean").fit_predict(
            array
        )

DbscanDoFnを使うように、pipelineのソースを以下のように変更します。 実行時にknown_args.inputへ渡すlist.txtにはsample.csvのファイル名のみが入っています。

    with beam.Pipeline(options=pipeline_options) as p:
        lines = p | "Read" >> ReadFromText(known_args.input)
        output = lines | "DBSCAN" >> (
            beam.ParDo(
                DbscanDoFn(
                    project=known_args.bucket_project_id,
                    bucket_name=known_args.bucket_name,
                    destination_file_name=known_args.destination_name,
                )
            )
        )
        output | "Write" >> WriteToText(known_args.output)

know_argsにパラメータが増えましたから、パラメータ処理にも以下の内容を追加します。

    parser.add_argument("--bucket_project_id", required=True, help="The project id")
    parser.add_argument("--bucket_name", required=True, help="The name of the bucket")
    parser.add_argument(
        "--destination_name", required=True, help="The destination name of the files"
    )

実行してみる

$ export PROJECT=
$ export REPO=dbscan
$ export TAG=latest
$ export IMAGE_URI=gcr.io/$PROJECT/$REPO:$TAG

$ gcloud builds submit . --tag $IMAGE_URI

$ python dbscan.py \
  --runner "DataflowRunner" \
  --project "$PROJECT" \
  --temp_location "gs://$MY_BUCKET/tmp" \
  --sdk_container_image "$IMAGE_URI" \
  --experiments "use_runner_v2" \
  --job_name "dbscan" \
  --input "gs://$MY_BUCKET/list.txt" \
  --output "gs://$MY_BUCKET/output" \
  --bucket_project_id "$PROJECT" \
  --bucket_name "$MY_BUCKET" \
  --destination_name "sample.csv" \
  --region "us-central1"

gs://$MY_BUCKET/outputではクラスタリングの結果を確認できたら、終わりです。

参考資料

※本記事は、ジーアイクラウド株式会社の見解を述べたものであり、必要な調査・検討は行っているものの必ずしもその正確性や真実性を保証するものではありません。

※リンクを利用する際には、必ず出典がGIC dryaki-blogであることを明記してください。
リンクの利用によりトラブルが発生した場合、リンクを設置した方ご自身の責任で対応してください。
ジーアイクラウド株式会社はユーザーによるリンクの利用につき、如何なる責任を負うものではありません。