Sunday, June 28, 2020

Error handling - the Cats way


Exceptions in the Java world suck as you don't necessarily know what will be "returned" by a method. Even if the exception is declared (and they often are not) then every method can "return" your expected value or maybe an Exception.  

So, how are they handled in the FP library, Cats?

Why is MonadError useful?

As ever, Rob Norris gives an immensely insightful answer on Gitter:

laiboonh @laiboonh Jun 27 14:44
Why can't I simply represent error with Either for example

Rob Norris @tpolecat Jun 27 14:45
It lets you abstract over things like Either.
 
Which is kind of a bad answer.
Some effects have an error channel. IO for instance. When you're talking to a database or something there are a billion things that can go wrong and you'll spend your whole life passing Eithers around, containing possible errors that you'll never be able to handle. So it's nicer to bury them in the effect itself and only summon them when you're in a position to do something about them.
 
Either can work this way. Your effect can be Either[MyError, ?]
But you can also have something like IO[Either[MyError, A]] which has an error channel for "bad" errors and an explicit Either for errors that you want the user to confront immediately. Kind of like runtime vs. checked exceptions in Java.
 
In any case MonadError lets you abstract over effects that have an error channel.
So you can write code that works with IO or Future or Try or Either or ...

@ def tryDiv[F[_]](a: Int, b: Int)(implicit ev: ApplicativeError[F, Throwable]): F[Int] =
    if (b != 0) ev.pure(a / b) else ev.raiseError(new ArithmeticException)
defined function tryDiv

@ tryDiv[IO](3, 1).unsafeRunSync
res4: Int = 3

@ tryDiv[IO](3, 0).unsafeRunSync
java.lang.ArithmeticException
  ammonite.$sess.cmd3$.tryDiv(cmd3.sc:2)
  ammonite.$sess.cmd5$.<clinit>(cmd5.sc:1)

@ tryDiv[Either[Throwable, ?]](3, 1)
res6: Either[Throwable, Int] = Right(3)

@ tryDiv[Either[Throwable, ?]](3, 0)
res7: Either[Throwable, Int] = Left(java.lang.ArithmeticException)

@ tryDiv[Try](3, 1)
res9: Try[Int] = Success(3)

@ tryDiv[Try](3, 0)
res10: Try[Int] = Failure(java.lang.ArithmeticException)


ApplicativeError and Option

You'll notice that Rob doesn't mention Option and there's a good reason for that. "This will not work for Option though : instances of these constructs do not exist for Options (the reason becomes obvious if you think 5 minutes about it)." [Reddit]

The answer for the impatient is because None carries no further information.  and "you will be throwing away some information and behaviour." [Ruben Pieters blog]

"I've played around with this a bit and it definitely breaks the existing laws for MonadError (and ApplicativeError) since the error value is always thrown away." [Cats Github issue]


Handling Errors

We can handle errors monadically avoiding the short-circuiting mechanics that you'd expect with a monad:

  val o1:     IO[Int] = IO(1)
  val o2:     IO[Int] = IO(2)
  val badBoy: IO[Int] = IO(1/0)

  val handledErrors: IO[Int] = for {
    a <- badBoy.handleErrorWith(_ => o1)
    b <- badBoy.orElse(o2)
  } yield a + b

Note that orElse is syntactic sugar and just defers to handleErrorWith

There's some nice error handling code in fs2-kafka:

(
        implicit F:      MonadError[F, Throwable],
                 jitter: Jitter[F],
                 timer:  Timer[F]
      )
...
        def retry(attempt: Int): Throwable => F[Unit] = {
          case retriable: RetriableCommitFailedException =>
            val commitWithRecovery = commit.handleErrorWith(retry(attempt + 1))
            if (attempt <= 10) backoff(attempt).flatMap(timer.sleep) >> commitWithRecovery
            else if (attempt <= 15) timer.sleep(10.seconds) >> commitWithRecovery
            else F.raiseError(CommitRecoveryException(attempt - 1, retriable, offsets))

          case nonRetriable: Throwable =>
            F.raiseError(nonRetriable)
        }

Nicely, this is stacking Fs like beads on a thread. Some Fs pause (calls to backoff return  F[_]s) and some Fs do the actual work of trying to commit (commit is a  F[Unit]). But whatever problems we face, we can call F.raiseError and no matter what F actually is, the appropriate semantics of its type will be used to return a pass or a fail.

Conclusion

As long as your effect has a type class of MonadError or ApplicativeError, you can raise an error. What the nature of this error is depends entirely on the effect.


