Monday, November 25, 2019

Shared State in FP


If everything is immutable in FP, how do you share state that is constantly changing?


Shared State in Pure FP

Here are some notes from Fabio Labella's (aka SystemFW) talk on the matter:

IO[A]

  •  produces one value, fails or never terminates
  •  can suspend any side-effect (ie, not evaluated immediately)
  •  many algebras (MonadSyncConcurrent ...). Note Timer is non-blocking.

If you want to do concurrent shared state, you do it in IO or any F[_]
Cats-effect gives primitives for use with immutable data. This strategy is not suitable for mutable data like a JDBC connection.

Access, modification and creation of mutable state needs suspension in IO. Creation for referential transparency.

Ref[A]

  • is a thin wrapper of AtomicReference
  • its API similar to the State monad. 
  • uses the Tagless Final pattern


The State Monad

Recall that the State Monad look a little like:

case class State[S, A](modify: S => (A, S)) {

  def flatMap[B](f: A => State[S, B]): State[S, B] =
    State { s =>
      val (result, nextState) = modify(s)
      f(result).modify(nextState)
    }
...

So note that we we create a new State that will run its modify on another state that it is yet to be given.

Bearing this in mind, let's see some multi-threaded code sharing 'mutable' state from the TypeLevel site, here. I've slightly changed the nomenclature for clarity:

  type MyState          = List[String]
  type MyStateContainer = Ref[IO, MyState]

  def putStrLn(str: String): IO[Unit] = IO(println(str))

  def process1(ref: MyStateContainer): IO[Unit] =
    putStrLn("Starting process #1") *>
      IO.sleep(3.seconds) *>
      ref.update(_ ++ List("#1")) *>
      putStrLn("Done #1")
...

and we start it all off with:

  def masterProcess: IO[Unit] =
    Ref.of[IO, MyState](List.empty[String]).flatMap { ref =>
      val ioa = List(process1(ref), process2(ref), process3(ref)).parSequence.void
      ioa *> ref.get.flatMap(rs => putStrLn(rs.toString))
    }

Actually, that's a lie. We don't start it off here. We merely define the pipeline. That's why we add ioa in ioa *> ref.get.flatMap. Without it, ioa is not part of the pipeline and will not be run and our Ref will not be mutated.

Note that Ref.of[IO, MyState] returns a IO wrapping a Ref, not the other way around, and it's this IO that we flatMap over.

Also note that process1 etc returns IO[Unit] not IO[List[String]] as you might expect. This is because it represents the change and in our Tagless Final interface that would be def set(s: S): F[Unit].

So, where does the update happen? We can't wholly avoid mutating memory with FP, it's just hidden from us. What happens under the covers is that Ref spins in its modify function performing a compare-and-swap. If it can't swap its AtomicReference, then it tries to update its list again. This is all done on a different thread to main.


Conclusion

It's all about the flatMap. This is essential for mutating the state. No flatMap, no change.

Saturday, November 23, 2019

Unit testing in Functional Programming (part 1)


Mocking in FP is considered a code smell, if not a downright antipattern but you can't avoid it with clever testing techniques. You must refactor the code.

This is what I had to do with some FS2 code from here. Now, this code demonstrates FS2's abilities wonderfully but it's not great to use in production as it's hard to test. Take this code in the link:

    val stream =
      consumerStream(consumerSettings)
        .evalTap(_.subscribeTo("topic"))
        .flatMap(_.stream)
        .mapAsync(25) { committable =>
          processRecord(committable.record)
            .as(committable.offset)
        }
        .through(commitBatchWithin(500, 15.seconds))

Not only is this hard to test, it's very dependent on Kafka specific code.

We  can abstract it so it looks like this:

  def pipeline[K, C, P, R, O, T](s:              Stream[IO, K],
                                 subscribe:      K => IO[Unit],
                                 toRecords:      K => Stream[IO, C],
                                 commitRead:     C => IO[P],
                                 producerPipe:   Pipe[IO, P, R],
                                 toWriteRecords: R => O,
                                 commitWrite:    Pipe[IO, O, T]): Stream[IO, T] = 
    s.evalTap(subscribe).flatMap(toRecords).mapAsync(25)(commitRead).through(producerPipe).map(toWriteRecords).through(commitWrite)

This makes testing easier for two reasons:

  1. The code has no dependency on Kafka.
  2. I can submit my own functions that use FS2's in-memory streams.
Now, the test looks like this. The domain object look like this:


  case class Kafka()
  case class Record(id: Int)
  case class ProducerRecords(id: Int)
  case class CommittableOffset(id: Int)



and although their names have a superficial resemblance to Kafka classes, this is just to make things conceptually simpler. We're dealing with a much higher layer of abstraction than that.

For instance, instead have having to connect to an embedded Kafka instance, I can have:

      val records = (1 to nToRead).map(x => Record(x))
      
      val toRecords: Kafka => Stream[IO, Record] =
        _ => Stream.emits(records).covary[IO]

and pass this as the toRecords argument to the function we want to test, pipeline. It's all in memory, no ports and sockets to worry about.

[Aside: from the docs: given a pure stream, "to convert to an effectful stream, use covary"]

The takeaway point is this: if you want to use FP to test your OO code and avoid mocks, it isn't going to work. You need to do a root and branch refactoring to get the benefits of avoiding Mockito. But your code will also be more abstract and therefore more re-usable.

Monday, November 18, 2019

Mutation in Spark


... is dangerous. This came up on the Spark Gitter channel a few days ago (11/11/19). Somebody's Spark job was throwing OptionalDataException. This exception indicates "the failure of an object read operation due to unread primitive data, or the end of data belonging to a serialized object in the stream".

Now, looking at the JavaDocs for this Exception, there is no good reason why Spark would occasionally throw this. The person (alina_alemzeb_twitter) reported:
"I don't understand what else could be the culprit. Our Spark job is pretty basic, we're just doing transformations and filtering. The job ran fine for 10 hrs and then crashed with this error and this is all the code there is."
The fact it ran without problems for ten hours suggests that it wasn't a library incompatibility problem which was my first thought (maybe an object to be serialized had missing fields between different versions of a library?). It was only upon holistic code inspection that clues to its real nature became clear.

Basically, a HashMap that can be accessed from a closure is sent from the driver to the executors. This HashMap can momentarily be mutated on the driver. If one were unlucky, the HashMap could be mutated at just the moment the closure was being serialized. And it's this that lead to the non-deterministic nature of the bug.

So, there was no Spark bug but a bug in the code that calls Spark. The take-away point is that mutation is an evil but mutation in a distributed environment is sheer folly.

Saturday, November 2, 2019

Python Crib Sheet #3


Modulo

Modulo in Python and Scala differ for negative numbers. In Python, you see something like:

>>> print(-1 % 10)
9
>>> print(-11 % 10)
9

In Scala:

scala> println(-1 % 10)
-1

scala> println(-11 % 10)
-1

Arrays

Negative indices can be used to count from the end, for example:

>>> xs = range(0, 10)
>>> xs
range(0, 10)
>>> list(xs)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> xs[-1]
9
>>> xs[-2]
8

etc

Numpy

Similarly, you can reshape Numpy arrays:

>>> xs = [0, 1, 2]
>>> np.array(xs).shape
(3,)
>>> np.array(xs).reshape(-1, 1).shape
(3, 1)

If you don't reshape the Numpy array then operators can change the shape of those that have been reshaped.

>>> xs = [0, 1, 2]
>>>  np.array(xs).reshape(-1, 1)
array([[0],
       [1],
       [2]])
>>> np.array(xs).reshape(-1, 1) * np.array(xs)
array([[0, 0, 0],
       [0, 1, 2],
       [0, 2, 4]])

That is, a Cartesian product operations.

But note that an array that is not reshaped implicitly acts like a row vector:

>>> np.array(xs).reshape(1, -1) * np.array(xs)
array([[0, 1, 4]])

That is, an element-wise product.

Also note that some actions on the reshaped array can reverse the reshaping. For instance, summing over rows:

>>> a = np.array(xs).reshape(-1, 1)
>>> a.shape
(3, 1)
>>> np.sum(a, 1).shape  # 0 would sum columns
(3,)


Generators

Generators are (to my understanding at least) like a Java/Scala stream.

Iterables, generators and yield are related and this excellent SO answer weaves them together.

“When you use a list comprehension, you create a list, and so an iterable:

Generators are iterators, a kind of iterable you can only iterate over once. Generators do not store all the values in memory, they generate the values on the fly. [The syntax for creating generators is] just the same except you used () instead of []

yield is a keyword that is used like return, except the function will return a generator… To master yield, you must understand that when you call the function, the code you have written in the function body does not run. The first time the for calls the generator object created from your function, it will run the code in your function from the beginning until it hits yield, then it'll return the first value of the loop. Then, each other call will run the loop you have written in the function one more time, and return the next value until there is no value to return. The generator is considered empty once the function runs, but does not hit yield anymore.

>>> def createGenerator():
...    mylist = range(3)
...    for i in mylist:
...        yield i*i
...
>>> mygenerator = createGenerator() # create a generator
>>> print(mygenerator) # mygenerator is an object!

>>> for i in mygenerator:
...     print(i)
0
1
4

Here it's a useless example, but it's handy when you know your function will return a huge set of values that you will only need to read once."

Note that "Local variables in a generator function are saved from one call to the next, unlike in normal functions." [Python Wiki]