PCollectionについて解説します。
既に何度もでてきてその都度説明してきましたが、今回はより具体的な作成方法やPCollectionの特徴が主な内容となります。
目次
前提
対象ドキュメント
PCollectionの概要
PCollectionはデータセットの入ったCollectionで、常に分散して保持されています。
PCollectionはPTransformの入出力となるため、Pipelineでデータを引き継ぎつつ変換を行うにはPCollectionに値をoutputする必要があります。
Pipelineの開発時はまず何らかのデータセットからPCollectionを作成する必要があります。
(作成しなければPTransformを呼び出せないので)
tip:
JavaDocなどには記載されていますが、入力PCollectionを必要としない特別なPBeginという入力を持ったPTransformもあります[Create.ofなど]。逆に出力を必要としないPDoneもあります。
現時点で筆者は2つとも動作させたことがないです。
外部ソースからPCollectionを作成する
外部ソースからデータを読み取り、PCollectionを作成するにはSource API(開発ガイド5章)を利用します。
各アダプタはReadメソッドを持っており、これをPipelineに適用することでデータを読み出し、PCollection化&出力されます。
以下のソースが例です。
TextIOというSourceAPIを利用し、GCSからファイルを読み込みます。
public static void main(String[] args) {
// Create the pipeline.
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
// Create the PCollection 'lines' by applying a 'Read' transform.
PCollection<String> lines = p.apply(
"ReadMyFile", TextIO.read().from("gs://some/inputData.txt"));
}
tip:
アダプタの利用方法は様々で、Javadocレベルで確認をしてゆく必要があるかなと思います。例えば、接続情報の設定方法など、、このあたり全然書かれてなくて結構プログラミングするには厳しいですね。
インメモリからPCollectionを作成する
お馴染み、Create.of()でインメモリ領域からPCollectionを作成可能です。
入力値としては、CollectionとCoderを設定可能です。
Collectionには以下のサンプルのようにCoderを設定して、型を指定する必要があります。
public static void main(String[] args) {
// Create a Java Collection, in this case a List of Strings.
final List<String> LINES = Arrays.asList(
"To be, or not to be: that is the question: ",
"Whether 'tis nobler in the mind to suffer ",
"The slings and arrows of outrageous fortune, ",
"Or to take arms against a sea of troubles, ");
// Create the pipeline.
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
// Apply Create, passing the list and the coder, to create the PCollection.
p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of());
}
tip:
Create.ofProviderでパイプラインのパラメータとなるValueProviderも設定可能です。
PCollectionの特徴
PCollectionはPipelineに属するので、複数のPipelineでPCollectionを共有して処理をすることはできません。
PCollectionはjava.util.Collectionsと同じような特徴を持っていますが、全く異なる点もあります。
要素の型
PCollectionは要素の型を制限しません。ただし、要素は全て同じ型である必要があります。
また、PCollectionとしては型の制限はありませんが、分散処理をする場合は型がSerializable(直列化可能・バイトとしてエンコード可能)である必要があります。
tip:
Javaでいうと、Serializableを既にimplementsしているクラス、もしくはプリミティブ型(intなど)のみをフィールドに持ち、かつSerializableをimplementsするクラスのみがSerializableとなります。
エンコーディングは、基本的にはBeamSDKで提供されますが、カスタムで指定することも可能です。
不変性
PCollectionは不変で、一度作成すると、要素の数と内容を変更できません。
PTransformは常に入力PCollectionから新たなPCollectionを作成します。上述したとおり、PCollectionは変更できないし、されることもありません。
ランダムアクセス
PCollectionはランダムアクセスを許容しません。
PCollectionにアクセスするにはすべての要素に対して変換を実施する必要があります。
サイズ
PCollectionの要素数に上限はありません。例えば、Kafkaなどストリーミングデータサービスと組み合わせた場合、無制限に要素が増えるということになります。
要素数が無制限か制限があるかは、Beamのデータ処理の方法に影響を与えます。
制限がある場合はバッチとしてデータセット全体を一度読み込み、時間やメモリに制限をかけて実行することができます。
制限がない場合はストリームとして、連続的に実行されることを考慮してジョブを設計する必要があります。
tip:
batchモード/Streamモードの説明がこれまでなかったですね、、(TODO)
BeamはBatch/Streamでの実行がサポートされています。
関連する実装として、ウィンドウ処理があり、ストリーム実行の場合PCollectionを特定のサイズに分割できます。
これらはtimestampなど
要素のtimestamp
PCollectionの要素には固有のtimestampを持ちます。
入力処理(ファイルIOなど)によって最初にtimestampが割り当てられるようになっており、ストリームの場合は要素の追加/読み取りに応じてtimestampが設定されます。
※データセットの境界が設定される処理でもtimestampが設定されますが、入力データセット読み込み時に設定するtimestampのほうがよく使われます。
timestampは要素が時間と関係する際役に立ちます。例えば、ツイート情報を解析する場合、各要素は投稿時刻をもつことになります。
手動でtimestampを設定することも可能です。
詳細はタイムスタンプの追加で確認できます。