今回はパイプラインの設計についてです。
設計時には、ApacheBeamではどのような想定が必要かについて確認します。
目次
前提
対象ドキュメント
パイプラインの設計時に考慮する事項
- 入力データの場所
- 入力データ数によって、パイプラインの開始時のTransformの種類が決まります。
- 入力データのキー
- 一部のビーム変換PCollectionは、キー/値ペアでのみ機能します。データにキーが設定されているかどうか、どのようにキー設定されているかなど、パイプライン中でデータがどのように保持されるのか検討する必要があります。
- データの変換仕様
- Beam SDKのコアトランスフォームは汎用的なので、データの変更または操作仕様を検討しておくことにより、ParDoなどのコアトランスフォームの構築方法、またはBeam SDKに含まれる事前作成済みトランスフォームの使用方法が決まります。
- 出力データの形式と保存先
- パイプラインの最後に適用する必要がある変換を検討します。
公式ドキュメントをほぼそのまま訳してみました。
(あまり何言ってるかわからないですね。)
独自で解説を加えると、
入出力・変換を検討するというのはプログラムの設計として言うまでもなく大事なことだと思いますが、その上で特にApacheBeam独自のStream的な一連の処理を意識することとその中で汎用的に用意されている処理や独自変換処理の設計をすることが重要となります。
キーの話などは上述のStream的な一連の処理の意識というなかに含まれていて、キーでの集約処理などを設けなければ処理が集約せず、複雑な処理では予期せず処理の前後関係が崩れてしまうということもありえます。
基本的なパイプライン
公式ドキュメントは大変わかりにくくかかれていますが、、
上図はInputにTransformを加え、TransformからPCollectionが出力されということを表しています。
これは極めてシンプルな例ですが、Transformから出力するPCollectionは複数とすることもできますし、入力のPCollectionを複数にすることもできます。
(ちなみにこれをすると実装は上述キーなどとの関係で極めて複雑になります。)
これは別の回でより詳しく解説しますが、ApacheBeamプログラム上では、以下の通り、文言として覚えておいてください。
- Pipeline
- パイプラインとして定義された処理全体のこと
- PTransform
- データに対する変換処理のこと
- PCollection
- Transformの入出力のこと
(前のTransformにとっては出力、次のTransformにとっては入力) - 内部は配列のようになっているが、PTransformに渡す以外では分解して値を取り出せない。
- Transformの入出力のこと
PCollectionの分岐
ここで、PCollectionの重要な特徴について解説します。
PCollectionは何度処理が呼び出されても値が変わったり変更されるということはありません。
同じPCollectionを複数Transformが処理するパターン
Aという名前を抽出するTransformAとBという名前を抽出するTransformBに同一の入力PCollectionを与えることが可能です。
↓はサンプルですが、dbRowCollectionという入力を2つのTransformに別で渡しているということが味噌です。やや実装よりな印象ですがこのようなロジックを組むことも可能です。
PCollection<String> dbRowCollection = ...;
PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("A")){
c.output(c.element());
}
}
}));
PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("B")){
c.output(c.element());
}
}
}));
複数のPCollectionを一つのTransformが出力する
別の実装ドキュメントの話ですが、PTransform内で別のタグを付けて出力する(https://beam.apache.org/documentation/programming-guide/#additional-outputs)ことで複数のPCollectionを出力することができます。
以下例で、Aで始まるものはoutputToPCollectionAへ、Bで始まるものはoutputToPCollectionBへ出力しています。
// Define two TupleTags, one for each output.
final TupleTag<String> startsWithATag = new TupleTag<String>(){};
final TupleTag<String> startsWithBTag = new TupleTag<String>(){};
PCollectionTuple mixedCollection =
dbRowCollection.apply(ParDo
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().startsWith("A")) {
// Emit to main output, which is the output with tag startsWithATag.
c.output(c.element());
} else if(c.element().startsWith("B")) {
// Emit to output with tag startsWithBTag.
c.output(startsWithBTag, c.element());
}
}
})
// Specify main output. In this example, it is the output
// with tag startsWithATag.
.withOutputTags(startsWithATag,
// Specify the output with tag startsWithBTag, as a TupleTagList.
TupleTagList.of(startsWithBTag)));
// Get subset of the output with tag startsWithATag.
mixedCollection.get(startsWithATag).apply(...);
// Get subset of the output with tag startsWithBTag.
mixedCollection.get(startsWithBTag).apply(...);
複数PCollectionをまとめる
上記のような処理で分散したPCollectionはPTransformに渡さなければ値展開ができないため、PCollectionをまとめる方法も用意されています。
結合する方法は(おそらく事前に用意された関数としては)2つあります
- Flatten
- 同じタイプの複数のPCollectionをひとつにまとめることができます
- 配列を足し算するイメージ(どちらの要素も持った配列へマージするイメージです)
- Join
- CoGroupByというTransformで同一のキーをもったPCollectionを関連付けて処理できます
- そもそもキーを持っているPCollectionを事前に用意する必要があるのと、2つのPCollectionのキーのタイプは同じである必要があります
↓はFlattenのサンプル。もう少し下にCoGroupByもあります。
//merge the two PCollections with Flatten
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
.apply(Flatten.<String>pCollections());
// continue with the new merged PCollection
mergedCollectionWithFlatten.apply(...);
複数ソースから入力を受け付ける
パイプラインは複数のソース(DB&ファイルなど)から入力をうけつけ、処理へ渡すことが可能です。
サンプルの説明
以下の図5に示す例では、パイプラインはデータベーステーブルから名前と住所を読み取り、Kafkaトピックから名前と注文番号を読み取ります。
次に、パイプラインはCoGroupByKey
この情報を結合するために
使用
します。キーは名前です。
結果にPCollection
は、名前、住所、注文のすべての組み合わせが含まれます。
事前にキーを用意すればSQLのJOIN的なことができるということですね。
PCollection<KV<String, String>> userAddress = pipeline.apply(JdbcIO.<KV<String, String>>read()...);
PCollection<KV<String, String>> userOrder = pipeline.apply(KafkaIO.<String, String>read()...);
final TupleTag<String> addressTag = new TupleTag<String>();
final TupleTag<String> orderTag = new TupleTag<String>();
// Merge collection values into a CoGbkResult collection.
PCollection<KV<String, CoGbkResult>> joinedCollection =
KeyedPCollectionTuple.of(addressTag, userAddress)
.and(orderTag, userOrder)
.apply(CoGroupByKey.<String>create());
joinedCollection.apply(...);