Showing posts with label graph. Show all posts
Showing posts with label graph. Show all posts

Sunday, November 27, 2016

GraphX


Spark tries to abstract you from the data so you can write beautiful, functional code. The truth is that you have to get your hands dirty if you don't want it to run like a dog.

I'm processing a graph of over 100 million edges and about 20 million vertices. It's been a learning experience to make it perform. It first took over 24 hours to execute the Connected Components algorithm. I finally managed to run it in less than an hour. But it wasn't easy.

Cluster configuration

I didn't find GraphX very memory hungry, at least not for the executors (the entire graph was only some 4gb on HDFS). After 24 hours on my first attempt, the job looked far from over. Using jstat, I noticed that the driver was GCing a lot. It had 8gb but I increased it to 20 and it was much more happy.

Although one is often recommended to run a larger number of executors with fewer cores each, for GraphX this might not be the best configuration. Facebook found:

"Even though both systems can run multiple workers on a single physical machine, we found that it is more efficient when we start only one worker process per machine and allocate all the available memory to it. In the following experiments, the number of workers is also the number of physical machines used for each job. We allocated 80 GB of RAM to each worker."

The shape of the graph

What your graph looks like makes a difference to the performance of GraphX. Looking at the maximum number of "Shuffle write Size / Records" in each super step (mapPartitions at GraphImpl.scala:207 in Spark 2.0.2) in the Spark web GUI, we see the numbers steadily decreasing.

This depends on the shape of the graph.

"GraphFrames and GraphX both use an algorithm which runs in d iterations, where d is the largest diameter of any connected component (i.e., the max number of hops between any 2 nodes in a component). So the running time will vary significantly depending on the your graph's structure. Tuning the vertices and edges DataFrames by making sure to cache them and possibly adjust the partitioning beforehand can help." (from DataBricks forum)

As somebody pointed out on StackOverflow:

"If you have a cluster where each of the vertexes is connected to every other vertex, then after one round of messages each one knows who the lowest VertexID is, and they all go silent the next round. One sequential step, the entire cluster.

"If, on the other hand, each vertex in the cluster is only connected to at most 2 other vertices, then it could take N sequential steps before all the vertices know who what the minimum VertexID is."

I tested this by making a graph that was just a chain of numbers 1, 2, 3, 4 ... 20. I found that after 10 super-steps, there were 10 resolved connected components: {1 to 11}, {12}, {13}, {14} ... {20}.

Partitioning

How do you partition a graph? Facebook said that they got good results using EdgePartition2D. The trick here is to imagine the graph as matrix where for a given vertex X, all non-zero elements in row X indicate an outgoing edge and all non-zero elements in row X indicate an incoming edge.

Therefore, all the details for a given vertex will either be in single row and a single column. So, for an NxN matrix, all the data will be in 2N elements (ie one row plus one column). Equivalently, if there are E edges, the upper bound on the number of partitions where we store the data for X is O(√E) assuming a maximum of one edge per pair of vertices (which is not a problem for the connected components algorithm).

But the biggest saving came from reducing the number of partitions. The previous stage of the pipeline had deliberately and explicitly increased the number of partitions to avoid running out of memory when performing matrix multiplication. This can be a disaster for GraphX.

The previous job had increased the number of partitions to some 18k. What's going on is described here:
As the graph algorithm progresses, it is common for less and less [sic] of the vertices to remain active. Therefore a full scan of all triplets during a map-reduce becomes less and less effective. “For example, in the last iteration of connected components on the Twitter graph, only a few of the vertices are still active. However, to execute mrTriplets we still must sequentially scan 1.5 billion edges and check whether their vertices are in the active set.”
Although GraphX has clever optimizations here, there's was still a huge amount of overhead in constantly mapping and reducing.

Coalescing the 18 000 partitions to a mere 360 on a cluster with 8 boxes and 24 cores per box, the time to run was reduced to a healthy 10-40 minutes depending on sub-graph structures.


Saturday, August 20, 2016

Louvain Modularity and Graph Modularity


Looking at ways to solve my problem of agglomerative clusters when trying to de-dupe data, I turned to community detection algorithms.

Louvain Modularity is a heuristic that tells you about communities in a network (implementations found here). The original paper ("Fast unfolding of communities in large networks") can be found here. The first equations comes without much of an introduction so I scuttled off to find where it came from. The natural choice was a brilliant book by the equation's creator Networks: An Introduction by Mark Newman*.

Newman points out the difficulty in coming up with an algorithm that finds communities. If you want to minimise the number of edges crossing a boundary between two groups, then a naive algorithm will cheerfully put no nodes in one group and all N nodes in the other. So, you might want to tell the algorithm to weight the results by multiplying them by the product of the numbers in each group. This is heading in the right direction as a 0-N split will be the least favourable. In fact, with a bit of simple calculus, you can find that the maximum is (N/2). This is nice but arbitrary.

So, one metric we could use is the difference between the actual number of edges connecting nodes of the same class with the expected number.

The actual number of between classes ci and cj is given by:
Σδ(ci,cj) =½ΣAij δ(ci,cj)
edges(i,j)ij

Where δ(ci,cj) is the Kronecker Delta (which is simply 1 if i == j else 0), A is our adjacency matrix and the ½ comes because we don't want to double-count all the edges in the adjacency matrix.

For the expected number, imagine node i that has degree ki. If the graph has m edges, there are obviously 2m ends. Therefore, for any outgoing edge from i, the chances that the other side is node j is kj/2m and for all outgoing edges from i, the chance is kikj/2m.

