Setting up Spark is not too hard but there are few environment issues you might need to know about. For this, I'm assuming you're using some Linux distro.
The environment
First, set up SSH keys on your boxes so you don't have to type your password all the time. This is a good read. I also use rdist to keep everything in synch. Just be careful about copying the data directory over...
A simple Spark cluster
Having installed the various binaries, the documentation covers most of what you need to know. I have two computers, nuc and phillsdell. Somewhat arbitrarily, on nuc, I run the master with:
export SPARK_MASTER_IP=[nuc's IP address]
export SPARK_LOCAL_IP=[nuc's IP address]
$SPARK_INSTALLATION/sbin/start-master.sh
and a slave with
$SPARK_INSTALLATION/sbin/start-slave.sh spark://[nuc's IP address]:7077
On phillsdell, I run:
$SPARK_INSTALLATION/sbin/start-slave.sh spark://[nuc's IP address]:7077
for a slave and:
$SPARK_INSTALLATION//bin/spark-shell --master spark://[nuc's IP address]:7077
If you see lots of errors like this:
15/07/06 08:40:42 INFO Worker: Connecting to master akka.tcp://sparkMaster@nuc:7077/user/Master...
15/07/06 08:40:53 INFO Worker: Retrying connection to master (attempt # 1)
Anyway, to check everything is OK, let's run:
scala> val myRdd = sc.parallelize(1 to 1000000)
myRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at
scala> val myRddWKeys = myRdd.map(x => (x % 1000, x))
myRddWKeys: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at map at
scala> val myRddReduced = myRddWKeys reduceByKey (_ + _)
myRddReduced: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[6] at reduceByKey at
scala> myRddReduced foreach (key => println(s"key = ${key._1} value = ${key._2}") )
15/07/06 10:19:43 INFO SparkContext: Starting job: foreach at
.
.
This creates an RDD with a million numbers, divides them into one thousand groups and reduces the numbers in each group by adding them all up. Finally, for each group, we print out the reduced number. The results can be seen at http://phillsdell:8081 and http://nuc:8081 where there are 500 rows printed to stdout on each slave.
HDFS
Now, let's add Hadoop to the mix by starting it on our boxes.
If you see something in the namenode log like:
2015-07-03 12:17:31,300 ERROR org.apache.hadoop.hdfs.server.namenode.NameNode: Failed to start namenode.
org.apache.hadoop.hdfs.server.common.InconsistentFSStateException: Directory /tmp/hadoop-henryp/dfs/name is in an inconsistent state: storage directory does not exist or is not accessible.
then don't forget to run:
hadoop namenode -format
<property>
<name>fs.default.name</name>
<value>hdfs://[phillsdell's IP address]</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>file:///home/henryp/Tools/Cluster/Hadoop/tmp/dfs/name/data</value>
<final>true</final>
</property>
<property>
<name>dfs.name.dir</name>
<value>file:///home/henryp/Tools/Cluster/Hadoop/tmp/dfs/name</value>
<final>true</final>
</property>
to my ./etc/hadoop/core-site.xml file.
Now, I can go to:
http://phillsdell:50070/dfshealth.html#tab-datanode
and see both data nodes in my cluster :)
Pulling it all together
I download historical data of the FTSE 100 from here. Using this helpful list of commands, I executed:
hadoop-2.7.0/sbin$ ../bin/hadoop fs -mkdir /ftse100
http://phillsdell:50070/dfshealth.html#tab-datanode
and see both data nodes in my cluster :)
Pulling it all together
I download historical data of the FTSE 100 from here. Using this helpful list of commands, I executed:
hadoop-2.7.0/sbin$ ../bin/hadoop fs -mkdir /ftse100
hadoop-2.7.0/sbin$ ../bin/hadoop fs -put ~/Downloads/table.csv /ftse100
and noticed in the web admin page that the file has been copied onto HDFS. Now, in the Spark shell:
scala> val csvRDD = sc.textFile("hdfs://192.168.1.15:8020/ftse100/table.csv")
.
.
scala> csvRDD.count
.
.
res3: Long = 8211
Which is not coincidentally the line count of the original file:
$ wc -l ~/Downloads/table.csv
8211 /home/henryp/Downloads/table.csv
So, it appears that my standalone spark cluster is quite happily talking to my Hadoop HDFS.
No comments:
Post a Comment