実装方法です。ただしとても概要なので、実際にはここに書いている内容よりはゴリゴリ開発ドキュメントを読み込むことが必要になると思います。
目次
前提
対象ドキュメント
Pipeline作成&実行の流れ
- Pipelineオブジェクトの作成
- Pipelineのデータ入力部をReadもしくはCreateTransformを利用して実装
- 変換処理を実装
(色々Transformがある/実装方法があるのでここは開発ドキュメントの方で詳しく解説しますね。) - データを出力
- Pipelineを実行する
Pipelineオブジェクトの作成
Beamでは実装の最初にPipelineオブジェクトを作成する。
↓わからず・・・(想像ですがパイプラインは独立して定義されているが、Transformやデータは独立して定義される必要はない(横断アクセス可能)みたいなことを言っているのでしょうか・・・すいませんわかるかたお助け願います。)
In the Beam SDKs, each pipeline is represented by an explicit object of type
パイプラインのカプセル性Pipeline
. EachPipeline
object is an independent entity that encapsulates both the data the pipeline operates over and the transforms that get applied to that data.
以下サンプルです。optionsはParameterとか持たせたり、設定値をもたせるやつです。
// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();
// Then create the pipeline.
Pipeline p = Pipeline.create(options);
データ読み込み
パイプラインの最初で初期データを読み込む必要があります。(データから入力PCollectionにするため)
読み込みのためにTranformが用意されており、ここではファイルデータ(GCS)を読み込む方法をサンプル記載します。
実装方法はPipeline.apply(PTransform)です。
PCollection<String> lines = p.apply(
"ReadLines", TextIO.read().from("gs://some/inputData.txt"));
他にもCreateというTransformでインメモリから入力PCollectionを作成することもできます。
変換処理
変換を実施する場合はPCollection.apply(PTransform)と記載します。
以下は単語を反転させるサンプルです。
PCollection<String> words = ...;
PCollection<String> reversedWords = words.apply(new ReverseWords());
これも具体的な実装の話なので後で説明しますが、PTransformには入力型が決まっているので、入力となるPCollectionの型を合わせる必要があります。
出力
変換が完了したら結果を出力します。
入力同様に出力のためにTranformが用意されており、ここではファイル(GCS)にデータを書き込む方法をサンプル記載します。
PCollection<String> filteredWords = ...;
filteredWords.apply("WriteMyFile", TextIO.write().to("gs://some/outputData.txt"));
Pipelineの実行
Pipelineを組み上げたら、Pipeline.run()を実行します。これを実行するまで処理は走りません。
p.run();
run
メソッドは非同期です。代わりにブロッキング実行が必要な場合は、waitUntilFinish
メソッドを追加してパイプラインを実行します。
とのことなので、複数Pipelineがある場合などはここで待ってほしいとかあるかもですね。
p.run().waitUntilFinish();