Skip to content

Latest commit

 

History

History
286 lines (162 loc) · 32.2 KB

2015-12-19.md

File metadata and controls

286 lines (162 loc) · 32.2 KB

Tracking progress in timely dataflow

Today we are going to look at how timely dataflow keeps track of progress.

By that I mean, how does the timely dataflow runtime keep track of which operators might receive inputs at which logical times. That might sound like word salad, so let's recap a bit and try and get on the same page. If you haven't, you might want to start with an introduction to timely dataflow. We are going to be especially interested in the unary_notify style of operator here, so if that is weird and unknown to you, best get clicking.

Progress tracking is exciting (to me) because it turns out that if you do it well, you can write some pretty cool generalized dataflow frameworks. For example, by being very precise about progress (in a way that Naiad before it was not), we can introduce some abstraction boundaries without giving up accuracy (and potential parallelism). These abstraction boundaries will make our lives easier, and let us write much more interesting (I think) dataflow computations.

Background

Timely dataflow (in Rust) is a distributed dataflow system. A timely dataflow computation is described by a directed graph (with some restrictions), where the graph nodes are operators (where computation happens) and the edges are channels (along which data moves).

One of the new and exciting features of timely dataflow is its support for "notifications", which are essentially an indication of progress made in the computation, across the entire system. So, an operator might be able to learn that it has received all of the input it will receive for some round of input data, even though the worker hosting the operator doesn't have total information about the state of all the other workers. Of course, before we can indicate progress, we have to be able to measure it, which leads us to timely dataflow's "logical timestamps".

Logical timestamps are bits of metadata associated with actual data that get sent around, and they indicate things like "which round of input data", or "which iteration of a loop". One way to think of them is that if you were executing the computation imperative-style, with a single thread, what would be on your stack or in your temporaries when you see this data. Let's look at an example.

Here is a hypothetical dataflow graph, with some special timely operators in blue. I've organized the operators into "scopes", which are actually a timely dataflow concept, but for the moment may just like like nice boxes with dashed lines.

hypothetical dataflow

Rather than think too hard about what the operators do, or even what data they produce, let's instead discuss what sorts of timestamps they might attach to the data they do send around.

The first rule about timestamps is that within a scope (a dashed region), all operators must send and receive data with the same type of timestamp. So the input operator will produce data with some sort of timestamp (often an increasing integer), and Op1 and Op4 need to use this timestamp when they send and receive data. The Op2 and Op3 operators live in a nested scope, and so their timestamps can be different; there are some rules about this, and it is probably best to think of them as the timestamp of the outer scope, with some additional information (often another increasing integer, for counting loop iterations). As you can see, there is a transition with the enter and leave operators, which we will talk about.

Imagine that the input operator produces some data with timestamp 0, and then some data with timestamp 1, and then some data with timestamp 2. Perhaps at some point the input operator promises not to produce any more data with timestamp 0. At this point, we might like to let Op1 know that, once it has received all of the data that has been sent so far, there won't be any more messages with a timestamp 0. This could be great news for Op1, who might have been waiting to accumulate all of its input data before acting. When we communicate this information to Op1 we call it a "notification".

