Wednesday, January 30, 2019

Setting up a Hadoop/Spark/Hive/Kafka cluster


We wanted full control of what was running in our cluster so we installed our own rather than use Cloudera or HortonWorks. These are some things I learned along the way.

HDFS

Getting a simple distributed filesystem up and running was relatively straight forward. This StackOverflow answer and this gives the minimum work required.

Spark

Spark doesn't need Hadoop to run. So, simplest thing that works, I started a Spark cluster by following the instructions at DataWookie. This made running a Spark shell in a cluster as simple as adding the switch --master spark:SPARK_MASTER_HOST:7077.

Although simple, such configuration brought down my cluster when I tried to do something that was computationally expensive (my Linux boxes died with oom_reaper messages in /var/log/messages).

So, I did the sensible thing and ran it under YARN with the --master yarn switch.

YARN

We didn't find YARN needed much additional configuration. The defaults were sufficient with the exception of:
  1. yarn.nodemanager.vmem-check-enabled should be set to false as it kept saying my Spark jobs did not have enough virtual memory (see StackOverflow).
  2. yarn.resourcemanager.hostname should be set to the machine where YARN's ResourceManager runs. Failing to do this will likely lead to HDFS running on the other nodes in the cluster but not YARN jobs (see StackOverflow).
  3. yarn-site.xml yarn.nodemanager.resource.memory-mb should be set to be a high percentage of the total memory of the cluster (StackOverflow). Note that the default is a measly 8gb so your Spark jobs will silently use few resources if you don't change this.
These are to be set in yarn-site.xml. The only other file to configure was capacity-scheduler.xml as I was the only person using the cluster and wanted to hog the resources (in particular, yarn.scheduler.capacity.maximum-am-resource-percent).

Please do remember to copy the $HADOOP_HOME/etc directory onto all nodes in the cluster when you make changes and restart the cluster for good measure.

Check that all your nodes are running by executing:

yarn node -list

Hive

Hive was the most annoying to get running so beware. It appears that in addition to copying the JARs from this article (Apache) on how to integrate it with Spark, you also must copy the spark-yarn and scala-reflect JARs into Hive's lib directory and also remove the Hive 1.2.1 JARs from the HDFS directory.

You'll need to configure Hive to talk to the database of your choice to allow it to store its metadata. This StackOverflow answer is for MySQL but the principle remains the same for me when I was using Derby.

Derby

Start Derby with something like $DERBY_HOME/bin/startNetworkServer -h HOST where HOST is the IP address you've told Hive to bind to.

This Apache resource is good for checking your Derby server is up and running.

Kafka 

Kafka and Zookeeper were surprisingly easy to set up as the Apache Quickstart page was very well written. It just remains to say that Kafka "keys are used to determine the partition within a log" (StackOverflow) and that if you want to clean the logs this StackOverflow answer may be useful

Check your disk space

Note that the data for old jobs hang around in $SPARK_HOME/work so you regularly need to clean this up.

If you don't, jobs just won't start and Yarn will complain there are not enough resources but not tell you what those deficient resources are. That your disk space is running low is not obvious.

YARN barfs because yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage is set to 90%. So, it is not sufficient to have "enough" disk space in absolute terms. See Cloudera for more information.

In addition, you may want to set hadoop.tmp.dir in core-site.xml to a partition with lots of free space or you might see Spark jobs failing with "Check the YARN application logs for more details".

If in doubt...

... check the logs.

Keep checking $HADOOP_HOME/logs, particularly the yarn-*.log files. They'll give an indication of why things are not working although the messages can sometimes be misleading.

The disk space issue above manifested itself when Hive was running even though it was ultimately due to Yarn. Hive would throw timeout exceptions in its log. YARN would say there were not enough resources even though nothing else was running (yarn application -list). Looking at YARN's pending applications, I saw just my job sitting there but not running. The hint was that the equivalent job would run on Spark when running outside of Yarn and on the Spark master.

Hive's logs default to /tmp/HIVE_USER/hive.log and this is the first place to turn when things go wrong.



Tuesday, January 22, 2019

Chaining Monads


What is exactly going on when we chain monads? Here is some Scalaz code to demonstrate.

First, we create the monad:

import scalaz.Monad

sealed trait MonadX[+A] {
  def run(ctx: Context): A
}

object MonadX {

  def apply[A](f: Context => A): MonadX[A] = new MonadX[A] {
    override def run(ctx: Context): A = f(ctx)
  }

  implicit val monad = new Monad[MonadX] {
    override def bind[A, B](fa: MonadX[A])(f: A ⇒ MonadX[B]): MonadX[B] = 
      MonadX(ctx ⇒ f(fa.run(ctx)).run(ctx))

    override def point[A](a: ⇒ A): MonadX[A] = MonadX(_ ⇒ a)
  }

}

We created two such monads that we will chain:

    case class Context(aString: String, aLong: Long)

    val hello: MonadX[String] = MonadX { ctx: Context =>
      ctx.aString
    }
    val meaningOfLife: MonadX[Long] = MonadX { ctx: Context =>
      ctx.aLong
    }

