Saturday, November 5, 2022

The semantics of Streaming

A clever but non-technical colleague asked why a batch system could not simply stream its data. The reason this is a big ask is that the semantics of batch and streaming are different, no matter how we try to pretend they are not. 

A file has a notion of completeness, it has an end. A stream does not neccesarily. You might like to send a message in a stream that indicates it has finished but now you impose an ordering constraint that the file did not necessarily have. 

And if you impose a constraint on order, you can no longer parallelize reading the stream. Again, no such constraint exists with a file. 

Note that these semantic objections are orthoganol to the argument that streams can be viewed as tables [Confluent]. That argument is merely an abstraction whereas the rest of this post focusses on the real differences between streams and batches.

Size

Using Scala's (2.13) built-in streams, we can create a stream of Fibonacci numbers with:

val fibs: Stream[Int] = 0 #:: fibs.scanLeft(1)(_ + _) // from the docs: `scanLeft` is analogous to `foldLeft`

We can then pretend that this stream is a Seq just like any other.

val seq: Seq[Int] = fibs
println(seq.take(5).mkString(", ")) // 0, 1, 1, 2, 3

But what kind of Seq never terminates when you call on it a simple .size?

Aside from the fact that Seq is generally frowned upon (it makes no performance guarantees unlike Vector and List; Cats incidentally eschews its use and you can't do things like call sequence on it), we can't pretend that potentially infinite streams are the same as strictly finite sequences.

Empty Streams

... present problems. Paul Snively on the FS2 chat said:
I don't know if it matters, but keep in mind that the types of Stream.empty and Stream.emits(List.empty[A]) are not the same.
You can see in the REPL that this is true:

scala> Stream.emits(List.empty[String])
val res0: fs2.Stream[[x] =>> fs2.Pure[x], String] = Stream(..)
scala> Stream.empty
val res1: fs2.Stream[fs2.Pure, fs2.INothing] = Stream(..)

Things are even worse if you try to "run" the stream:

scala> Stream.emits(List.empty[String]).repeat(10)

This just hangs while also using an entire core. So does this:

scala> Stream.empty.repeat(10)

Effectful streams
Lucas Kasser @lkasser1 Jul 03 06:22
If I have a Stream[IO, A], is there a way to access the individual IOs? I'd like to be able to get a Stream[IO, IO[A]] so that I can retry individual elements in the stream.
I've looked through the docs, but I didn't see any function like uneval

Fabio Labella @SystemFw Jul 03 09:58
No, it's not possible because a Stream is not just a List of IOs
it's monadic, so it's more like a tree (some of the IOs depends on the result of previous ones)
Complete vs Incomplete Data

Some ciphers (for instance, RSA) need the whole data to de/encrypt. "Some modes of operation can make block ciphers [like AES] act as stream ciphers." [SO] This differs from a true streaming cipher like (ChaCha20) but by using Chunks, we can simulate it.

Grouping & Streaming in Spark
"Developing the translation layer (called runner) from Apache Beam to Apache Spark we faced an issue with the Spark Structured Streaming framework: the problem is that this framework does not support more than one aggregation in a streaming pipeline. For example, you cannot do a group by then a reduce by in a streaming pipeline. There is an open ticket in the Spark project, an ongoing design and an ongoing PR, but, as for now, they received no update since the summer 2019. As a consequence, the Beam runner based on this framework is on hold waiting for this feature from the Spark project." [Etienne Chauchot's blog]
Basically, if there are two grouping operations, op1 and op2, the grouping in op1 might make the datra to be fed into op2 out-of-date. It might have gone stale while it was living in op1's buffer.
"[S]treaming systems define the notion of watermark. It is what gives the system the notion of completeness of data in a constant flow of streaming data. It is the point in time when the system should not receive older elements. As streaming systems rely on windowing to divide this stream of data, the watermark can also be defined as the system notion of when all the data in a certain window can be expected to have arrived in the streaming pipeline. When the watermark passes the end of the window, the system outputs data." [ibid]

 

No comments:

Post a Comment