Notifications are an important part of timely dataflow, because they provide information about the global state of the computation to otherwise myopic operators who just pick up bits of data they receive and act on them. At the same time, notifications can be hard to do well: we want them to be correct (you shouldn't receive a notification for a timestamp if more data might arrive), but we don't want to force the operators to repeatedly coordinate in any painful fashion. To make matters worse, things like cycles may make it hard to tell exactly when we can notify Op3 or Op4 if there are still messages circulating.

Our approach in this post will be to (a) describe how we will reason about progress, ignoring the issue of distribution, and then (b) describe a distributed protocol that provides a conservative approximation to the progress of the distributed computation.

Reasoning about progress

Let's start by being clear about what properties we aim to provide, and then we'll build up a framework for reasoning about them.

Each operator has some number of input ports on which it receives messages. In the example graph above, Op1 has one input port, and Op2 has two input ports. Each input port is different, and we will want to provide independent guarantees for each (Naiad didn't do this). What guarantee would we like to provide? Ideally, for each input port, we want to let the operator know what timestamps it might possibly receive on that port, by telling it which timestamps it will certainly never receive again.

In the Naiad paper we described these using "frontiers", which were a set of timestamps where if some other timestamp was strictly less than any element of the frontier, you wouldn't see it ever again. This is a bit of an approximation, in that it requires timestamps to be ordered (partially, not totally), and you aren't allowed to advance the frontier until all prior timestamps have vanished. This means that a timestamp 0 message could hold up the notification for timestamp 1, which we didn't find to be all that horrible. If you want 0 and 1 to be independent, don't use a partial order with 0 < 1.

We are basically going to lift this idea from Naiad, but be careful to do frontiers on a port-by-port basis. Naiad over-approximated and combined all of the frontiers for the input ports into one, which it presented to the operator. That isn't horrible, but it makes some optimizations hard (e.g. in differential dataflow's join operator), and it would totally prevent us from presenting nested scopes upwards as operators. Mostly it loses accuracy, which is something we are going to try hard to maintain, so that we can introduce lossless abstractions.

Where do messages come from?

To guarantee that some input port will never again receive a message with some timestamp, we need to understand where timestamped messages come from. If we can put some restrictions into place, we can start to reason about whether one might or might not see a given timestamp, given the current state of the world.

Timely dataflow does impose restrictions on what timestamp an operator can use when it sends a message. When an operator receives an input with timestamp t it also receives a capability to send messages with that timestamp. The operator can send messages with timestamp t only if it holds a capability for a timestamp t' <= t.

This means that if we know about all of the outstanding messages (still in system queues) and all of the outstanding capabilities, and we look really really hard at the dataflow graph, we should be able to figure out if it is possible that a message with a certain timestamp might reach a specific input port.

Let's look at another hypothetical dataflow graph again, sort of like up above but simpler.

hypothetical dataflow

Imagine that at the moment input only holds a capability to send messages with timestamp 3 and there are some messages going from Op2 to Op3 with timestamp 2. By looking really hard and thinking, we can tell that

  1. Op1 and Op2 won't receive any messages with timestamp 2, and
  2. Op3 and Op4 won't receive any messages with timestamp 1.

If this was all we needed to do, which is roughly the case in a directed acyclic graph, we could just replace our "hard looking and thinking" with reachability information from each output port to each input port. As messages and capabilities are retired, we tweak the open timestamps at each input port they can reach, and do something exciting if the frontier advances.

But, this train ain't stopping at DAG-town.

TOOOOOOT!!!

Yeah actually, our dataflow graphs actually kinda look more like this (actual SCC computation):

hypothetical dataflow

And this is weird and complicated, because we have cycles and so output ports can reach back to input ports on the same operators, and if we just think about connectivity between ports it is all a terrible mess. We also have operators that speak in terms of different types of timestamps: operators in each of the connected components have different timestamp types from the containing scope, and (logically, at least) from each other.

Naiad got around this by putting the whole structure of the dataflow graph in one place, and it was sort of a mess. We had a bunch of baked assumptions about all timestamps being UInt64[]s, and the only thing each could mean was "loop counter" and ... eesh. Ok, we aren't going to do that this time.

Let's look at our initial simplified dataflow graph again.

hypothetical dataflow

Let's break the troublesome bits into two more manageable parts, (a) graphs with cycles, and (b) graphs with nested scopes, which we'll tackle individually.

Graphs with cycles

Let's just focus in on the part of our dataflow graph with the cycle in it:

hypothetical dataflow

If we do what we said up above about determining which output ports can reach which input ports, we pretty quickly conclude that all the output ports can reach all the input ports. If Op2 holds a capability for timestamp t, it could make its way back around to the input to Op2. This makes it hard for Op2 to hold on to a capability until it is sure it won't receive any more input, because the capability itself prevents the notification.

But actually, there is that handy feedback operator sitting there. We know, in our heart-of-hearts, that this operator will advance the timestamp of each message that moves through it. We know that any message that Op2 sends won't return to the input of Op2 with the same timestamp; it has to be incremented by at least one.

This leads us to a refinement of the "outputs connected to inputs" approach. Rather than just track connectivity, we'll track connectivity and the least action that must happen to a timestamp as it goes from the output to the input. In the example above, Op2's output is connected to its input but requires the iteration coordinate of its timestamp to be incremented by one. As long as we note this down, and make sure to invoke this when we do our hard look and think, we will be able to conclude that it is safe to notify Op2's input even though it holds a capability for the same time.

A quick point about generality: any output may connect to any input, but it may do so through many paths. There may be a few different things that could happen to the timestamp as it goes, and we need to make sure to track the least that happens to the timestamp. This isn't as simple as taking a min, because the actions that happen to the timestamp don't have to be totally ordered (Naiad assumed they were, preventing some types of notification and restricting concurrency). What we track for each (output, input) pair is a list of actions-on-timestamps, where no one is strictly worse than any other. This list often has size one (or zero), but it could have multiple entries: for example, there might be one path that bumps the iteration counter, and one path that bumps the round-of-input counter.

Great, we have a good idea about how we think about timestamps going from an output port to an input port, and it even works in cycles. How do we get this information? Do we just look at feedback operators and trace paths through them? It turns out it can be more complicated (and fun!) than this. Instead, we will have operators explain what sorts of horrible things they might do to messages that pass through them, but with a twist!

Let's go through the next subsection to motivate why it gets more complicated, and then we'll put together actual rules for determining these path summaries.

Graphs with subgraphs

The other problem we have is that when there are nested subgraphs, information about one type of timestamp needs to get turned into another type of timestamp. This can be a bit of a pain, unless you make a bunch of simplifying assumptions (which Naiad did: all timestamps are UInt64[]s and you add/remove coordinates as you enter/leave subgraphs).

We are going to do something different, which has its own sort of simplicity. Rather than think about a nested subgraph as a complicated tangle of additional operators with different timestamps and ... we are just going to pretend it is an operator.

hypothetical dataflow

From the point of view of the outer scope, that loop thing is just an opaque operator with an input port and an output port. It might do lots of weird and crazy things inside, but all that Op1 and Op4 care about is whether it is planning on sending any messages on that output port. Also we have to care enough to provide it sufficient information to service its own notifications, so we'll have to tell it about messages it might receive on its input.

Other than that, no one doing progress reasoning in the outer scope needs to care or even know what is going on inside the loop.

Ok, that isn't really true. We need to know some amount of information about what is going on in the loop. What we need to know is: which inputs are connected to which outputs. In this case there is only one input and output, so, duh. But there is an important different between two nested subgraphs whose internal structures are:

hypothetical dataflow

If we just pretend is is a big all-to-all blob, we lose precision. We want the same accuracy about "who might get what messages" with this abstraction in place as we would have gotten with the contained subgraph opened up for inspection.

So we need the connectivity between inputs and outputs? Yeah, basically. Let's go just a bit further and say that we want the path summaries from each input to each outputs. What has to happen to timestamps on each input before they reach each of the outputs. Perhaps some of the operators tweak the timestamp as they go, and messages on the first input port can reach the first output with unchanged timestamp, but can only reach the third output with timestamp plus one. As far as I understand, if the subgraph reports this information we are giving up very little (or nothing) by otherwise concealing its details.

You might be thinking "whoa, this is all too complicated" and I sort of agree. Mostly I agree that we should do the math right and let the computer figure out the consequences, because I've already spent a few years thinking I was clever enough to keep all the assumptions in my head. As it turns out, I have other assumptions I want to keep in my head, so we'll just try and do this part correctly instead.

The cost of getting it wrong is missed opportunities for notification, in the worst case deadlock. So, let's try and stick to the zero-cost abstraction motto, and summarize operators with as much precision as possible.

Each operator, nested or not, needs to summarize the minimal actions on timestamps from each input port to each output port. Many operators will just have full connectivity, but operators like feedback and nested subgraphs will thank us for this.

Subgraphs within graphs

The other side of the nesting story, where we just wallpapered over those subgraphs and pretended they didn't exist, is that the subgraphs themselves need to know how to process information about the outside world. It turns out that there is a fairly simple transformation we can do that makes this reasoning much simpler. If we take the scope wrapped around the nested scope, and pretend that the whole outside world is just an operator, with its enter and leave operator as output ports and input ports, respectively, we get a possibly insane take on the nested subgraph:

hypothetical dataflow

All of the math works out just fine, as long as outside world summarizes itself as an operator to the nested subgraph. The summary explains what happens to timestamps on messages that are handed to a leave operator (an input to outside world) when they return through enter operators (an output of outside world). Messages floating around outside the subgraph are just capabilities of outside world; the subgraph doesn't need to know where they are, just that they exist as capabilities for the enter ports.

It is a lot like the outside world is actually a nested scope within the subgraph.

I'm already in conversations with Christopher Nolan for the film adaptation.

Actual rules

Ok, enough funny talk. Let's get to business about what we really need to do to make this all work out. I'm going to go through the actual trait timely uses to represent operators, from a progress tracking point of view. I'm going to cut out all the documentation, because it takes up a bunch of space here. I've also snipped out a few implementation details that we may return to.

/// Methods for describing an operator's topology, and the progress it makes.
pub trait Operate<T: Timestamp> {

    fn inputs(&self) -> usize;
    fn outputs(&self) -> usize;

    /// Fetches summary information about internal structure of the operator.
    fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Vec<CountMap<T>>) {
        (vec![vec![Antichain::from_elem(Default::default()); self.outputs()]; self.inputs()],
         vec![CountMap::new(); self.outputs()])
    }

    /// Presents summary information about the external structure around the operator.
    fn set_external_summary(&mut self, _summaries: Vec<Vec<Antichain<T::Summary>>>, _frontier: &mut [CountMap<T>]) { }

    /// Reports a summary of progress statements external to the operator and its peer group.
    fn push_external_progress(&mut self, external: &mut [CountMap<T>]) {
        // default implementation just drains the external updates
        for updates in external.iter_mut() {
            updates.clear();
        }
    }

    /// Retrieves a summary of progress statements internal to the operator.
    fn pull_internal_progress(&mut self, consumed: &mut [CountMap<T>],          // to populate
                                         internal: &mut [CountMap<T>],          // to populate
                                         produced: &mut [CountMap<T>]) -> bool; // to populate
}

