Sunday, October 27, 2019

More on applicatives


My post on applicatives barely broke the surface.

The problem

I wanted my cake and eat it. I wanted:
  • to use my type in a for comprehension (ie, to flatMap over it)
  • to accumulate errors
In short, I wanted my type to act like a Monad and an Applicative. The trouble is that a monad is fail fast. That is, the first monad to represent a 'failure' short circuits the for comprehension.

I asked about the best way to proceed and get some great responses from people much more clever than me. I've included some in this post side-by-side with annotations.
Fabio Labella @SystemFw "flatMap loses errors," violating @PhillHenry's actual use case.
No, what I was saying is that you cannot have a use case where you have F[A] and A => F[B], and you have errors in F[A] and F[B] at the same time
It's impossible.
Julien Truffaut @julien-truffaut  Once you point it out, it is obvious you cannot accumulate errors in a for comprehension, or execute IO concurrently. 
This took me a long time to grok so this post is my attempt to understand.

First, let's clear up some terminology.

Effectful types
Option models the effect of optionality
Future models latency as an effect
Try abstracts the effect of failures (manages exceptions as effects)

Option is a monad that models the effect of optionality,” and it’s also what Mr. Norris means when he says that an effectful function returns F[A] rather than [A].

If you want to think about it philosophically, when a function returns an A, that A has already been fully evaluated; but if that function returns F[A] instead, that result has not already been fully evaluated, the A is still inside F[A] waiting to be evaluated.
[Alvin Alexander's blog]
When working with abstractions that are related to composing computations, such as applicative functors and monads, it's convenient to somewhat distinguish between the actual value and the "rest", which we often call an "effect". In particular, if we have a type f of kind * -> *, then in f a the a part is "the value" and whatever "remains" is "the effect.

The distinction between "effects" and "values" doesn't really depend on the abstraction. FunctorApplicative and Monad just give us tools what we can do with them (Functors allow to modify values inside, Applicatives allow to combine effects and Monads allow effects to depend on the previous values).
[StackOverflow]

Parallel has nothing to do with threads

...at least not in this context.
A slightly simplified signature of andThen is: 
def andThen[B](f: A => Validated[E, B]): Validated[E, B] 
and it looks exactly as flatMap would look like. Unfortunately, andThen cannot be just renamed to flatMap. The latter is an operation from the Monad type class, and since cats.Monad extends cats.Applicative there is a law that defines how their operations must relate to each other. One of them says that Applicative operations must be equivalent to the same operations implemented using Monad operations. In the case of Validated this means that if there was an instance of the Monad type class, the Applicative operations wouldn't be able to accumulate errors
[Roman Timushev's blog]

For this reason, you can't use a Validated in a for comprehension.

You can, of course, convert between, say, Either and Validated when you want monadic and applicative behaviour respectively.
When browsing the various Monads included in Cats, you may have noticed that some of them have data types that are actually of the same structure, but instead have instances of Applicative. E.g. Either and Validated. This is because defining a Monad instance for data types like Validated would be inconsistent with its error-accumulating behaviour. In short, Monads describe dependent computations and Applicatives describe independent computations.
[The Cats documentation on Parallel].

And this is where Parallel or rather its super trait NonEmptyParallel comes in. From the docs: "Some types that form a FlatMap, are also capable of forming an Apply that supports parallel composition. The NonEmptyParallel type class allows us to represent this relationship."

Now, if we have a

EitherNel[String, A]

where EitherNec is just a type alias for Either[NonEmptyList[E], A] then a call to

tupled.parMapN(f)

(where f is of type A => B) will magically accumulate our Lefts.

[Note, that if you look at the parallel code, you'll see a few ~> which  Rob Norris describes thus: "~> is like => but it operates on type constructors"]

Rob Norris @tpolecat Jul 18 03:58
Parallel is more general than it may appear. It describes a relationship between a monad and an associated applicative functor. For IO the applicative really does run things in parallel. For Either the applicative accumulates values on the left. For List the applicative zips instead of Cartesian product.
Only IO requires the context shift.


Summary
Fabio Labella @SystemFw I think "use par* to accumulate errors" is easier to explain that "well, the reason why you cannot flatMap Validated is because of the laws of consistency between Applicative and Monad"
Oleg Pyzhcov @oleg-py "user par* to accumulate errors, but make sure your error type has a semigroup instance" ... The thing about Parallel, I think, is that many people learn about it in the context of IO where the wording makes intuitive sense which doesn't translate at all to what we call Parallel in cats
Since we're talking about semigroups, we can parallelize the computation in the concurrency sense of the word since semigroups go hand-in-hand with parallel computations.


Thursday, October 24, 2019

Python for Java Programmers


Dependency management


Python doesn't seem to have build tools like Maven, Gradle, SBT etc. Instead, you use virtual environments that are tailored to your project.

To set it up, you run something like:

python3 -m venv ~/env/ARBITRARY_NAME_FOR_ENV
source ~/env/ARBITRARY_NAME_FOR_ENV/bin/activate
pip install -r requirements.txt

where requirements.txt looks something like:

apache-airflow[gcp_api]==1.10.1
google-api-python-client==1.7.8
pytest

Otherwise, dependencies are scoped to the OS and that way madness lies.


IDEs

PyCharm seems the IDE of choice in my office. The community edition can be downloaded and used for free.

When setting up a project, one must Add a new Python interpreter and this allows you to choose the virtual environment you want for this project.


Logging

For logging, just use a:

import logging

See the Python Docs for examples.


Mocking

You can use mocks in at least 2 different ways:

@patch('x.y.method_to_mock', return_value=VALUE_MOCK_RETURNS)

or

def test_trigger_dataflow_success(mocked_service, config, task_instance, monkeypatch, data_access_layer_stub):
.
.
    def mock_my_method(an_argument):
        return my_stub
.
.
    monkeypatch.setattr('x.y.method_to_mock', mock_my_method)


Tests

Tests are run with the command:

python -m pytest tests/

What is included appears to be based on convention.

"py.test will import conftest.py and all Python files that match the python_filespattern, by default test_*.py. If you have a test fixture, you need to include or import it from conftest.py or from the test files that depend on it" [from SO]. 

It appears (?) that test files that are nested deeper will use the conftest.py of higher packages in the same branch.


Notable differences with Java

Unlike Java, Python classes tend not to live in files that bear their names [SO]. So, if you have a class called X in file y.py in package z, you'd import it with:

from z import y

x = y.X(...)

Note the absence of the new operator.

Friday, October 18, 2019

Cats and Applicatives


This is part of a 'brown-bag' for my colleagues to extol the joys of functional programming. I want to emphasize practicality over theory.

One use case is to do some IO then close all streams. For this, Applicatives and Cats made the code incredibly easy to work with.

Without further ado, here's the code:

import cats.data.NonEmptyList
import cats.{Applicative, ApplicativeError}

class MyIO[F[_]: Applicative](implicit E: ApplicativeError[F, Throwable]) {

  def doIO[U](f: => U): F[U] = E.fromTry(Try(f))

  def allOrNothing[A](xs: NonEmptyList[F[A]]): F[A] = {
    import cats.implicits._
    xs.foldLeft(xs.head) { case (a, x) =>
      a*> x
    }
  } 

  def doIO(...): F[Unit] = {
    ...
    val fo1 = doIO(firstOutputStream.close())
    val fo2 = doIO(secondOutputString.close())
    val fi  = doIO(inputStream.close())

    allOrNothing(NonEmptyList(fo1, List(fo2, fi)))
  }

A few points to note here.

  1. This works for all Applicative types. That is to say, with the relevant imports, doIO will return us an Either, an Option, or any other suitable data structure with no changes to the code in MyIO. (Note: Option may not be an appropriate choice...)
  2. Because allOrNothing is dealing with Applicatives, it will return the relevant type for all passing (Right, Some, etc) or for at least one failing (Left, None, etc) again with no change to the code.
Note that there must be a suitable ApplicationError[F[_], Exception] in the ether. Some you can get for free with Cats and a suitable import, some you must roll by hand.

So, what is actually returned in allOrNothing? Given a list of, say, Rights, it returns the last Right. But given a List of Eithers with at least one Left, then the first Left will be returned. An analogous situation exists for any type with the appropriate ApplicationError.



More fun and games with Beam


We're using Beam 2.13 and want it to be called by Google Composer/Airflow. Unfortunately, the route we have been recommended (here) is awkware to say the least.

Extracting the CLI values is cumbersome. They’re wrapped in ValueProvider objects and a reference to them must be created with something like:

  def createViewFor(name: String, pipeline: Pipeline, value: ValueProvider[String]): PCollectionView[String] =
    pipeline.apply(s"${name}Config:Start",  Create.of("Start"))
      .apply(s"${name}Config:Parse",        ParDo.of(new ApplicationConfigurationParser(value)))
      .apply(s"${name}Config:AsView",       View.asSingleton())

Then, you pass this reference to your DoFn with something likes:

    val myConfig = createViewFor(…)
    ParDo.of(new MyDoFn(myConfig))
      .withSideInputs(myConfig)

And then finally in your DoFn, you access it with:

class MyDoFn(
                      myConfigView:          PCollectionView[String],
                     ) extends DoFn[FileIO.ReadableFile, String] {
  @ProcessElement
  def processElement(@Element input:  FileIO.ReadableFile,
                     out:             OutputReceiver[String],
                     context:         ProcessContext): Unit = {
    val myConfig = context.sideInput(myConfigView)
.
.

Just painful.

Saturday, October 5, 2019

Miscellaneous FS2 notes


These are my first steps into the FS2 [underscore.io] world ("when you start working with fs2 is the fact everything is a Stream. By everything I mean everything, even your Queue").

Some terminology

"Pure means no effect on the world. It just makes your CPU work and consumes some power, but besides that it does not affect the world around you.
Use evalMap instead of map when you want to apply a function that has an effect like loadUserIdByName to a Stream." [FreeCodeCamp]

"Pipe[F, A, B] is just Stream[F, A] => Stream[F, B]
I believe it's just a type alias" [Chris Davenport, Gitter].

Note the difference between concurrent and synchronization in this from FS2's Gitter channel:

Oleg Pyzhcov @oleg-py
If you have two threads doing update at the same time, one will always see results of another (not the case with var, for instance, where both can see old state) - that's safe concurrent modification. However, you can't enforce ordering of those operations with Ref alone - that's lack of synchronization.

Fabio Labella @SystemFw
FS2 concurrency builds up off a few simple primitives the most basic one being a primitive to start an F asyncly
Not needing Effect is better because Effect is very very (too) powerful, being basically equivalent to the ability to unsafeRunSync it also means that we don't need to thread ExecutionContexts around (you will only find ECs in APIs that block).


Minimal implementation

You'll notice that your main class extends cats.effect.IOApp and your 'main' methods now becomes this:

  override def run(args: List[String]): IO[ExitCode] =

IOApp pulls in a lot of implicits and an execution context, making things much easier and explaining why isolated snippets might not compile.

captainmannering @captainmannering Apr 07 13:59
I would like to use Cats IO and fs2 to write a program with the following structure: one task that receives events from one data source (say RabbitMQ), another task that receives events from another data source (say Kafka) and both of these are then sent into another task that processes them. These tasks should run until a special message is received on either of the sources that causes everything to gracefully stop and the program to exit. Is there any similar sort of code that anyone has a link for that does this sort of thing?

Fabio Labella @SystemFw Apr 07 14:18
is the special message the same on both sources or different?
but basically use fs2-rabbit and fs2-kafka to get stream out of your sources (there are ways to write your own connectors ofc if you need them), then you can do:

rabbitStream
  .map(toCommonType)
  .merge(kafkaStream map toCommonType)
  .takeWhile(_ == specialMessage)
  .evalMap(yourProcessingTaskReturningIO)
  .compile
  .drain
  .as(ExitCode.Success)


Wrapping old Java code with FS2

One suggestion from Gitter was to use a tagless solution.

Oleg Pyzhcov @oleg-py Jun 27
The common pattern is to do something like this:

trait YoloContext[F[_]] {
  def execute(): F[Unit]
  def insert[T](o: T): F[Handle[T]]
}

object YoloContext {
  private class Implementation[F[_]: Sync](javaCtx: YoloCtx) extends YoloContext[F] { 
    /* implement methods. You have javaCtx here */ 
  }
  def create[F[_]: Sync](name: String): Resource[F, YoloContext[F]] = { 
    /* return a resource with `Implementation`. Hide the java stuff completely*/ 
  }
}


The algebra of streaming

This isn't FS2 specific but this salient comment from the omniscient Rob Norris is worth repeating:

Rob Norris @tpolecat Sep 23 22:58
Yeah in general it [fold] has to be lazy because you might try to fold something infinite.
And the thing that is not immediately intuitive is that foldLeft cannot possibly terminate for an infinite structure, but foldRight might.

Think about where the parens stack up. With foldLeft it looks like (((((((((( forever.

With foldRight it looks like a + (b + (c + ... forever.

So if + is lazy you might be able to stop after a while.


Saturday, September 28, 2019

Applicative Functors and Validation


Applicatives

Laws Applicative instances must follow:

map(apply(x))(f)         == apply(f(x))
join(apply(x), apply(y)) == apply((x, y))

Recall that "functions are applicative functors" (LYAHFGG) and applicatives have a function of type:

f (a -> b) -> f a -> f b.

So, where is this function on scala.Function1 etc?

Basically, the authors of Scala didn't put that functional programming goodness into the standard library and you have to roll-your-own or use a library like Cats (see below).

Applicative Functor

I liked the second definition here at StackOverflow which basically says the defining Functor function is:

  def map[A, B](f : A => B): C[A] => C[B]

"And it works perfectly for a function of one variable. But for a function of 2 and more, after lifting to a category, we have the following signature:"

val g = (x: Int) => (y: Int) => x + y

for example:

Option(5) map g // Option[Int => Int]

The defining Applicative function is:

  def apply[A, B](f: F[A => B]): F[A] => F[B]

"So why bother with applicative functors at all, when we've got monads? First of all, it's simply not possible to provide monad instances for some of the abstractions we want to work with—Validation is the perfect example. Second (and relatedly), it's just a solid development practice to use the least powerful abstraction that will get the job done. In principle this may allow optimizations that wouldn't otherwise be possible, but more importantly it makes the code we write more reusable"
[ibid]

Validation

The author of this answer raises a good example of  where monads are not appropriate, something that struck me this week when I was writing validation code.

Monads are great. Why they're great is because I don't need to change my code when I mess up. For instance, I had some parsing code that looked like this:

  def parse: T[String] = for {
    a <- parent
    b <- parseElementA
    c <- parseElementB
  } yield {
    ...
  }

and naively my parseXXX functions return Options. The problem here is that if we fail to parse an element, None doesn't tell us why. No worries, let's use Either which (since Scala 2.12) is a monad too! Now my parseXXX methods will tell me if they fail why they fail and I never had to change the above block of code!

The next problem occurred when the QA told me that he must run the whole application again to find the next error in the data he is feeding into it. In the cloud (GCP), this is a royal pain. So, wouldn't it be great to aggregate all the errors?

As mentioned in the SO answer above, it's simply not possible to do this with monads. Fortunately, "there's a simpler abstraction—called an applicative functor—that's in-between a functor and a monad and that provides all the machinery we need. Note that it's in-between in a formal sense." [SO]

Cats and Validation

Cats has a type to help here called Validated. But note that "what’s different about Validation is that it is does not form a monad, but forms an applicative functor." [eed3si9n]

So, with a few imports from Cats, I can write the even more succinct:

  def doApplicatives: T[String] = (parseParentparseElementAparseElementB).mapN { case (x, y, z) =>
    ...
  }

What's more, since Options and Eithers are also applicatives (as are all monads) this code still works just as well with them because "every monad is an applicative functor, every applicative functor is a functor, but not every applicative functor is a monad, etc." [SO]

Note that Scalaz also gives us an applicative functor instance for Option, so we can write the following:

import scalaz._, std.option._, syntax.apply._def add(i: Int, j: Int): Int = i + j

val x: Option[Int] = ...
val y: Option[Int] = ...

val xy = (x |@| y)(add)

So, for validation, don't use monads, use applicatives and some library to add syntactic sugar.

Sunday, September 22, 2019

Git strategies


I've normally worked in a team where each developer has his own feature branch. My current team instead has each developer working on his own fork. The reason for this is the main branch is not "polluted" with long-dead development.

In addition, squashing the commits (see below) means each feature only adds one commit to the main codebase.

Forking code

For a codebase with:

git clone FORK_TO
git remote add ARBITRARY_NAME  FORK_FROM
git fetch ARBITRARY_NAME
git checkout -b ARBITRARY_BRANCH_NAME
git merge  --allow-unrelated-histories ARBITRARY_NAME/master
git push --set-upstream origin ARBITRARY_BRANCH_NAME

Origins and aliases

Git's remote labels are just that - aliases for URLs of repositories. To see them, run:

git remote -v

"Remotes are like nicknames for the URLs of repositories - origin is one, for example." [SO]

By convention, "upstream generally refers to the original repo that you have forked ... origin is your fork: your own repo on GitHub, clone of the original repo of GitHub" [SO]

You can delete remote labels with:

git remote rm upstream

Note that this cannot in itself damage your codebase.

Anyway, having forked from the main repository, you may want to add an alias like this:

git remote add upstream URL_OF_CODE_WE_FORKED_FROM

Briefly, "you have access to pull and push from origin, which will be your fork of the main ... repo. To pull in changes from this main repo, you add a remote, upstream in your local repo, pointing to this original and pull from it.

"So origin is a clone of your fork repo, from which you push and pull. Upstream is a name for the main repo, from where you pull and keep a clone of your fork updated, but you don't have push access to it.” [SO]

Merging from the main branch

You might first want to see what change you're pulling [SO] so do a:

git diff COMMIT_ID~ COMMIT_ID

This assumes the previous commit "squashed" their commits (see below). This makes the main fork cleaner.

You can checkout your personal main branch and run:

git pull upstream MAIN_BRANCH

or a:

git reset --hard upstream MAIN_BRANCH

rather than a rebase

You may sometimes have Git tell you that you are working on a detached head. What does this mean? “You are not on a branch (the commit is likely to be on multiple branches). You are checked out to a specific instance in the history… You are checked out to a specific commit.” [SO]

After doing this, your personal main branch has the team's changes and you can merge like usual.

If you really foul things up after pushing to your personal code base, fear not. You can rollback a commit with:

git revert THE_HASH_FOR_COMMIT_YOU_WANT_TO_FORGET

this will leave commit in the log though. See the line “less dangerous approach” here [SO].

Merging to the main branch

When pushing your code to the main branch, you can "squash" all the commits. This means that in the future, developers will just see the salient changes without all the commits that went to make that featuer.

You can squash commits with:

git rebase -i MAIN_BRANCH

You'll be asked to edit a file. Keep the first line and replace 'pick' in subsequent lines with 's'. Then:

git push -f origin YOUR_BRANCH

Now you're good to merge with the team's codebase, although you may not have privileges to do that...

Note: you might want to squash your code before merging changes from another branch as this makes any conflicts much easier to resolve.

Miscellaneous

Some people use fetch and some use pull. What's the difference? “In the simplest terms, git pull does a git fetch followed by a git merge.

"You can do a git fetch at any time to update your remote-tracking branches under refs/remotes//.

"This operation never changes any of your own local branches under refs/heads, and is safe to do without changing your working copy… A git pull is what you would do to bring a local branch up-to-date with its remote version, while also updating your other remote-tracking branches.” [SO]

Rolling back

How to rollback depends on what exactly you mean by rollings back (do you want a record of the abandoned commits or not? etc). The simplest way is to just checkout [SO] like:

git checkout HASH_OF_DESIRED_COMMIT .

and note that '.' at the end of the line. You can then git push origin HEAD:YOUR_BRANCH

There are variants on this but this way will retain your history - which may or may not be what you desire. For instance, this line above will literally give you the code as it was at that point in time. If you want to re-apply any changes from other branches that you had previously pulled, Git will think this has already been applied and do nothing. Similarly, if you pull this branch into another that already has those changes, they will not be deleted just because the branch you're pulling from no longer has them. You need to revert.

Merges also cause a problem for revert. This is because when rolling back each commit, Git does not know by default which branch to continue rolling back in the event of it encountering a merge. you have to run something like git revert -m 1 YOUR_HASH..HEAD to revert. The -m 1 tells Git which branch to follow.