Showing posts with label Beam. Show all posts
Showing posts with label Beam. Show all posts

Tuesday, September 22, 2020

Spark Gotchas


Some miscellaneous notes on Spark I've been making the last few weeks.


Triggers and OutputModes when Aggregating

I was experimenting with aggregating a query that reads from Kafka and writes Parquet to HDFS.

The interplay between watermarking, Trigger and OutputMode look a bit like this:
  • Queries that aggregate with OutputMode.Append yet with no watermark will always throw "Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;" irrespective of the Trigger
  • Queries that aggregate with OutputMode.Complete  with "Data source parquet does not support Complete output mode;". Even text files will barf with "Data source text does not support Complete output mode;".
  • Queries that aggregatie with OutputMode.Append but with a watermark behave like this:

TriggerBehaviour
ContinuousUnknown type of trigger: ContinuousTrigger(5000) [1] 

or

Continuous processing does not support EventTimeWatermark operations.;;
ProcessingTimeNo errors but last batch is not pulled from Kafka
none ("When no trigger is specified, Structured Streamin stats the processing of a new batch as soon as the previous one is finished." - Stream Processing with Apache Spark)No errors but last batch is not pulled from Kafka
OnceNo errors but nothing is pulled from Kafka.
Running the query twice with awaitTermination pulls some elements. 

The reason Spark leaves a little something on Kakfa is (apparently) for efficiency. There appear to be a pull request for making Spark consume the last bit from Kafka but in its associated Jira somebody mentions my StackOverflow question that talks about Spark's behaviour being unchanged.

So perplexed was I that I asked the Spark mailing lists. The answer from Spark committer, Jungtaek Lim, was illuminating:

"Unfortunately there's no mechanism on SSS to move forward only watermark without actual input, so if you want to test some behavior on OutputMode.Append you would need to add a dummy record to move watermark forward."

This doesn't seem to be because we're writing Parquet. Text files will also behave this way.


Defragging Question

Spark will create a new file per executor thread per ProcessTrigger time (as this test code demonstrates). This can lead to a lot of files that could ultimately choke your Name Node if you're persisting to HDFS. This may necessitate a roll-your-own fix [SO].

If you're using DataBricks, you can do this "bin-packing" by running the OPTIMIZE command. Unfortunately, I could not see the code to do this inside the Delta library DataBricks have open sourced. (I discuss other ways of dealing with this problem without Delta in this blog post).

Since our use case at work is such that we only want the output of stream processing once a day, we opted for Trigger.Once. Since we're not aggregating, all of the data is pulled and processed. Since this is streaming in a very loose sense of the word (it's more batch processing that gets its data in one short, sharp stream), we avoided having large numbers of small files.


Exceptions in processing DataFrames

Spark can optionally have a "best efforts" approach to exceptions rather than failing the whole process.

spark.range(10000).repartition(7).map { i =>
  if (i == 9999) { Thread.sleep(5000); throw new RuntimeException("oops!") }
  else i
}.write.option("mapreduce.fileoutputcommitter.algorithm.version", ALGO_VERSION).mode("append").parquet(s"/tmp/test_algo_$ALGO_VERSION")

When ALGO_VERSION=1, nothing is written to HDFS. On the other hand, when ALGO_VERSION=2, the partition in which the exception occurred is not written but all the other ones are.

see https://databricks.com/blog/2017/05/31/transactional-writes-cloud-storage.html


DataFrame types

You can see the schema as JSON with:

spark.read.parquet("THE_FILE").schema.json

The type "timestamp" would map the data to a java.sql.Timestamp (the equivalent of a SQL TIMESTAMP type). This Java type is a subclass of java.util.Date.

The type "date" would map to a java.sql.Date (the equivalent of a SQL DATE type). This is also a subclass of java.util.Date.

See this StackOverflow page  for descriptions of the SQL types.

Trying to read from a Parquet file that uses "date" and coercing it to a "timestamp" will result in:

Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary


Idempotency

DataStreamWriter.foreachBatch is experimental and "is supported only the in the micro-batch execution modes (that is, when the trigger is not continuous)". It provides a batchId that "can be used deduplicate and transactionally write the output (that is, the provided Dataset) to external systems. The output Dataset is guaranteed to exactly same for the same batchId (assuming all operations are deterministic in the query" [Spark docs].


