Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler underestimates data transfer cost for small transfers #5324

Open
gjoseph92 opened this issue Sep 16, 2021 · 7 comments · May be fixed by #5326
Open

Scheduler underestimates data transfer cost for small transfers #5324

gjoseph92 opened this issue Sep 16, 2021 · 7 comments · May be fixed by #5326
Labels
discussion Discussing a topic with no specific actions yet networking performance scheduling stealing

Comments

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Sep 16, 2021

I feel like this has been discussed tangentially in other places, but I couldn't find an issue just for this. The scheduler generally seems a little too eager to schedule tasks in a way that requires data transfer, when transfer could be avoided (#5253 aims to address another aspect of this).

In our worker_objective function used to estimate how long it will take before a given task can start running on a given worker, we just consider basically n_bytes_to_transfer / self.bandwidth. But there's a lot more than that that has to happen in reality.

Here are some things we're not accounting for:

  • Fixed-cost network latency between workers, both receiver->sender to request the data, and sender->receiver to send it
  • Serialization time on the sending worker
  • Deserialization time on the receiving worker
  • Possible disk-write (plus serialization) time on the receiving worker, if it has to spill some other key to disk to make room
  • Possible disk-read (plus deserialization) time on the sending worker, if the requested value is spilled
  • The worker has a queue of keys to fetch; there may already be keys ahead of this one waiting to be fetched (xref Should Worker.data_needed be priority-ordered? #5323)
  • The fetch itself is enqueued onto the worker's event loop, and is async. We've recently learned that the event loop can spend a surprising amount of time waiting for the GIL if user tasks also hold the GIL (Testing network performance #5258 (comment)). So worst-case there could be multiple (O(~10?)) 5ms pauses between the worker intending to fetch and a message even getting sent (let alone received) over the network
  • Transferring might slow down other tasks currently running on the sender/receiver (GIL, CPU to (de)serialize), plus may require nontrivial memory, potentially pausing or even killing one or both workers (not time-estimate related, but worth considering someday)

Now I'm not at all saying we should try to actually measure these things. And when the data to transfer is big, what we have is pretty accurate.

But we should probably add some fixed-cost penalty for any transfer (covering network latency, GIL-pausing, etc.). Ideally at least latency can be measured (is it already on the scheduler?).

We should also just make decide_worker less inclined to cause data transfer. Maybe, if one worker already holds all the dependencies for a task, only pick a different worker if:

  • we estimate a different worker could start the task some factor sooner (1.2x? 2x?)
  • the estimated delay on the no-transfer worker is non-trivial (> 100ms?)
Test to look at estimated vs actual transfer times
@gen_cluster(client=True)
async def test_transfer_cost(client, s, a, b):
    from dask.utils import format_time, format_bytes

    nbytes = 1024
    print(f"Transferring {format_bytes(nbytes)}")
    x = client.submit(lambda: "x" * nbytes, workers=a.address, key="x")
    await x
    bandwidth = s.bandwidth
    y = client.submit(lambda x: None, x, workers=b.address, key="y")
    await y

    data_creation_duration = s.get_task_duration(s.tasks["x"])
    print(f"Data creation: {format_time(data_creation_duration)}")

    actual_nbytes = s.tasks["x"].nbytes
    estimated_xfer_duration = actual_nbytes / bandwidth
    print(f"Estimated transfer time: {format_time(estimated_xfer_duration)}")

    xfer_duration = b.incoming_transfer_log[0]["duration"]
    print(f"Actual transfer time: {format_time(xfer_duration)}")

    avg_latency = (a.latency + b.latency) / 2
    print(f"Worker latency: {format_time(avg_latency)}")

    print(
        f"Transfer estimate is {data_creation_duration/estimated_xfer_duration:.1f}x faster than data creation, "
        f"{avg_latency/estimated_xfer_duration:.1f}x faster than worker latency, "
        f"{xfer_duration/estimated_xfer_duration:.1f}x faster than reality "
    )

    assert 0.1 < xfer_duration / estimated_xfer_duration < 10
Transferring 1.00 kiB
Data creation: 44.11 us
Estimated transfer time: 10.73 us
Actual transfer time: 9.37 ms
Worker latency: 2.07 ms
Transfer estimate is 4.1x faster than data creation, 193.2x faster than worker latency, 873.0x faster than reality 
Transferring 1.00 MiB
Data creation: 387.91 us
Estimated transfer time: 10.49 ms
Actual transfer time: 10.75 ms
Worker latency: 2.13 ms
Transfer estimate is 0.0x faster than data creation, 0.2x faster than worker latency, 1.0x faster than reality 

cc @fjetter @mrocklin

gjoseph92 added a commit to gjoseph92/distributed that referenced this issue Sep 16, 2021
If a dependency is already on every worker—or will end up on every worker regardless, because many things depend on it—we should ignore it when selecting our candidate workers. Otherwise, we'll end up considering every worker as a candidate, which is 1) slow and 2) often leads to poor choices (xref dask#5253, dask#5324).

Just like with root-ish tasks, this is particularly important at the beginning. Say we have a bunch of tasks `x, 0`..`x, 10` that each depend on `root, 0`..`root, 10` respectively, but every `x` also depends on one task called `everywhere`. If `x, 0` is ready first, but `root, 0` and `everywhere` live on different workers, it appears as though we have a legitimate choice to make: do we schedule near `root, 0`, or near `everywhere`? But if we choose to go closer to `everywhere`, we might have a short-term gain, but we've taken a spot that could have gone to better use in the near future. Say that `everywhere` worker is about to complete `root, 6`. Now `x, 6` may run on yet another worker (because `x, 0` is already running where it should have gone). This can cascade through all the `x`s, until we've transferred most `root` tasks to different workers (on top of `everywhere`, which we have to transfer everywhere no matter what).

The principle of this is the same as dask#4967: be more forward-looking in worker assignment and accept a little short-term slowness to ensure that downstream tasks have to transfer less data.

This PR is a binary choice, but I think we could actually generalize to some weight in `worker_objective` like: the more dependents or replicas a task has, the less weight we should give to the workers that hold it. I wonder if, barring significant data transfer inbalance, having stronger affinity for the more "rare" keys will tend to lead to better placement.
@fjetter
Copy link
Member

fjetter commented Sep 16, 2021

This might also be connected to some bandwidth bias I discovered in #4962

We only take into account bandwidth measurements if the byte size is beyond a threshold

if total_bytes > 1000000:
self.bandwidth = self.bandwidth * 0.95 + bandwidth * 0.05
bw, cnt = self.bandwidth_workers[worker]
self.bandwidth_workers[worker] = (bw + bandwidth, cnt + 1)
types = set(map(type, response["data"].values()))
if len(types) == 1:
[typ] = types
bw, cnt = self.bandwidth_types[typ]
self.bandwidth_types[typ] = (bw + bandwidth, cnt + 1)
biasing our measurements. If the bandwidth is perceived to be too lage, the cost is underestimated.

@mrocklin
Copy link
Member

mrocklin commented Sep 16, 2021 via email

@gjoseph92
Copy link
Collaborator Author

Yeah I experimented a little with turning off that threshold. What was the motivation for it?

Do we have a measure of latency on the scheduler side? Workers have self.latency, but I don't see an equivalent on the scheduler. Maybe worker's latency should be included in heartbeat metrics?

Maybe we want to add 10ms?

That sounds generally reasonable. Could be nice to have something latency-based too though.

@gjoseph92 gjoseph92 linked a pull request Sep 17, 2021 that will close this issue
3 tasks
@fjetter
Copy link
Member

fjetter commented Sep 17, 2021

That sounds generally reasonable. Could be nice to have something latency-based too though.

The worker measures the latency upon every heartbeat but it isn't used anywhere. It should be easy enough to sync this to the scheduler

@gjoseph92
Copy link
Collaborator Author

Sure. One small thing I like about the latency measure on the worker side is that it's updated as soon as the worker connects to the scheduler (doesn't have to wait for a heartbeat cycle). Ideally it would be nice to have the same on the scheduler side (get it on first connection without waiting for the first heartbeat), mostly just for tests.

But for now I'll send worker latency in metrics. Do we want to track a single average latency on the scheduler? Or per-WorkerState?

@fjetter
Copy link
Member

fjetter commented Sep 20, 2021

Do we want to track a single average latency on the scheduler? Or per-WorkerState?

I would suggest to do this per worker since some may be colocated, others may be on a very busy host, slow network, etc. I'm not sure if this makes a huge difference but it's very easy to measure, after all

@gjoseph92
Copy link
Collaborator Author

It definitely is easier. Though the worker's latency measurement is of it talking to the scheduler. Whereas the latency that we'll actually incur is the worker talking to one of its peers. So if network topology is inconsistent, this estimate could be inaccurate. But the whole thing is pretty inaccurate anyway, so I think it's fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Discussing a topic with no specific actions yet networking performance scheduling stealing
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants