Skip to content

Latest commit

 

History

History
280 lines (186 loc) · 25.8 KB

2017-10-23.md

File metadata and controls

280 lines (186 loc) · 25.8 KB

Differential Dataflow! But at what COST?

Way back when, in the course of thinking out loud about the performance improvements differential dataflow could use, I proposed that I would write up a single-threaded implementation of "differential" breadth-first search, to try and see how much overhead the differential dataflow framework introduces.

That attempt failed, because I couldn't figure out how to do it correctly without re-using lots of the internals of differential dataflow.

Recently, meaning yesterday (or maybe two days ago, now), I took a stab at this again and simplified the problem enough that I could write a relatively straightforward single-threaded implementation. The "simplified" form of the problem hides several of the tricky corner cases, and isn't exactly a fair comparison for differential dataflow, but that's ok and we are going to check it out nonetheless. The goal is to understand where differential dataflow incurs overhead, as part of understanding whether we should eat it or optimize it.

Differential BFS

Let's start with a statement of the problem.


We are going to compute and maintain a breadth-first distance labeling of the nodes of a directed graph, from node zero, as the graph experiences various additions and removals of edges.


A breadth-first distance labeling is an assignment of distances to each node, counting the fewest directed edges that must be traversed in order to reach the node when starting from node zero. Clearly, this depends on the edges in the graph, and as those edges change (new edges added, old edges removed) we expect the output might also change.

The input to our problem is a list of edge changes, of the form

edge_changes: Vec<((Node, Node), Time, isize)>

The (Node, Node) type describes a directed edge by a pair of node identifiers (source and destination). The Time type is just going to be usize indicating the round in which the change occurs. The isize field indicates whether we should be adding something to the occurrence count of this edge (introducing an edge) or subtracting from it (removing an edge).

The output of our problem is a corresponding list of node distance changes, of the form

dist_changes: Vec<((Node, usize), Time, isize)>

where the (Node, usize) pairs indicate a distance labeling for a node, and the Time types correspond to the times in the input. As above, the isize field indicates whether this label is being introduced or removed.

The intent is that the output changes should exactly correspond to the input changes, in that we want the dist_changes to accumulate to the correct answer for BFS computation applied to the corresponding accumulation of edge_changes, roughly:

sum_{s <= t} dist_changes[s] = BFS(sum_{s <= t} edge_changes[s])

Let's look at a small example, which may or may not be instructive. Let's imagine that I have the following edge changes as input, where the structure is ((src, dst), time, diff):

((1, 1), 0, 1)
((2, 1), 0, 1)
((0, 1), 0, 1)
((0, 2), 0, 1)
((1, 0), 0, 1)
((2, 0), 1, 1)
((1, 1), 1, -1)
((1, 2), 2, 1)
((2, 1), 2, -1)
((1, 2), 3, 1)
((0, 1), 3, -1)
((2, 1), 4, 1)
((0, 2), 4, -1)
((0, 2), 5, 1)
((1, 0), 5, -1)

There are five edges introduced at time 0, followed by paired additions and subtractions at each subsequent time. There is some initial distance labeling output at time 0 corresponding to the initial graph, and probably then a few changes. This is what gets computed:

((0, 0), 0, 1)
((1, 1), 0, 1)
((2, 1), 0, 1)
((1, 1), 3, -1)
((2, 1), 4, -1)
((1, 2), 5, 1)
((2, 1), 5, 1)

Remember that these pairs are "(node, distance)". We have our initial report that 0 is at distance zero, which makes sense because it is taken to be the root of the breadth-first search, and the two other nodes each have distance one, which also makes sense because there are edges from 0 to each of them.

Nothing happens at the first time step, because the removal of the self-loop (1,1) doesn't change any labels (understandably). Nothing changes at the second time step because the edge addition is between the two non-0 nodes, which also does not change any distances. Finally, we see a change in the third time step when we remove the (0,1) edge, which disconnects node 1 and removes its distance report without replacing it. A similar change happens in the next round when (0,2) is removed, and then in the last round (0,2) is re-introduced and connects node 1 at distance two because a (2,1) edge was added in the fourth round.

