Sunday, July 19, 2020

Cancellation idioms

Java IO interrupt refresher

"The InterruptibleChannel interface is a marker that, when implemented by a channel, indicates that the channel is interruptible... Most, but not all, channels are interruptible.

"Channels introduce some new behaviors related to closing and interrupts. If a channel implements the InterruptibleChannel interface, then it's subject to the following semantics. If a thread is blocked on a channel, and that thread is interrupted (by another thread calling the blocked thread's interrupt() method), the channel will be closed, and the blocked thread will be sent a ClosedByInterruptException.  Additionally, if a thread's interrupt status is set, and that thread attempts to access a channel, the channel will immediately be closed, and the same exception will be thrown." [Java NIO, Hitchens]

Summarising, if a thread is interrupted before or during a blocking call on a channel, the channel is closed and an exception is thrown.

"It may seem rather draconian to shut down a channel just because a thread sleeping on that channel was interrupted. But this is an explicit design decision made by the NIO architects.  Experience has shown that it's impossible to reliably handle interrupted I/O operations consistently across all operating systems."

"Interruptible channels are also asynchronously closable. A channel that implements InterruptibleChannel can be closed at any time, even if another thread is blocked waiting for an I/O to complete on that channel. When a channel is closed, any threads sleeping on that channel will be awakened and receive an AsynchronousCloseException. The channel will then be closed and will be no longer usable." [ibid]

The problems with Java

Daniel Spiewak @djspiewak Apr 24 19:47, 2020
There are a couple things with thread interruption that are horrible:

There's no way to build "uninterruptible" code. Meaning that you cannot have a critical section which acquires a resource atomically in several steps. Or in other words, there is no analogue to the acquire action in bracket.

The only way to detect self-cancelation for valid purposes (e.g. resource cleanup) is catching the InterruptedException, but doing this immediately flips the interrupted bit on the Thread back to false! 

The only solution to this is to do Thread.currentThread().interrupt() at the end of your exception handler, which almost no one knows to do. To make matters more annoying, even if you do this correctly, you mess up the stack trace on the interruption, because it's technically a new interrupt.
Oh, and exception handlers are not critical regions either, so if someone is just hammering the interrupt() button over and over externally, you could catch the exception, try to clean things up, and then get immediately interrupted again. This actually happens a lot because of the next point.

Catching Exception or Error will silently catch InterruptedException, even when that's almost guaranteed to not be what you want to do. This leads to silently ignoring interruption in most code paths, which is why people repeatedly hammer interrupt() in the first place.

The problems also go deeper than just Java but down to the OS level. "The underlying stream may not know it its closed until you attempt to write to it (e.g. if the other end of a socket closes it)" [SO]. "There's no API for determining whether a stream has been closed." [SO]

The semantics of cancelling