Add them all up, and the expected number is:

½Σ(ki kj / 2m) δ(ci,cj)
i,j

where, again, the ½ prevents us from double-counting.

Now, we take the difference and divide by m (since we interested in fractions not absolute numbers) and we get:

Q = (1/2m)Σ(Aij - ki kj / 2m) δ(ci,cj)
i,j

which is the equation in the paper.

* I really can't recommend this book enough. If you've got an undergraduate degree in something mathematically related, you'll find the equations no problem and they are presented in an easy-going manner.

Friday, April 22, 2016

Graphs as Matrices - part 1


Graphs are typically represented as adjacency lists. But if they're represented as matrices, you get all the juicy goodness of linear algebra.

What does this mean? Well, for example, let's represent this directed graph as a matrix.

Digraph from Segewick's "Algorithms".
Note that it has no cycles, so we can represent it like this:

The same digraph.
We can represent this as an n x n matrix (A) where
Ai,j is 1 when i is linked to j and 0 otherwise. Since there are no self-loops, Ai,i= 0.

In Python, it would look like this:

import numpy as np
from numpy.linalg import eig 
from scipy.linalg import lu  
import networkx as nx  
import sympy

A = np.matrix(  # 0  1  2  3  4  5  6  7  8  9 10 11 12
              '0  1  0  0  0  1  1  0  0  0  0  0  0;'  # 0
              '0  0  0  0  0  0  0  0  0  0  0  0  0;'  # 1
              '1  0  0  1  0  0  0  0  0  0  0  0  0;'  # 2
              '0  0  0  0  0  1  0  0  0  0  0  0  0;'  # 3
              '0  0  0  0  0  0  0  0  0  0  0  0  0;'  # 4
              '0  0  0  0  1  0  0  0  0  0  0  0  0;'  # 5
              '0  0  0  0  1  0  0  0  0  1  0  0  0;'  # 6
              '0  0  0  0  0  0  1  0  0  0  0  0  0;'  # 7
              '0  0  0  0  0  0  0  1  0  0  0  0  0;'  # 8
              '0  0  0  0  0  0  0  0  0  0  1  1  1;'  # 9
              '0  0  0  0  0  0  0  0  0  0  0  0  0;'  # 10
              '0  0  0  0  0  0  0  0  0  0  0  0  1;'  # 11
              '0  0  0  0  0  0  0  0  0  0  0  0  0'   # 12
)

It's worth noting that the eigenvalues of this matrix are all 0:

print "eigenvalues = ", eig(A)[0] #  [ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]

This is always true of adjacency matrices that are acyclic. I'll explain why.

Firstly, if we re-label the nodes, we can change the order of the rows and columns such that it becomes an upper triangular matrix. Given an order that I calculated elsewhere:

newOrder = [2, 0, 1, 3, 5, 8, 7, 6, 4, 9, 10, 11, 12] 
re_arranged_cols = A[newOrder, :]
re_arranged_rows_and_cols = re_arranged_cols[:, newOrder]

we get a matrix that looks like this:

[[0 1 0 1 0 0 0 0 0 0 0 0 0]
 [0 0 1 0 1 0 0 1 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 1 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 1 0 0 0 0]
 [0 0 0 0 0 0 1 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 1 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 1 1 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 1 1 1]
 [0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 1]
 [0 0 0 0 0 0 0 0 0 0 0 0 0]]

which also has all zero eigenvalues:

print "eigenvalues for this new matrix: ", eig(re_arranged_rows_and_cols)[0]<-- all="" as="" expected="" font="" zeroes="">

Notice that with this new choice of basis, the matrix has all the connections (that is, the 1 values) above the diagonal and the diagonal values themselves are all zeros (since even with this re-ordering there are still no self-loops).

Triangular Matrices

Now, an interesting thing about upper triangular matrices is that their diagonals are their eigenvalues. The reason for this follows this recipe:

1. The elements in an upper triangular matrix, aij are necessarily 0 if i < j (where i and j are the row and column indexes). That is:

aij = 0 ∀ {i, j | i < j }

2. A formula for calculating the determinants is this one from Leibniz:

n
det(A) = Σ sgn(σ)Π aσ(i),i
σ ∈ S ni = 1


where


  • n is the order of the square matrix (2 if it's 2x2; 3 if it's 3x3 ...)
  • σ is a permutation of n integers in the permutation group Sn.
  • sgn(σ) is 1 or -1 depending on the order of σ. Ignore this. It will disappear.
  • and σ(i) is the i-th integer in σ

Take a look at the rightmost element of (2). It multiplies n elements of the matrix given by axi . But remember from (1) that the product is 0 is any i is less than x. If you think about it, there is only one permutation for which this is not true, namely [1, 2, 3, ... n]. So, for a triangular matrix,
n
det(A) =
Π ai,i

i = 1

that is, the determinant is the product of the diagonals.

Given that you calculate the eigenvalues from the characteristic equation:

det(A - λ I) = 0

(where I is the identity matrix) then the eigenvalues are given by:

(λ - a1,1) . (λ - a2,2) . (λ - a3,3) ... (λ - an,n)  = 0

The only way for this equation to hold for all eigenvalues, λ, is that each eigenvalue must equal its corresponding diagonal element. Since the diagonals of a triangular matrix representing an acyclic graph are all 0, all its eigenvalues are 0.

Conclusion

We've shown that just by looking at the adjacency matrix (and not tediously exploring the graph) we can tell whether the graph is cyclic or acyclic. This is done by calculating the eigenvalues which is simple given any decent maths library.