[1] "You can set the optional Continuous Trigger in queries that satisfy the following conditions: Read from supported sources like Kafka and write to supported sinks like Kafka, memory, console." SO quoting DataBricks.

Friday, October 18, 2019

More fun and games with Beam


We're using Beam 2.13 and want it to be called by Google Composer/Airflow. Unfortunately, the route we have been recommended (here) is awkware to say the least.

Extracting the CLI values is cumbersome. They’re wrapped in ValueProvider objects and a reference to them must be created with something like:

  def createViewFor(name: String, pipeline: Pipeline, value: ValueProvider[String]): PCollectionView[String] =
    pipeline.apply(s"${name}Config:Start",  Create.of("Start"))
      .apply(s"${name}Config:Parse",        ParDo.of(new ApplicationConfigurationParser(value)))
      .apply(s"${name}Config:AsView",       View.asSingleton())

Then, you pass this reference to your DoFn with something likes:

    val myConfig = createViewFor(…)
    ParDo.of(new MyDoFn(myConfig))
      .withSideInputs(myConfig)

And then finally in your DoFn, you access it with:

class MyDoFn(
                      myConfigView:          PCollectionView[String],
                     ) extends DoFn[FileIO.ReadableFile, String] {
  @ProcessElement
  def processElement(@Element input:  FileIO.ReadableFile,
                     out:             OutputReceiver[String],
                     context:         ProcessContext): Unit = {
    val myConfig = context.sideInput(myConfigView)
.
.

Just painful.

Friday, August 16, 2019

First steps in Apache Beam


Introduction

Caveat: I've just started learning Apache Beam specifically on GCP and these are some notes I've made. They may at this stage be unreliable. I will update this entry as I learn more.

Superficially like Spark

"Side inputs are useful if your ParDo needs to inject additional data when processing each element in the input PCollection, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline." (the Beam docs).

Side inputs are a bit like Spark’s broadcast variables, I'm told.

The Pipeline.apply(“name”, ParDo.of(new XXXDoFn)) is the Beam equivalent of Spark sending a function to its executors. The XXXDoFn must be serializable.

DoFns are annotated with @ProcessElement tag which passes in the required objects. Note that the DoFn does not need to conform to any particular interface.

Partitioning

“The elements in a PCollection are processed in bundles. The division of the collection into bundles is arbitrary and selected by the runner. This allows the runner to choose an appropriate middle-ground between persisting results after every element, and having to retry everything if there is a failure

“If processing of an element within a bundle fails, the entire bundle fails. The elements in the bundle must be retried (otherwise the entire pipeline fails), although they do not need to be retried with the same bundling.”

(from the Beam Documentation.)

The Runner

Beam is designed to run on many different platforms and therefore has many different Runners, with differing degrees of capability. These Runners allow your Beam code to run on Spark, Flink etc. Of note is the Direct Runner.

“The Direct Runner executes pipelines on your machine and is designed to validate that pipelines adhere to the Apache Beam model as closely as possible…  the Direct Runner performs additional checks” (the Beam Documentation).

This Runner should be fine for running tests. You can also use TestPipeline "inside of tests that can be configured to run locally or against a remote pipeline runner."

DataFlow

DataFlow is Google's implementation of a Beam runner.

“After you write your pipeline, you must create and stage your template file… After you create and stage your template, your next step is to execute the template.” (from the DataFlow documentation).

What sorcery is this?

"Developer typically develop the pipeline in the development environment and execute the pipeline and create a template. Once the template is created, non-developer users can easily execute the jobs by using one of these interfaces including GCP console, gcloud cli, or REST API... We simply run this command to compile and stage the pipeline so that it becomes accessible from several execution interfaces:

 mvn compile exec:java \
     -Dexec.mainClass=com.example.MyClass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=[YOUR_PROJECT_ID] \
                  --stagingLocation=gs://[YOUR_BUCKET_NAME]/staging \
                  --templateLocation=gs://[YOUR_BUCKET_NAME]/templates/"

according to Imre Nagi who has mixed views particularly on using Composer, Google's managed version of AirFlow ("especially about it’s versioning and debugging technique") for managing DataFlow jobs.

He says that one issue is "you must modify your pipeline code to support runtime parameters". This seems to run contrary to Beam's attempt to be an abstraction across all Big Data processing applications.

For instance, you need to use ValueProviders to pass config around. But, if your "driver" code accesses them (say in com.example.MyClass's main method) they won't be available in the template creation stage.