Friday, December 27, 2019

CNNs and network scanning with TensorFlow


Introduction

Port scanning leaves tell-tale signs in flow logs that are easy to spot when we visualize them (here is an example). Wouldn't it be good if we could teach a neural net to check our logs for such activity?

To this end, I've written some Python code using Tensorflow (here) that fakes some port data. Some of the fakes are clearly the result of a port scan, others are just random connections. Can a convolutional neural network (CNN) spot the difference?

Fake data look like this:

Graphical representation of network connection activity
Notice that port scanning is represented by having a straight line where vertical lines might represent scanning all hosts on a single box or horizontal lines might represent scanning a lot of boxes on just one port.

Realistically, nefarious actors only scan a subset of ports (typically those below 1024). I'll make the fake data more sophisticated later.

A brief introduction to CNNs

"The big idea behind convolutional neural networks is that a local understanding of an image is good enough. As a consequence, the practical benefit is that fewer parameters greatly improve the time it takes to learn as well as lessens the amount of data required to train the model." [2]

"Pooling layers are commonly inserted between successive convolutional layers. We want to follow convolutional layers with pooling layers to progressively reduce the spatial size (width and height) of the data representation. Pooling layers reduce the data representation progressively over the network and help control overfitting. The pooling layer operates independently on every depth slice of the output." [1]

"The pooling layer uses the max() operation to resize the input data spatially (width, height). This operation is referred to as max pooling. With a 2x2 filter size, the max() operation is taking the largest of four numbers in teh filter area." [1]

Code

The code I stole from Machine Learning with Tensorflow but while it reshapes its data to 24x24 images, my images are of a different dimension. And when I took the code in the book, I got errors. Apparently, this line (p184) was causing me problems:

