Saturday, December 8, 2018

JSON Streaming in Spark


We have a fairly standard architecture where we want to stream JSON logs into HDFS preferably as Parquet. Kafka and Spark Structured Streaming and the obvious choices and here is a test that runs in a single JVM to demonstrate the flow.

Streaming or Structured Streaming


Note Spark has two kinds of streaming:

1. Spark streaming - the old way of doing it with a "Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a continuous stream of data " (from the ScalaDocs).

2. Spark Structured Streaming - the new way.

What's new?

The new way is more elegant as we are just dealing with a function over a Dataset rather than a collection of RDDs. As a friend told me: "build your app as a streaming application because it can be extended to a batch easily (a batch is a stream with one big item in it). The opposite is not true, however." Batch is just a special case of streaming.

The new structured streaming has been optimized for low latency. "Since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. Without changing the Dataset/DataFrame operations in your queries, you will be able to choose the mode based on your application requirements" (from the Spark docs).

There have been machine learning frameworks built on top of it such as this one here where HortonWorks recommends loading the prediction model from storage and calling predict() on all elements in a partition.

Kafka for the unitiated

"Data in Kafka is organized into topics that are split into partitions for parallelism. Each partition is an ordered, immutable sequence of records, and can be thought of as a structured commit log. Producers append records to the tail of these logs and consumers read the logs at their own pace. Multiple consumers can subscribe to a topic and receive incoming records as they arrive. As new records arrive to a partition in a Kafka topic, they are assigned a sequential id number called the offset. A Kafka cluster retains all published records—whether or not they have been consumed—for a configurable retention period, after which they are marked for deletion" (from the DataBricks blog).

Kafka to Spark, Spark to Kafka

Note, data can be written by Spark back to Kafka: "Writing data from any Spark supported data source into Kafka is as simple as calling writeStream on any DataFrame that contains a column named “value”, and optionally a column named “key”. If a key column is not specified, then a null valued key column will be automatically added. A null valued key column may, in some cases, lead to uneven data partitioning in Kafka, and should be used with care" (ibid).

Flume?

Kafka uses a pull paradigm. "On the contrary, Flume sink supports push model.  When event producers suddenly generate a flood of messages, even though flume channel somewhat acts as a buffer between source and sink, the sink endpoints might still be overwhelmed by the write operations.

"Moreoever, the file-based channel does not replicate event data to a different node. It totally depends on the durability of the storage it writes upon. If message durability is crucial, it is recommended to use SAN or RAID. Kafka supports both synchronous and asynchronous replication based on your durability requirement and it uses commodity hard drive" (LinkedIn).

Parsing JSON without JSON parsers

There are (too?) many JSON parsers for Scala(there is an excellent comparison made here). But we don't need them if all we want is to dump the data into Parquet with the same schema as the JSON.

Since hand-coding schemas is tedious this StackOverflow page suggests using a sample piece of JSON, reading it as a DataFrame and borrowing that schema.

The next part of this cunning trick is then to use Spark's from_json function so:

val outgoing = jsonDF.select(from_json($"value", struct))

Finally, we can call:

outgoing.writeStream.format("parquet")....start()

(having set the appropriate options) et voila, our Spark code is pulling JSON from Kafka and writing Parquet to HDFS in just a few lines.


No comments:

Post a Comment