Sunday, October 27, 2019

More on applicatives


My post on applicatives barely broke the surface.

The problem

I wanted my cake and eat it. I wanted:
  • to use my type in a for comprehension (ie, to flatMap over it)
  • to accumulate errors
In short, I wanted my type to act like a Monad and an Applicative. The trouble is that a monad is fail fast. That is, the first monad to represent a 'failure' short circuits the for comprehension.

I asked about the best way to proceed and get some great responses from people much more clever than me. I've included some in this post side-by-side with annotations.
Fabio Labella @SystemFw "flatMap loses errors," violating @PhillHenry's actual use case.
No, what I was saying is that you cannot have a use case where you have F[A] and A => F[B], and you have errors in F[A] and F[B] at the same time
It's impossible.
Julien Truffaut @julien-truffaut  Once you point it out, it is obvious you cannot accumulate errors in a for comprehension, or execute IO concurrently. 
This took me a long time to grok so this post is my attempt to understand.

First, let's clear up some terminology.

Effectful types
Option models the effect of optionality
Future models latency as an effect
Try abstracts the effect of failures (manages exceptions as effects)

Option is a monad that models the effect of optionality,” and it’s also what Mr. Norris means when he says that an effectful function returns F[A] rather than [A].

If you want to think about it philosophically, when a function returns an A, that A has already been fully evaluated; but if that function returns F[A] instead, that result has not already been fully evaluated, the A is still inside F[A] waiting to be evaluated.
[Alvin Alexander's blog]
When working with abstractions that are related to composing computations, such as applicative functors and monads, it's convenient to somewhat distinguish between the actual value and the "rest", which we often call an "effect". In particular, if we have a type f of kind * -> *, then in f a the a part is "the value" and whatever "remains" is "the effect.

The distinction between "effects" and "values" doesn't really depend on the abstraction. FunctorApplicative and Monad just give us tools what we can do with them (Functors allow to modify values inside, Applicatives allow to combine effects and Monads allow effects to depend on the previous values).
[StackOverflow]

Parallel has nothing to do with threads

...at least not in this context.
A slightly simplified signature of andThen is: 
def andThen[B](f: A => Validated[E, B]): Validated[E, B] 
and it looks exactly as flatMap would look like. Unfortunately, andThen cannot be just renamed to flatMap. The latter is an operation from the Monad type class, and since cats.Monad extends cats.Applicative there is a law that defines how their operations must relate to each other. One of them says that Applicative operations must be equivalent to the same operations implemented using Monad operations. In the case of Validated this means that if there was an instance of the Monad type class, the Applicative operations wouldn't be able to accumulate errors
[Roman Timushev's blog]

For this reason, you can't use a Validated in a for comprehension.

You can, of course, convert between, say, Either and Validated when you want monadic and applicative behaviour respectively.
When browsing the various Monads included in Cats, you may have noticed that some of them have data types that are actually of the same structure, but instead have instances of Applicative. E.g. Either and Validated. This is because defining a Monad instance for data types like Validated would be inconsistent with its error-accumulating behaviour. In short, Monads describe dependent computations and Applicatives describe independent computations.
[The Cats documentation on Parallel].

And this is where Parallel or rather its super trait NonEmptyParallel comes in. From the docs: "Some types that form a FlatMap, are also capable of forming an Apply that supports parallel composition. The NonEmptyParallel type class allows us to represent this relationship."

Now, if we have a

EitherNel[String, A]

where EitherNec is just a type alias for Either[NonEmptyList[E], A] then a call to

tupled.parMapN(f)

(where f is of type A => B) will magically accumulate our Lefts.

[Note, that if you look at the parallel code, you'll see a few ~> which  Rob Norris describes thus: "~> is like => but it operates on type constructors"]

Rob Norris @tpolecat Jul 18 03:58
Parallel is more general than it may appear. It describes a relationship between a monad and an associated applicative functor. For IO the applicative really does run things in parallel. For Either the applicative accumulates values on the left. For List the applicative zips instead of Cartesian product.
Only IO requires the context shift.


Summary
Fabio Labella @SystemFw I think "use par* to accumulate errors" is easier to explain that "well, the reason why you cannot flatMap Validated is because of the laws of consistency between Applicative and Monad"
Oleg Pyzhcov @oleg-py "user par* to accumulate errors, but make sure your error type has a semigroup instance" ... The thing about Parallel, I think, is that many people learn about it in the context of IO where the wording makes intuitive sense which doesn't translate at all to what we call Parallel in cats
Since we're talking about semigroups, we can parallelize the computation in the concurrency sense of the word since semigroups go hand-in-hand with parallel computations.


Thursday, October 24, 2019

Python for Java Programmers


Dependency management


Python doesn't seem to have build tools like Maven, Gradle, SBT etc. Instead, you use virtual environments that are tailored to your project.

To set it up, you run something like:

python3 -m venv ~/env/ARBITRARY_NAME_FOR_ENV
source ~/env/ARBITRARY_NAME_FOR_ENV/bin/activate
pip install -r requirements.txt

where requirements.txt looks something like:

apache-airflow[gcp_api]==1.10.1
google-api-python-client==1.7.8
pytest

Otherwise, dependencies are scoped to the OS and that way madness lies.


IDEs

PyCharm seems the IDE of choice in my office. The community edition can be downloaded and used for free.

When setting up a project, one must Add a new Python interpreter and this allows you to choose the virtual environment you want for this project.


Logging

For logging, just use a:

import logging

See the Python Docs for examples.


Mocking

You can use mocks in at least 2 different ways:

@patch('x.y.method_to_mock', return_value=VALUE_MOCK_RETURNS)

or

def test_trigger_dataflow_success(mocked_service, config, task_instance, monkeypatch, data_access_layer_stub):
.
.
    def mock_my_method(an_argument):
        return my_stub
.
.
    monkeypatch.setattr('x.y.method_to_mock', mock_my_method)


Tests

Tests are run with the command:

python -m pytest tests/

What is included appears to be based on convention.

"py.test will import conftest.py and all Python files that match the python_filespattern, by default test_*.py. If you have a test fixture, you need to include or import it from conftest.py or from the test files that depend on it" [from SO]. 

It appears (?) that test files that are nested deeper will use the conftest.py of higher packages in the same branch.


Notable differences with Java

Unlike Java, Python classes tend not to live in files that bear their names [SO]. So, if you have a class called X in file y.py in package z, you'd import it with:

from z import y

x = y.X(...)

Note the absence of the new operator.

Friday, October 18, 2019

Cats and Applicatives


This is part of a 'brown-bag' for my colleagues to extol the joys of functional programming. I want to emphasize practicality over theory.

One use case is to do some IO then close all streams. For this, Applicatives and Cats made the code incredibly easy to work with.

Without further ado, here's the code:

import cats.data.NonEmptyList
import cats.{Applicative, ApplicativeError}

class MyIO[F[_]: Applicative](implicit E: ApplicativeError[F, Throwable]) {

  def doIO[U](f: => U): F[U] = E.fromTry(Try(f))

  def allOrNothing[A](xs: NonEmptyList[F[A]]): F[A] = {
    import cats.implicits._
    xs.foldLeft(xs.head) { case (a, x) =>
      a*> x
    }
  } 

  def doIO(...): F[Unit] = {
    ...
    val fo1 = doIO(firstOutputStream.close())
    val fo2 = doIO(secondOutputString.close())
    val fi  = doIO(inputStream.close())

    allOrNothing(NonEmptyList(fo1, List(fo2, fi)))
  }

A few points to note here.

  1. This works for all Applicative types. That is to say, with the relevant imports, doIO will return us an Either, an Option, or any other suitable data structure with no changes to the code in MyIO. (Note: Option may not be an appropriate choice...)
  2. Because allOrNothing is dealing with Applicatives, it will return the relevant type for all passing (Right, Some, etc) or for at least one failing (Left, None, etc) again with no change to the code.
Note that there must be a suitable ApplicationError[F[_], Exception] in the ether. Some you can get for free with Cats and a suitable import, some you must roll by hand.

So, what is actually returned in allOrNothing? Given a list of, say, Rights, it returns the last Right. But given a List of Eithers with at least one Left, then the first Left will be returned. An analogous situation exists for any type with the appropriate ApplicationError.



More fun and games with Beam


We're using Beam 2.13 and want it to be called by Google Composer/Airflow. Unfortunately, the route we have been recommended (here) is awkware to say the least.

Extracting the CLI values is cumbersome. They’re wrapped in ValueProvider objects and a reference to them must be created with something like:

  def createViewFor(name: String, pipeline: Pipeline, value: ValueProvider[String]): PCollectionView[String] =
    pipeline.apply(s"${name}Config:Start",  Create.of("Start"))
      .apply(s"${name}Config:Parse",        ParDo.of(new ApplicationConfigurationParser(value)))
      .apply(s"${name}Config:AsView",       View.asSingleton())

Then, you pass this reference to your DoFn with something likes:

    val myConfig = createViewFor(…)
    ParDo.of(new MyDoFn(myConfig))
      .withSideInputs(myConfig)

And then finally in your DoFn, you access it with:

class MyDoFn(
                      myConfigView:          PCollectionView[String],
                     ) extends DoFn[FileIO.ReadableFile, String] {
  @ProcessElement
  def processElement(@Element input:  FileIO.ReadableFile,
                     out:             OutputReceiver[String],
                     context:         ProcessContext): Unit = {
    val myConfig = context.sideInput(myConfigView)
.
.

Just painful.

Saturday, October 5, 2019

Miscellaneous FS2 notes


These are my first steps into the FS2 [underscore.io] world ("when you start working with fs2 is the fact everything is a Stream. By everything I mean everything, even your Queue").

Some terminology

"Pure means no effect on the world. It just makes your CPU work and consumes some power, but besides that it does not affect the world around you.
Use evalMap instead of map when you want to apply a function that has an effect like loadUserIdByName to a Stream." [FreeCodeCamp]

"Pipe[F, A, B] is just Stream[F, A] => Stream[F, B]
I believe it's just a type alias" [Chris Davenport, Gitter].

Note the difference between concurrent and synchronization in this from FS2's Gitter channel:

Oleg Pyzhcov @oleg-py
If you have two threads doing update at the same time, one will always see results of another (not the case with var, for instance, where both can see old state) - that's safe concurrent modification. However, you can't enforce ordering of those operations with Ref alone - that's lack of synchronization.

Fabio Labella @SystemFw
FS2 concurrency builds up off a few simple primitives the most basic one being a primitive to start an F asyncly
Not needing Effect is better because Effect is very very (too) powerful, being basically equivalent to the ability to unsafeRunSync it also means that we don't need to thread ExecutionContexts around (you will only find ECs in APIs that block).


Minimal implementation

You'll notice that your main class extends cats.effect.IOApp and your 'main' methods now becomes this:

  override def run(args: List[String]): IO[ExitCode] =

IOApp pulls in a lot of implicits and an execution context, making things much easier and explaining why isolated snippets might not compile.

captainmannering @captainmannering Apr 07 13:59
I would like to use Cats IO and fs2 to write a program with the following structure: one task that receives events from one data source (say RabbitMQ), another task that receives events from another data source (say Kafka) and both of these are then sent into another task that processes them. These tasks should run until a special message is received on either of the sources that causes everything to gracefully stop and the program to exit. Is there any similar sort of code that anyone has a link for that does this sort of thing?

Fabio Labella @SystemFw Apr 07 14:18
is the special message the same on both sources or different?
but basically use fs2-rabbit and fs2-kafka to get stream out of your sources (there are ways to write your own connectors ofc if you need them), then you can do:

rabbitStream
  .map(toCommonType)
  .merge(kafkaStream map toCommonType)
  .takeWhile(_ == specialMessage)
  .evalMap(yourProcessingTaskReturningIO)
  .compile
  .drain
  .as(ExitCode.Success)


Wrapping old Java code with FS2

One suggestion from Gitter was to use a tagless solution.

Oleg Pyzhcov @oleg-py Jun 27
The common pattern is to do something like this:

trait YoloContext[F[_]] {
  def execute(): F[Unit]
  def insert[T](o: T): F[Handle[T]]
}

object YoloContext {
  private class Implementation[F[_]: Sync](javaCtx: YoloCtx) extends YoloContext[F] { 
    /* implement methods. You have javaCtx here */ 
  }
  def create[F[_]: Sync](name: String): Resource[F, YoloContext[F]] = { 
    /* return a resource with `Implementation`. Hide the java stuff completely*/ 
  }
}


The algebra of streaming

This isn't FS2 specific but this salient comment from the omniscient Rob Norris is worth repeating:

Rob Norris @tpolecat Sep 23 22:58
Yeah in general it [fold] has to be lazy because you might try to fold something infinite.
And the thing that is not immediately intuitive is that foldLeft cannot possibly terminate for an infinite structure, but foldRight might.

Think about where the parens stack up. With foldLeft it looks like (((((((((( forever.

With foldRight it looks like a + (b + (c + ... forever.

So if + is lazy you might be able to stop after a while.