Where was it getting 6*6*64 from? The 64 is easy to explain (it's an arbitrary number of convolutions we use in the previous layer) but the 6x6...?

When using MLwTF's code and my data, Tensorflow was complaining that logits_size and labels_size were not the same. What does this mean?
Logits is an overloaded term which can mean many different things. In Math, Logit is a function that maps probabilities ([0, 1]) to R ((-inf, inf)) ... Probability of 0.5 corresponds to a logit of 0. Negative logit correspond to probabilities less than 0.5, positive to > 0.5. 
In ML, it can be the vector of raw (non-normalized) predictions that a classification model generates, which is ordinarily then passed to a normalization function. If the model is solving a multi-class classification problem, logits typically become an input to the softmax function. The softmax function then generates a vector of (normalized) probabilities with one value for each possible class.
(from StackOverflow).

After a lot of trial and error, I figured the 6*6 was coming from max pooling. This is where the algorithm"sweeps a window across an image and picks the pixel with the maximum value" [MLwTF]. The ksize in the code is 2 and this is the third layer so we have max-pooled the original matrix twice already. So, the 6 comes from our original size (24 pixels) twice max-pooled by a size of 2 giving 24/(2*2) = 6.

I noticed that the stride length also plays a part in the calculation of the size of the fully connected layer ("for example, a stride length of 2 means the 5 × 5 sliding window moves by 2 pixels at a time until it spans the entire image" MLwTF). In the example in MLwTF, the size is 1 so it makes no difference in this particular case but in general, we also need to divide our image size on each layer by this value.

So, the calculation of the size of the fully connected layer looks like (in Python):

        for _ in range(depth):
            i = math.ceil(i / self.stride_size)
            i = math.ceil(i / self.ksize)
            j = math.ceil(j / self.stride_size)
            j = math.ceil(j / self.ksize)
        return i * j

where i and j are originally set to be the size of the input data and depth is the layer for which we want to calculate the size.

[1] Deep Learning - a practitioners guide.
[2]  Machine Learning with TensorFlow

Sunday, December 1, 2019

Monad Transformers: code smell in disguise?


What are they?

Luka Jacobowitz (Gitter @LukaJCB 01/04/20 01:48) says "the definition of monad transformer is that for any MT[F, A] if F is a monad than MT[F, *] is also a monad."

Noel  Welsh gives us an example of a 3-way monad: None, Some(x) or a (String) error:
Monad transformers allow us to stack monads. Eg, stacking an Option with \/ (that is, Scalaz's version of Either) into one handy monad. The type OptionT[M[_], A] is a monad transformer that constructs an Option[A] inside the monad M. So the first important point is the monad transformers are built from the inside out.
Here's an example:

  def transformAndAdd[F[_]: Monad](fa: OptionT[F, Int], fb: OptionT[F, Int]): OptionT[F, Int] = for {
    a <- fa
    b <- fb
  } yield a + b


Here, F can be any monad. For instance:

    val faIO = OptionT[IO, Int](IO(Some(1)))
    val fbIO = OptionT[IO, Int](IO(Some(2)))

    transformAndAdd(faIO, fbIO)

will wrap the result of the addition in an IO. You can do something similar for Future:

    val fa = OptionT[Future, Int](Future(Some(1)))
    val fb = OptionT[Future, Int](Future(Some(2)))

    println(Await.result(transformAndAdd(fa, fb).value, 1.seconds))

We care not what the outer monad is as long as the inner is an Option in this case.

Drawbacks

Not all combinations of monads can be used with transformers. This SO answer explains why:
Important thing here is that there is no FutureT defined in cats, so you can compose Future[Option[T]], but can't do that with Option[Future[T]] ... 
That means you can compose Future x Option (aka Option[Future[T]]) to one single monad (coz OptionT exists), but you can't compose Option x Future (aka Future[Option[T]]) without knowing that Future is something else besides being a monad (even though they’re inherently applicative functors - applicative is not enough to neither build a monad nor monad transformer on it)
The omniscient Rob Norris explains why:
Rob Norris @tpolecat
Like Option[IO. There's no IOT transformer. 
PhillHenry @PhillHenry
Because nobody has written one? Or it can't be done?
(Sorry for such a silly question. Friday night here.) 
Rob Norris @tpolecat
It can't be done. You would have to define Option[IO[A]] => (A => Option[IO[B]]) => Option[IO[B]] but there's no way to do that because you can't get the A out.

Should we even use them?

There appear to be two schools of thought on Monad Transformers. People aligned to Scalaz appear to hate them in Scala.
jrciii @jrciii
Are monad transformer stacks idiomatic scalaz? If not is there a better way to get either an error or a value with logging along the way? 
beezee @beezee
eh idiomatic i have to defer to others. i feel like the idioms change every time there's a conference
i used them a lot and now consider that code legacy
i dream of the day it's all gone
it seems very in fashion to mimic mtl
scalaz has MonadError and MonadWriter
the nice thing about it is you don't worry about plumbing in your business code and you can be explicit about what capabilities are required for every function
i'm sure others who have used it more than me could describe pitfalls there too 
John A. De Goes @jdegoes
Monad transformers are not idiomatic in Scala. Avoid them at all costs. [See John's article here]

Emily Pillmore
The real question you need to ask after deciding Monad Transformers are not good in Scala is why that is true. The answer is not that MT are bad, per se, but that Scala has very poor facilities for dealing with nested boxed and unboxed values alike. It simply is not a language that has reasonable data formats 
John A. De Goes @jdegoes
@PhillHenry Yeah, what Emily says. Monad transformers work well in Haskell (actually way more performant than, e.g. free monads). But Scala is not Haskell. In Scala, monad transformers destroy performance and type inference, and add tedious boilerplate unless used in combination with type classes (in which case you just have to worry about type inference and performance).

[Gitter chat, 20 & 26 Feb  2019]

But the Cats crowd don't seem to mind:
Rob Norris @tpolecat
The correct take on monad transformers is that they're fine. Write your program in terms of abstract effect F and then at the end of the world if you have to instantiate it into a transformer stack, who cares? It's awkward but just for a line or two. 
Gavin Bisesi @Daenyth
And unless your app's runtime does no database or network IO, any performance overhead from using transformers is just not worth caring about. IO drowns it out by orders of magnitude

[Gitter chat, 1 Nov 2019]

But even the Cats people say: "Strong claim: Thou shalt not use Monad Transformers in your interface" (Practical FP in Scala by Gabriel Volpe)


Monday, November 25, 2019

Shared State in FP


If everything is immutable in FP, how do you share state that is constantly changing?


Shared State in Pure FP

Here are some notes from Fabio Labella's (aka SystemFW) talk on the matter:

IO[A]

  •  produces one value, fails or never terminates
  •  can suspend any side-effect (ie, not evaluated immediately)
  •  many algebras (MonadSyncConcurrent ...). Note Timer is non-blocking.

If you want to do concurrent shared state, you do it in IO or any F[_]
Cats-effect gives primitives for use with immutable data. This strategy is not suitable for mutable data like a JDBC connection.

Access, modification and creation of mutable state needs suspension in IO. Creation for referential transparency.

Ref[A]

  • is a thin wrapper of AtomicReference
  • its API similar to the State monad. 
  • uses the Tagless Final pattern


The State Monad

Recall that the State Monad look a little like:

case class State[S, A](modify: S => (A, S)) {

  def flatMap[B](f: A => State[S, B]): State[S, B] =
    State { s =>
      val (result, nextState) = modify(s)
      f(result).modify(nextState)
    }
...

So note that we we create a new State that will run its modify on another state that it is yet to be given.

Bearing this in mind, let's see some multi-threaded code sharing 'mutable' state from the TypeLevel site, here. I've slightly changed the nomenclature for clarity:

  type MyState          = List[String]
  type MyStateContainer = Ref[IO, MyState]

  def putStrLn(str: String): IO[Unit] = IO(println(str))

  def process1(ref: MyStateContainer): IO[Unit] =
    putStrLn("Starting process #1") *>
      IO.sleep(3.seconds) *>
      ref.update(_ ++ List("#1")) *>
      putStrLn("Done #1")
...

and we start it all off with:

  def masterProcess: IO[Unit] =
    Ref.of[IO, MyState](List.empty[String]).flatMap { ref =>
      val ioa = List(process1(ref), process2(ref), process3(ref)).parSequence.void
      ioa *> ref.get.flatMap(rs => putStrLn(rs.toString))
    }

Actually, that's a lie. We don't start it off here. We merely define the pipeline. That's why we add ioa in ioa *> ref.get.flatMap. Without it, ioa is not part of the pipeline and will not be run and our Ref will not be mutated.

Note that Ref.of[IO, MyState] returns a IO wrapping a Ref, not the other way around, and it's this IO that we flatMap over.

Also note that process1 etc returns IO[Unit] not IO[List[String]] as you might expect. This is because it represents the change and in our Tagless Final interface that would be def set(s: S): F[Unit].

So, where does the update happen? We can't wholly avoid mutating memory with FP, it's just hidden from us. What happens under the covers is that Ref spins in its modify function performing a compare-and-swap. If it can't swap its AtomicReference, then it tries to update its list again. This is all done on a different thread to main.


Conclusion

It's all about the flatMap. This is essential for mutating the state. No flatMap, no change.

Saturday, November 23, 2019

Unit testing in Functional Programming (part 1)


Mocking in FP is considered a code smell, if not a downright antipattern but you can't avoid it with clever testing techniques. You must refactor the code.

This is what I had to do with some FS2 code from here. Now, this code demonstrates FS2's abilities wonderfully but it's not great to use in production as it's hard to test. Take this code in the link:

    val stream =
      consumerStream(consumerSettings)
        .evalTap(_.subscribeTo("topic"))
        .flatMap(_.stream)
        .mapAsync(25) { committable =>
          processRecord(committable.record)
            .as(committable.offset)
        }
        .through(commitBatchWithin(500, 15.seconds))

Not only is this hard to test, it's very dependent on Kafka specific code.

We  can abstract it so it looks like this:

  def pipeline[K, C, P, R, O, T](s:              Stream[IO, K],
                                 subscribe:      K => IO[Unit],
                                 toRecords:      K => Stream[IO, C],
                                 commitRead:     C => IO[P],
                                 producerPipe:   Pipe[IO, P, R],
                                 toWriteRecords: R => O,
                                 commitWrite:    Pipe[IO, O, T]): Stream[IO, T] = 
    s.evalTap(subscribe).flatMap(toRecords).mapAsync(25)(commitRead).through(producerPipe).map(toWriteRecords).through(commitWrite)

This makes testing easier for two reasons:

  1. The code has no dependency on Kafka.
  2. I can submit my own functions that use FS2's in-memory streams.
Now, the test looks like this. The domain object look like this:


  case class Kafka()
  case class Record(id: Int)
  case class ProducerRecords(id: Int)
  case class CommittableOffset(id: Int)



and although their names have a superficial resemblance to Kafka classes, this is just to make things conceptually simpler. We're dealing with a much higher layer of abstraction than that.

For instance, instead have having to connect to an embedded Kafka instance, I can have:

      val records = (1 to nToRead).map(x => Record(x))
      
      val toRecords: Kafka => Stream[IO, Record] =
        _ => Stream.emits(records).covary[IO]

and pass this as the toRecords argument to the function we want to test, pipeline. It's all in memory, no ports and sockets to worry about.

[Aside: from the docs: given a pure stream, "to convert to an effectful stream, use covary"]

The takeaway point is this: if you want to use FP to test your OO code and avoid mocks, it isn't going to work. You need to do a root and branch refactoring to get the benefits of avoiding Mockito. But your code will also be more abstract and therefore more re-usable.

Monday, November 18, 2019

Mutation in Spark


... is dangerous. This came up on the Spark Gitter channel a few days ago (11/11/19). Somebody's Spark job was throwing OptionalDataException. This exception indicates "the failure of an object read operation due to unread primitive data, or the end of data belonging to a serialized object in the stream".

Now, looking at the JavaDocs for this Exception, there is no good reason why Spark would occasionally throw this. The person (alina_alemzeb_twitter) reported:
"I don't understand what else could be the culprit. Our Spark job is pretty basic, we're just doing transformations and filtering. The job ran fine for 10 hrs and then crashed with this error and this is all the code there is."
The fact it ran without problems for ten hours suggests that it wasn't a library incompatibility problem which was my first thought (maybe an object to be serialized had missing fields between different versions of a library?). It was only upon holistic code inspection that clues to its real nature became clear.

Basically, a HashMap that can be accessed from a closure is sent from the driver to the executors. This HashMap can momentarily be mutated on the driver. If one were unlucky, the HashMap could be mutated at just the moment the closure was being serialized. And it's this that lead to the non-deterministic nature of the bug.

So, there was no Spark bug but a bug in the code that calls Spark. The take-away point is that mutation is an evil but mutation in a distributed environment is sheer folly.

Saturday, November 2, 2019

Python Crib Sheet #3


Modulo

Modulo in Python and Scala differ for negative numbers. In Python, you see something like:

>>> print(-1 % 10)
9
>>> print(-11 % 10)
9

In Scala:

scala> println(-1 % 10)
-1

scala> println(-11 % 10)
-1

Arrays

Negative indices can be used to count from the end, for example:

>>> xs = range(0, 10)
>>> xs
range(0, 10)
>>> list(xs)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> xs[-1]
9
>>> xs[-2]
8

etc

Numpy

Similarly, you can reshape Numpy arrays:

>>> xs = [0, 1, 2]
>>> np.array(xs).shape
(3,)
>>> np.array(xs).reshape(-1, 1).shape
(3, 1)

If you don't reshape the Numpy array then operators can change the shape of those that have been reshaped.

>>> xs = [0, 1, 2]
>>>  np.array(xs).reshape(-1, 1)
array([[0],
       [1],
       [2]])
>>> np.array(xs).reshape(-1, 1) * np.array(xs)
array([[0, 0, 0],
       [0, 1, 2],
       [0, 2, 4]])

That is, a Cartesian product operations.

But note that an array that is not reshaped implicitly acts like a row vector:

>>> np.array(xs).reshape(1, -1) * np.array(xs)
array([[0, 1, 4]])

That is, an element-wise product.

Also note that some actions on the reshaped array can reverse the reshaping. For instance, summing over rows:

>>> a = np.array(xs).reshape(-1, 1)
>>> a.shape
(3, 1)
>>> np.sum(a, 1).shape  # 0 would sum columns
(3,)


Generators

Generators are (to my understanding at least) like a Java/Scala stream.

Iterables, generators and yield are related and this excellent SO answer weaves them together.

“When you use a list comprehension, you create a list, and so an iterable:

Generators are iterators, a kind of iterable you can only iterate over once. Generators do not store all the values in memory, they generate the values on the fly. [The syntax for creating generators is] just the same except you used () instead of []

yield is a keyword that is used like return, except the function will return a generator… To master yield, you must understand that when you call the function, the code you have written in the function body does not run. The first time the for calls the generator object created from your function, it will run the code in your function from the beginning until it hits yield, then it'll return the first value of the loop. Then, each other call will run the loop you have written in the function one more time, and return the next value until there is no value to return. The generator is considered empty once the function runs, but does not hit yield anymore.

>>> def createGenerator():
...    mylist = range(3)
...    for i in mylist:
...        yield i*i
...
>>> mygenerator = createGenerator() # create a generator
>>> print(mygenerator) # mygenerator is an object!

>>> for i in mygenerator:
...     print(i)
0
1
4

Here it's a useless example, but it's handy when you know your function will return a huge set of values that you will only need to read once."

Note that "Local variables in a generator function are saved from one call to the next, unlike in normal functions." [Python Wiki]


Sunday, October 27, 2019

More on applicatives


My post on applicatives barely broke the surface.

The problem

I wanted my cake and eat it. I wanted:
  • to use my type in a for comprehension (ie, to flatMap over it)
  • to accumulate errors
In short, I wanted my type to act like a Monad and an Applicative. The trouble is that a monad is fail fast. That is, the first monad to represent a 'failure' short circuits the for comprehension.

I asked about the best way to proceed and get some great responses from people much more clever than me. I've included some in this post side-by-side with annotations.
Fabio Labella @SystemFw "flatMap loses errors," violating @PhillHenry's actual use case.
No, what I was saying is that you cannot have a use case where you have F[A] and A => F[B], and you have errors in F[A] and F[B] at the same time
It's impossible.
Julien Truffaut @julien-truffaut  Once you point it out, it is obvious you cannot accumulate errors in a for comprehension, or execute IO concurrently. 
This took me a long time to grok so this post is my attempt to understand.

First, let's clear up some terminology.

Effectful types
Option models the effect of optionality
Future models latency as an effect
Try abstracts the effect of failures (manages exceptions as effects)

Option is a monad that models the effect of optionality,” and it’s also what Mr. Norris means when he says that an effectful function returns F[A] rather than [A].

If you want to think about it philosophically, when a function returns an A, that A has already been fully evaluated; but if that function returns F[A] instead, that result has not already been fully evaluated, the A is still inside F[A] waiting to be evaluated.
[Alvin Alexander's blog]
When working with abstractions that are related to composing computations, such as applicative functors and monads, it's convenient to somewhat distinguish between the actual value and the "rest", which we often call an "effect". In particular, if we have a type f of kind * -> *, then in f a the a part is "the value" and whatever "remains" is "the effect.

The distinction between "effects" and "values" doesn't really depend on the abstraction. FunctorApplicative and Monad just give us tools what we can do with them (Functors allow to modify values inside, Applicatives allow to combine effects and Monads allow effects to depend on the previous values).
[StackOverflow]

Parallel has nothing to do with threads

...at least not in this context.
A slightly simplified signature of andThen is: 
def andThen[B](f: A => Validated[E, B]): Validated[E, B] 
and it looks exactly as flatMap would look like. Unfortunately, andThen cannot be just renamed to flatMap. The latter is an operation from the Monad type class, and since cats.Monad extends cats.Applicative there is a law that defines how their operations must relate to each other. One of them says that Applicative operations must be equivalent to the same operations implemented using Monad operations. In the case of Validated this means that if there was an instance of the Monad type class, the Applicative operations wouldn't be able to accumulate errors
[Roman Timushev's blog]

For this reason, you can't use a Validated in a for comprehension.

You can, of course, convert between, say, Either and Validated when you want monadic and applicative behaviour respectively.
When browsing the various Monads included in Cats, you may have noticed that some of them have data types that are actually of the same structure, but instead have instances of Applicative. E.g. Either and Validated. This is because defining a Monad instance for data types like Validated would be inconsistent with its error-accumulating behaviour. In short, Monads describe dependent computations and Applicatives describe independent computations.
[The Cats documentation on Parallel].

And this is where Parallel or rather its super trait NonEmptyParallel comes in. From the docs: "Some types that form a FlatMap, are also capable of forming an Apply that supports parallel composition. The NonEmptyParallel type class allows us to represent this relationship."

Now, if we have a

EitherNel[String, A]

where EitherNec is just a type alias for Either[NonEmptyList[E], A] then a call to

tupled.parMapN(f)

(where f is of type A => B) will magically accumulate our Lefts.

[Note, that if you look at the parallel code, you'll see a few ~> which  Rob Norris describes thus: "~> is like => but it operates on type constructors"]

Rob Norris @tpolecat Jul 18 03:58
Parallel is more general than it may appear. It describes a relationship between a monad and an associated applicative functor. For IO the applicative really does run things in parallel. For Either the applicative accumulates values on the left. For List the applicative zips instead of Cartesian product.
Only IO requires the context shift.


Summary
Fabio Labella @SystemFw I think "use par* to accumulate errors" is easier to explain that "well, the reason why you cannot flatMap Validated is because of the laws of consistency between Applicative and Monad"
Oleg Pyzhcov @oleg-py "user par* to accumulate errors, but make sure your error type has a semigroup instance" ... The thing about Parallel, I think, is that many people learn about it in the context of IO where the wording makes intuitive sense which doesn't translate at all to what we call Parallel in cats
Since we're talking about semigroups, we can parallelize the computation in the concurrency sense of the word since semigroups go hand-in-hand with parallel computations.


Thursday, October 24, 2019

Python for Java Programmers


Dependency management


Python doesn't seem to have build tools like Maven, Gradle, SBT etc. Instead, you use virtual environments that are tailored to your project.

To set it up, you run something like:

python3 -m venv ~/env/ARBITRARY_NAME_FOR_ENV
source ~/env/ARBITRARY_NAME_FOR_ENV/bin/activate
pip install -r requirements.txt

where requirements.txt looks something like:

apache-airflow[gcp_api]==1.10.1
google-api-python-client==1.7.8
pytest

Otherwise, dependencies are scoped to the OS and that way madness lies.


IDEs

PyCharm seems the IDE of choice in my office. The community edition can be downloaded and used for free.

When setting up a project, one must Add a new Python interpreter and this allows you to choose the virtual environment you want for this project.


Logging

For logging, just use a:

import logging

See the Python Docs for examples.


Mocking

You can use mocks in at least 2 different ways:

@patch('x.y.method_to_mock', return_value=VALUE_MOCK_RETURNS)

or

def test_trigger_dataflow_success(mocked_service, config, task_instance, monkeypatch, data_access_layer_stub):
.
.
    def mock_my_method(an_argument):
        return my_stub
.
.
    monkeypatch.setattr('x.y.method_to_mock', mock_my_method)


Tests

Tests are run with the command:

python -m pytest tests/

What is included appears to be based on convention.

"py.test will import conftest.py and all Python files that match the python_filespattern, by default test_*.py. If you have a test fixture, you need to include or import it from conftest.py or from the test files that depend on it" [from SO]. 

It appears (?) that test files that are nested deeper will use the conftest.py of higher packages in the same branch.


Notable differences with Java

Unlike Java, Python classes tend not to live in files that bear their names [SO]. So, if you have a class called X in file y.py in package z, you'd import it with:

from z import y

x = y.X(...)

Note the absence of the new operator.

Friday, October 18, 2019

Cats and Applicatives


This is part of a 'brown-bag' for my colleagues to extol the joys of functional programming. I want to emphasize practicality over theory.

One use case is to do some IO then close all streams. For this, Applicatives and Cats made the code incredibly easy to work with.

Without further ado, here's the code:

import cats.data.NonEmptyList
import cats.{Applicative, ApplicativeError}

class MyIO[F[_]: Applicative](implicit E: ApplicativeError[F, Throwable]) {

  def doIO[U](f: => U): F[U] = E.fromTry(Try(f))

  def allOrNothing[A](xs: NonEmptyList[F[A]]): F[A] = {
    import cats.implicits._
    xs.foldLeft(xs.head) { case (a, x) =>
      a*> x
    }
  } 

  def doIO(...): F[Unit] = {
    ...
    val fo1 = doIO(firstOutputStream.close())
    val fo2 = doIO(secondOutputString.close())
    val fi  = doIO(inputStream.close())

    allOrNothing(NonEmptyList(fo1, List(fo2, fi)))
  }

A few points to note here.

  1. This works for all Applicative types. That is to say, with the relevant imports, doIO will return us an Either, an Option, or any other suitable data structure with no changes to the code in MyIO. (Note: Option may not be an appropriate choice...)
  2. Because allOrNothing is dealing with Applicatives, it will return the relevant type for all passing (Right, Some, etc) or for at least one failing (Left, None, etc) again with no change to the code.
Note that there must be a suitable ApplicationError[F[_], Exception] in the ether. Some you can get for free with Cats and a suitable import, some you must roll by hand.

So, what is actually returned in allOrNothing? Given a list of, say, Rights, it returns the last Right. But given a List of Eithers with at least one Left, then the first Left will be returned. An analogous situation exists for any type with the appropriate ApplicationError.



More fun and games with Beam


We're using Beam 2.13 and want it to be called by Google Composer/Airflow. Unfortunately, the route we have been recommended (here) is awkware to say the least.

Extracting the CLI values is cumbersome. They’re wrapped in ValueProvider objects and a reference to them must be created with something like:

  def createViewFor(name: String, pipeline: Pipeline, value: ValueProvider[String]): PCollectionView[String] =
    pipeline.apply(s"${name}Config:Start",  Create.of("Start"))
      .apply(s"${name}Config:Parse",        ParDo.of(new ApplicationConfigurationParser(value)))
      .apply(s"${name}Config:AsView",       View.asSingleton())

Then, you pass this reference to your DoFn with something likes:

    val myConfig = createViewFor(…)
    ParDo.of(new MyDoFn(myConfig))
      .withSideInputs(myConfig)

And then finally in your DoFn, you access it with:

class MyDoFn(
                      myConfigView:          PCollectionView[String],
                     ) extends DoFn[FileIO.ReadableFile, String] {
  @ProcessElement
  def processElement(@Element input:  FileIO.ReadableFile,
                     out:             OutputReceiver[String],
                     context:         ProcessContext): Unit = {
    val myConfig = context.sideInput(myConfigView)
.
.

Just painful.

Saturday, October 5, 2019

Miscellaneous FS2 notes


These are my first steps into the FS2 [underscore.io] world ("when you start working with fs2 is the fact everything is a Stream. By everything I mean everything, even your Queue").

Some terminology

"Pure means no effect on the world. It just makes your CPU work and consumes some power, but besides that it does not affect the world around you.
Use evalMap instead of map when you want to apply a function that has an effect like loadUserIdByName to a Stream." [FreeCodeCamp]

"Pipe[F, A, B] is just Stream[F, A] => Stream[F, B]
I believe it's just a type alias" [Chris Davenport, Gitter].

Note the difference between concurrent and synchronization in this from FS2's Gitter channel:

Oleg Pyzhcov @oleg-py
If you have two threads doing update at the same time, one will always see results of another (not the case with var, for instance, where both can see old state) - that's safe concurrent modification. However, you can't enforce ordering of those operations with Ref alone - that's lack of synchronization.

Fabio Labella @SystemFw
FS2 concurrency builds up off a few simple primitives the most basic one being a primitive to start an F asyncly
Not needing Effect is better because Effect is very very (too) powerful, being basically equivalent to the ability to unsafeRunSync it also means that we don't need to thread ExecutionContexts around (you will only find ECs in APIs that block).


Minimal implementation

You'll notice that your main class extends cats.effect.IOApp and your 'main' methods now becomes this:

  override def run(args: List[String]): IO[ExitCode] =

IOApp pulls in a lot of implicits and an execution context, making things much easier and explaining why isolated snippets might not compile.

captainmannering @captainmannering Apr 07 13:59
I would like to use Cats IO and fs2 to write a program with the following structure: one task that receives events from one data source (say RabbitMQ), another task that receives events from another data source (say Kafka) and both of these are then sent into another task that processes them. These tasks should run until a special message is received on either of the sources that causes everything to gracefully stop and the program to exit. Is there any similar sort of code that anyone has a link for that does this sort of thing?

Fabio Labella @SystemFw Apr 07 14:18
is the special message the same on both sources or different?
but basically use fs2-rabbit and fs2-kafka to get stream out of your sources (there are ways to write your own connectors ofc if you need them), then you can do:

rabbitStream
  .map(toCommonType)
  .merge(kafkaStream map toCommonType)
  .takeWhile(_ == specialMessage)
  .evalMap(yourProcessingTaskReturningIO)
  .compile
  .drain
  .as(ExitCode.Success)


Wrapping old Java code with FS2

One suggestion from Gitter was to use a tagless solution.

Oleg Pyzhcov @oleg-py Jun 27
The common pattern is to do something like this:

trait YoloContext[F[_]] {
  def execute(): F[Unit]
  def insert[T](o: T): F[Handle[T]]
}

object YoloContext {
  private class Implementation[F[_]: Sync](javaCtx: YoloCtx) extends YoloContext[F] { 
    /* implement methods. You have javaCtx here */ 
  }
  def create[F[_]: Sync](name: String): Resource[F, YoloContext[F]] = { 
    /* return a resource with `Implementation`. Hide the java stuff completely*/ 
  }
}


The algebra of streaming

This isn't FS2 specific but this salient comment from the omniscient Rob Norris is worth repeating:

Rob Norris @tpolecat Sep 23 22:58
Yeah in general it [fold] has to be lazy because you might try to fold something infinite.
And the thing that is not immediately intuitive is that foldLeft cannot possibly terminate for an infinite structure, but foldRight might.

Think about where the parens stack up. With foldLeft it looks like (((((((((( forever.

With foldRight it looks like a + (b + (c + ... forever.

So if + is lazy you might be able to stop after a while.


Saturday, September 28, 2019

Applicative Functors and Validation


Applicatives

Laws Applicative instances must follow:

map(apply(x))(f)         == apply(f(x))
join(apply(x), apply(y)) == apply((x, y))

Recall that "functions are applicative functors" (LYAHFGG) and applicatives have a function of type:

f (a -> b) -> f a -> f b.

So, where is this function on scala.Function1 etc?

Basically, the authors of Scala didn't put that functional programming goodness into the standard library and you have to roll-your-own or use a library like Cats (see below).

Applicative Functor

I liked the second definition here at StackOverflow which basically says the defining Functor function is:

  def map[A, B](f : A => B): C[A] => C[B]

"And it works perfectly for a function of one variable. But for a function of 2 and more, after lifting to a category, we have the following signature:"

val g = (x: Int) => (y: Int) => x + y

for example:

Option(5) map g // Option[Int => Int]

The defining Applicative function is:

  def apply[A, B](f: F[A => B]): F[A] => F[B]

"So why bother with applicative functors at all, when we've got monads? First of all, it's simply not possible to provide monad instances for some of the abstractions we want to work with—Validation is the perfect example. Second (and relatedly), it's just a solid development practice to use the least powerful abstraction that will get the job done. In principle this may allow optimizations that wouldn't otherwise be possible, but more importantly it makes the code we write more reusable"
[ibid]

Validation

The author of this answer raises a good example of  where monads are not appropriate, something that struck me this week when I was writing validation code.

Monads are great. Why they're great is because I don't need to change my code when I mess up. For instance, I had some parsing code that looked like this:

  def parse: T[String] = for {
    a <- parent
    b <- parseElementA
    c <- parseElementB
  } yield {
    ...
  }

and naively my parseXXX functions return Options. The problem here is that if we fail to parse an element, None doesn't tell us why. No worries, let's use Either which (since Scala 2.12) is a monad too! Now my parseXXX methods will tell me if they fail why they fail and I never had to change the above block of code!

The next problem occurred when the QA told me that he must run the whole application again to find the next error in the data he is feeding into it. In the cloud (GCP), this is a royal pain. So, wouldn't it be great to aggregate all the errors?

As mentioned in the SO answer above, it's simply not possible to do this with monads. Fortunately, "there's a simpler abstraction—called an applicative functor—that's in-between a functor and a monad and that provides all the machinery we need. Note that it's in-between in a formal sense." [SO]

Cats and Validation

Cats has a type to help here called Validated. But note that "what’s different about Validation is that it is does not form a monad, but forms an applicative functor." [eed3si9n]

So, with a few imports from Cats, I can write the even more succinct:

  def doApplicatives: T[String] = (parseParentparseElementAparseElementB).mapN { case (x, y, z) =>
    ...
  }

What's more, since Options and Eithers are also applicatives (as are all monads) this code still works just as well with them because "every monad is an applicative functor, every applicative functor is a functor, but not every applicative functor is a monad, etc." [SO]

Note that Scalaz also gives us an applicative functor instance for Option, so we can write the following:

import scalaz._, std.option._, syntax.apply._def add(i: Int, j: Int): Int = i + j

val x: Option[Int] = ...
val y: Option[Int] = ...

val xy = (x |@| y)(add)

So, for validation, don't use monads, use applicatives and some library to add syntactic sugar.

Sunday, September 22, 2019

Git strategies


I've normally worked in a team where each developer has his own feature branch. My current team instead has each developer working on his own fork. The reason for this is the main branch is not "polluted" with long-dead development.

In addition, squashing the commits (see below) means each feature only adds one commit to the main codebase.

Forking code

For a codebase with:

git clone FORK_TO
git remote add ARBITRARY_NAME  FORK_FROM
git fetch ARBITRARY_NAME
git checkout -b ARBITRARY_BRANCH_NAME
git merge  --allow-unrelated-histories ARBITRARY_NAME/master
git push --set-upstream origin ARBITRARY_BRANCH_NAME

Origins and aliases

Git's remote labels are just that - aliases for URLs of repositories. To see them, run:

git remote -v

"Remotes are like nicknames for the URLs of repositories - origin is one, for example." [SO]

By convention, "upstream generally refers to the original repo that you have forked ... origin is your fork: your own repo on GitHub, clone of the original repo of GitHub" [SO]

You can delete remote labels with:

git remote rm upstream

Note that this cannot in itself damage your codebase.

Anyway, having forked from the main repository, you may want to add an alias like this:

git remote add upstream URL_OF_CODE_WE_FORKED_FROM

Briefly, "you have access to pull and push from origin, which will be your fork of the main ... repo. To pull in changes from this main repo, you add a remote, upstream in your local repo, pointing to this original and pull from it.

"So origin is a clone of your fork repo, from which you push and pull. Upstream is a name for the main repo, from where you pull and keep a clone of your fork updated, but you don't have push access to it.” [SO]

Merging from the main branch

You might first want to see what change you're pulling [SO] so do a:

git diff COMMIT_ID~ COMMIT_ID

This assumes the previous commit "squashed" their commits (see below). This makes the main fork cleaner.

You can checkout your personal main branch and run:

git pull upstream MAIN_BRANCH

or a:

git reset --hard upstream MAIN_BRANCH

rather than a rebase

You may sometimes have Git tell you that you are working on a detached head. What does this mean? “You are not on a branch (the commit is likely to be on multiple branches). You are checked out to a specific instance in the history… You are checked out to a specific commit.” [SO]

After doing this, your personal main branch has the team's changes and you can merge like usual.

If you really foul things up after pushing to your personal code base, fear not. You can rollback a commit with:

git revert THE_HASH_FOR_COMMIT_YOU_WANT_TO_FORGET

this will leave commit in the log though. See the line “less dangerous approach” here [SO].

Merging to the main branch

When pushing your code to the main branch, you can "squash" all the commits. This means that in the future, developers will just see the salient changes without all the commits that went to make that featuer.

You can squash commits with:

git rebase -i MAIN_BRANCH

You'll be asked to edit a file. Keep the first line and replace 'pick' in subsequent lines with 's'. Then:

git push -f origin YOUR_BRANCH

Now you're good to merge with the team's codebase, although you may not have privileges to do that...

Note: you might want to squash your code before merging changes from another branch as this makes any conflicts much easier to resolve.

Miscellaneous

Some people use fetch and some use pull. What's the difference? “In the simplest terms, git pull does a git fetch followed by a git merge.

"You can do a git fetch at any time to update your remote-tracking branches under refs/remotes//.

"This operation never changes any of your own local branches under refs/heads, and is safe to do without changing your working copy… A git pull is what you would do to bring a local branch up-to-date with its remote version, while also updating your other remote-tracking branches.” [SO]

Rolling back

How to rollback depends on what exactly you mean by rollings back (do you want a record of the abandoned commits or not? etc). The simplest way is to just checkout [SO] like:

git checkout HASH_OF_DESIRED_COMMIT .

and note that '.' at the end of the line. You can then git push origin HEAD:YOUR_BRANCH

There are variants on this but this way will retain your history - which may or may not be what you desire. For instance, this line above will literally give you the code as it was at that point in time. If you want to re-apply any changes from other branches that you had previously pulled, Git will think this has already been applied and do nothing. Similarly, if you pull this branch into another that already has those changes, they will not be deleted just because the branch you're pulling from no longer has them. You need to revert.

Merges also cause a problem for revert. This is because when rolling back each commit, Git does not know by default which branch to continue rolling back in the event of it encountering a merge. you have to run something like git revert -m 1 YOUR_HASH..HEAD to revert. The -m 1 tells Git which branch to follow.