Monday, January 20, 2020

FS2 Idioms (part 1)


The semantics of terminating

How do you know when a Stream has finished? Well, Stream.noneTerminate results in a Stream where items are wrapped in a Some and a None infers termination. Stream.unNoneTerminate does the opposite in that these wrapped items are unwrapped but terminates on the first None.

The semantics of pulling

There can be alternative behaviour when pulling from a Stream.

You could be guaranteed to get a value every time and if there is nothing new, you get the old value (Signal.continuous). In each case, the element comes in a new Stream.

Alternatively, every time you pull, you could get a new Stream that represents the latest update and if there is nothing new, the Stream is empty. This behaviour can be found in Signal.discrete.

Finally, you might want to get all the updates since you last pulled but in this case, use a `Queue` instead of Signal. This is because Signal is a "pure holder of a single value"

We can convert a Stream to a Signal by calling hold with a default value or holdOption to handle the None event.

Reading the signs

 
I had a bug in my code when I called this method on fs.concurrent.Topic:

  /**
    * Signal of current active subscribers.
    */
  def subscribers: Stream[F, Int]

My code seemed to hang. In desperation, I tried >> (which is basically flatMap { _ => ... } ) and although my Stream now seemed to start processing, the evaluated effects were not what I was expecting.

The ever helpful creator of FS2, Fabio Labella, helped me:
@PhillHenry the issue is that subscribers is an infinite stream
or rather, as long as the lifetime of the topic.
Each new element of the stream is a change in the number of subscribers
so subscribers ++ anything will never execute anything [but]
subscribers >> s2 will execute s2 every time there is a change in the number of subscribers.
If you look at the description of subscribers it says to return the current number of subscribers. If it were an F[Int], that it would be something you can poll: every time you call it, it gives you the current number and returns. But because the return type is a Stream, it means that it will emit every time there is a new subscriber, and that necessarily means it has to wait when there are no changes to the subscriber number. That is to say, it doesn't terminate.

It is not possible to know from the type whether a stream is infinite or not. In my case, if I wanted the current number of subscribers to the topic then I needed to call take(1). This way, I need not worry whether the stream is finite or not.

The take-away point is that in FP, the signatures are trying to help you to understand the semantics.

On the Pull with FS2


The learning curve for FS2 is steep and comparing it to Scala's built-in streams is not helpful. Given this Scala stream to generate Fibonacci numbers:

  val f: Stream[Int] = 0  #:: 1  #:: f.zip(f.tail).map { case (x, y) => x + y }

you might think you can do this:

    val spliced = f.take(5) ++ f.drop(5)

and treat spliced as normal:

    println(spliced.take(10).mkString(", ")) // 0, 1, 1, 2, 3, 5, 8, 13, 21, 34

And you'd be correct when working with Scala streams but not for FS2 streams, that is s != s.take(n) ++ s.drop(n)
Fabio Labella @SystemFw
I mean, this is certainly not the case in general, since the RHS reevaluates s twice
That is, s.take(n) ++ s.drop(n) will return the same values as Scala's streams but evaluate different effects to what you might expect.

For example, if we had a stream:

    rangeStream(6).evalMap { x =>
      IO {
        println(s"x = $x")
        x
      }
    }

then s.take(3) ++ s.drop(3) would indeed return a Stream containing 1,2,3,4,5,6  but you'd see this printed out:

x = 1
x = 2
x = 3
x = 1
x = 2
x = 3
x = 4
x = 5
x = 6

As Fabio says "you can use Pull if you want 'take some, then give me the rest' semantics". So, this is what I did here on GitHub.

Aside: the reason I want to splice an FS2 Stream is to introduce some test assertions. Here, I want the opposite of the above, namely to evaluate the effect (my assertion in this case) but to ignore the value. Here, one can use

Stream.eval_
 
Note the underscore. However, be a bit careful here as if there is no value returned, calls like myStream.take may semantically block.

Sunday, January 5, 2020

Cloud Devops (part 1)


The "Infrastructure as Code" movement is all very well but a just criticism is that for most people this means piles of XML and YAML - both of which are not code but mark-up languages. Fear not, there are decent alternatives.

Java programmers are spoiled by having at least two client libraries for Kubernetes. The official one is here. But there is also...


Fabric8

"Fabric8 is an integrated open source DevOps and Integration Platform which works out of the box on any Kubernetes or OpenShift environment and provides Continuous Delivery, Management, ChatOps and a Chaos Monkey." (Fabric8 FAQ)
Spark also uses the Fabric8 API in its integration tests. Its client module " provides access to the full Kubernetes & OpenShift REST APIs via a fluent [Java] DSL."

A notable feature of it is its mock Kubernetes server that allows you to write tests like this one.

Fabric8 also provide a Maven plugin that
"is a one-stop-shop for building and deploying Java applications for Docker, Kubernetes and OpenShift. It brings your Java applications on to Kubernetes and OpenShift. It provides a tight integration into maven and benefits from the build configuration already provided. It focuses on three tasks:
  • Building Docker images
  • Creating OpenShift and Kubernetes resources
  • Deploy application on Kubernetes and OpenShift" (Fabric8 FAQ)

Minikube

Although you can install and run Kubernetes on your bare metal, it may be easier to run a virtual cluster on your laptop with Minikube.
"Minikube is a tool that makes it easy to run Kubernetes locally. Minikube runs a single-node Kubernetes cluster inside a Virtual Machine (VM) on your laptop for users looking to try out Kubernetes or develop with it day-to-day." (Minikube docs)
In Spark, "the simplest way to run the integration tests is to install and run Minikube".

Upon starting Minikube with something like this:

$ minikube start --vm-driver=virtualbox

Minikube will create a VirtualBox VM, install Kubernetes on it and once done, kubectl is now configured to use "minikube".

You can also run

$ minikube dashboard

which opens the Kubernetes dashboard in your browser.

Note: you may need to run minikube delete if you change networks (say, you take your laptop to a coffee shop) and restart the Minikube cluster as this confuses it.