Thursday, February 26, 2015

Cassandra for the Busy - Part 2

Interesting problem I had this week that's obvious when you think about it.

The Config

I have a cluster of 5 Cassandra (version nodes. I execute:

    'class' :              'SimpleStrategy',
    'replication_factor' : 2 
CREATE TABLE my_table (
    my_int_id    varint,
    my_string_id varchar,
    my_value     decimal,
    PRIMARY KEY (my_int_id, my_string_id)

I then start hammering my cluster with some CQL like:

INSERT INTO my_table 
    (my_int_id, my_string_id, my_value)
    VALUES (1, "2", 42.0)

or such like. The code I use to write to the DB looks something like:

RegularStatement  toPrepare         = (RegularStatement) (new SimpleStatement(cql).setConsistencyLevel(ConsistencyLevel.TWO));
PreparedStatement preparedStatement = cassandraSession.prepare(toPrepare);
BoundStatement    bounded           = preparedStatement.bind(new BigInteger(myIntId), myStringId, new BigDecimal("42.0"));

The Error

I then kill a single node and my client starts barfing with:

Not enough replica available for query at consistency TWO (2 required but only 1 alive)

which baffled me initially because there were still 4 nodes in the cluster:

dse$ bin/nodetool -h status my_keyspace
Datacenter: Analytics
|/ State=Normal/Leaving/Joining/Moving
--  Address       Load       Tokens  Owns   Host ID                               Rack
UN  83.98 KB   256     40.0%  86df3d65-0e19-49b2-a49e-bc89d6347fa4  rack1
UN  82.78 KB   256     42.0%  8fghe485-0e19-49b2-a49e-bc89d6347fa5  rack1
UN  84.91 KB   256     38.0%  9834edf1-0e19-49b2-a49e-bc89d6347fa6  rack1
UN  88.22 KB   256     41.0%  873fdeab-0e19-49b2-a49e-bc89d6347fa7  rack1
DN  86.36 KB   256     39.0%  2bdf9211-0e19-49b2-a49e-bc89d6347fa8  rack1

Of course, the problem is that your tolerance to a node fault is:

    Tolerance = Replication Factor - Consistency Level

For me, that was Tolerance = 2 - 2 = 0 tolerance. Oops.

This is not a problem on all inserts. Running

dse$ bin/cqlsh -uUSER - pPASSWORD
Connected to PhillsCassandraCluster at
[cqlsh 4.1.1 | Cassandra | DSE 4.6.0 | CQL spec 3.1.1 | Thrift protocol 19.39.0]
Use HELP for help.

and inserting worked fine. But this is because cqlsh uses a different consistency level.

cqlsh> consistency
Current consistency level is ONE.

so, let's change that:

cqlsh> consistency TWO

Consistency level set to TWO.

Now do the inserts fail? Well sometimes. It depends on which nodes we attempt to write to, something transparent to the user.

The Problem

Cassandra will try to put a piece of data on REPLICATION_FACTOR number of nodes. These nodes will be determined by hashing their primary key. If a node is down, Cassandra will not choose another. It simply cannot be written to at this time (of course, it might get the data if/when the node joins the cluster).

Upon inserting, the client decides how many nodes must be successfully written to before the INSERT is declared a success. This number is given by your CONSISTENCY_LEVEL. If the contract cannot be fulfilled then Cassandra will barf.

Further Reading

Application Failure Scenarios with Cassandra

Lightweight Transactions in Cassandra 2.0

Monday, February 23, 2015

Cassandra For the Busy - Part 1

A quick guide to Cassandra admin

From the Cassandra installation directory, running:

bin/nodetool -uUSER -pPASSWORD -hNODE ring KEYSPACE

(where the username, password, node and keyspace correspond to whatever you set up) produces data that reads like this:

Token is a number between Long.MIN_VALUE and Long.MAX_VALUE that is used in hashing the key. (See org.apache.cassandra.dht.Murmur3Partitioner.describeOwnership for how ownership is calculated over  a cluster using something like a MurmerHash algorithm. This is the default for Cassandra ).

Load refers to how much has been flushed to disk. Therefore, this may take some time to register a change after you have truncated a table containing 10 million rows. This appears to correspond to org.apache.cassandra.service.StorageService.getLoad().

Owns This represents how much of the ring range a token covers. Note: it might not add up to 100%. "For DC [data centre] unaware replication strategies, ownership without replication will be 100%" (see code comments here. for org.apache.cassandra.service.StorageService.effectiveOwnership). But with replication, it appears to be 100% * the replication factor.

Saturday, February 21, 2015

Type Constructors

Type Constructors in Java and Scala

In "Functional Programming in Scala", I came across this code:

trait Foldable[F[_]] { ...

This is an example of something common in Java and Scala: type constructors.

"We write it as F[_], where the underscore indicates that F is not a type but a type constructor that takes one type argument. Just like functions that take other functions as arguments are called higher-order functions, something like Foldable is a higher-order type constructor or a higher-kinded type.

"Just like values and functions have types, types and type constructors have kinds. Scala uses kinds to track how many type arguments a type constructor takes, whether it’s co- or contravariant in those arguments, and what the kinds of those arguments are." [1]

However, each language differs in their use of type constructors.

"[I]n Scala it is not valid to say something is of type List, unlike in Java where javac would put the List<Object> for you. Scala is more strict here, and won’t allow us to use just a List in the place of a type, as it’s expecting a real type - not a type constructor." [2]

Java doesn't allow you to write something so abstract as the Foldable trait above. For instance, this won't compile:

public class Foldable<U<T>> { ...

Which brings us on to Types of a Higher Kind...

[1] Functional Programming in Scala

Sunday, February 15, 2015

More on those Monoids

I mentioned in a previous post about Scalaz' implementation of Monoids. Having run through the code with somebody better at Scala than I am, here are my notes.

Our code imports the implicit means of creating a monoid. The signature of the method is this:

  implicit def optionMonoid[A: Semigroup]: Monoid[Option[A]] = new Monoid[Option[A]] {

The [A: Semigroup] looks a little like a type parameter but it is totally different. It's a Context Bound that "are typically used with the so-called type class pattern, a pattern of code that emulates the functionality provided by Haskell type classes, though in a more verbose manner" (from this excellent explanation).

Context Bounds of the form [X: Y] say: I am expecting an implicit value of type Y[X]. In fact, this line is equivalent to Scalaz' implementation:

    implicit def optionMonoid[A](implicit x: Semigroup[A]): Monoid[Option[A]] = new Monoid[Option[A]] {

Now, recall that my code using the Scalaz library looked like this:

    implicit val stringSemigroup: Semigroup[String] = Semigroup.instance[String]((x, y) => x + y)

    val mySemigroup: Semigroup[Option[String]] = Semigroup[Option[String]]

The first line is me just defining my Semigroup - nothing interesting there.

The second is much more magic. It invokes this Scalaz code:

object Semigroup {
  @inline def apply[F](implicit F: Semigroup[F]): Semigroup[F] = F

which is somewhat confusing as F is used for more than one purpose. Basically, this line says: there should implicitly be a value of type Semigroup[F] where F is defined in the call site and I will return that value. (Note that the symbol 'F' is confusingly used to represent both the returned value and the type at the call site).

Well, the only implicit value that we defined is our own stringSemigroup and that's of type Semigroup[String] not Semigroup[Option[String]]. But then we also imported Scalaz' optionMonoid that says if there is a Semigroup[String] hanging around in the implicit ether, I can deliver a Monoid[Option[String]]. Since Monoid is a type of SemigroupmySemigroup is legitimately given this Monoid[Option[String]].

Et voila. Not immediately obvious to a Java programmer but that's how Scala does it.

Further reading

1. Stackoverflow: Scalaz learning resources.

Sunday, February 8, 2015

Things they don't tell you about Akka clustering

... or rather they do but it's hard to find. Akka promises to make multi-threaded, multi-JVM code easy. It's very nice but it's not necessarily easy.

Getting your actor system up and running is not too hard and there are good books on it. Derek Wyatt's book just missed the inclusion of cluster and "would probably add a couple of hundred pages" anyway. Here I've tried to give the minimum to get a cluster up and running.

The thing to note is that your Akka actors don't need to changes at all. It's all down to the configuration.

Ok, so let's start with a simple Actor that prints out everything he hears:

class PhillTheActor extends Actor {
  val myIndex = PhillTheActor.counter.incrementAndGet()

  override def receive: Receive = {
    case _ => {
      println("message received by actor #" + myIndex)
      Thread.sleep(1000) // very naughty

object PhillTheActor {
  final val counter = new AtomicInteger(0)

Very simple.

First, let's take the config. We'll create a class we can run:

object ClusterApp extends App {

  val systemName = "testSystem"
  val myPort     = args(0)
  val seedPort   = "9119"
  val actorSystem = ActorSystem(systemName, ConfigFactory.parseString(
        akka {
          actor.provider = "akka.cluster.ClusterActorRefProvider"
          remote.log-remote-lifecycle-events = on
          remote.netty.tcp.port = $myPort
          remote.netty.tcp.hostname =
          cluster {
              seed-nodes = [

We've (arbitrarily) chosen port 9119 to be the port of the seed node. We could have chosen any reasonable port number but the node that has the same value for myPort will be the one calling the shots as we'll see later.

"The seed nodes are configured contact points for initial, automatic, join of the cluster... [Y]ou can only join to an existing cluster member, which means that for bootstrapping some node must join itself." [1]

Now, we need a local router:
  val localSamplerActorName = "localSamplerActorName"
  val localRouter =   
    actorSystem.actorOf(Props(classOf[PhillTheActor]).withRouter(RoundRobinPool(nrOfInstances = 2)), localSamplerActorName)
We'll be using localSamplerActorName later so keep an eye on that.

Then we need the cluster router. From the scala docs:

"akka.routing.RouterConfig implementation for deployment on cluster nodes. Delegates other duties to the local akka.routing.RouterConfig, which makes it possible to mix this with the built-in routers such as akka.routing.RoundRobinRouter or custom routers."

We only need this on one node so:

  if (myPort == seedPort) {
      val loadBalancingGroup = AdaptiveLoadBalancingGroup(HeapMetricsSelector)
      val clusterRouterGroupSettings = ClusterRouterGroupSettings(
          totalInstances = 3,
          routeesPaths = List(s"/user/$localSamplerActorName"),
          allowLocalRoutees = true,
          useRole = None
      val clusterRouterGroup = ClusterRouterGroup(loadBalancingGroup, clusterRouterGroupSettings)
      val clusterActor       = actorSystem.actorOf(clusterRouterGroup.props, "clusterWideRouter")

There's a lot going on here. The argument totalInstances refers to how many instances in the cluster will be part of this load balancing.

The routeesPaths is the path to the actor that is the local router (see above) that is referenced by the variable localSamplerActorName.

The allowLocalRoutees is straightforward: does this node also process work or is it just a coordinator only?

As for roles: "Not all nodes of a cluster need to perform the same function: there might be one sub-set which runs the web front-end, one which runs the data access layer and one for the number-crunching. Deployment of actors—for example by cluster-aware routers—can take node roles into account to achieve this distribution of responsibilities." [1]

Notice that the cluster wide balancer uses a Group and the local balancer uses a Pool. "The Akka team has introduced two different types of routers in Akka 2.3: Pool and Group. Pools are routers that manage their routees by themselves (creation and termination as child actors) whereas Groups are routers that get pre-configured routees from outside." (from this blog). So, the local router using a round-robin pool will spin up actors for us while the cluster-wide router is using this pre-configured routing actor.

Finally, we send messages to this cluster just as we would any actor:

      while (true) {
        clusterActor ! "hello!"

whereupon PhillTheActor starts printing out messages. If I start this app again with other ports as arguments, then PhillTheActor on them also prints out messages.

Obviously, there is much more to Akka clustering but this got my simple app up and running.

[1]  Akka Cluster Usage docs

Further Reading

Adam Warski's blog.

Thursday, February 5, 2015

Monoids are easy

I've been avoiding the in-depth of functional programming for years. Now it seems I can't avoid it. So, without further a-do...


Don't be put off by the esoterica you might expect with monoids. They're easy to understand.

From Functional Programming Scala:

"A monoid consists of the following:
  • Some type A
  • An associative binary operation, op, that takes two values of type A and combines them into one: op(op(x,y), z) == op(x, op(y,z)) for any choice of x:A, y:A, z:A
  • A value, zero:A, that is an identity for that operation: op(x, zero) == x"

The first two properties describe a Semigroup.

Example of monoids include string concatenation, integer addition, float multiplication etc where the identities are the empty string, 0 and 1 respectively.

That's pretty easy. What about Options? Well, I've found two different ways of doing this. They give different answers but obey the same Monoid laws.

The first is in the FPS book. Chiusano and Bjarnason define their Monoid for Options as:

  def optionMonoid[A]: Monoid[Option[A]] = new Monoid[Option[A]] {
    def op(x: Option[A], y: Option[A]) = x orElse y
    val zero = None

This basically gives you the first Option that's a Some or, failing that, a None.

Whereas the Scalaz guys invoke op on the contents of the Option if the Option contains another Semigroup. Their implementation looks like this:

  implicit def optionMonoid[A: Semigroup]: Monoid[Option[A]] = new Monoid[Option[A]] {
    def append(f1: Option[A], f2: => Option[A]) = (f1, f2) match {
      case (Some(a1), Some(a2)) => Some(Semigroup[A].append(a1, a2))
      case (Some(a1), None)     => f1
      case (None, sa2 @ Some(a2)) => sa2
      case (None, None)         => None

    def zero: Option[A] = None

Where we invoke it with:

    val stringOption  = Option("a string ")
    val stringOption2 = Option("another string")

    implicit val stringSemigroup: Semigroup[String] 
      = Semigroup.instance[String]((x, y) => x + y)

    import scalaz.std.option.optionMonoid 

    val mySemigroup: Semigroup[Option[String]] = Semigroup[Option[String]]
    println(mySemigroup.append(stringOption, stringOption2)) // Some(a string another string)

It gives a different answer but in both cases the actions are associative and once again, Nones are ignored.

There's a lot of Scala (black?) magic going on here with implicits but that's for another post.