Warming up

The first thing to notice is that an operator is parameterized by a timestamp. This is the only type that the operator knows about, and all of its methods will speak only about this timestamp.

The first two methods in the trait speaks to this:

    fn inputs(&self) -> usize;
    fn outputs(&self) -> usize;

Once an operator describes its "shape", its number of input and output ports, we can talk about them by number. This avoids anything more complicated where we might need to know something about the implementor to get at its structure. Naiad, for example, had specializations for unary and binary topologies, and you had to write more code the more shapes you wanted to put in.

Summarizing structure

The next two methods summarize the structure of operators, and are part of graph construction.

The first method, get_internal_summary, summarizes the operator upwards to its parent. The method's signature indicates that it returns two things. The first is a

	Vec<Vec<Antichain<T::Summary>>>

which is what we talked about up above, a map from pairs (input, output) to summaries of what must happen to timestamps as they move along paths internal to the operator. An Antichain is a fancy way of saying "a bunch of minimal things in a partial order", and T::Summary is the associated "summarization type" for the timestamp (think: something that can say "+1").

The second is a

	Vec<CountMap<T>>

and it is just here to help us initialize the computation. It indicates, for each output port of the operator, the capabilities the operator initially holds. That is, before any messages start going around, does this operator want to reserve the right to send something? The input operator, for example, wants to set this so that folks are warned that data might come out of it.

