Saturday, October 5, 2019

Miscellaneous FS2 notes


These are my first steps into the FS2 [underscore.io] world ("when you start working with fs2 is the fact everything is a Stream. By everything I mean everything, even your Queue").

Some terminology

"Pure means no effect on the world. It just makes your CPU work and consumes some power, but besides that it does not affect the world around you.
Use evalMap instead of map when you want to apply a function that has an effect like loadUserIdByName to a Stream." [FreeCodeCamp]

"Pipe[F, A, B] is just Stream[F, A] => Stream[F, B]
I believe it's just a type alias" [Chris Davenport, Gitter].

Note the difference between concurrent and synchronization in this from FS2's Gitter channel:

Oleg Pyzhcov @oleg-py
If you have two threads doing update at the same time, one will always see results of another (not the case with var, for instance, where both can see old state) - that's safe concurrent modification. However, you can't enforce ordering of those operations with Ref alone - that's lack of synchronization.

Fabio Labella @SystemFw
FS2 concurrency builds up off a few simple primitives the most basic one being a primitive to start an F asyncly
Not needing Effect is better because Effect is very very (too) powerful, being basically equivalent to the ability to unsafeRunSync it also means that we don't need to thread ExecutionContexts around (you will only find ECs in APIs that block).


Minimal implementation

You'll notice that your main class extends cats.effect.IOApp and your 'main' methods now becomes this:

  override def run(args: List[String]): IO[ExitCode] =

IOApp pulls in a lot of implicits and an execution context, making things much easier and explaining why isolated snippets might not compile.

captainmannering @captainmannering Apr 07 13:59
I would like to use Cats IO and fs2 to write a program with the following structure: one task that receives events from one data source (say RabbitMQ), another task that receives events from another data source (say Kafka) and both of these are then sent into another task that processes them. These tasks should run until a special message is received on either of the sources that causes everything to gracefully stop and the program to exit. Is there any similar sort of code that anyone has a link for that does this sort of thing?

Fabio Labella @SystemFw Apr 07 14:18
is the special message the same on both sources or different?
but basically use fs2-rabbit and fs2-kafka to get stream out of your sources (there are ways to write your own connectors ofc if you need them), then you can do:

rabbitStream
  .map(toCommonType)
  .merge(kafkaStream map toCommonType)
  .takeWhile(_ == specialMessage)
  .evalMap(yourProcessingTaskReturningIO)
  .compile
  .drain
  .as(ExitCode.Success)


Wrapping old Java code with FS2

One suggestion from Gitter was to use a tagless solution.

Oleg Pyzhcov @oleg-py Jun 27
The common pattern is to do something like this:

trait YoloContext[F[_]] {
  def execute(): F[Unit]
  def insert[T](o: T): F[Handle[T]]
}

object YoloContext {
  private class Implementation[F[_]: Sync](javaCtx: YoloCtx) extends YoloContext[F] { 
    /* implement methods. You have javaCtx here */ 
  }
  def create[F[_]: Sync](name: String): Resource[F, YoloContext[F]] = { 
    /* return a resource with `Implementation`. Hide the java stuff completely*/ 
  }
}


The algebra of streaming

This isn't FS2 specific but this salient comment from the omniscient Rob Norris is worth repeating:

Rob Norris @tpolecat Sep 23 22:58
Yeah in general it [fold] has to be lazy because you might try to fold something infinite.
And the thing that is not immediately intuitive is that foldLeft cannot possibly terminate for an infinite structure, but foldRight might.

Think about where the parens stack up. With foldLeft it looks like (((((((((( forever.

With foldRight it looks like a + (b + (c + ... forever.

So if + is lazy you might be able to stop after a while.


No comments:

Post a Comment