Friday, October 18, 2019

More fun and games with Beam


We're using Beam 2.13 and want it to be called by Google Composer/Airflow. Unfortunately, the route we have been recommended (here) is awkware to say the least.

Extracting the CLI values is cumbersome. They’re wrapped in ValueProvider objects and a reference to them must be created with something like:

  def createViewFor(name: String, pipeline: Pipeline, value: ValueProvider[String]): PCollectionView[String] =
    pipeline.apply(s"${name}Config:Start",  Create.of("Start"))
      .apply(s"${name}Config:Parse",        ParDo.of(new ApplicationConfigurationParser(value)))
      .apply(s"${name}Config:AsView",       View.asSingleton())

Then, you pass this reference to your DoFn with something likes:

    val myConfig = createViewFor(…)
    ParDo.of(new MyDoFn(myConfig))
      .withSideInputs(myConfig)

And then finally in your DoFn, you access it with:

class MyDoFn(
                      myConfigView:          PCollectionView[String],
                     ) extends DoFn[FileIO.ReadableFile, String] {
  @ProcessElement
  def processElement(@Element input:  FileIO.ReadableFile,
                     out:             OutputReceiver[String],
                     context:         ProcessContext): Unit = {
    val myConfig = context.sideInput(myConfigView)
.
.

Just painful.

No comments:

Post a Comment