Monday, July 20, 2015

Shapeless


There's a very interesting Scala library called Shapeless that let's you do some weird things with types. One such "what magic is this?!" moment is asserting a checksum derived from a list of numbers is a certain value at compile time rather than run time.

This impressed me no end, even if I can't think of a practical use for it. I'm still trying to get my head around Travis Brown's excellent tutorial since he is so much more clever than me. But here are some notes I made along the way.

First, we borrow a bit from the Peano Axioms where every natural number is a successor to the previous except 0 which has no successor. We model this much like Shapeless does with something like:

  trait NaturalNumber

  trait NaturalNumber0 extends NaturalNumber

  trait NaturalNumberSucceeds[T <: NaturalNumber] extends NaturalNumber

Next we'll have a type representing each and every number. I'll only bother with the natural numbers 0 to 4 to make things simple:

    type Type0 = NaturalNumber0
    type Type1 = NaturalNumberSucceeds[Type0]
    type Type2 = NaturalNumberSucceeds[Type1]
    type Type3 = NaturalNumberSucceeds[Type2]
    type Type4 = NaturalNumberSucceeds[Type3]

This is just shorthand. Really, Type4 is of type:

NaturalNumberSucceeds[NaturalNumberSucceeds[NaturalNumberSucceeds[NaturalNumberSucceeds[NaturalNumber0]]]]

But that's far too verbose.

So much for the types, let's have some instances:

    implicit val _0 = new Type0 {}
    implicit val _1 = new Type1 {}
    implicit val _2 = new Type2 {}
    implicit val _3 = new Type3 {}
    implicit val _4 = new Type4 {}

Now comes our first function that makes assertions at compile time. It asserts that T is the immediate successor to U and looks like this:

  def immediateSuccessor[T <: NaturalNumberSucceeds[U], U <: NaturalNumber]: Unit = {}

And we can see it immediately working:

    immediateSuccessor[Type3, Type2] // compiles
//  immediateSuccessor[Type3, Type1] // doesn't compile because Type3 is not the immediate successor to Type1

A more useful examples tells us what is the successor without us having to assert:

    implicit def add1[A <: NaturalNumberSucceeds[_]](implicit b: NaturalNumberSucceeds[A]): NaturalNumberSucceeds[A] = b
    val result3add1: Type4 = add1[Type3]

Well, we assert insofar as we define the type of result3add1 but we didn't have to.

Subtraction proved a little more difficult for me to understand. Playing around with some Shapeless code I got this:

    trait NaturalPair[A <: NaturalNumber, B <: NaturalNumber]

    implicit def sub[A <: NaturalNumber, B <: NaturalNumber](implicit e: NaturalPair[NaturalNumberSucceeds[A], NaturalNumberSucceeds[B]])
      = new NaturalPair[A, B] {}

So, A is the number we want to start with and then we reduce it by B. This implicit function will then recurse until one or the other reaches 0.

    implicit val analyseThis = new NaturalPair[Type3, Type1] {}
    val result3sub1          = implicitly[NaturalPair[Type2, Type0]]
//  val result3sub1Wrong     = implicitly[NaturalPair[Type1, Type0]] // doesn't compile because 3 - 1 != 1

Here we send into the implicit ether a request to take a 3 and subtract a 1. This is picked up by our implicit sub function that decrements both 3 and 1 until one or both reach 0. We then assert that a (2,0) pair is out there in the ether and indeed it is. Our code compiles.

The example Brown uses is much more complicated than this but I thought this would be a more accessible introduction.

Thursday, July 9, 2015

Setting up a little Spark/HDFS cluster


Setting up Spark is not too hard but there are few environment issues you might need to know about. For this, I'm assuming you're using some Linux distro.

The environment

First, set up SSH keys on your boxes so you don't have to type your password all the time. This is a good read. I also use rdist to keep everything in synch. Just be careful about copying the data directory over...

A simple Spark cluster

