Friday, August 16, 2019

First steps in Apache Beam


Introduction

Caveat: I've just started learning Apache Beam specifically on GCP and these are some notes I've made. They may at this stage be unreliable. I will update this entry as I learn more.

Superficially like Spark

"Side inputs are useful if your ParDo needs to inject additional data when processing each element in the input PCollection, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline." (the Beam docs).

Side inputs are a bit like Spark’s broadcast variables, I'm told.

The Pipeline.apply(“name”, ParDo.of(new XXXDoFn)) is the Beam equivalent of Spark sending a function to its executors. The XXXDoFn must be serializable.

DoFns are annotated with @ProcessElement tag which passes in the required objects. Note that the DoFn does not need to conform to any particular interface.

Partitioning

“The elements in a PCollection are processed in bundles. The division of the collection into bundles is arbitrary and selected by the runner. This allows the runner to choose an appropriate middle-ground between persisting results after every element, and having to retry everything if there is a failure

“If processing of an element within a bundle fails, the entire bundle fails. The elements in the bundle must be retried (otherwise the entire pipeline fails), although they do not need to be retried with the same bundling.”

(from the Beam Documentation.)

The Runner

Beam is designed to run on many different platforms and therefore has many different Runners, with differing degrees of capability. These Runners allow your Beam code to run on Spark, Flink etc. Of note is the Direct Runner.

“The Direct Runner executes pipelines on your machine and is designed to validate that pipelines adhere to the Apache Beam model as closely as possible…  the Direct Runner performs additional checks” (the Beam Documentation).

This Runner should be fine for running tests. You can also use TestPipeline "inside of tests that can be configured to run locally or against a remote pipeline runner."

DataFlow

DataFlow is Google's implementation of a Beam runner.

“After you write your pipeline, you must create and stage your template file… After you create and stage your template, your next step is to execute the template.” (from the DataFlow documentation).

What sorcery is this?

"Developer typically develop the pipeline in the development environment and execute the pipeline and create a template. Once the template is created, non-developer users can easily execute the jobs by using one of these interfaces including GCP console, gcloud cli, or REST API... We simply run this command to compile and stage the pipeline so that it becomes accessible from several execution interfaces:

 mvn compile exec:java \
     -Dexec.mainClass=com.example.MyClass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=[YOUR_PROJECT_ID] \
                  --stagingLocation=gs://[YOUR_BUCKET_NAME]/staging \
                  --templateLocation=gs://[YOUR_BUCKET_NAME]/templates/"

according to Imre Nagi who has mixed views particularly on using Composer, Google's managed version of AirFlow ("especially about it’s versioning and debugging technique") for managing DataFlow jobs.

He says that one issue is "you must modify your pipeline code to support runtime parameters". This seems to run contrary to Beam's attempt to be an abstraction across all Big Data processing applications.

For instance, you need to use ValueProviders to pass config around. But, if your "driver" code accesses them (say in com.example.MyClass's main method) they won't be available in the template creation stage.

Saturday, August 10, 2019

Spark in Anger


Why doesn't Dataset have reduceByKey?

Well, it does have a reduce these days but it's marked as 'experimental'. StackOverflow says use a groupByKey instead then calling reduceGroups on the subsequent KeyValueGroupedDataset. Alternatively, this SO post says use a groupBy followed by an agg.


Too many files/buckets

If you try to write a Dataset using bucketBy, you might get too many files, as seen on the Spark mailing list here. The solution is that "you need to first repartition ... since each task will write out the buckets/files. If the bucket keys are distributed randomly across the RDD partitions, then you will get multiple files per bucket."


Making the stats of a Parquet file visible

No need for mr-parquet-tools (which didn't seem to want to show stats no matter how much coaxing even when setting the parquet.strings.signed-min-max.enabled Hadoop configuration). Using the Dataframe.describe method you can see the minimum and maximum values. The trick is to load each .parquet file as a Dataframe even if it is actually part of a greater whole as we do here.

It seemed that there needed to be 400 files created before a file would be shared by different values of the sortBy column. I don't know at the moment which configuration key controls this behaviour.


Deterministic Order?

The method zipWithIndex is odd. It is contiguous but not necessarily deterministic. This latter link shows that the index may change on re-evaluation [briefly: a zipWithIndex on an RDD that is then joined to itself does not necessarily have tuples that have both elements are equal to each other. This is surprising but not a bug since the RDD was never ordered before the call to zipWithIndex and therefore there are no guarantees].

None other than Sean Owen comments on StackOverflow:

"An RDD has a guaranteed order if it is the result of a sortBy operation, for example. An RDD is not a set; it can contain duplicates. Partitioning is not opaque to the caller, and can be controlled and queried. Many operations do preserve both partitioning and order, like map. That said I find it a little easy to accidentally violate the assumptions that zip depends on, since they're a little subtle."

This is just like SQL. Unless you order by, you can make no guarantees on the order even if it often presents results in order.


Co-location vs. co-partitioned

"The lack of a shuffle does not mean that no data will have to be moved between nodes. It's possible for two RDDs to have the same partitioner (be co-partitioned) yet have the corresponding partitions located on different nodes (not be co-located).  This situation is still better than doing a shuffle, but it's something to keep in mind. Co-location can improve performance, but is hard to guarantee." [StaclOverflow]


Badly written queries

I come across a lot of "why does Spark run out of memory when it's supposed to be scalable?" questions. This is often because the chosen algorithm simply does not scale. Here are some silly mistakes I've seen:
  • Do a word count by concatenating the records and at the end, count the words in the accumulated corpus.
  • Counting the number of permutations of a collection (ie, the number of power sets) by generating them. Note that this can work OK when the number of elements are small and so can pass testing only to blow up when faced with real, production data. (The solution is simply to calculate 2x where x is the size of the original collection).

OutOfMemoryErrors in machine learning

...is often because of how the ML libraries have been written - badly in some cases. A lot of the work is done in the driver (SO). The standard, first approach (as mentioned in the official docs here) follows:

"Sometimes, you will get an OutOfMemoryError not because your RDDs don’t fit in memory, but because the working set of one of your tasks, such as one of the reduce tasks in groupByKey, was too large. Spark’s shuffle operations (sortByKey, groupByKey, reduceByKey, join, etc) build a hash table within each task to perform the grouping, which can often be large. The simplest fix here is to increase the level of parallelism, so that each task’s input set is smaller. Spark can efficiently support tasks as short as 200 ms, because it reuses one executor JVM across many tasks and it has a low task launching cost, so you can safely increase the level of parallelism to more than the number of cores in your clusters."


Big Data Patterns


Reduce- vs Map-side joins

When actual join logic is done in the reducers, we call this a reduce-side join. Here, "the mappers take the role of preparing the input data: extracting the key and value from each input record, assigning the key-value pairs to reducer partitions and sorting by key.

"The reduce-side approach has the advantage that you do not need to make any assumptions about the input data." However, it can be quite expensive.

"On the other hand, if you can make certain assumptions about your input data, it is possible to make joins faster by using a so-called map-side join [where] there are no reducers and no sorting. Instead, each mapper simply reads one input file log from the distributed filesystem and writes one output file to the filesystem - that is all." Prior map-reduce actions in the pipeline make this assumption reasonable since "the output from the mapper is always sorted before it is given to the reducer." [1]

Note that "the output of a reduce-side join is partitioned and sorted by the join key, whereas the output of a map-side join is partioned and sorted in the same way as the large input" regardless of whether a partitioned or broadcast (see below) join is used.


Map-side joins

Broadcast hash joins is where the data in one table is sufficiently small to fit in an in-memory hash table and be distributed to each mapper where the join can then take place.

Partitioned hash joins build on this idea. Here, the data of the larger table is partitioned on the key. "This has the advantage that each mapper can load a smaller amount of data into its hash table... Partitioned hash joins are known as bucketed map joins in Hive" [1]

A map-side merge join is where "a mapper can perform the same merging operation that would normally be done by a reducer: reading both input files incrementally, in order of ascending key and matching records with the same key."


Reduce-side joins

In this example taken from [1], we have an activity log (a "click stream") and a table of users.

sort-merge join is where "the mapper output is sorted by key and the reducers then merge together the sorted lists of records from both sides of the join". [1] The reducer processes all of the records for a particular user ID in a single sweep keeping only one user record in memory at any one time.

"The MapReduce job can even arrange the records to be sorted such that the reducer always sees the record from the user database first, followed by the activity events in timestamp order - this technique is known as a secondary sort". [1]

Note that with the sort-merge join pattern, all tuples with the same key are sent to the same reducer.

If there are skewed 'hot keys' in Pig, it "first runs a a sampling key to see which keys are hot [then] the mappers send any records relating a hot key to one of several reducers chosen at random." [1] This is a skewed join. "Hive's skewed join optimization takes an alternative approach. It requires hot keys to be specified explicitly in the table metadata."


Streaming

From Understanding Kafka Consumer Groups at DZone:

"What Is a Consumer Group? ... it turns out that there is a common architecture pattern:  a group of application nodes collaborates to consume messages, often scaling out as message volume goes up, and handling the scenario where nodes crash or drop out.

"A Consumer Group based application may run on several nodes, and when they start up they coordinate with each other in order to split up the work.  This is slightly imperfect because the work, in this case, is a set of partitions defined by the Producer.  Each Consumer node can read a partition and one can split up the partitions to match the number of consumer nodes as needed.  If the number of Consumer Group nodes is more than the number of partitions, the excess nodes remain idle. This might be desirable to handle failover.  If there are more partitions than Consumer Group nodes, then some nodes will be reading more than one partition."

[1] Designing Data Intensive Applications, Klepperman

Friday, August 9, 2019

FP Terminology


Some difficult FP terms that I've been trying to learn from my time on Gitter.

Miscellaneous Terminology

TerminologyCode
GetterA => B
IsomorphismA => B
B => A
Prism for AA => Option[B]
B => A
Monads (roughly)A => F[A] and F[F[A]] => F[A]
Comonads (roughly)F[A] => A and F[A] => F[F[A]]
EndofunctionA => A
a * (b + c) = a * b + a * c
(Distributive)
(A, M[B, C]) => M[(A, B), (A, C)]


Tagless Final
"So, final tagless is a broader term, which is now getting used in a more specific meaning increasingly often. 
Generally speaking final tagless is a technique for encoding DSLs. 
These days, often it's used to mean "a final tagless encoding of an effectful algebra", like a DB or something. 
Anyway the general idea is simple: 
trait KVS[F[_]] { 
   def put(k: String, v: String): F[Unit] 
   def get(k: String): F[Option[String]] 
} 
F will be replaced by a concrete type like cats.effect.IO, or Kleisli[IO, DbConnection, ?] 
That's the gist of it really" - Fabio Labella @SystemFw Mar 01 2018 12:38 (Gitter)

"What mtl is now is a final tagless encoding of common effects : like state, reading from an environment, and so on." (Reddit)

It superficially looks like the old fashioned Java-like interface with the only new interesting thing being the methods return F[_]. However, this is a big deal. That F[_] could be a plain old type (using type Id[T] = T) or it could be something non-synchronous like a Future. Therein lies the magic. The same interface for both synchronous and non-synchronous calls, for example.
zygfryd @zygfryd
I don't get it, I thought tagless was a pattern for working with lifted side effects, while code generation doesn't need side effects, and the target language has no facilities for lifting anything
Anthony Cerruti @srnb_gitlab 
Tagless was for DSLs I thought
Rob Norris @tpolecat 
It’s a programming style for abstracting over effects.
It has nothing directly to do with side-effects.
(It’s more general than abstracting over effects but that’s the common case.)
It's much simpler than it at first appears.
Rob Norris @tpolecat
You can think of Jokes[F[_]] as a service interface [in Http4s]. It’s a interface for something that knows how to tell jokes. The interesting bit is that joke-telling happens “in” F which is your effect type. If F is Option then joke-telling might fall. If it’s Either[String, ?] then joke-telling may fail with an explanation. If it’s IO then joke-telling might involve reading from disk or taking to the network. When you construct an instance you know what F is. But when you use it you typically leave F abstract so your code may be able to work for many choices of F. Constraints in user code like F[_]: Monad allow you to restrict the abstract F to only those effects that provide operations you need.
PhillHenry @PhillHenry
@tpolecat Is that tagless final? Or is that something completely different?
Rob Norris @tpolecat
Yes, that’s tagless style. Also called MTL style.
John de Goes has an excellent article on the advantages of tagless-final here including a particularly useful section on its advantages in testing (and some caveats).


What is an FP algebra?
"An algebra is a set of constructors for some data type plus rules for composing them. In tagless style (typical usage, it's more general than this) we have a trait like 
Foo[F[_]] { def blah(...): F[Bar] ; ... } 
that is parameterized on some effect F and provides methods for constructing values in F. The rules of composition are orthogonal and are determined by constraints you place on F later; i.e., 
def doSomething[F[_]: Monad](..., foo: Foo[F]): F[Something]."
Rob Norris @tpolecat Mar 24 18:48 

What are exceptions in an FP algebra?
"I would define it this way:
an error is something that makes a flatMap/mapN short-circuit
or more precisely, an e such that raiseError(e).flatMap(f) == raiseError(e)" - Fabio Labella @SystemFw Feb 26 22:21 

Referential Transparency
"Referential transparency talks about the equivalence of two expressions under a syntactic transformation types aren't mentioned anywhere in fact, you don't need types for it (although typically pure languages are also typed)
...
In general, it's a bit more gradual than this: there is a spectrum of information you can have expressed in a type but it's not always "more is better", there is a curve
...
A type system is a tractable syntactic method for statically proving the absence of certain program behaviour by classifying sentences according to the kind of values they compute that's a formal definition from Types and Programming Languages (off the top of my head so it might be a bit off here and there)
...
It's fairly easy to end up in a situation where too much info in types gives you a fragile mess Finite State Machines with IndexedStateT vs Ref + an ADT being a prime example" - Fabio Labella @SystemFw Feb 26 22:47

Effects vs Side effects

"An effect is a computational context like partiality or nondeterminism or exception handling or dependency injection.
A side-effect is a syntactic property, namely the absence of referential transparency, which means the inability to inline or factor out an expression while leaving your program’s behavior unchanged." - Rob Norris @tpolecat 02:42

Concurrency and Synchronization
Oleg Pyzhcov @oleg-py 06:41
@blitzkr1eg if you have two threads doing update at the same time, one will always see results of another (not the case with var, for instance, where both can see old state) - that's safe concurrent modification. However, you can't enforce ordering of those operations with Ref alone - that's lack of synchronization.

Equality

Rob Norris @tpolecat Jun 08 15:17
oh in that case we want it a == b to not compile for mutable things
so that's what cats.Eq gives you. a === b won't compile for mutable things
the deal is that if a === b you should be able to freely substitute a with b and vice-versa. Which you can't do with mutable things