Sunday, June 21, 2020

Why flatMap is effectful magic


Yet again I was wondering why some Cats effect code was not running my effects and yet again I discovered I'd forgot to flatMap. Silly but it made me think why do we need to flatMap in the first place - besides being told to.

I was thinking: if map is defined as:

map[A, B]:     IO[A] => (A => B)     => IO[B]

and flatMap as

flatMap[A, B]: IO[A] => (A => IO[B]) => IO[B]

then if they both result in IO[B] then why does one necessarily run a program and one does not?

The omniscient Fabio Labella pointed me in the right direction on Gitter. He did this with a toy implementation of an IO:

class IO[A](val unsafeRun: () => A) {
  def map[B](f: A => B): IO[B] = new IO[B] { () =>
      val myResult = unsafeRun()
      val result   = f(myResult)
      result
  }
  def flatMap[B](f: A => IO[B]): IO[B] = new IO[B] { () =>
      val myResult = unsafeRun()
      val nextIO   = f(myResult)
      nextIO.unsafeRun()
   }
}
object IO {
   def apply(a: => A): IO[A] = new IO(() => a)
}

Here, we're simply suspending some functionality, a, into an IO with a very basic implementation of map and flatMap which were just ineluctably derived from the signatures I defined at the top of this post.

Fabio Labella @SystemFw 11:41
[Regarding map], say we do IO(readLine).map(_.toUpperCase)
and unsafeRun that, let's do a substitution to see what [it] translates to

IO { () =>
  val myResult = readLine
  myResult.toUpperCase
}
 
and when you unsafeRun that you get

   val myResult = readLine
   myResult.toUpperCase
}
 
which is the expected behaviour

So, map's type is fine for this use case. 

Fabio Labella @SystemFw 11:47
ok, so now let's take this other example

val read: IO[String]           = IO(readLine)
def print(s: String): IO[Unit] = IO(println(s))
val nope: IO[IO[Unit]]         = read.map(print)
nope.unsafeRun

which has not printed anything! (the function is still suspended)

Here, we want an unsafeRun that references a program not a container of a program (where "program == things of shape F[A]").

So, we have no choice but to use flatMap. It's the only signature that fits.

Fabio Labella @SystemFw 11:50    
For "map to be able to run effects", it would have to know that when B happens to be IO[C], it also needs to call unsafeRun on it, which breaks parametricity [ie, it does not conform to the signatures].
Not only that, it actually negates a useful pattern, sometimes you want an IO that produces another IO without running it.
Because you want to treat the inner one as a value (for example you want to send it to an in memory queue).
So, it's not just a theoretical problem, it has a practical impact.


Conclusion

1. Running effects via flatMap is not a convention. It's fundamental.
2. Turning on the compiler warnings would have caused the faulty code not to compile in the first place. Given:

    val printEffect:  Int => IO[Unit]     = x => IO { println(x) }
    val printEach:    Pipe[IO, Int, Unit] = { _.map(x => printEffect(x)) }
    printEach(Stream(1, 2, 3, 4, 5)).compile.toList

then the compiler will fail on printEffect(x) because although this returns a value that is not Unit, the compiler will treat it as if it does because of the type of printEach.


Friday, June 12, 2020

Spark, Parquet and Schemas


Recently, I've been playing with Spark and Parquet. Here are a few tidbits I thought were interesting.

Nulls

I was surprised to see that attaching a Schema to a DataFrame did not enforce its constraints ("Schema object passed to createDataFrame has to match the data, not the other way around" - SO). Instead, Spark does a "best efforts" attempt to have data read from HDFS to comply with the schema, but it does not throw exceptions if it can't be done. For instance, columns in the schema that don't appear in the file are filled with nulls even if the schema says they cannot be null.
"When you define a schema where all columns are declared to not have null values , Spark will not enforce that and will happily let null values into that column. The nullable signal is simply to help Spark SQL optimize for handling that column. If you have null values in columns that should not have null values, you can get an incorrect result or see strange exceptions that can be hard to debug."
Spark: The Definitive Guide: Big Data Processing Made Simple (p102)
The solution was to deserialize everything into the appropriate case class for the Dataset. Since I had to do bespoke validations on the objects, this wasn't too great an issue.

Note that "when writing Parquet files, all columns are automatically converted to be nullable for compatibility reasons" (from the Spark docs). The compatibility being the Parquet spec (see below) which we have to map to when writing.


Floating points

