Pipelineの作成について解説します。
パラメータの設定や、オプションの設定についても記載があります。
目次
前提
対象ドキュメント
パイプラインの作成について
Pipelineはデータと変換処理、入出力がカプセル化されたものです。JavaではPipelineオブジェクトを構築し、その中でデータセットPCollectionを生成し、Transformを適用することがはじめの一歩です。
Pipelineオブジェクトの作成時は実行情報を定義したオプションを設定します。
プログラムでの設定も可能ですが、コマンドラインから渡すことも可能です。
// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();
// Then create the pipeline.
Pipeline p = Pipeline.create(options);
パイプラインオプションの構成
パイプラインランナーやそのランナーの設定、またパイプライン自体の設定が可能です。
後ほど解説されますが、ここで設定したパイプラインオプションはDoFn内のProcessElementメソッド(PCollectionの要素を一つ一つ実行するメソッド)に引き渡すことも可能です。
コマンドラインからパイプラインオプションを渡す
PipelineOptionsでは以下のサンプルのようにコマンドラインで値を受け付けることが可能です。
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().create();
※.withValidationは引数のバリデーション用に用意されたメソッドです。
以下の形式でコマンドライン引数を渡すことが可能です。
--<option>=<value>
具体的な例は、WordCountのサンプルにも組み込まれています
(https://beam.apache.org/get-started/wordcount-example)
カスタムオプションを作成する
標準のオプション以外にもカスタムオプションを定義することが可能です。
例えば以下のサンプルのように、input値とoutput値を設定することも可能です。
定義はinterfaceの定義のみで大丈夫です。
public interface MyOptions extends PipelineOptions {
@Description("Input for the pipeline")
@Default.String("gs://my-bucket/input")
String getInput();
void setInput(String input);
@Description("Output for the pipeline")
@Default.String("gs://my-bucket/input")
String getOutput();
void setOutput(String output);
}
また、アノテーションに関して、デフォルト値の他にDescriptionがありますが、これは–helpオプションを利用してコマンド実行した際に表示されます。
以下のコードはインターフェースを実際にパイプラインでオプションとして利用するサンプルです
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
パイプラインに値を渡すには、以下のようにコマンドラインにてオプションを指定します。
--input=value1 --output=value2