Monday, November 25, 2019

Shared State in FP


If everything is immutable in FP, how do you share state that is constantly changing?


Shared State in Pure FP

Here are some notes from Fabio Labella's (aka SystemFW) talk on the matter:

IO[A]

  •  produces one value, fails or never terminates
  •  can suspend any side-effect (ie, not evaluated immediately)
  •  many algebras (MonadSyncConcurrent ...). Note Timer is non-blocking.

If you want to do concurrent shared state, you do it in IO or any F[_]
Cats-effect gives primitives for use with immutable data. This strategy is not suitable for mutable data like a JDBC connection.

Access, modification and creation of mutable state needs suspension in IO. Creation for referential transparency.

Ref[A]

  • is a thin wrapper of AtomicReference
  • its API similar to the State monad. 
  • uses the Tagless Final pattern


The State Monad

Recall that the State Monad look a little like:

case class State[S, A](modify: S => (A, S)) {

  def flatMap[B](f: A => State[S, B]): State[S, B] =
    State { s =>
      val (result, nextState) = modify(s)
      f(result).modify(nextState)
    }
...

So note that we we create a new State that will run its modify on another state that it is yet to be given.

Bearing this in mind, let's see some multi-threaded code sharing 'mutable' state from the TypeLevel site, here. I've slightly changed the nomenclature for clarity:

  type MyState          = List[String]
  type MyStateContainer = Ref[IO, MyState]

  def putStrLn(str: String): IO[Unit] = IO(println(str))

  def process1(ref: MyStateContainer): IO[Unit] =
    putStrLn("Starting process #1") *>
      IO.sleep(3.seconds) *>
      ref.update(_ ++ List("#1")) *>
      putStrLn("Done #1")
...

and we start it all off with:

  def masterProcess: IO[Unit] =
    Ref.of[IO, MyState](List.empty[String]).flatMap { ref =>
      val ioa = List(process1(ref), process2(ref), process3(ref)).parSequence.void
      ioa *> ref.get.flatMap(rs => putStrLn(rs.toString))
    }

Actually, that's a lie. We don't start it off here. We merely define the pipeline. That's why we add ioa in ioa *> ref.get.flatMap. Without it, ioa is not part of the pipeline and will not be run and our Ref will not be mutated.

Note that Ref.of[IO, MyState] returns a IO wrapping a Ref, not the other way around, and it's this IO that we flatMap over.

Also note that process1 etc returns IO[Unit] not IO[List[String]] as you might expect. This is because it represents the change and in our Tagless Final interface that would be def set(s: S): F[Unit].

So, where does the update happen? We can't wholly avoid mutating memory with FP, it's just hidden from us. What happens under the covers is that Ref spins in its modify function performing a compare-and-swap. If it can't swap its AtomicReference, then it tries to update its list again. This is all done on a different thread to main.


Conclusion

It's all about the flatMap. This is essential for mutating the state. No flatMap, no change.

No comments:

Post a Comment