Wednesday, September 30, 2015

Future notes (part II)


There's some really nice aspects to Scala's Futures that make them very different to Java's. Because they're monadic, you can do all sorts of interesting things.

Differences between Java and Scala Futures

In Java, you merely block while waiting for the result by calling get(). That's it.

However, you can cancel Java futures. Scala "Futures come without a built-in support for cancellation" [1] although there are ways to roll-your-own.

So what can you do that's so special?

Because both Lists and Futures are monadic, you can "blend" them. For instance, say you have a List of Futures, you can convert them into a Future of Lists. This allows you to react to just a single Future with all the data rather than worrying about many Futures each with some data.

This is built in to the Scala API via Future.sequence. It basically folds over the monadic structure (eg a List) that's passed to it.

A somewhat simplified version of this code (I use the more specific List rather than sequence's more general TraversableOnce and avoids the magic of CanBuildFrom) looks like this:

  def mySequence[T](futures: List[Future[T]])(implicit ec: ExecutionContext): Future[List[T]] = {
    def foldFn(agg: Future[List[T]], ft: Future[T]): Future[List[T]] = {
      agg flatMap { list =>
        ft map {
          element => list :+ element
        }
      }
    }

    val seed: Future[List[T]] = Future {
      List[T]()
    }

    futures.foldLeft (seed) (foldFn)
  }

Or, we could use a for-comprehension to give it some syntactic sugar:

    def 
foldFn(agg: Future[List[T]], ft: Future[T]): Future[List[T]] = for {
      list <- agg
      t <- ft
    } yield (list :+ t)

They're equivalent. Whether it's this simplified method or the more complicated one in the Scala API, you'd call it with something like:

    val listOfFutures = List(createSimpleFuture("this"), createSimpleFuture("is"), createSimpleFuture("a"), createSimpleFuture("test"))
    val futureOfLists = mySequence(listOfFutures)
    val lists         = Await.result(futureOfLists, Duration(5, TimeUnit.SECONDS))
    println("lists from future of lists from list of futures: " + lists) 


  def createSimpleFuture(eventualResult: String)(implicit ec: ExecutionContext): Future[String] = {
    Future {
      Thread.sleep(100)
      eventualResult
    }
  }

which prints out:

List(this, is, a, test)

More funky stuff in part III.

[1] Learning Concurrent Programming in Scala by Prokopec, Aleksandar - Aleksandar Prokopec

Friday, September 25, 2015

Scala's Return


I was reading this StackOverflow answer about the distinction between methods and functions. It had the usual explanations, for example a method "can be easily converted into [a function]" but "one can't convert the other way around"; methods "can receive type parameters"; "a function is an object" etc.

But as pointed out here, return behaves differently in a function compared to a method and can be used to short-circuit a fold etc.

For example:

  def caller(): Any = {
    println("caller started")
    val g = () => { return true } // note: return type must be compatible with this method
    called(g)
    println("caller finished")    // this is never executed!
  }

  def called(innocuousFn: () => Boolean): Unit = {
    println("called: about to call innocuousFn...")
    innocuousFn()
    println("called: finished")   // this is never executed!
  }

prints out:

caller started
called: about to call innocuousFn...

That is, when the flow of control hits the function's return statement, the execution stack is popped as if it were in the enclosing method (it's illegal for a function outside a method to have a return statement).

Worse, if one method passes a returning function back to the method that calls it, you get a Throwable. After all, where does that function return from?

  def executeReturningFnFromOtherMethod(): Unit = {
    println("about to call calledReturnsFnThatReturns")
    val fn: () => Boolean = returnsFnThatReturns()
    println("about to call fn")
    fn()                // Exception in thread "main" scala.runtime.NonLocalReturnControl
    println("finished") // never executes
  }

  def returnsFnThatReturns(): () => Boolean = {
    val fn: () => Boolean = () => {
      return makeTrue
    }
    fn
  }

  def makeTrue(): Boolean = {
    return true
  }

Prints out:

about to call calledReturnsFnThatReturns
about to call fn
Exception in thread "main" scala.runtime.NonLocalReturnControl


Methods that may or may not compile in Scala


If you want to sum some integers in a List, you just call the sum method. So far, so obvious. But if the list does not hold elements that are summable, how does Scala give you a method that you may or may not call?

The sum method lives in TraversableOnce and the signature looks like this:

  def sum[B >: A](implicit num: Numeric[B]): B = ...

Here [B >: A] says must be a superclass of A, the type of elements in the list. The implicit says there must exist in the ether something that provides the functionality in trait Numeric for type B (the functionality for plus, minus etc).