By default, the implementation just says "each input could reach each output, without adjusting the timestamp" and "no I have no capabilities". This seemed pretty common, but you can override this as you see fit.

The second method, set_external_summary, is roughly the dual of get_internal_summary and summarizes the outside world. It takes as arguments (again, called by the containing scope) a Vec<Vec<Antichain<T::Summary>>> and a Vec<CountMap<T>>, which respectively summarize the actions on timestamps as they move from the operator's outputs back to its inputs and indicate initial capabilities held by the outside world.

These two methods are enough for any subgraph to get up and running: each subgraph understands what each of its operators might do to timestamps, it knows what the outside world might do to timestamps, and it knows how they are all connected. Knowing the initial capabilities of each of the operators, it is now in a position to start making progress statements.

Summarizing progress

The next two methods summarize progress within and without operators, and are used at runtime.

The first method, push_external_progress, is used to indicate that the outside world has downgraded some of its capabilities. This may be because some messages outside the operator finally got dealt with and are no longer a threat, or it may be that some other operator released some of its capabilities. In any case, this method is called with changes to the counts of capabilities for timestamps at each of the operator's inputs. By default, the operator just ignores them, but this is clearly an important part of a subgraph running correctly.

The second method, pull_internal_progress is probably the most complicated one. Because reasons, this method isn't exactly dual to the previous method. The method is called with three buffers that the operator should populate. This is basically the same as having the operator "return" them, but we don't have to keep re-allocating things. So think of the three buffers as return values. There is actually a bool return value, anyhow.