It might seem a bit complicated, but there is a specific correct answer that we do want, and so it will be handy to have a computer to do this for us.

Benchmarks

Our plan is to check out two classes of changing graphs, chosen to try and tease out different sorts of behaviors.

Small graph

Our first benchmark is a random graph with 1,000 nodes and 2,000 edges. We then randomly perform a sequence of one million changes to the edges, where each change is the removal of an existing edge and the introduction of a new random edge. More specifically, we will use a random number generator to produce a sequence of 1,002,000 edges, and our graph will be defined by the sliding window of size 2,000. We could have done things differently (e.g. pick a random edge to remove) so don't fixate on the sliding window aspect.

You might say "1K nodes and 2k edges is small data, loser" which reflects a keen intuition. I would invite you to fire up your favorite big data platform and see what you need to do to get the reports of distance changes at all 1,000,000 output times. I'd be pleased to hear how this works out for you (seriously).

Large graph

Our second benchmark is a random graph with 1,000,000 nodes and 10,000,000 edges, with one million random edge changes. This graph is larger than the small graph, which perhaps counter-intuitively can make things easier. The one million random changes are scattered among a larger set of nodes, meaning that each node experiences just a few changes, rather than a few thousand changes as in the small graph. The graph also has higher average degree, which means that despite being larger we should expect smaller distances (approximating the number of folks at distance d by the average degree raised to the d-th power).

Differential Dataflow

Let's start with the differential dataflow computation. Because differential dataflow is clean and elegant and a joy to behold, we can express the BFS computation relatively crisply using its primitives:

fn bfs<G: Scope>(
    edges: &Collection<G, Edge>,
    roots: &Collection<G, Node>) -> Collection<G, (Node, u32)>
where G::Timestamp: Lattice+Ord
{
    roots.map(|x| (x, 0))
         .iterate(|inner|
             edges.enter(&inner.scope())
                  .join_map(&inner, |_k, dst, lbl| (*dst, lbl+1))
                  .concat(&inner)
                  .group(|_, s, t| t.push((*s[0].0, 1)))
         )
}

The idea is that each element in roots, which in our case will be just the single element 0, is first extended with a distance of zero, because they are roots. We then iteratively update the collection of (node, dist) pairs by joining distances with the set of edges to create new distance proposals, keeping our previous distances too, and then extracting the minimum distance for each node.

Behind the scenes, differential dataflow implements these operators and allows you to arbitrarily change their inputs. We are going to do that, pushing the timestamped changes through the computation and recording the outputs.

The implementation of bfs in the differential dataflow repository, examples/bfs.rs, allows you to specify the number of nodes and edges, as well as the number of changes by specifying a number of rounds of a number of changes. We will want the product of rounds and changes to be 1,000,000, and the flexibility here is just in determining how many changes are revealed to the system at once. In principle, giving the system more data at once gives it more opportunities, and it should go faster. That doesn't exactly happen, though, for reasons that make some sense.

Let's look at the elapsed times for several pairs of rounds x changes, for both the small graph and the large graph. These are single-worker numbers, and don't have anything to do with scaling just yet.

differential small large
10 x 100000 123.83s 108.91s
100 x 10000 59.32s 78.01s
1000 x 1000 31.81s 58.47s
10000 x 100 35.91s 46.16s
100000 x 10 43.45s 31.14s
1000000 x 1 55.85s 20.26s

These times vary a bit. They start large and get smaller because differential dataflow is designed to increase the throughput for larger batches of data. You may notice that for the smaller graph the numbers actually get worse as we go toward the largest amounts of offered data, for a good reason that we will return to (our bfs computation is more general than it "needs to be").

For the moment, let's take the smallest number for each configuration as the representative, even though you might be concerned that there isn't a simple rule to pick the configuration (nor is it simply "the largest batch size").

Single-threaded implementation