Having installed the various binaries, the documentation covers most of what you need to know. I have two computers, nuc and phillsdell. Somewhat arbitrarily, on nuc, I run the master with:

export SPARK_MASTER_IP=[nuc's IP address]
export SPARK_LOCAL_IP=[nuc's IP address]
$SPARK_INSTALLATION/sbin/start-master.sh

and a slave with

$SPARK_INSTALLATION/sbin/start-slave.sh spark://[nuc's IP address]:7077

On phillsdell, I run:

$SPARK_INSTALLATION/sbin/start-slave.sh spark://[nuc's IP address]:7077

for a slave and:

$SPARK_INSTALLATION//bin/spark-shell --master  spark://[nuc's IP address]:7077

for the Spark shell.

If you see lots of errors like this:

15/07/06 08:40:42 INFO Worker: Connecting to master akka.tcp://sparkMaster@nuc:7077/user/Master...
15/07/06 08:40:53 INFO Worker: Retrying connection to master (attempt # 1)

you'll need to configure your firewall to accept all the node chatter. If you're using Ubuntu, this is a good resource.

Anyway, to check everything is OK, let's run:

scala> val myRdd = sc.parallelize(1 to 1000000)
myRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at :21

scala> val myRddWKeys = myRdd.map(x => (x % 1000, x))
myRddWKeys: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at map at :23

scala> val myRddReduced = myRddWKeys reduceByKey (_ + _)
myRddReduced: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[6] at reduceByKey at :25

scala> myRddReduced foreach (key => println(s"key = ${key._1} value = ${key._2}") )

15/07/06 10:19:43 INFO SparkContext: Starting job: foreach at :28
.
.

This creates an RDD with a million numbers, divides them into one thousand groups and reduces the numbers in each group by adding them all up. Finally, for each group, we print out the reduced number. The results can be seen at http://phillsdell:8081 and http://nuc:8081 where there are 500 rows printed to stdout on each slave.

HDFS

Now, let's add Hadoop to the mix by starting it on our boxes.

If you see something in the namenode log like:

2015-07-03 12:17:31,300 ERROR org.apache.hadoop.hdfs.server.namenode.NameNode: Failed to start namenode.
org.apache.hadoop.hdfs.server.common.InconsistentFSStateException: Directory /tmp/hadoop-henryp/dfs/name is in an inconsistent state: storage directory does not exist or is not accessible.

then don't forget to run:

hadoop namenode -format

(more information here). I also needed to add this:

  <property>
    <name>fs.default.name</name>
    <value>hdfs://[phillsdell's IP address]</value>
  </property>
  <property>
    <name>dfs.data.dir</name>
    <value>file:///home/henryp/Tools/Cluster/Hadoop/tmp/dfs/name/data</value>
    <final>true</final>
  </property>
  <property>
    <name>dfs.name.dir</name>
    <value>file:///home/henryp/Tools/Cluster/Hadoop/tmp/dfs/name</value>
    <final>true</final>
  </property>

to my ./etc/hadoop/core-site.xml file.

Now, I can go to:

http://phillsdell:50070/dfshealth.html#tab-datanode

and see both data nodes in my cluster :)

Pulling it all together

I download historical data of the FTSE 100 from here. Using this helpful list of commands, I executed:

hadoop-2.7.0/sbin$ ../bin/hadoop fs -mkdir /ftse100
hadoop-2.7.0/sbin$ ../bin/hadoop fs -put ~/Downloads/table.csv /ftse100

and noticed in the web admin page that the file has been copied onto HDFS. Now, in the Spark shell:

scala> val csvRDD = sc.textFile("hdfs://192.168.1.15:8020/ftse100/table.csv")
.
.

scala> csvRDD.count
.
.
res3: Long = 8211

Which is not coincidentally the line count of the original file:

$ wc -l ~/Downloads/table.csv 
8211 /home/henryp/Downloads/table.csv

So, it appears that my standalone spark cluster is quite happily talking to my Hadoop HDFS.