We're using Beam 2.13 and want it to be called by Google Composer/Airflow. Unfortunately, the route we have been recommended (here) is awkware to say the least.
Extracting the CLI values is cumbersome. They’re wrapped in ValueProvider objects and a reference to them must be created with something like:
def createViewFor(name: String, pipeline: Pipeline, value: ValueProvider[String]): PCollectionView[String] =
pipeline.apply(s"${name}Config:Start", Create.of("Start"))
.apply(s"${name}Config:Parse", ParDo.of(new ApplicationConfigurationParser(value)))
.apply(s"${name}Config:AsView", View.asSingleton())
Then, you pass this reference to your DoFn with something likes:
val myConfig = createViewFor(…)
ParDo.of(new MyDoFn(myConfig))
.withSideInputs(myConfig)
And then finally in your DoFn, you access it with:
class MyDoFn(
myConfigView: PCollectionView[String],
) extends DoFn[FileIO.ReadableFile, String] {
@ProcessElement
def processElement(@Element input: FileIO.ReadableFile,
out: OutputReceiver[String],
context: ProcessContext): Unit = {
val myConfig = context.sideInput(myConfigView)
.
.
Just painful.
No comments:
Post a Comment