The three arguments are

consumed: &mut [CountMap<T>],
internal: &mut [CountMap<T>],
produced: &mut [CountMap<T>]

What this crazy Rust syntax says is that each of these arguments are pointers to slices containing CountMap<T>s. It's Rust's way of saying that the parent owns three Vec<CountMap<T>>s and is willing to let this operator play with their contents. What this operator is supposed to do is indicate

  1. how many messages they consumed on each input port,
  2. what internal capabilities they want to change on each output port, and
  3. how many messages they produced on each output port.

Not so complicated, right?

The number of messages consumed and produced allows an operator to track the messages flowing within it, and report up to its parent the number of messages it has consumed and produced. There are some pecularities here, and they relate a bit to the distributed nature of the computation. If there were only one worker involved, this would totally be enough information, though.

The reason for the asymmetry has to do with the distributed nature of the computation, and that not all of the messages "produced" by the parent will be consumed by this operator; some might end up at different instances of this operator. Consequently, it makes sense to let the operator report the work that it did do (messages consumed and produced) rather than have the parent speculate on what it thinks the operator will have to do.

Reasoning about progress tracking

The framework we've described keeps track of where messages are outstanding, and which operators hold capabilities for timestamps. Each scope also has a summary of the behavior of the operators, and its parent scope. These two together provide enough information (proof pending) for each scope to reason about which timestamps might be seen again on which inputs of which operators.

A distributed protocol for tracking progress.

Naiad had a distributed protocol for tracking progress, and we are going to use something quite similar here. There are some important differences, and it has only recently changed a bit to fix some bugs. So, this part may be a bit of a work in progress, but it does make some sense.

Naiad's progress tracking was based on the idea that as each worker does work, it can record changes to the counts of messages and capabilities of timestamps at various locations in the graph. If each worker produces a stream of these updates, and broadcasts it to other workers, each worker eventually sees all of the updates that happen across the computation.

However, this broadcasting happens asynchronously, and it is possible that some progress update overtake others. A worker might hear about a message received at one worker before it even hears from the worker that produced it. With a few careful restrictions, no problemo. As long as workers send updates about new work before they sent updates about retired work, no worker would ever incorrectly determine that a time had closed out before it actually does. There is a fairly long formal proof about why this works.

We are going to do something similar here, with a few minor tweaks related to our abstraction of subgraphs as operators. One can tweak the argument up above to require that workers sends batches of updates atomically, so that all corresponding increments and decrements are received together at each remote worker. In fact, Tom did just this, as part of a liveness argument for the distributed protocol.