Reading a parquet file that had decimal(38, 10) for a floating point type, taking its schema with DataFrame.schema and then using that self same schema to load the Parquet again with:

spark.read.schema(theSchema).parquet("...").as[MyDomainObject].show()

unfortunately gives me:

org.apache.spark.sql.AnalysisException: Cannot up cast `REDACTED` from decimal(38,10) to decimal(38,18) as it may truncate
The type path of the target object is:
- field (class: "scala.math.BigDecimal", name: "REDACTED")
- root class: "MyDomainObject"
You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;

where MyDomainObject has a field that is simply a scala.math.BigDecimal.

The problem seems to be due to the underlying store from whence the data was plucked (as this SO question shows a similar problem with MySQL). 

Modifying the schema by hand to use only decimal(38, 18) solved the problem. This inconvenience has its own JIRA that's unfortuately marked as "Won't Fix".

Since "the types supported by the file format are intended to be as minimal as possible" (from the Parquet docs), the floating point types are your standard IEEE formats. It appears that if you want BigDecimals, the data will be saved in a Java representation by using Parquet's native BYTE_ARRAY types since Parquet is language agnostic. Spark assumes as it's defined as a decimal(38, 18) in SYSTEM_DEFAULT here.

Since this is the default, if you save your data in anything less, you must read the file with a schema if you're hoping to have a Dataset.as[MyDomainObject]. You might need to cast the original values as this StackOverflow answer suggests or else the decimal point might not be were you were expecting it to be.


Timestamps

As already mentioned, the Parquet types are restricted to a IEEE compliant types and a few primitive others. Therefore, all the richness of the Java ecosystem is lost and there needs to be a mapping between these primitives and Java classes. It appears that Dates and Timestamps are converted from the INT64 type as you'll see casting errors if your types misalign. 


Sharing schemas

You can load and save the schema as JSON if you like (as this SO answer shows). You can also add metadata to the schema with a simple:

import org.apache.spark.sql.types._
val metaBuilder = new MetadataBuilder()
val sillyFields = schemaFromJsonFile.map(_.copy(metadata = metaBuilder.putString("You", "suck!").build))
val sillySchema = StructType(sillyFields.toArray)
spark.read.schema(sillyStructType).parquet("...").show()

and all is good with the World.

The advantage of this is that you can pass annotated schemas between teams in a (fairly) human-readable format like JSON.


Conclusion

Spark schemas and Parquet do not align one-to-one since Spark schemas could apply to any file format (or even just data in memory). Some elements of the schema are lost when written to Parquet (eg, everything in Parquet is nullable irrespective of the Spark schema) and some types in Scala/Java are not represented in Parquet.

Saturday, June 6, 2020

Cracked Pipe


Pipes can get blocked as an old post of mine illustrates. So, how do we ensure that we can read and write to and from pipes in the effectful world?

Well, my first (rather poor) attempt was to use Java's PipedInputStream and PipedOutputStream and have ZIO handle the multithreading aspects of it. Although the test passes it is far from exhaustive.

One of the problems is that unbeknown to me that Java's piped IO classes are very thread sensitive. Consequently, my ZIO test was failing with:

    Fiber failed.
    A checked error was not handled.
    java.io.IOException: Read end dead
        at java.io.PipedInputStream.checkStateForReceive(PipedInputStream.java:262)
        at java.io.PipedInputStream.awaitSpace(PipedInputStream.java:268)
        at java.io.PipedInputStream.receive(PipedInputStream.java:231)
        at java.io.PipedOutputStream.write(PipedOutputStream.java:149)
        at java.io.OutputStream.write(OutputStream.java:75)
        at uk.co.odinconsultants.fp.zio.streams.LargePipeMain$.writing$1(LargePipeMain.scala:42)
        at uk.co.odinconsultants.fp.zio.streams.LargePipeMain$.$anonfun$piping$3(LargePipeMain.scala:58)
        at zio.internal.FiberContext.evaluateNow(FiberContext.scala:458)
        at zio.internal.FiberContext.$anonfun$evaluateLater$1(FiberContext.scala:687)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

This happens because the JDK code has something in it that looks like this:

...
        } else if (readSide != null && !readSide.isAlive()) {
            throw new IOException("Read end dead");
        }
...

and to be fair, the Java API documentation says:

"A pipe is said to be broken if a thread that was providing data bytes to the connected piped output stream is no longer alive."

But if your effect system uses fibres instead of threads then it is free to stop and start threads as it pleases. The bottom line is that wrapping side effecting code in a ZIO or a Cats IO might not be the first step in making your code more FP.