Not very useful, are they? But you get the idea. Now, all we want is a for-comprehension, so:

  val concatLength: MonadX[Int] = for {
    x <- hello
    y <- meaningOfLife
  } yield (x + y).length

You can think of monads as programs, so let's run it:

  val ctx         = Context("hello", 42)

  val length: Int = concatLength.run(ctx)

Using this highly de-sugared and non-FP code to demonstrate, the flow of control can be given as this:

About to run for-comprehension
==============================
bind: Creating boundHello with fa=Hello, f=<function1>

This is just the first part of our for-comprehension (x <- greeting). Note that nothing further is executed as monads are lazy. All the bind operation did was create a new MonadX containing a function. We never applied that function.

Only when we run the outer monad (concatLength.run(ctx)) does the 'program' execute:

About to run boundHello
=======================
boundHello.run
    boundHello.f(ctx) = 
        Hello.run
            helloFn(ctx) = 
                'hello'
        Hello.run Finished
        <function1>(hello) = 
            bind: Creating boundMeaningOfLife with fa=MeaningOfLife, f=<function1>
            'boundMeaningOfLife'
        boundMeaningOfLife.run
            boundMeaningOfLife.f(ctx) = 
                MeaningOfLife.run
                    meaningOfLifeFn(ctx) = 
                        '42'
                MeaningOfLife.run Finished
                <function1>(42) = 
                    Creating point (7) [Integer]
                    'point'
                point.run
                    point.f(ctx) = 
                        '7'
                point.run Finished
                '7'
        boundMeaningOfLife.run Finished
        '7'
boundHello.run Finished

So, what's happened? Well, first our monads hello and meaningOfLife  have had bind called with them (bind is another word for flatMap in some languages). The reason is that anything in a for-comprehension will have to be flatMapped as that's what we're doing under the covers. Yes,  de-sugared for-comprehension invokes map but a map can be substituted for a flatMap and a point (sometimes called unit, see the monad laws here). And this is where the point comes from in the above flow.

Leveraging this substitution, the Scalaz map, Monad.map is defined as

map[A,B](fa: F[A])(f: A => B): F[B] = bind(fa)(a => point(f(a)))

Since you can't see map and flatMap functions needed by the Scala compiler, where do they come from? They're provided by Scalaz in scalaz.syntax.FunctorOps.map and scalaz.syntax.BindOps.flatMap.

In my heavily de-sugared version of this code, I have given my functions names. But the Scala compiler is also giving me anonymous functions (<function1>) . These appear to be y <- meaningOfLife for the first one and the yield function, (x + y).length, for the second.

So, in a brief, hand-wavey summary: 
  1. the outermost monad is not hello but boundHello which wraps it. 
  2. boundHello calls run on its hello.
  3. It feeds the results from this into its function, f. This happens to be the block of code that is a result of y <- meaningOfLife. Since we're now using the bind/point substitution while mapping, we're given a boundMeaningOfLife.
  4. boundHello runs this boundMeaningOfLife, which, being a recursive structure, runs the same steps as 1 and 2 but on its wrapped MeaningOfLife monad.
  5. Again, like boundHello in step #3, boundMeaningOfLife calls its f function but this time the result is a point.
  6. Again, since it's a recursive structure, run is called upon point which returns the result of the yield function.
  7. Then this program's "stack" is popped all the way to the top with our result.


Tuesday, January 15, 2019

FP and Big Data


Distributed computing relies surprisingly heavily on mathematics.


Associativity

This just says that

A . (B . C) = (A . B) . C

for any operator '.'

Note that addition is obviously associative when subtraction isn't. However, there is a clever hack around this.

"Subtraction is not an associative operation. But it is associative under addition, where subtraction is defined to be the inverse operation, −1​ :N→N:n↦−n together with addition. This way, I can do subtraction, via addition of inverses, where 3 - 5 is desugared to 3 + 5−1.

"Subtraction as a standalone function is not associative, because 3−(−5−2)≠(3−(−5))−2. But addition is, in fact, principled, associative, commutative, preserves identity, respects inverses, and is semantically what you want.

"We care about associativity ... in the context of distributed programming ... because it means we can arbitrarily partition data. We care about identity because it means we know that recursion will terminate when we reach the identity element (it is a fixed point modulo our operation), and we know that if a function is also commutative, then we can arbitrarily combine our results.

"This is the model that Spark uses underneath the hood - you'll notice it only takes commutative monoidal functions for its jobs (i.e. sets of data with an operation that has an identity within the dataset, and is associative + commutative). The algebra of distributed programming is quite simple."

[Emily Pillmore, Gitter]

