APACHEBEAM公式ドキュメント解説~パイプラインのテスト(JAVA)~


自動テストの概要に関するテスト方法です。CIなど入れない、一旦一通りの動作を確認したいのであれば、ここに記載の情報だけでテストしてもよいかと思います。

CIを入れたり、本番運用を見据えるとここに記載されているテストだけでは不足する部分もあるかと思います。
本番運用向けのユニットテストとして十分かどうかという点ではここに記載されている情報だけでは厳しい部分があると思います。
これらはJUnitのドキュメントなど参照ください。

前提

対象ドキュメント

本ページで解説の対象となる成果物

パイプラインのテストは重要、だそうです。(当然ですね。)デバッグでの実行はPipelineの一連の処理で行うのは至難の業とのことで、コードを利用したテストが推奨されています。

バグの発見は上述の通り作り上げられた環境などではかなり難しいのでローカルで実行テストすることも推奨されています。(DirectRunnerなど、他にも選択肢はあるようです。)

BeamSDKにはテスト用の方法が様々用意されており、パイプライン全体から各Transformまでテスト可能です。以下引用

  • パイプラインのコアトランスフォーム内で、DoFnなどの個々の関数オブジェクトをテストできます。
    (Transformの処理部分)
  • 複合変換全体を1つのユニットとしてテストできます。
  • パイプライン全体に対してエンドツーエンドのテストを実行できます。

それぞれなんたるか具体的な解説は実装パートで解説します。

より細かなMethod単位のUTはJUnitドキュメントなどを参照ください。
(個人的には処理を有効に共通化できていればユニットテストとしてはメソッドごとのUTと正常異常1ケース程度通してテストできればいいのかなと思います。)

DoFnTester

サーバでの分散処理をテストする前にDoFnを個別にテストすることでテストにかかる労力を削減できるとのことです。

DoFnテストにはJUnitを利用可能で、DoFnTesterというライブラリが提供されているようです。テストの流れは以下です。

  1. DoFnTesterを作成する。(テスト用オブジェクト)
  2. 1つ以上のメインテスト入力を作成する。
    複数の入出力のテストも可能です。
  3. DoFnTester.processBundleを呼び出します。
    (PCollectionを一つ一つ処理するための主要処理メソッド)
  4. JUnitのAssert.assertThatメソッドを使用して、返されるテスト出力が期待値と一致することを確認します。

詳細手順

DoFnTesterの作成

以下の記載で対象のDoFnをDoFnTesterとして定義できます。

static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;

  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

通常の入力

テストに利用する入力値を宣言します

static class MyDoFn extends DoFn<String, Integer> { ... }
MyDoFn myDoFn = ...;
DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

String testInput = "test1";

(このパートが何を言いたいのかイマイチわからないですね。。。ただの変数宣言に見えます。)

SideInput

DoFnTester.setSideInputsでSideInputのテストも可能です。

static class MyDoFn extends DoFn<String, Integer> { ... }
MyDoFn myDoFn = ...;
DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

PCollectionView<List<Integer>> sideInput = ...;
Iterable<Integer> value = ...;
fnTester.setSideInputInGlobalWindow(sideInput, value);

※SideInputとは通常の入力(PCollectionつなぎのインプット)とは別でTransformに入力値を渡す仕組みです。実装回で具体的に解説します。

複数出力

TupleTagと呼ばれる出力の識別に利用されるオブジェクトを作成し、複数出力されるようにプログラムされたDoFnへ渡すことで複数出力を識別することが可能です

以下のソースコードが例で、
方の違うtagを設定し、DoFnTesterへ渡しています

static class MyDoFn extends DoFn<String, Integer> { ... }
MyDoFn myDoFn = ...;
DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

TupleTag<String> tag1 = ...;
TupleTag<Integer> tag2 = ...;
TupleTagList tags = TupleTagList.of(tag1).and(tag2);

fnTester.setOutputTags(tags);

処理実行と結果の確認

作成した入力をDoFnTesterへ引き渡してprocessBundleを呼び出します。fnTester.processBundleは出力となるリストを返してくれます。

static class MyDoFn extends DoFn<String, Integer> { ... }
MyDoFn myDoFn = ...;
DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

String testInput = "test1";
List<Integer> testOutputs = fnTester.processBundle(testInput);

出力はAssertクラスで検証できます。

String testInput = "test1";
List<Integer> testOutputs = fnTester.processBundle(testInput);

Assert.assertThat(testOutputs, Matchers.hasItems(...));

// Process a larger batch in a single step.
Assert.assertThat(fnTester.processBundle("input1", "input2", "input3"), Matchers.hasItems(...));

複合変換(Transformが複数繋がる場合)のテスト

手順は以下のとおりです

  • TestPipelineを作成します。
  • テストデータを作成します。
  • Create変換を使用して、入力データとなるPCollectionを作成します。
  • ApplyでTransformを適用し、結果の出力を保存します。
  • PAssertを使用して出力に期待する要素が含まれていることを確認します。

