Monday, January 20, 2020

FS2 Idioms (part 1)


The semantics of terminating

How do you know when a Stream has finished? Well, Stream.noneTerminate results in a Stream where items are wrapped in a Some and a None infers termination. Stream.unNoneTerminate does the opposite in that these wrapped items are unwrapped but terminates on the first None.

The semantics of pulling

There can be alternative behaviour when pulling from a Stream.

You could be guaranteed to get a value every time and if there is nothing new, you get the old value (Signal.continuous). In each case, the element comes in a new Stream.

Alternatively, every time you pull, you could get a new Stream that represents the latest update and if there is nothing new, the Stream is empty. This behaviour can be found in Signal.discrete.

Finally, you might want to get all the updates since you last pulled but in this case, use a `Queue` instead of Signal. This is because Signal is a "pure holder of a single value"

We can convert a Stream to a Signal by calling hold with a default value or holdOption to handle the None event.

Reading the signs

 
I had a bug in my code when I called this method on fs.concurrent.Topic:

  /**
    * Signal of current active subscribers.
    */
  def subscribers: Stream[F, Int]

My code seemed to hang. In desperation, I tried >> (which is basically flatMap { _ => ... } ) and although my Stream now seemed to start processing, the evaluated effects were not what I was expecting.

The ever helpful creator of FS2, Fabio Labella, helped me:
@PhillHenry the issue is that subscribers is an infinite stream
or rather, as long as the lifetime of the topic.
Each new element of the stream is a change in the number of subscribers
so subscribers ++ anything will never execute anything [but]
subscribers >> s2 will execute s2 every time there is a change in the number of subscribers.
If you look at the description of subscribers it says to return the current number of subscribers. If it were an F[Int], that it would be something you can poll: every time you call it, it gives you the current number and returns. But because the return type is a Stream, it means that it will emit every time there is a new subscriber, and that necessarily means it has to wait when there are no changes to the subscriber number. That is to say, it doesn't terminate.

It is not possible to know from the type whether a stream is infinite or not. In my case, if I wanted the current number of subscribers to the topic then I needed to call take(1). This way, I need not worry whether the stream is finite or not.

The take-away point is that in FP, the signatures are trying to help you to understand the semantics.

No comments:

Post a Comment