This is a complicated area. (see the interruption model proposed for Cats Effects 3 at https://github.com/typelevel/cats-effect/issues/681)
Basically interruptible/uninterruptible are not composable. The best way to think about it is that "interruptable means always accept the interrupt, no matter what", while "uninterruptible means always suppress the interrupt no matter what". But the uninterruptible(fa >> interruptible(fb) >> fc) breaks either one of the guarantees.
So you have to choose: do you want resource leaks (by biasing in favor of the innermost in that context), or do you want possible deadlocks (by biasing in favor of the outermost)?
And you can't even phrase it as inner/outer, because you can do the same thing in reverse: uninterruptible(interruptible(fa >> uninterruptible(fb) >> fc))
..
as prior art here, Haskell tried all of these and ultimately decided that mask/poll was the sanest solution [Daniel Spiewak, Gitter]

An alternative architecture

"it's a mistake to think of interruptibility as being an attribute of threads or fibers. Instead it should be an attribute of the activities which run on the threads/fibers, and of necessity, that means that any interruptible activity must have it's own first class interrupt channel. If we go down that route then an activity is interruptible if it 1) has an interrupt channel and 2) it's interrupt channel is accessible. If it doesn't have an interrupt channel, or the channel is hidden somehow, then it's uninterruptible.

"Exposing an explicit interrupt channel on every blocking operation that we want to be interruptible is obviously a lot more laborious than just firing random interrupts at globally visible threads/fibers and hoping for the best, but I think it's the only way to go."
[Miles Sabin]


How does this affect Effectful Systems?

Integrating Scala code that uses effectful libraries with Java IO code can cause problems.

Gavin Bisesi @Daenyth Feb 05 21:16, 2020
InputStream is always a blocking api
(note you don't need much to make a blocker; Blocker.apply gives a Resource of one)

Daniel Spiewak @djspiewak Feb 05 21:46, 2020
FYI, all things involving files are blocking except on Windows, and even then they're blocking most of the time.
So the "NIO stuff" that is inside of getResourceAsStream is actually not NIO but rather regular IO wrapped up with a thread pool :-(
I generally use Blocker just to be safe on resource access. It doesn't really cost that much in terms of syntax
... non-blocking things must have a callback-driven API
either directly (via callbacks passed to functions) or indirectly (via Future or CompletableFuture)
If something doesn't have a callback API, then you know it's blocking


When can you cancel?

The effect of a cancellation is felt at every asynch boundary or every N flatMaps (where N=1 for ZIO, it seems).

Note that there is no code in ZIO nor Cats that calls Thread.interrupt() that I could find. Note however that ZIO still gives you the ability to wrap your code in a Future and cancelling this will lead to an InterruptedException (see this gist).

Note that there are still undefined areas in Cats regarding cancellation:

Raas Ahsan @RaasAhsan Jul 07 20:26
calling cancel right after start results in non-deterministic behavior

Fabio Labella @SystemFw Jul 08 19:38
yeah I wanted to say
fa.guarantee(foo) doesn't guarantee that foo will always happen
it guarantees that if fa happens, then foo always happen
In particular if you have fa.guarantee(foo).start.flatMap(_.cancel)foo might happen or not, because the program can be cancelled before fa gets scheduled to run

See tip #2 at this Cats video (An Introduction to Interruption by Jakub Kozlowskiat 11'03" ) where starting and joining in a for comprehension is an anti-pattern (what if one fails to complete?). Instead, one should use (ioa, iob).parTupled.


Tuesday, July 14, 2020

Does this still happen in 2020?


Lazy data modellers kick the can down the road when choosing an inappropriate type. For instance, we had a field that could only be a calendar date modelled as a java.sql.Timestamp and was told to ignore the time element. 

The trouble is, Timestamp contains no timezone information. To illustrate the problem here, a colleague in Romania created a Parque file with Spark that contained today's date at 0 hours, 0 minutes and 0 seconds - midnight, right? He then sent it to me only for me to see:

scala> val df = spark.read.parquet("/home/henryp/Downloads/part-00000-d41a228d-65fe-47ff-a70f-825a3cc61846-c000.snappy.parquet")
df: org.apache.spark.sql.DataFrame = [Timestamp: timestamp]

scala> df.show()
+-------------------+
|          Timestamp|
+-------------------+
|2020-07-12 22:00:00|
+-------------------+

Horrors - that's yesterday! (Romania is currently 2 hours ahead of the UK).

Of course, it should be 2020-07-13 00:00:00, right? Well, no. Since I'm currently in the BST timezone, that's actually 1 hours off UTC and only the godless would use anything but UTC.

The problem is compounded by the string representation of the dates being specific to the locale of the programmer. My Romanian colleage might have double checked his data before sending it and been lulled into a false sense of security that the timestamps indeed had a zeroed time component. 

Wednesday, July 8, 2020

ZIO cheats


Here are some brief notes I made to help myself become more familiar with ZIO.

The ZIO Monad

The ZIO monad is defined as ZIO[-R, +E, +A] where R is the requirement, the E is the error and the A is the value.

To feed the requirements into ZIO, use the compose function, its >>> synonym or a provideXXX function.


Useful type aliases

A U in ZIO type names indicates that some code cannot error. For instance:

UIO[+A]      = ZIO[Any, Nothing, A]
URIO[-R, +A] = ZIO[R,   Nothing, A]

Mnemonic: think of that U as an indication the monad is Unexceptional.

Note that you can't have a reference to Nothing. For example:

Welcome to the Ammonite Repl 2.0.4 (Scala 2.13.1 Java 1.8.0_241)
@ val x: Nothing = "Hello" 
cmd0.sc:1: type mismatch;
 found   : String("Hello")
 required: Nothing
val x: Nothing = "Hello"

Since something can never be Nothing, we know that branch of the code cannot be executed. 


Error handling

... is perhaps simpler in ZIO when compared to Cats. You simply call catchAll on your ZIO monad (see this article for fallback, folding and retries).

If we want to ignore an exception then you can do something like:

    val result:     ZIO[Any, DBError, Unit] = makeUser.provideLayer(fullLayer)
    val exit:       UIO[Int]                = UIO(1)   
    val resultExit: ZIO[Any, DBError, Int]  = result *> exit
    val afterCatch: ZIO[Any, Nothing, Int]  = resultExit.catchAll(_ => exit)

Obviously, this won't compile:

  override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] = {
    val exitResult: ZIO[Any, DBError, Unit] = exit *> result
    exitResult.catchAll(_ => exit)
  }

as we've only changed the error channel type but the value value is still Unit.

If you want to only take the successful results, you need something like:

  val xs:        List[IO[Throwable, String]] = ???
  val successes: UIO[List[String]]           = ZIO.collectAllSuccesses(xs)

One way to get you hands on an error is using ZIO.flip which can be useful in testing [SO].


Combinators

The joke in the Cats world is that the answer is always Traverse. But in ZIO, this operation is collectAll (or collectAllPar if you want the work done in parrallel). It's a pity that that ZIO didn't call it Sequence (a close relative of Traverse) as this has precedent in the plain old Scala world in Future.sequence.

jdegoes 12 April 2020 at 2:10 PM (Discord)
x1.ignore &> x2.ignore &> x3. This will execute x1x2, and x3 in parallel, using zipRightPar  (that's the &> operator, zips two effects together in parallel, returning whatever is produced on the right), and ignore the result of x1 and x2 so their failures don't influence the result of the computation.
 
In ZIO, even parallel zip or collect or foreach operations will "kill" the other running effects if one of them fails. Because that's often what you want. To get the other behavior, just use .ignore in the right places to ignore the failures you don't care about.


Threads

ZIO effects can be executed on other Scala ExecutionContexts with the use of ZIO.on.

Alternatively, you might like to use zio.blocking.XXX which "provides access to a thread pool that can be used for performing blocking operations, such as thread sleeps, synchronous socket/file reads, and so forth. The contract is that the thread pool will accept unlimited tasks (up to the available memory) and continuously create new threads as necessary." [JavaDoc]

For the purposes of cancellation: 

adamfraser 25/6/2020 at 11:13 PM (Discord)
Interruption is checked between each flatMap.

If for whatever reason you can't run your code in a zio.App, you can call zio.Runtime.default.unsafeRun on your effect.


Interop with plain Scala

ZIO maps nicely to plain Scala classes. For instance, ZIO[?, E, A].either yields a ZIO that wraps a plain, old Scala Either[E, A]. Going the other way is simply a ZIO.fromEither call.

Similarly, .option will yield a ZIO that wraps a None or Some[A].


Resource management

ZIO resourcement management is syntactically similar to Cats where bracket takes the code to close the resource and the code that uses the resource:

  val toInputStream: Task[InputStream] = ...

  toInputStream(resource).bracket(close(_)) { input =>
    ZIO {
       new FileOutputStream(output)
    }.bracket(close(_)) { output =>
      ZIO { IOUtils.pipe(input, output) }
    }


Testing

If you want ZIO tests to be picked up as part of a build that uses JUnit then your class should extend zio.test.junit.JUnitRunnableSpec.

You can fail tests on timeout with:

import zio.test.TestAspect._
import zio.duration._

  override def spec: ZSpec[TestEnvironment, Any] = suite("...")(testM("..."){
...
  } @@ timeout(10 seconds)



Monday, July 6, 2020

Unit testing with ZIO


ZIO has the concept of Layers. These allow you to inject requirements to your ZIO monad (the R in ZIO[R, E, A]). The nice thing about them is that they make testing easier.

Say, I have an application with an intialisation piece and a data flow piece. My code would look like this:

import zio.{Has, Task, ZIO}

  type Init = Has[Init.Service]

  object Init {
    trait Service {
      def initializeWith(configFile:  String,
                         session:     SparkSession,
                         fs:          FileSystem): ZIO[Any, Throwable, Settings]
    }
...
  type Flow = Has[Flow.Service]

  object Flow {
    trait Service {
      def flow(s:       Settings,
               paths:   Filenames,
               session: SparkSession,
               fs:      FileSystem): Task[Results[String]]
    }

Now, the data structure I want to test is something like a ZIO[Init with Flow, Throwable, A] and the code that creates it is probably some large and complicated for-comprehension that's begging for a test. 

In production, the code to "run" it would look like:


    val prodInitialization: ULayer[Init]    = ZLayer.succeed(InitProd)
    val prodDataFlow:       ULayer[Flow]    = ZLayer.succeed(FlowProd)
...
    zio.provideLayer(prodInitialization ++ prodDataFlow)

where InitProd and FlowProd are my production objects that implement Init and Flow

My test code, however, look like:

import zio.test.Assertion._
import zio.test.environment.TestEnvironment
import zio.test.junit.JUnitRunnableSpec
import zio.test.{ZSpec, assertM, suite, testM}
import zio.{Layer, Task, ZIO, ZLayer}

class InitFlowSpec extends JUnitRunnableSpec {

  def unhappyPathInitLayer(e: Exception): Layer[Nothing, Init] = ZLayer.succeed(
    new Init.Service {
      override def initializeWith(configFile: String, session: SparkSession, fs: FileSystem) = ZIO.fail(e)
    }
  )
...
  val error                         = new Exception("Boo")
  val withNoErrors: Results[String] = Map.empty

  override def spec: ZSpec[TestEnvironment, Any] = suite("Orchestration")(
    testM("returns error if initialisation fails"){
      val layers    = unhappyPathInitLayer(error) ++ flowLayer(withNoErrors)
      val result    = zio.provideLayer(layers)
      assertM(result.run)(fails(equalTo(error)))
    }
  )

Et voila, we have a mocking framework for the requirement channel of a ZIO monad.