Cloud FunctionsでDataflowを呼び出す

2022/04/08に公開されました。
2022/04/08に更新されました。

Dataflowテンプレート別に呼び出す方法をまとめる


author: Chiakoba

はじめに

DXチームのChiaKobaです。

華麗にデータ加工していますか?
今回は、データ加工のためのDataflowのテンプレートをCloud Functionsで呼び出していきます。

Dataflowの概要

Dataflowは、サーバーレスかつフルマネージなバッチ処理およびストリーム処理を行うサービスです。
大規模なデータ処理を単純化するオープンソースのApache Beam モデルを、Dataflowが並列分散してくれるので、データ処理の論理的な構成に集中できるサービスとなります。

そんなDataflowですが、テンプレートを利用することで、Google Cloud Console、Google Cloud CLI、またはREST APIを使用できます。

Dataflowテンプレート

今回はこのテンプレートをCloudFunction(Python2.7)でそれぞれ呼んで違いを噛み締めていきます。 自作テンプレートのデプロイについては割愛します。

現時点では、以下の3種類用意されています。
クラシックテンプレートとFlexテンプレートは自作する場合に使用します。


Google 提供のテンプレートをCloudFunctionで呼び出す

まずは、Google提供のテンプレートを呼んでいきましょう。
API詳細はprojects.templates.launchに記載されています。

以下のソースは、公開されているテンプレートのWordCountを使い、GCSのファイルを解析し、結果をGCSへファイルを出力する想定です。

from googleapiclient.discovery import build
import os

def _dataflow_job_start_function(request):
    # 引数dataから必要な情報を取得
    # 渡されるJSONの例))
    # {"data":{"bucket":"your-bucket-20220404","name":"test.csv"}}
    request_json = request.get_json()
    if request_json and 'data' in request_json:
        data = request_json['data']
        bucket_name = data['bucket']
        file_name = data['name']
    else:
        return f'None data!'

    job = 'word-count-call-job1'
    # templateは公開されてるgs://dataflow-templates/配下のテンプレートを記載する
    template = "gs://dataflow-templates/latest/Word_Count"
    parameters = {
         'inputFile': "gs://{}/{}".format(bucket_name,file_name),
         'output': "gs://{}/output/my_output1".format(bucket_name),
     }

    service  = build("dataflow","v1b3",cache_discovery=False)

    request = service.projects().templates().launch(
        projectId=os.environ.get('GCP_PROJECT'),  # pythonバージョンにより変更が必要
      	location=os.environ.get('FUNCTION_REGION'),  # pythonバージョンにより変更が必要
        gcsPath=template,
        body={
            'jobName': job,
            'parameters': parameters,
        }
    )
    return request.execute()

クラシックテンプレートをCloudFunctionで呼び出す

クラシックテンプレートをGCSに配置して、CloudFunctionで呼び出します。
API詳細はprojects.templates.launchに記載されています。

以下のソースは、GCSのファイルを自作のクラシックテンプレートで解析し、結果をGCSへファイルを出力する想定です。

from googleapiclient.discovery import build
import os

def _dataflow_job_start_function(request):
    # 引数dataから必要な情報を取得
    # 渡されるJSONの例))
    # {"data":{"bucket":"your-bucket-20220404","name":"test.csv","template":"classic-wordcount"}}
    request_json = request.get_json()
    if request_json and 'data' in request_json:
        data = request_json['data']
        bucket_name = data['bucket']
        file_name = data['name']
        template_name = data['template']
    else:
        return f'None data!'

    job = 'word-count-call-job2'
    # templateは自身が作成したテンプレートのGCSの場所を記載
    template = "gs://{}/templates/{}".format(bucket_name,template_name)
    parameters = {
         'inputFile': "gs://{}/{}".format(bucket_name,file_name),
         'output': "gs://{}/output/my_output2".format(bucket_name),
     }

    service  = build("dataflow","v1b3",cache_discovery=False)

    request = service.projects().templates().launch(
        projectId=os.environ.get('GCP_PROJECT'),  # pythonバージョンにより変更が必要
      	location=os.environ.get('FUNCTION_REGION'),  # pythonバージョンにより変更が必要
        gcsPath=template,
        body={
            'jobName': job,
            'parameters': parameters,
        }
    )
    return request.execute()

FlexテンプレートをCloudFunctionで呼び出す

FlexテンプレートをGCSに配置して、CloudFunctionで呼び出します。
API詳細はprojects.locations.flexTemplates.launchに記載されています。

以下のソースは、GCSのファイルを自作のFlexテンプレートで解析し、結果をGCSへファイルを出力する想定です。

from googleapiclient.discovery import build
import os

def _dataflow_job_start_function(request):
    # 引数dataから必要な情報を取得
    # 渡されるJSONの例))
    # {"data":{"bucket":"your-bucket-20220404","name":"test.csv","template":"flex-wordcount"}}
    request_json = request.get_json()
    if request_json and 'data' in request_json:
        data = request_json['data']
        bucket_name = data['bucket']
        file_name = data['name']
        template_name = data['template']
    else:
        return f'None data!'

    job = 'word-count-call-job3'
    # templateは自身が作成したテンプレート仕様ファイルの場所を記載
    template = "gs://{}/templates/{}.json".format(bucket_name,template_name)
    parameters = {
         'inputFile': "gs://{}/{}".format(bucket_name,file_name),
         'output': "gs://{}/output/my_output3".format(bucket_name),
     }

    service  = build("dataflow","v1b3",cache_discovery=False)

    request = service.projects().locations().flexTemplates().launch(
        projectId=os.environ.get('GCP_PROJECT'), # pythonバージョンにより変更が必要
      	location=os.environ.get('FUNCTION_REGION'),  # pythonバージョンにより変更が必要
        body={
            "launch_parameter":{
              'jobName': job,
              'parameters': parameters,
      	      'containerSpecGcsPath': template
            }
        }
    )
    return request.execute()

まとめ

違いを味わえましたでしょうか。
Google提供のテンプレートがない場合は、状況に応じて使い分けていきましょう。

最後に

データ加工のためのサービスはどんどん出てきますが、まだまだDataflowの使い所は多いと推測しています。
何かの参考になれば嬉しいです。

参考

https://github.com/GoogleCloudPlatform/java-docs-samples/tree/main/dataflow/templates



GI Cloudは事業の拡大に向けて一緒に夢を追う仲間を募集しています

当社は「クラウドで日本のIT業界を変革し、世の中をもっとハッピーに」をミッションに掲げ、Google Cloudに特化した技術者集団として、お客様にコンサルティングからシステム開発、運用・保守まで一気通貫でサービスを提供しています。

まだ小規模な事業体ですが、スタートアップならではの活気と成長性に加えて、大手総合商社である伊藤忠グループの一員としてやりがいのある案件にもどんどんチャレンジできる環境が整っています。成長意欲の高い仲間と共にスキルを磨きながら、クラウドの力で世の中をもっとハッピーにしたい。そんな我々の想いに共感できる方のエントリーをお待ちしています。

採用ページ

※本記事は、ジーアイクラウド株式会社の見解を述べたものであり、必要な調査・検討は行っているものの必ずしもその正確性や真実性を保証するものではありません。

※リンクを利用する際には、必ず出典がGIC dryaki-blogであることを明記してください。
リンクの利用によりトラブルが発生した場合、リンクを設置した方ご自身の責任で対応してください。
ジーアイクラウド株式会社はユーザーによるリンクの利用につき、如何なる責任を負うものではありません。