Now, for Int, Double etc you get these for free in the Numeric object that's standard to Scala where you see, for example:

  trait IntIsIntegral extends Integral[Int] { ...

which is pulled into the implicit ether with:

  implicit object IntIsIntegral extends IntIsIntegral with Ordering.IntOrdering

in the same file. Note, that unlike Java, Scala's numerics do not extend a common superclass like java.lang.Number.

So, we could happily create our own arithmetic with something like:

  class MyInteger

  trait MyIntegerOrdering extends Ordering[MyInteger] {
    def compare(x: MyInteger, y: MyInteger) = ???
  }
  trait MyIntegral extends Numeric[MyInteger] { // or Integral that extends Numeric[T]
    def plus(x: MyInteger, y: MyInteger): MyInteger = new MyInteger
.
.
.
  }
  implicit object MyIntegerIsIntegral extends MyIntegral with MyIntegerOrdering

It's not a terribly useful arithmetic but it would allow us to have a list of MyIntegers and call sum on it.

Thursday, September 10, 2015

Notes on the Future (part 1)


Scala's Futures are very different to Java's. Where Java's Futures encourage blocking by their API, Scala's encourages asynchronous coding... apart from one gotcha that follows shortly.

Futures are Monads so they have all that Monad goodness and we can use them in for comprehensions. Unfortunately, this very idiom can introduce a bug. Take this code:

for {
  i <- Future(5)
  j <- Future(11)
} yield j

It creates two futures operating that return the values 5 and 11, possibly on different threads, and returns a Future that contains 11 just like a good Monad should.

"Here, the beauty of the for comprehension sugar can work against us. The for comprehension hides all of that ugly nesting from us, making it look like these are on the same level, but nesting is indeed happening here. What might look concurrent isn't." [1]

Gotcha

What happens here is the first Future is executed and only then is the second. The solution is simple. The code should look like this:

val iFuture = Future(5)
val jFuture = Future(11)
for {
  i <- iFuture
  j <- jFuture
} yield j

For just returning a number, you probably won't notice anything amiss. But for something more ambitious, you may very well notice that the first cut of this code is sub-optimal.

"Only men's minds could have unmapped into abstraction..."

In Scala, we can get our dirty little hands on the contents of a Future by mapping like:

import scala.concurrent.ExecutionContext

    implicit val xc = ExecutionContext.global // needed for next map:

    j map { value =>
      println("j = " + value)
    }

but there are other ways, too. Being monads, we can flatMap and so if we slightly modify the code in [1]:

  implicit val xc = ExecutionContext.global

  val timeToWait = Duration(5, TimeUnit.SECONDS)

  val future1 = Future[Int] {
    (1 to 3).foldLeft(0) { (a, i) =>
      log("future1 "  + i)
      pause()
      a + i
    }
  }

  val future2 = Future[String] {
    ('A' to 'C').foldLeft("") { (a, c) =>
      log("future2 " + c)
      pause()
      a + c
    }
  }

  def spliceFutures(): Unit = {
    val awaitable = future1 flatMap { numsum: Int =>
      log("in future1.flatMap") // ForkJoinPool-1-worker-1: in future1.flatMap

      future2 map { str: String =>
        log("in future2.map") // ForkJoinPool-1-worker-5: in future2.map
        (numsum, str)
      }
    }

    val awaited = Await.result(awaitable, timeToWait)
    println(awaited) // (6,ABC)
  }

  def log(msg: String): Unit = {
    println(Thread.currentThread().getName() + ": " + msg)
  }

"The most powerful way to use these combinators is in combining the results from futures that are already executing. This means that we can still have a lot of independent futures running, and have another piece of code combine the eventual results from those futures into another "final" future that operates on those results."[1]

You'll notice from the log statements that the code in the map and flatMap run in different threads.

But wait, there's more. You can add callbacks with onSuccessonFailure and onComplete, all of which have return type of Unit (so you know that the functions passed to it must have side-effects) and there is no guarantee that they'll run on the same thread that ran the body of the Future.

  val onCompleteFn: Try[Boolean] => Unit = {
    case Success(x)   => println(" success: " + x)
    case Failure(err) => println(" failure: " + err + ", " + err.getCause)
  }
.
.
    aFuture onComplete onCompleteFn 

In fact, Future.onComplete is the daddy. All the other combinator methods (map, flatMap, recover, transform etc) all call it. We'll come back to these other combinators in another post.

Problems in your Future

If an Exception is thrown in your Future, then it will be captured in the failure case in onCompleteFn above. But if an Error is thrown, you'll get a Boxed Error.

Another surprise comes from filter. For Lists, Trys and Options, things are quite straightforward:

    val myList = List(1,2,3)
    val filteredList = myList.filter(_ > 10)
    println(filteredList)   // List()

    val myOption = Option(1)
    val filteredOption = myOption.filter(_ > 10)
    println(filteredOption) // None

    val myTry = Try(1)
    val filteredTry = myTry.filter(_ > 10)
    println(filteredTry)    // Failure(java.util.NoSuchElementException: Predicate does not hold for 1)

That is, a filter whose predicate leads to nothing is represented by the "negative" subclass of the monads. However, there is no such class for an "empty" Future.

    import scala.concurrent.ExecutionContext.Implicits._
    import scala.concurrent.duration._

    val myFuture       = Future { 1 }
    val filteredFuture = myFuture filter {_ > 10}
    filteredFuture onComplete {
      case Success(x) => println(s"success: $x")
      case Failure(x) => println(s"failure: $x")
    } // failure: java.util.NoSuchElementException: Future.filter predicate is not satisfied

    Await.result(filteredFuture, 1.second) // Exception in thread "main" java.util.NoSuchElementException: Future.filter predicate is not satisfied

That is, the Failure case is the path taken by onComplete. If we were so foolish to block using Await, then an Exception is thrown.

[1] Akka Concurrency, Derek Wyatt.