TestPipelineの作成

TestPipelineは、Transformのテスト専用のBeam Java SDKに含まれているクラスです。

TestPipelineはPipelineオブジェクトの代わりに利用します。
詳細はhttps://beam.apache.org/blog/2016/10/20/test-stream.htmlにサンプルとともに書かれているそうです。(英語ですが…TODO:そのうち解説)

Pipeline p = TestPipeline.create();

入力値を作成

以下で入力値を設定できます。
(他にもPCollectionを作る方法は複数あるようです。下記参考(https://beam.apache.org/documentation/programming-guide/#creating-a-pcollection))

String value = "test";
p.apply(Create.of(value));

PAssertで結果の検証

PAssertはBeamJavaSDKに含まれるクラスで、PCollectionの中身を検証できます(イメージ的には上述リストをAssertで検証した感じと同じ)

利用方法は以下の通りで、elem1,2,3が期待値で順番関係なく含まれていることという条件になっています(containsInAnyOrder)。

PCollection<String> output = ...;

// Check whether a PCollection contains some elements in any order.
PAssert.that(output)
.containsInAnyOrder(
  "elem1",
  "elem3",
  "elem2");

PAssertを利用するにはJUnitと関連付ける必要があり、Mavenの場合(他の場合にはここに書いてないです)pom.xmlへ以下の通り記載しましょう。

<dependency>
    <groupId>org.hamcrest</groupId>
    <artifactId>hamcrest-all</artifactId>
    <version>1.3</version>
    <scope>test</scope>
</dependency>

複合変換のテスト例

以下のコードでは複合変換のテスト例が見れます。
ソースを見たほうが早いと思いますが、処理自体はリストに含まれている単語を数えてキーを単語、バリューを出現数としてKVと呼ばれるApacheBeamのキーバリューマップへ変換します。

テストの特徴としては、containsInAnyOrderはKVをそのまま検証できるということ、テスト実行の際はp.run()を実行する&検証はその前に置くということが味噌かなと思います。

containsInAnyOrderで結果が見つからない場合は例外が発生します。

public class CountTest {

// Our static input data, which will make up the initial PCollection.
static final String[] WORDS_ARRAY = new String[] {
"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""};

static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

public void testCount() {
  // Create a test pipeline.
  Pipeline p = TestPipeline.create();

  // Create an input PCollection.
  PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());

  // Apply the Count transform under test.
  PCollection<KV<String, Long>> output =
    input.apply(Count.<String>perElement());

  // Assert on the results.
  PAssert.that(output)
    .containsInAnyOrder(
        KV.of("hi", 4L),
        KV.of("there", 1L),
        KV.of("sue", 2L),
        KV.of("bob", 2L),
        KV.of("", 3L),
        KV.of("ZOW", 1L));

  // Run the pipeline.
  p.run();
}

パイプライン全体のテスト

パイプライン全体を呼び出し・テストすることも可能です。
テスト方法は以下の通りです(引用)

  • パイプラインへ渡すテスト入力データを作成します。
  • パイプラインのテスト出力データを作成します
    (型はPCollection)
  • TestPipelineを作成します。
  • パイプラインのRead変換の代わりに、Create変換を使用して入力データから1つ以上のPCollectionを作成します。
  • パイプラインを実行します。
  • パイプラインのWrite変換の代わりにPAssertを利用し、パイプラインが生成する出力がテスト出力データと一致することを確認します。

WordCountパイプラインのテスト

公式のWordCountSampleをテストする例です。
https://beam.apache.org/get-started/wordcount-example/

public class WordCountTest {

    // Our static input data, which will comprise the initial PCollection.
    static final String[] WORDS_ARRAY = new String[] {
      "hi there", "hi", "hi sue bob",
      "hi sue", "", "bob hi"};

    static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

    // Our static output data, which is the expected data that the final PCollection must match.
    static final String[] COUNTS_ARRAY = new String[] {
        "hi: 5", "there: 1", "sue: 2", "bob: 2"};

    // Example test that tests the pipeline's transforms.

    public void testCountWords() throws Exception {
      Pipeline p = TestPipeline.create();

      // Create a PCollection from the WORDS static input data.
      PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());

      // Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
      PCollection<String> output = input.apply(new CountWords());

      // Assert that the output PCollection matches the COUNTS_ARRAY known static output data.
      PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);

      // Run the pipeline.
      p.run();
    }
}

基本的には複合変換のテストと同じです。

ここでポイントは(反例的に)作り上げたPipelineクラスをそのままテストする方法は示されていないということです。
(見つけれてないだけかもですが)

つまりPipelineとして全体の出力を検証するためには、全く同じTestPipelineを作成するか実行で確認する必要があるということです。

(個人的には全く同じTestPipelineを作成するのは微妙だなと思うので、実行でテストしちゃう派です。。あくまで個人的に)

Please follow and like us:

コメントを残す

メールアドレスが公開されることはありません。