Let's now take a moment to talk through how we might write a bespoke single-threaded implementation. I bet there are lots of clever ways to do this, as well as several not-clever ways. For example, one not-clever way is "literally doing 1,000,000 bfs computations from scratch"; we aren't going to do that.

We are going to do what I think is a general approach for time-series computations, that isn't especially well-appreciated: we are going to re-order the loops of our computation so that instead of iterating first over times, we iterate last over times. That may not make much sense, so let's walk through an algorithm sketch.

One naive way to try and implement breadth-first search over data that change at various times is

for time in times:
    for depth in depths:
        for node in nodes:
            label unreached neighbors of reached node at time with depth

This is a pretty standard style of computation, where for each of the one million distinct graph inputs (loop over times) we explore the graph in breadth-first rounds (loop over depths) labeling all nodes that are newly reached with that depth (loop over nodes).

However, this isn't the only way to write the loop, and in my opinion it is also not the best when we have many times. Here is a different way:

for depth in depths:
    for node in nodes:
        for time in times:
            determine if reachability of node changes at time

This is a bit harder to explain, but roughly we explore the "graph" in breadth-first rounds (loop over depths), for each node (loop over nodes) replaying the history of changes it experiences (loop over times) at this depth. These changes include whether the node is reachable within depth steps and which edges originate at the node, and by replaying the history we get a full history of its "offerings" for various neighbors at the depth+1, which we can use in the next round.

I have the code scribbled down, the inner "determine if .." hunk is about 50 lines or so. It isn't especially intuitive, but it does the right things each time the node experience each of the three flavors of changes (edge change, existing reachability change, new reachability change). Importantly, it seems to produce the same output as the differential dataflow computation, which wasn't true of its first few incarnations.

single-threaded small large
1 x 1000000 5.61s 7.34s

First let's notice that there is only one row; the program as implemented must be provided with all changes, and provides no partial information (although you could imagine exposing progress through the depths loop as partial). This contrasts with the differential dataflow 1,000 x 1,000 configuration, which produces 1,000 reports as the computation runs (fairly steadily). Which is fine, that is how we defined the problem.