With the above modification from streams of updates to streams of batches of updates, you can remove any ordering requirements on the batches of updates. As long as each batch arrives at remote workers intact, with both its increments and decrements, it doesn't matter what order they are received in. This might sound a little crazy, but there are still enough causal dependencies to keep the system in order:

  1. Someone must send a message before you can receive it.
  2. You must receive a message before you can announce receiving it.
  3. You must learn about message receipts before notifying an operator.

If an operator only sends messages in response to notifications, we get a chain of causal dependencies that ensure that one notification must happen before another, and some message receives must happen before other message sends. The main relaxation is that we don't require the order of execution of the updates to be reflected in the update stream.

This re-ordering property roughly says that it is safe for any worker to see any subset of batches of updates other workers produce. This is especially helpful for us, as nested subgraphs perform progress tracking reasoning independently, and do not (at least not without a good deal more work) coordinate their broadcasts with other scopes in the same worker.

Attention to detail

This protocol above works great in the abstract, but it can be very tricky to make sure that batches of updates move around atomically.

One race condition went unobserved until October, despite tests on 100s of workers. Until recently, each scope exchanged information about their individual actions and reported them upwards, then awaited information from the parent scope about whether messages still exist. This meant that it was possible a scope could report consuming a message to a parent, which could make its way to their peer on another worker, who would notify their corresponding child scope about no more messages, all before the first scope could notify its peer that it had held on to a capability, or whatever.

This required a bit of a rethink, which actually lead to the view above as subsetting batches of updates. At least, that got invented so that there would be a clear property to uphold and leave invariants to test. The problem here is that the batch of updates corresponding to "received some messages, held a capability" got split up. The first half went by way of the peer parent scopes, the second half went directly (if slowly) to the peer scope. That doesn't happen any more. In fact, in fixing the bug the protocol got a bit more efficient, because we make sure that each update gets moved exactly once (which wasn't true before, but not in an obviously unsafe way).

It is likely that the progress tracking code will evolve, but it has a narrow interface with relatively clear guarantees, and a distributed protocol with fairly simple invariants to maintain. It will be good to develop this further, and shake out as much parallelism as we can.

Asynchronous performance

One of the most appealing characteristics of this protocol is that it is totally asynchronous. Workers produce "exhaust" from their computations, and this information suffices to let other workers know how much work is done. If a worker A sends a message to B, who receives it and sends a message to C, who receives it and does nothing, the updates broadcast might look like

A : (T, +1)
B : (T, -1), (T', +1)
C : (T', -1)

A slightly lazy worker D who doesn't receive any messages can just receive the three progress updates and confirm that they cancel out and that it shouldn't expect anything more; it doesn't need to actively participate unless it does some work. Workers A, B, and C don't have to wait on D, who can be asleep if it really wants (or maybe working on something else). Nor do the worker explicitly syncronize; they can't fire notifications until they learn that work is done, but they can work on whatever else they like in the meantime.

The downside of the protocol is the broadcasting. In Naiad we started with a very simple version of the protocol, and found that there were lots of progress updates being sent around. We introduced several optimizations in the paper, including hierarchical aggregation and some clever buffering. The Rust codebase doesn't seem to suffer from this as much, possibly due to the abstraction boundaries suppressing spurious changes, it hasn't been exercised at the same scale, either.

Conclusions

Progress tracking in timely dataflow is not as simple as it is in traditional directed acyclic dataflow computations, or in batch scheduled computations. On the other hand, we get access to a low-overhead coordination mechanism that allows us to run distributed iterative computations with latencies in the microsecond range.

The abstraction boundary introduced for progress tracking is also useful for much more than simplifying programming: giving operators the capability to do their own progress tracking allows them to use different logic, which can be valuable when you want to slow down progress to track safely persisted data, or speed it up when you get the first results back from a replicated computation. Given the time, I'll be happy to talk more about these in future posts.