Lambda Architecture has become very fashionable but how best to test it in a single JVM?
Basically, Lambda Architecture is composed of three layers.
- a large store of data (eg, HDFS) against which you can run batch jobs
- real time data processing (eg, Spark Streaming)
- a fast access layer (eg, Cassandra)
Ideally, you want to code such that your integration tests can be run in your IDE with no additional systems needed. This can be done as follows.
HDFS
Hadoop has a MiniCluster that facilitates testing. Pulling it into your project is simply a matter of adding a dependency to your pom.xml (see here). Note: I had to do some fiddly hacks to get it working with my Windows machines without installing binaries.
Once you have changed your POM, you can use it with:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.{DistributedFileSystem, MiniDFSCluster}
import java.nio.file.Files
.
.
val baseDir = Files.createTempDirectory("tests").toFile.getAbsoluteFile
val conf = new Configuration()
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath)
val builder = new MiniDFSCluster.Builder(conf)
val hdfsCluster = builder.build()
val distributedFS = hdfsCluster.getFileSystem
val hdfsUri = "hdfs://127.0.0.1/" + hdfsCluster.getNameNodePort + "/"
Spark Streaming
There is an excellent article on how to use Spark Streaming in you integration tests here. Given a SparkContext, you can parallelize any data you have in memory and test against that as in the link.
But, if you want to run against a real Kafka instance, you want code that looks something like the following. Let's say we're dealing just with Strings. You can add your own compression etc later. But let's define the following:
type KeyType = String
type ValueType = String
type RddKeyValue = (KeyType, ValueType)
Now assume we have Kafka and Zookeeper up and running. We'll need a function that takes the Kafka stream:
import org.apache.spark.streaming.dstream.DStream
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.mapred.JobConf
.
.
def handler(path: String): (DStream[(KeyType, ValueType)]) => Unit = {
stream =>
stream.foreachRDD { rdd =>
if (rdd.count() > 0) {
// do what you need to do here.
// After, we save the data to HDFS so:
rdd.saveAsNewAPIHadoopFile(
path,
classOf[KeyType],
classOf[ValueType],
classOf[TextOutputFormat[KeyType, ValueType]],
createHdfsPersistenceJobConf)
}
}
}
def createHdfsPersistenceJobConf: JobConf = {
val jobConf = new JobConf
jobConf
}
Note: this createHdfsPersistenceJobConf method must be a separate method in an object otherwise you'll see runtime messages such as:
NotSerializableException: org.apache.hadoop.mapred.JobConf
because it will otherwise have a reference to its enclosing function.
Now, we need a function that kicks the whole streaming process off:
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
.
.
def creatingFunc(checkpointFolder: String): () => StreamingContext = () => {
val conf = new SparkConf().setMaster("local[*]").setAppName("myAppName")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val streamingContext = new StreamingContext(conf, Duration(3000))
streamingContext.checkpoint(checkpointFolder)
val dStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[KeyType, ValueType, StringDecoder, StringDecoder](streamingContext,
kafkaConf,
Set("my_topic_name"))
handler(checkpointFolder)(dStream)
streamingContext
}
Then tell Spark Streaming to start using this function:
val streamingContext = StreamingContext.getOrCreate(checkpointFolder, creatingFunc(checkpointFolder))
streamingContext.start()
The handler will be called first at the StreamingContext.getOrCreate and then as Spark Streaming polls the stream.
One final note: Spark fires up a Jetty instance as an admin tool. This was initially giving me:
class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>2.6.0</version>
</dependency>
So, you need to make sure your dependency on Jetty (org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016 for me) is the first in your classpath.
No comments:
Post a Comment