As a sanity check, the numbers of changes propagated is 16,789,289 for the small graph and 13,303,160 for the large graph. This means something like 334ns and 550ns for each change propagated, respectively. Much of this time goes in to sorting the new changes in each round, essentially routing the information about changes to the affected node (the time for the small graph increases by an alarming 2s if we switch in Rust's new sort_unstable_by quicksorting routine; the large graph time is unaffected in either direction). It isn't clear that our times are the best one could achieve, and I have to admit to trying less hard than I have for other COST evaluations; in part, this is a weirder problem and we are already in the hundreds of lines of code.

If we take the best measurements for each graph from up above, differential dataflow with one worker is slower than the bespoke implementation by a factor of 5.67x for the small graph and a factor of 2.76x for the large graph. This isn't scalability information just yet, so we can't make any clever comments about the COST, which asks for the configuration that outperforms the single threaded implementation. We probably are going to need at least 5.67 and 2.76 workers (rounded up) to compete, but it could actually be lots worse than this.

Before going too far into scalability land, let's talk about what we are actually computing, how the differential dataflow bfs computation actually does more than it needs to do to solve this specific problem. This will reduce the factors to about 4.00x and 1.83x for small and large respectively, at the cost of reducing the generality of the computation.

Differential variations

There are at least two things that our differential dataflow implementation of breadth-first search does that are not strictly necessary, at least for the problem we have defined. These aren't the only sources of overhead, but they are a good place to start if we want to compare apples to apples.

Counting rounds vs distances

Our differential dataflow implementation is general enough to be correct for any (non-negative) distances on the edges. Although we've hard-wired in a value of 1, you could have each edge with its own distance and the computation would still be correct. Our single-threaded COST implementation just tracks the rounds in which nodes become reachable, which is a substantial simplification.

We can do this trick with differential dataflow as well, though I am going to cheat just a bit. We will write a reachable method that just determines which nodes are reachable, rather than their distances.

fn reachable<G: Scope>(
    edges: &Collection<G, Edge>,
    roots: &Collection<G, Node>) -> Collection<G, Node>
where G::Timestamp: Lattice+Ord
{
    roots.iterate(|inner|
        edges.enter(&inner.scope())
             .semijoin(&inner)
             .map(|(_,y)| y)
             .concat(&inner)
             .distinct()
    )
}

While this computation only reports the set of nodes reachable from an element of roots, the values that are actually circulated internally have the form (Node, Time, Diff), and tell us in which round a node becomes reachable. Unfortunately, that information is stripped off as the collection exits the iterate context. We could hack something in that reveals the data to us, but the hack doesn't currently exist. But we could just put an inspect and println! there if we wanted to see the answers stream past.

Both the semijoin and distinct operators communicate to differential dataflow that the type of data is not (key, val) pairs, but rather just key records. The representation of the data simplify substantially, but a more important thing happen too, which starts to show up in the improved numbers:

differential small large
10 x 100000 114.84s 89.20s
100 x 10000 50.88s 61.64s
1000 x 1000 26.85s 44.50s
10000 x 100 29.22s 32.69s
100000 x 10 32.29s 26.52s
1000000 x 1 31.42s 18.02s

The important thing that is happening is that the collections change less. When we are pondering the 7th iteration, in which many nodes have already been reached, it is not wildly helpful to get a summary of existing changes as "was at distance 5, then later at distance 4, now at distance 6". All of those changes contain much more detail than we need to answer "in this, the 7th iteration, are there changes to reachability?". By reducing down to just a boolean "already reachable" the input history has fewer changes, and we perform less work to see how that changes in each subsequent iteration. In this example, we would be able to ignore rounds where the only change to a node's distance was from one value less than seven to another.

Importantly, we still do have all of this data, because the histories do need to record in which round the change happened, which distinguishes distance 5 from 4 from 6. However, that data lives in the timestamp rather than the data, and gets integrated out when we accumulate differences. More on this at the end of the next variance.

Ignoring prior and subsequent computation

Our differential dataflow implementation starts by loading the initial graph edges (either 2k or 10m edges), then doing various rounds of computation, and finally calling it quits and terminating. This means that at each of these points, just after loading, between each round, and just before terminating, the system needs to have a representation of its history that supports arbitrary future changes.

The COST implementation requires none of this, as it does a single batch of work, and so can perform all of its updates in-place.

We can do the same thing in differential dataflow, though effecting it is a bit indirect and a bit subtle.

First, we can remove the distinctions between each of the batches by using the 1000000 x 1 configuration, which hasn't always been the best performer but that is about to change. This removes any dependence on batching structure, so we will only have one measurement rather than a spread.

differential small large
1 x 1000000 31.42s 18.02s

Second, we can load the initial graph edges as part of the graph changes. They are logically of the same type: changes from an initially empty collection. This has the advantage that when we start working with our batch of changes, there are no pre-existing changes, which simplifies the logic a fair bit (the group logic for times you may need to consider is more complex with pre-existing updates; it's complicated, and I apologize).

differential small large
1 x 1000000 27.71s 15.08s

Third, we can indicate that we don't need to maintain the multiversioned collection by ... closing the graph input into which we supply graph changes. This is pretty subtle, in that this only communicates to the operators that they should not expect more changes to that collection. The operators are designed to pass this information along to the shared backing storage, by explaining the future requests they might make of the storage. Because math, the result is that updates at different rounds can be collapsed, essentially updating in-place.

differential small large
1 x 1000000 22.47s 13.74s

The ability to update in place is not just about conserving memory. Recall when we switched from group to distinct, we reduced the amount of variation from changing distances to only those moments when reachability changes. Our ability to merge these changes together means we can actually collapse down all this variation in our storage as well; we no longer need to record all the rounds at which the reachability of a node changed, because we know that we will never need to distinguish those rounds again. We end up storing only the reachability changes for the most recent rounds (the number depending on how often we merge LSM layers).

This third change is the one that fixes the pain experienced by large batching. The reason large batches were painful was that we may have had 1,000,000 steps of changes, all with largely irrelevant variation, and as we went from one depth to the next we needed to re-integrate all of that information. We no longer need to do this. This was less painful with smaller batches because the same compaction mechanism integrates up the historical batches, and so we only needed to re-integrate updates for 1,000 times in each batch, rather than the updates across all 1,000,000 times. Again, it's a bit complicated and I can try and explain in more detail another time.

Other variances

There are several other ways in which the differential dataflow implementation is more general than the COST implementation. I thought I would point them out, but it is non-trivial to do the work to recover the performance.

Differential's distinct operator keeps an indexed form of its output, so that it can efficiently check whether changes to its input actually change the output (otherwise it would need to recompute the output more than we would like to do). It's output is wired to the second input of the semijoin operator, and the first thing the semijoin operator will do is index its input in exactly the same way. Differential dataflow does allow you to share these indexes between the operators, but you need to restructure your computation to do so: our loop would need to start with distinct leading in to semijoin, plus we would need to use a few more raw interfaces. As it turns out, the volume of data moving from distinct to semijoin is relatively small compared to the data moving from semijoin back to distinct, so this is not a substantial win.

Differential dataflow's index structures are generic with respect to arbitrary types implementing the Ord+Hash trait, those things that can be sorted and hashed. Our COST implementation uses arrays and strongly assumes that all identifiers are dense integers. If we had to swap in HashMap indices there would be a fair bit of overhead. The indices also use log-structured merging to support sharing and amortize costs over arbitrarily structured edits; if the COST implementation had to run on graphs with large degree (random graphs are not) we would also need something similar to maintain the edge set (it just calls sort_by for each update).

Of course, another important difference is that the dataflow formulation allows us to easily parallelize the computation. Moving data around is harder than just computing it; our COST implementation doesn't even materialize its output, much less shuffle it around between workers. This leads us to the main event: how many workers do we need to recover the overhead introduced by differential dataflow?

Counting the COST

Let's start with the measurements we can do on a laptop: two workers!

I'm going to run the same optimized implementation where we use distinct and do everything in one big batch.

laptop cores small large
single-thread 1 5.61s 7.34s
differential 1 22.47s 13.74s
differential 2 12.27s 7.82s

At 7.82 seconds, we are just over the 7.34s time for the large graph, and we are still just over 2x larger than the 5.61s time for the small graph, and within 2x of the time if we had used sort_unstable in our single-threaded implementation.

The next step is to take this to a machine with more cores. I made one tweak, which is to start the differential dataflow timer after the random graph data were generated (which has been the case for the bespoke measurements, but which hadn't been the case in the differential implementations until just now, because it was written to generate the changes on the fly).

workstation cores small large
single-thread 1 6.64s 6.93s
differential 1 25.15s 14.16s
differential 2 13.84s 7.96s
differential 4 6.89s 4.58s
differential 8 3.74s 2.88s
differential 16 2.37s 2.21s

We now get a chance to see the COST of differential dataflow on this particular problem. Depending on the problem, the COST is somewhere between 4 and 8 cores, with 4 cores almost working out for both graphs.

Now, before getting too excited, I'm not certain that COST measurements can be taken as positive results. They only show that for a certain problem there is only so much of a gap between the scalable system and a hand-rolled implementation; they don't show a small gap for many problems, on many datasets. Also, I stopped working on the single-threaded implementation once I got it producing the correct answers, and there may be smarter things you could do (but given that I got it wrong a few times, I'm inclined to recommend against hand-rolling your incremental compute solutions).

I do think it is pretty neat that the COST is as small as it is, despite differential not exactly being optimized for this scenario (one monolithic batch of changes). I sincerely expected to see a number more like 32 cores, or even something unbounded. Perhaps that is closer to the right answer, with smarter single-threaded implementations, but if so the competition should only help to make differential faster. At least, that is the theory behind the COST work!