"In this context this means that if we have pipelines A, B and C, and we wish to join them to form a single pipeline ABC, we can either join A and B and then join the result AB and C, or we could join A to pipeline BC. " [Stephen Zoio's blog]


Commutivity

"Suppose you wrote this:

someNumbers.reduce(_ - _)

What would you expect the result to be? The short answer is: we don’t know. Since the operation is not associative and is not commutative, we have broken both constraints we need to have this operation work well. " [Nicholos Tietz-Sokolosky's blog]

Domain

"Unless the set A [to which binary operations are applied] is chosen carefully, they [binary operations] may not always be defined. For example, if one restricts one's attention to the positive integers, then the expression 3-5 has no meaning. There are two conventions one could imagine adopting to this. One might decide not to insist that a binary operations should be defined for every pair of elements of A, and to regard it as a desirable extra property of an operation if it is defined everywhere. But the convention actually in force is that binary operations do have to be defined everywhere, so that "minus," though a perfectly good binary operation on the set of all integers, is not a binary operation on the set of all positive integers." [Princeton Companion to Mathematics]

Similarly, a bad choice of domain can blow up in something like Spark. If you're adding Ints, their total will not exceed the maximum value, right?


Composability

"Whenever one is looking for a general solution to composability, monads are normally not too far away. By composability, we mean the output of one process is the input into another. And that is precisely what we are trying to do. A data processing pipeline consists of several operations, each joined together to form the pipeline. And we can join two or more pipelines to form larger pipelines."
[Stephen Zoio]

Zoio gives an example where wrapping calls to Spark in Monads makes things more re-usable but although the article is otherwise excellent, I found this unconvincing as each Spark call was tied to another. Instead, I refactored his Scalaz/Monad code to make it a little more divisible here.


Equality

Rob Norris in his "Gazing at the Stars" lecture makes some interesting points. He describes how a simple type alias of type Angle = Double caused thousands of dollars of damage as it was not sufficient for all comparisons. (The lecture is largely about mapping between different units).

At 11'34" into the video, Norris tells us Scala 2 lets you compare anything for equality, for example a URL and a Classloader. That's a bug that should be fixed in Scala 3. FP programmers prefer type classes as a solution for example Eq (pronounced "eek").

Note that Norris uses CatsSuite for testing. His code for the lecture can be found here, here and here. Norris' use of CatsSuite throws a lot of random numbers at the unit tests to try and flush out errors.


Monoids

The mathematical definition of Monoids and some Scalaz examples can be found in a previous post of mine here.

Note that even Haskell doesn't (can't?) enforce the Monoid rules. "There must be a value that acts as the identity with respect to the binary function and that the binary function must be associative. It's possible to make instances of Monoid that don't follow these rules... Haskell doesn't enforce these laws so we need to be careful that our instances do indeed obey them." [LYAHFGG]

The best way I've seen so far to address this issue is by the types giving a hint. The function "reduceLeft is not parallelizable, as it explicitly assumes a form that is not associative: (B,A) => B. As long as you use an associative operator, reduce is parallelizable." [SO] Whereas, (B,B) => B would imply associativity.

You need a monoid for fold operations to work properly and a commutative semigroup for reduce. "Commutative semigroups consists of all those semigroups in which the binary operation satisfies the commutativity property that ab = ba for all elements a and b in the semigroup." [Wikipedia]. Monoids are a specialization of semigroups that have an identity element.


Optimization

Obeying the FP laws allows Spark etc to optimize the code on our behalf.

"We can do this because we know our programs are invariant under certain substitutions. Referential transparency gives us the most basic rule: we can inline anything, and factor anything out. Lawful abstractions give us more substitution rules for our toolbox. Functor tells us that fa.map(f).map(g) = fa.map(f andThen g) for instance, so we get an optimization opportunity that we can apply freely in our programs." [tpolecat Jan 09 17:33, Gitter]

However, as with all performance testing, it's an empirical science. Using the almost canonical example of an AST for basic arithmetic, Noel Walsh ("Unite Church and State") says: "each constructor in the FP implementation becomes a method in the OO implementation. This removes the need for the pattern matching. The transformation is known as Church encoding.

"We can go the opposite way as well: convert every method in the OO representation into a case class in a sealed trait (an algebraic data type), and then use a pattern match to chose the action to take (a structural recursion). This transformation is known as reification.

"We can think of OO and FP not as entirely different programming paradigms but choices we make to encode a solution to the problem according to the tradeoffs we want to make.  Let’s see an example where where this transformation is useful. One aspect ... is performance. In the FP representation we must allocate memory to hold the data structure that represents the operations we want to perform."


Frameless

The Frameless project brings a more FP approach to using Dataframes. Now, one can already use the more strongly-typed Datasets to give you compile-time error checking but "unfortunately, this syntax does not allow Spark to optimize the code." Frameless apparently gets around that problem will still giving type safety.

However, I did change a field name but foolishly did not change it elsewhere in the codebase where I called partitionBy. I asked on their Gitter channel how Frameless could save me from this but was told:
Cody Allen @ceedubs Jan 12 19:19
@PhillHenry as far as I can tell, Frameless doesn't specifically have support for that partitionBy method. But in general that is the sort of problem that frameless solves.
So, maybe a future pull request...?