Thursday, July 9, 2015

Setting up a little Spark/HDFS cluster

Setting up Spark is not too hard but there are few environment issues you might need to know about. For this, I'm assuming you're using some Linux distro.

The environment

First, set up SSH keys on your boxes so you don't have to type your password all the time. This is a good read. I also use rdist to keep everything in synch. Just be careful about copying the data directory over...

A simple Spark cluster

Having installed the various binaries, the documentation covers most of what you need to know. I have two computers, nuc and phillsdell. Somewhat arbitrarily, on nuc, I run the master with:

export SPARK_MASTER_IP=[nuc's IP address]
export SPARK_LOCAL_IP=[nuc's IP address]

and a slave with

$SPARK_INSTALLATION/sbin/ spark://[nuc's IP address]:7077

On phillsdell, I run:

$SPARK_INSTALLATION/sbin/ spark://[nuc's IP address]:7077

for a slave and:

$SPARK_INSTALLATION//bin/spark-shell --master  spark://[nuc's IP address]:7077

for the Spark shell.

If you see lots of errors like this:

15/07/06 08:40:42 INFO Worker: Connecting to master akka.tcp://sparkMaster@nuc:7077/user/Master...
15/07/06 08:40:53 INFO Worker: Retrying connection to master (attempt # 1)

you'll need to configure your firewall to accept all the node chatter. If you're using Ubuntu, this is a good resource.

Anyway, to check everything is OK, let's run:

scala> val myRdd = sc.parallelize(1 to 1000000)
myRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at :21

scala> val myRddWKeys = => (x % 1000, x))
myRddWKeys: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at map at :23

scala> val myRddReduced = myRddWKeys reduceByKey (_ + _)
myRddReduced: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[6] at reduceByKey at :25

scala> myRddReduced foreach (key => println(s"key = ${key._1} value = ${key._2}") )

15/07/06 10:19:43 INFO SparkContext: Starting job: foreach at :28

This creates an RDD with a million numbers, divides them into one thousand groups and reduces the numbers in each group by adding them all up. Finally, for each group, we print out the reduced number. The results can be seen at http://phillsdell:8081 and http://nuc:8081 where there are 500 rows printed to stdout on each slave.


Now, let's add Hadoop to the mix by starting it on our boxes.

If you see something in the namenode log like:

2015-07-03 12:17:31,300 ERROR org.apache.hadoop.hdfs.server.namenode.NameNode: Failed to start namenode.
org.apache.hadoop.hdfs.server.common.InconsistentFSStateException: Directory /tmp/hadoop-henryp/dfs/name is in an inconsistent state: storage directory does not exist or is not accessible.

then don't forget to run:

hadoop namenode -format

(more information here). I also needed to add this:

    <value>hdfs://[phillsdell's IP address]</value>

to my ./etc/hadoop/core-site.xml file.

Now, I can go to:


and see both data nodes in my cluster :)

Pulling it all together

I download historical data of the FTSE 100 from here. Using this helpful list of commands, I executed:

hadoop-2.7.0/sbin$ ../bin/hadoop fs -mkdir /ftse100
hadoop-2.7.0/sbin$ ../bin/hadoop fs -put ~/Downloads/table.csv /ftse100

and noticed in the web admin page that the file has been copied onto HDFS. Now, in the Spark shell:

scala> val csvRDD = sc.textFile("hdfs://")

scala> csvRDD.count
res3: Long = 8211

Which is not coincidentally the line count of the original file:

$ wc -l ~/Downloads/table.csv 
8211 /home/henryp/Downloads/table.csv

So, it appears that my standalone spark cluster is quite happily talking to my Hadoop HDFS.

No comments:

Post a Comment