Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing network performance #5258

Open
gjoseph92 opened this issue Aug 24, 2021 · 27 comments
Open

Testing network performance #5258

gjoseph92 opened this issue Aug 24, 2021 · 27 comments
Labels
discussion Discussing a topic with no specific actions yet networking

Comments

@gjoseph92
Copy link
Collaborator

I wrote a quick script to test network performance between workers.

We try 3 approaches:

  • Make the scheduler transfer a DataFrame through task dependencies.

    This includes client < -- > scheduler < -- > workers communication in transfer time, so isn't actually a good measure of bandwidth, but is maybe still an interesting number, since it's closer to (but still an over-estimation of) a measure of the real-life bandwidth a data transfer would experience.

  • Add simple comms handlers and send the DataFrame between workers directly.

  • Use iperf3 to test raw TCP network (and disk) performance, to get an upper bound on what we should expect from the network

This script was a quick hack (DataFrame size doesn't match iperf3 transfer size, for instance) but others might still find it interesting.

The script
import time

from dask.utils import format_bytes
import distributed
from distributed.comm.addressing import parse_address, parse_host_port
from distributed.protocol import to_serialize
import numpy as np
import pandas as pd
import coiled


def test_tasks(client: distributed.Client):
    "Test network performance using tasks (scheduler forces a transfer)"
    client.wait_for_workers(2)
    client.restart()
    a, b, *workers = client.scheduler_info()["workers"]

    print(f"send: {a} recv: {b} - performance over task `get_data`")

    # Store data on a global variable so we don't have to recompute
    distributed.wait(
        client.submit(
            lambda: setattr(
                distributed, "GLOBAL_DF", pd.DataFrame(np.random.random((30_000, 1000)))
            ),
            workers=[a],
            pure=False,
        )
    )

    size = client.submit(
        lambda: distributed.GLOBAL_DF.memory_usage().sum(), workers=[a]
    ).result()

    for i in range(15):
        dff = client.submit(
            lambda: distributed.GLOBAL_DF,
            workers=[a],
            pure=False,
        )

        start = time.perf_counter()
        distributed.wait(client.submit(lambda df: None, dff, workers=[b]))
        elapsed = time.perf_counter() - start

        print(
            f"{format_bytes(size)}: {elapsed:.2f}sec, {format_bytes(size / elapsed)}/sec"
        )

    # Clean up the global variable
    distributed.wait(
        client.submit(
            lambda: delattr(distributed, "GLOBAL_DF"), workers=[a], pure=False
        )
    )


def test_handlers(client: distributed.Client):
    "Test network performance using pure comms handlers"
    client.wait_for_workers(2)
    client.restart()
    a, b = client.scheduler_info()["workers"]

    print(f"send: {a} recv: {b} - performance over comms handler")

    async def send(dask_worker: distributed.Worker):
        df = pd.DataFrame(np.random.random((30_000, 1000)))
        dask_worker._send_size = df.memory_usage().sum()
        s = to_serialize(df)
        dask_worker._send_times = []
        while True:
            start = time.perf_counter()
            await dask_worker.rpc(b).stuff_receive(data=s)
            elapsed = time.perf_counter() - start
            dask_worker._send_times.append(elapsed)

    def add_receiver(dask_worker: distributed.Worker):
        def receive(comm, data=None):
            pass

        dask_worker.handlers["stuff_receive"] = receive

    client.run(add_receiver, workers=[b])
    client.run(send, workers=[a], wait=False)

    def get_times(dask_worker: distributed.Worker):
        times = dask_worker._send_times
        dask_worker._send_times = []

        return dask_worker._send_size, times

    for i in range(8):
        time.sleep(2)
        size, times = client.run(get_times, workers=[a])[a]
        for t in times:
            print(f"{format_bytes(size)}: {t:.2f}sec, {format_bytes(size / t)}/sec")

    # TODO stop send coroutine and clean up handlers


def test_iperf(client: distributed.Client):
    "Install iperf on workers and test network and disk performance with it"
    import subprocess

    client.wait_for_workers(2)
    client.restart()

    try:
        client.run(
            subprocess.run,
            "iperf3 -v",
            shell=True,
            check=True,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )
    except subprocess.CalledProcessError:
        print("Installing iperf3 on workers from conda...")
        client.run(
            subprocess.run, "conda install -c conda-forge iperf", shell=True, check=True
        )

    a, b = client.scheduler_info()["workers"]
    a_ip = parse_host_port(parse_address(a)[1])[0]
    b_ip = parse_host_port(parse_address(b)[1])[0]

    print(f"A: {a} B: {b} - performance from iperf3")

    print("B memory -> A memory")
    # start iperf server (daemon & oneshot mode so `client.run` doesn't block)
    client.run(
        subprocess.run, "iperf3 -s -D -1 -p 5001", shell=True, check=True, workers=[a]
    )
    result = client.run(
        subprocess.run,
        f"iperf3 -c {a_ip} -p 5001 -f M -t 30",
        shell=True,
        capture_output=True,
        text=True,
        workers=[b],
    )
    proc = result[b]
    print(proc.stdout)
    if proc.returncode != 0:
        print(proc.stderr)
        return

    print("B memory -> A disk")
    # reference: https://fasterdata.es.net/performance-testing/network-troubleshooting-tools/iperf/disk-testing-using-iperf/
    client.run(
        subprocess.run,
        "iperf3 -s -D -1 -p 5001 -F iperf_out",
        shell=True,
        workers=[a],
    )
    time.sleep(0.5)
    result = client.run(
        subprocess.run,
        f"iperf3 -c {a_ip} -p 5001 -f M -t 15",
        shell=True,
        capture_output=True,
        text=True,
        workers=[b],
    )
    proc = result[b]
    print(proc.stdout)
    if proc.returncode != 0:
        print(proc.stderr)
        return

    print("A disk -> B disk")
    client.run(
        subprocess.run,
        "iperf3 -s -D -1 -p 5001 -F iperf_out",
        shell=True,
        workers=[b],
    )
    result = client.run(
        subprocess.run,
        f"iperf3 -c {b_ip} -p 5001 -f M -F iperf_out -t 30",
        shell=True,
        capture_output=True,
        text=True,
        workers=[a],
    )
    proc = result[a]
    print(proc.stdout)
    if proc.returncode != 0:
        print(proc.stderr)
        return

    client.run(
        subprocess.run,
        "rm iperf_out",
        shell=True,
        check=True,
    )


if __name__ == "__main__":
    with distributed.Client(
        memory_limit=None,
        n_workers=2,
        processes=True,
        worker_class=distributed.Worker,
        threads_per_worker=1,
        scheduler_port=8786,
    ) as client:

        # cluster = coiled.Cluster(
        #     name="perf",
        #     software="gjoseph92/shuffleservice",
        #     n_workers=2,
        #     worker_cpu=2,
        #     worker_memory="2GiB",
        #     scheduler_cpu=1,
        #     scheduler_memory="2GiB",
        # )
        # with distributed.Client(cluster) as client:
        test_tasks(client)
        test_handlers(client)
        test_iperf(client)

Initial results:

On a Coiled cluster (docker on AWS EC2 VMs; don't know the exact instance type, but I requested 2CPU and 2GiB memory, so something low-end):

  • task dependencies: 180-280MiB/sec
  • comms handler: 290-330MiB/sec
  • iperf3 raw TCP: 590 MBytes/sec
  • iperf3 raw TCP -> disk: 44.2 MBytes/sec
  • iperf3 disk -> TCP -> disk: 5.46 MBytes/sec

So dask's networking is only half as fast as raw TCP here. That's better than I expected actually.
Using comms handlers directly is faster, though not hugely. Also not surprising.

On these low-end EC2 nodes, networking is slow. And disk is very slow.

Full results
(env) gabe dask-playground/shuffle-service » python network.py
Using existing cluster: 'perf'
send: tls://10.6.20.175:33093 recv: tls://10.6.31.53:42065 - performance over task `get_data`
228.88 MiB: 1.12sec, 204.94 MiB/sec
228.88 MiB: 1.06sec, 216.87 MiB/sec
228.88 MiB: 1.14sec, 200.35 MiB/sec
228.88 MiB: 1.20sec, 190.22 MiB/sec
228.88 MiB: 1.23sec, 186.78 MiB/sec
228.88 MiB: 0.84sec, 272.11 MiB/sec
228.88 MiB: 0.87sec, 261.72 MiB/sec
228.88 MiB: 0.87sec, 264.40 MiB/sec
228.88 MiB: 0.82sec, 278.06 MiB/sec
228.88 MiB: 0.89sec, 256.02 MiB/sec
send: tls://10.6.20.175:43659 recv: tls://10.6.31.53:46709 - performance over comms handler
228.88 MiB: 0.79sec, 290.32 MiB/sec
228.88 MiB: 0.68sec, 336.06 MiB/sec
228.88 MiB: 0.85sec, 268.17 MiB/sec
228.88 MiB: 0.93sec, 245.97 MiB/sec
228.88 MiB: 0.68sec, 334.61 MiB/sec
228.88 MiB: 0.74sec, 308.71 MiB/sec
228.88 MiB: 0.70sec, 328.58 MiB/sec
228.88 MiB: 0.75sec, 303.90 MiB/sec
228.88 MiB: 0.79sec, 288.88 MiB/sec
228.88 MiB: 0.75sec, 304.38 MiB/sec
228.88 MiB: 0.73sec, 315.54 MiB/sec
228.88 MiB: 0.75sec, 303.72 MiB/sec
228.88 MiB: 0.72sec, 319.23 MiB/sec
228.88 MiB: 1.12sec, 204.12 MiB/sec
228.88 MiB: 0.77sec, 298.89 MiB/sec
228.88 MiB: 0.74sec, 307.82 MiB/sec
228.88 MiB: 0.78sec, 292.28 MiB/sec
228.88 MiB: 0.72sec, 318.10 MiB/sec
228.88 MiB: 0.85sec, 268.82 MiB/sec
228.88 MiB: 0.82sec, 279.50 MiB/sec
228.88 MiB: 0.74sec, 310.35 MiB/sec
228.88 MiB: 0.78sec, 294.62 MiB/sec
228.88 MiB: 0.77sec, 295.47 MiB/sec
228.88 MiB: 0.70sec, 327.29 MiB/sec
228.88 MiB: 0.78sec, 294.05 MiB/sec
228.88 MiB: 0.68sec, 335.36 MiB/sec
A: tls://10.6.20.175:45103 B: tls://10.6.31.53:39869 - performance from iperf3
B memory -> A memory
Connecting to host 10.6.20.175, port 5001
[  5] local 10.6.31.53 port 58252 connected to 10.6.20.175 port 5001
[ ID] Interval           Transfer     Bitrate         Retr  Cwnd
[  5]   0.00-1.00   sec   597 MBytes   597 MBytes/sec    0   1.89 MBytes       
[  5]   1.00-2.00   sec   592 MBytes   592 MBytes/sec    0   2.44 MBytes       
[  5]   2.00-3.00   sec   589 MBytes   589 MBytes/sec    0   2.85 MBytes       
[  5]   3.00-4.00   sec   588 MBytes   587 MBytes/sec    0   3.00 MBytes       
[  5]   4.00-5.00   sec   590 MBytes   590 MBytes/sec    0   3.00 MBytes       
[  5]   5.00-6.00   sec   580 MBytes   580 MBytes/sec   46   2.26 MBytes       
[  5]   6.00-7.00   sec   594 MBytes   594 MBytes/sec    0   2.45 MBytes       
[  5]   7.00-8.00   sec   590 MBytes   590 MBytes/sec    2   1.97 MBytes       
[  5]   8.00-9.00   sec   591 MBytes   591 MBytes/sec    0   2.13 MBytes       
[  5]   9.00-10.00  sec   591 MBytes   591 MBytes/sec    0   2.24 MBytes       
[  5]  10.00-11.00  sec   588 MBytes   588 MBytes/sec    0   2.29 MBytes       
[  5]  11.00-12.00  sec   585 MBytes   585 MBytes/sec   15   1.66 MBytes       
[  5]  12.00-13.00  sec   588 MBytes   587 MBytes/sec    0   1.94 MBytes       
[  5]  13.00-14.00  sec   590 MBytes   590 MBytes/sec    0   2.13 MBytes       
[  5]  14.00-15.00  sec   592 MBytes   592 MBytes/sec    0   2.23 MBytes       
[  5]  15.00-16.00  sec   592 MBytes   593 MBytes/sec    0   2.27 MBytes       
[  5]  16.00-17.00  sec   592 MBytes   593 MBytes/sec    0   2.30 MBytes       
[  5]  17.00-18.00  sec   592 MBytes   592 MBytes/sec    8   1.78 MBytes       
[  5]  18.00-19.00  sec   592 MBytes   592 MBytes/sec    0   2.07 MBytes       
[  5]  19.00-20.00  sec   592 MBytes   593 MBytes/sec    0   2.17 MBytes       
[  5]  20.00-21.00  sec   590 MBytes   590 MBytes/sec    0   2.24 MBytes       
[  5]  21.00-22.00  sec   594 MBytes   594 MBytes/sec    0   2.30 MBytes       
[  5]  22.00-23.00  sec   586 MBytes   586 MBytes/sec    0   2.30 MBytes       
[  5]  23.00-24.00  sec   592 MBytes   593 MBytes/sec    0   2.30 MBytes       
[  5]  24.00-25.00  sec   594 MBytes   594 MBytes/sec    0   2.31 MBytes       
[  5]  25.00-26.00  sec   581 MBytes   581 MBytes/sec    0   2.36 MBytes       
[  5]  26.00-27.00  sec   592 MBytes   592 MBytes/sec    0   2.39 MBytes       
[  5]  27.00-28.00  sec   592 MBytes   593 MBytes/sec    0   2.62 MBytes       
[  5]  28.00-29.00  sec   592 MBytes   592 MBytes/sec    0   2.73 MBytes       
[  5]  29.00-30.00  sec   594 MBytes   594 MBytes/sec    0   2.73 MBytes       
- - - - - - - - - - - - - - - - - - - - - - - - -
[ ID] Interval           Transfer     Bitrate         Retr
[  5]   0.00-30.00  sec  17.3 GBytes   591 MBytes/sec   71             sender
[  5]   0.00-30.00  sec  17.3 GBytes   590 MBytes/sec                  receiver

iperf Done.

B memory -> A disk
Connecting to host 10.6.20.175, port 5001
[  5] local 10.6.31.53 port 58258 connected to 10.6.20.175 port 5001
[ ID] Interval           Transfer     Bitrate         Retr  Cwnd
[  5]   0.00-1.00   sec  48.8 MBytes  48.8 MBytes/sec    0    288 KBytes       
[  5]   1.00-2.00   sec  45.3 MBytes  45.3 MBytes/sec    0    297 KBytes       
[  5]   2.00-3.00   sec  46.0 MBytes  46.0 MBytes/sec    0    297 KBytes       
[  5]   3.00-4.00   sec  46.7 MBytes  46.7 MBytes/sec    0    315 KBytes       
[  5]   4.00-5.00   sec  46.6 MBytes  46.6 MBytes/sec    0    350 KBytes       
[  5]   5.00-6.00   sec  45.0 MBytes  45.0 MBytes/sec    0    350 KBytes       
[  5]   6.00-7.00   sec  44.5 MBytes  44.5 MBytes/sec    0    350 KBytes       
[  5]   7.00-8.00   sec  45.5 MBytes  45.5 MBytes/sec    0    350 KBytes       
[  5]   8.00-9.00   sec  45.2 MBytes  45.2 MBytes/sec    0    350 KBytes       
[  5]   9.00-10.00  sec  45.5 MBytes  45.5 MBytes/sec    0    350 KBytes       
[  5]  10.00-11.00  sec  45.5 MBytes  45.5 MBytes/sec    0    350 KBytes       
[  5]  11.00-12.00  sec  42.7 MBytes  42.7 MBytes/sec    0    350 KBytes       
[  5]  12.00-13.00  sec  45.0 MBytes  45.0 MBytes/sec    0    350 KBytes       
[  5]  13.00-14.00  sec  43.0 MBytes  43.0 MBytes/sec    0    350 KBytes       
[  5]  14.00-15.00  sec  41.5 MBytes  41.5 MBytes/sec    0    350 KBytes       
[  5]  15.00-16.00  sec  41.0 MBytes  41.0 MBytes/sec    0    350 KBytes       
[  5]  16.00-17.00  sec  43.6 MBytes  43.5 MBytes/sec    0    350 KBytes       
[  5]  17.00-18.00  sec  43.6 MBytes  43.6 MBytes/sec    0    350 KBytes       
[  5]  18.00-19.00  sec  45.6 MBytes  45.6 MBytes/sec    0    350 KBytes       
[  5]  19.00-20.00  sec  46.1 MBytes  46.1 MBytes/sec    0    350 KBytes       
[  5]  20.00-21.00  sec  45.8 MBytes  45.8 MBytes/sec    0    350 KBytes       
[  5]  21.00-22.00  sec  42.5 MBytes  42.5 MBytes/sec    0    350 KBytes       
[  5]  22.00-23.00  sec  43.3 MBytes  43.3 MBytes/sec    0    367 KBytes       
[  5]  23.00-24.00  sec  42.1 MBytes  42.1 MBytes/sec    0    367 KBytes       
[  5]  24.00-25.00  sec  43.1 MBytes  43.1 MBytes/sec    0    367 KBytes       
[  5]  25.00-26.00  sec  43.9 MBytes  43.9 MBytes/sec    0    385 KBytes       
[  5]  26.00-27.00  sec  43.1 MBytes  43.1 MBytes/sec    0    385 KBytes       
[  5]  27.00-28.00  sec  42.1 MBytes  42.1 MBytes/sec    0    385 KBytes       
[  5]  28.00-29.00  sec  42.1 MBytes  42.1 MBytes/sec    0    385 KBytes       
[  5]  29.00-30.00  sec  43.6 MBytes  43.6 MBytes/sec    0    385 KBytes       
- - - - - - - - - - - - - - - - - - - - - - - - -
[ ID] Interval           Transfer     Bitrate         Retr
[  5]   0.00-30.00  sec  1.30 GBytes  44.3 MBytes/sec    0             sender
[  5]   0.00-30.00  sec  1.29 GBytes  44.2 MBytes/sec                  receiver

iperf Done.

A disk -> B disk
Connecting to host 10.6.31.53, port 5001
[  5] local 10.6.20.175 port 59674 connected to 10.6.31.53 port 5001
[ ID] Interval           Transfer     Bitrate         Retr  Cwnd
[  5]   0.00-1.00   sec  9.75 MBytes  9.75 MBytes/sec   11    271 KBytes       
[  5]   1.00-2.00   sec  4.84 MBytes  4.84 MBytes/sec   11    271 KBytes       
[  5]   2.00-3.00   sec  5.79 MBytes  5.79 MBytes/sec   12    271 KBytes       
[  5]   3.00-4.00   sec  5.20 MBytes  5.20 MBytes/sec   11    271 KBytes       
[  5]   4.00-5.00   sec  5.02 MBytes  5.02 MBytes/sec    9    271 KBytes       
[  5]   5.00-6.00   sec  5.14 MBytes  5.14 MBytes/sec   11    271 KBytes       
[  5]   6.00-7.00   sec  4.48 MBytes  4.48 MBytes/sec    9    271 KBytes       
[  5]   7.00-8.00   sec  6.27 MBytes  6.27 MBytes/sec   13    271 KBytes       
[  5]   8.00-9.00   sec  7.29 MBytes  7.29 MBytes/sec   12    271 KBytes       
[  5]   9.00-10.00  sec  5.44 MBytes  5.44 MBytes/sec   12    271 KBytes       
[  5]  10.00-11.00  sec  5.91 MBytes  5.91 MBytes/sec   11    271 KBytes       
[  5]  11.00-12.00  sec  5.32 MBytes  5.32 MBytes/sec   11    271 KBytes       
[  5]  12.00-13.00  sec  5.14 MBytes  5.14 MBytes/sec   10    271 KBytes       
[  5]  13.00-14.00  sec  5.79 MBytes  5.79 MBytes/sec   12    271 KBytes       
[  5]  14.00-15.00  sec  5.26 MBytes  5.25 MBytes/sec   10    271 KBytes       
[  5]  15.00-16.00  sec  5.62 MBytes  5.62 MBytes/sec   13    271 KBytes       
[  5]  16.00-17.00  sec  5.50 MBytes  5.50 MBytes/sec   11    271 KBytes       
[  5]  17.00-18.00  sec  4.84 MBytes  4.84 MBytes/sec   10    271 KBytes       
[  5]  18.00-19.00  sec  5.14 MBytes  5.14 MBytes/sec   10    271 KBytes       
[  5]  19.00-20.00  sec  5.50 MBytes  5.50 MBytes/sec   10    271 KBytes       
[  5]  20.00-21.00  sec  4.66 MBytes  4.66 MBytes/sec   10    271 KBytes       
[  5]  21.00-22.00  sec  5.38 MBytes  5.37 MBytes/sec   10    271 KBytes       
[  5]  22.00-23.00  sec  5.38 MBytes  5.38 MBytes/sec   12    271 KBytes       
[  5]  23.00-24.00  sec  5.74 MBytes  5.74 MBytes/sec   11    271 KBytes       
[  5]  24.00-25.00  sec  4.00 MBytes  4.00 MBytes/sec    8    271 KBytes       
[  5]  25.00-26.00  sec  4.84 MBytes  4.84 MBytes/sec   10    271 KBytes       
[  5]  26.00-27.00  sec  5.02 MBytes  5.02 MBytes/sec   10    271 KBytes       
[  5]  27.00-28.00  sec  4.90 MBytes  4.90 MBytes/sec    9    271 KBytes       
[  5]  28.00-29.00  sec  5.74 MBytes  5.73 MBytes/sec   11    271 KBytes       
[  5]  29.00-30.00  sec  4.96 MBytes  4.96 MBytes/sec   11    271 KBytes       
- - - - - - - - - - - - - - - - - - - - - - - - -
[ ID] Interval           Transfer     Bitrate         Retr
[  5]   0.00-30.00  sec   164 MBytes  5.46 MBytes/sec  321             sender
        Sent  164 MByte / 1.29 GByte (12%) of iperf_out
[  5]   0.00-30.03  sec   162 MBytes  5.38 MBytes/sec                  receiver

iperf Done.

Locally on my mac:

  • task depenencies: ~1.7 GiB/sec
  • comms hander: ~2.4 GiB/sec
  • raw TCP: 7.5 GiB/sec
Full results
(env) gabe dask-playground/shuffle-service » python network.py
/Users/gabe/dev/dask-playground/env/lib/python3.9/site-packages/pandas/compat/__init__.py:124: UserWarning: Could not import the lzma module. Your installed Python is incomplete. Attempting to use lzma compression will result in a RuntimeError.
  warnings.warn(msg)
/Users/gabe/dev/dask-playground/env/lib/python3.9/site-packages/setuptools/distutils_patch.py:25: UserWarning: Distutils was imported before Setuptools. This usage is discouraged and may exhibit undesirable behaviors or errors. Please use Setuptools' objects directly or at least import Setuptools first.
  warnings.warn(
send: tcp://127.0.0.1:57650 recv: tcp://127.0.0.1:57652 - performance over task `get_data`
228.88 MiB: 0.22sec, 1.02 GiB/sec
228.88 MiB: 0.13sec, 1.74 GiB/sec
228.88 MiB: 0.15sec, 1.53 GiB/sec
228.88 MiB: 0.12sec, 1.89 GiB/sec
228.88 MiB: 0.12sec, 1.86 GiB/sec
228.88 MiB: 0.12sec, 1.81 GiB/sec
228.88 MiB: 0.12sec, 1.82 GiB/sec
228.88 MiB: 0.12sec, 1.83 GiB/sec
228.88 MiB: 0.15sec, 1.51 GiB/sec
228.88 MiB: 0.13sec, 1.76 GiB/sec
228.88 MiB: 0.12sec, 1.91 GiB/sec
228.88 MiB: 0.11sec, 1.98 GiB/sec
228.88 MiB: 0.12sec, 1.83 GiB/sec
228.88 MiB: 0.12sec, 1.87 GiB/sec
228.88 MiB: 0.14sec, 1.62 GiB/sec
send: tcp://127.0.0.1:57650 recv: tcp://127.0.0.1:57652 - performance over comms handler
228.88 MiB: 0.17sec, 1.32 GiB/sec
228.88 MiB: 0.10sec, 2.33 GiB/sec
228.88 MiB: 0.10sec, 2.35 GiB/sec
228.88 MiB: 0.10sec, 2.35 GiB/sec
228.88 MiB: 0.09sec, 2.36 GiB/sec
228.88 MiB: 0.09sec, 2.39 GiB/sec
228.88 MiB: 0.10sec, 2.30 GiB/sec
228.88 MiB: 0.10sec, 2.28 GiB/sec
228.88 MiB: 0.09sec, 2.37 GiB/sec
228.88 MiB: 0.09sec, 2.37 GiB/sec
228.88 MiB: 0.09sec, 2.35 GiB/sec
228.88 MiB: 0.10sec, 2.35 GiB/sec
228.88 MiB: 0.10sec, 2.31 GiB/sec
228.88 MiB: 0.09sec, 2.37 GiB/sec
228.88 MiB: 0.10sec, 2.28 GiB/sec
228.88 MiB: 0.09sec, 2.38 GiB/sec
228.88 MiB: 0.10sec, 2.35 GiB/sec
228.88 MiB: 0.10sec, 2.29 GiB/sec
228.88 MiB: 0.09sec, 2.37 GiB/sec
228.88 MiB: 0.10sec, 2.32 GiB/sec
228.88 MiB: 0.10sec, 2.13 GiB/sec
228.88 MiB: 0.10sec, 2.18 GiB/sec
228.88 MiB: 0.10sec, 2.32 GiB/sec
228.88 MiB: 0.09sec, 2.38 GiB/sec
228.88 MiB: 0.10sec, 2.25 GiB/sec
228.88 MiB: 0.09sec, 2.38 GiB/sec
228.88 MiB: 0.09sec, 2.37 GiB/sec
228.88 MiB: 0.10sec, 2.28 GiB/sec
228.88 MiB: 0.10sec, 2.34 GiB/sec
228.88 MiB: 0.09sec, 2.41 GiB/sec
228.88 MiB: 0.09sec, 2.36 GiB/sec
228.88 MiB: 0.09sec, 2.36 GiB/sec
228.88 MiB: 0.10sec, 2.31 GiB/sec
228.88 MiB: 0.10sec, 2.33 GiB/sec
228.88 MiB: 0.09sec, 2.39 GiB/sec
228.88 MiB: 0.09sec, 2.40 GiB/sec
228.88 MiB: 0.10sec, 2.35 GiB/sec
228.88 MiB: 0.10sec, 2.27 GiB/sec
228.88 MiB: 0.11sec, 2.11 GiB/sec
228.88 MiB: 0.11sec, 2.01 GiB/sec
228.88 MiB: 0.11sec, 2.09 GiB/sec
228.88 MiB: 0.11sec, 2.04 GiB/sec
228.88 MiB: 0.10sec, 2.24 GiB/sec
228.88 MiB: 0.10sec, 2.28 GiB/sec
228.88 MiB: 0.10sec, 2.34 GiB/sec
228.88 MiB: 0.10sec, 2.34 GiB/sec
228.88 MiB: 0.10sec, 2.27 GiB/sec
228.88 MiB: 0.10sec, 2.16 GiB/sec
228.88 MiB: 0.10sec, 2.19 GiB/sec
228.88 MiB: 0.10sec, 2.24 GiB/sec
228.88 MiB: 0.10sec, 2.22 GiB/sec
228.88 MiB: 0.10sec, 2.22 GiB/sec
228.88 MiB: 0.10sec, 2.19 GiB/sec
228.88 MiB: 0.10sec, 2.24 GiB/sec
228.88 MiB: 0.09sec, 2.36 GiB/sec
228.88 MiB: 0.09sec, 2.35 GiB/sec
228.88 MiB: 0.10sec, 2.25 GiB/sec
228.88 MiB: 0.10sec, 2.16 GiB/sec
228.88 MiB: 0.12sec, 1.91 GiB/sec
228.88 MiB: 0.10sec, 2.26 GiB/sec
228.88 MiB: 0.10sec, 2.27 GiB/sec
228.88 MiB: 0.11sec, 2.02 GiB/sec
228.88 MiB: 0.15sec, 1.48 GiB/sec
228.88 MiB: 0.10sec, 2.25 GiB/sec
228.88 MiB: 0.10sec, 2.16 GiB/sec
228.88 MiB: 0.10sec, 2.23 GiB/sec
228.88 MiB: 0.10sec, 2.15 GiB/sec
228.88 MiB: 0.11sec, 1.98 GiB/sec
228.88 MiB: 0.10sec, 2.20 GiB/sec
228.88 MiB: 0.10sec, 2.15 GiB/sec
228.88 MiB: 0.10sec, 2.25 GiB/sec
228.88 MiB: 0.10sec, 2.26 GiB/sec
228.88 MiB: 0.10sec, 2.28 GiB/sec
228.88 MiB: 0.09sec, 2.37 GiB/sec
228.88 MiB: 0.09sec, 2.37 GiB/sec
228.88 MiB: 0.10sec, 2.29 GiB/sec
228.88 MiB: 0.11sec, 2.00 GiB/sec
228.88 MiB: 0.10sec, 2.29 GiB/sec
228.88 MiB: 0.10sec, 2.19 GiB/sec
228.88 MiB: 0.11sec, 2.09 GiB/sec
228.88 MiB: 0.10sec, 2.23 GiB/sec
228.88 MiB: 0.13sec, 1.72 GiB/sec
228.88 MiB: 0.11sec, 2.10 GiB/sec
228.88 MiB: 0.09sec, 2.37 GiB/sec
228.88 MiB: 0.09sec, 2.36 GiB/sec
228.88 MiB: 0.09sec, 2.36 GiB/sec
228.88 MiB: 0.10sec, 2.30 GiB/sec
228.88 MiB: 0.10sec, 2.13 GiB/sec
228.88 MiB: 0.10sec, 2.35 GiB/sec
228.88 MiB: 0.09sec, 2.36 GiB/sec
228.88 MiB: 0.09sec, 2.36 GiB/sec
228.88 MiB: 0.10sec, 2.16 GiB/sec
228.88 MiB: 0.12sec, 1.81 GiB/sec
228.88 MiB: 0.10sec, 2.32 GiB/sec
228.88 MiB: 0.09sec, 2.38 GiB/sec
228.88 MiB: 0.09sec, 2.38 GiB/sec
228.88 MiB: 0.10sec, 2.14 GiB/sec
228.88 MiB: 0.10sec, 2.21 GiB/sec
228.88 MiB: 0.10sec, 2.35 GiB/sec
228.88 MiB: 0.10sec, 2.32 GiB/sec
228.88 MiB: 0.10sec, 2.31 GiB/sec
228.88 MiB: 0.10sec, 2.21 GiB/sec
228.88 MiB: 0.12sec, 1.81 GiB/sec
228.88 MiB: 0.10sec, 2.32 GiB/sec
228.88 MiB: 0.09sec, 2.37 GiB/sec
228.88 MiB: 0.10sec, 2.32 GiB/sec
228.88 MiB: 0.10sec, 2.32 GiB/sec
228.88 MiB: 0.10sec, 2.23 GiB/sec
228.88 MiB: 0.09sec, 2.36 GiB/sec
228.88 MiB: 0.10sec, 2.34 GiB/sec
228.88 MiB: 0.10sec, 2.32 GiB/sec
228.88 MiB: 0.10sec, 2.32 GiB/sec
228.88 MiB: 0.10sec, 2.29 GiB/sec
228.88 MiB: 0.12sec, 1.92 GiB/sec
228.88 MiB: 0.10sec, 2.30 GiB/sec
228.88 MiB: 0.10sec, 2.35 GiB/sec
228.88 MiB: 0.10sec, 2.31 GiB/sec
228.88 MiB: 0.10sec, 2.30 GiB/sec
228.88 MiB: 0.10sec, 2.34 GiB/sec
228.88 MiB: 0.10sec, 2.35 GiB/sec
228.88 MiB: 0.09sec, 2.39 GiB/sec
228.88 MiB: 0.09sec, 2.36 GiB/sec
228.88 MiB: 0.10sec, 2.22 GiB/sec
228.88 MiB: 0.13sec, 1.77 GiB/sec
228.88 MiB: 0.10sec, 2.26 GiB/sec
228.88 MiB: 0.10sec, 2.34 GiB/sec
228.88 MiB: 0.10sec, 2.30 GiB/sec
228.88 MiB: 0.10sec, 2.23 GiB/sec
228.88 MiB: 0.10sec, 2.35 GiB/sec
228.88 MiB: 0.10sec, 2.34 GiB/sec
228.88 MiB: 0.10sec, 2.35 GiB/sec
228.88 MiB: 0.10sec, 2.35 GiB/sec
228.88 MiB: 0.10sec, 2.31 GiB/sec
228.88 MiB: 0.09sec, 2.40 GiB/sec
228.88 MiB: 0.12sec, 1.92 GiB/sec
228.88 MiB: 0.09sec, 2.36 GiB/sec
228.88 MiB: 0.09sec, 2.37 GiB/sec
228.88 MiB: 0.10sec, 2.31 GiB/sec
228.88 MiB: 0.09sec, 2.36 GiB/sec
228.88 MiB: 0.09sec, 2.39 GiB/sec
228.88 MiB: 0.09sec, 2.38 GiB/sec
228.88 MiB: 0.10sec, 2.35 GiB/sec
228.88 MiB: 0.10sec, 2.28 GiB/sec
228.88 MiB: 0.10sec, 2.30 GiB/sec
228.88 MiB: 0.12sec, 1.87 GiB/sec
228.88 MiB: 0.10sec, 2.32 GiB/sec
228.88 MiB: 0.10sec, 2.35 GiB/sec
228.88 MiB: 0.10sec, 2.26 GiB/sec
228.88 MiB: 0.09sec, 2.36 GiB/sec
228.88 MiB: 0.09sec, 2.40 GiB/sec
228.88 MiB: 0.09sec, 2.37 GiB/sec
228.88 MiB: 0.09sec, 2.36 GiB/sec
228.88 MiB: 0.10sec, 2.32 GiB/sec
228.88 MiB: 0.10sec, 2.34 GiB/sec
228.88 MiB: 0.09sec, 2.36 GiB/sec
228.88 MiB: 0.09sec, 2.37 GiB/sec
228.88 MiB: 0.09sec, 2.36 GiB/sec
228.88 MiB: 0.10sec, 2.30 GiB/sec
228.88 MiB: 0.10sec, 2.29 GiB/sec
228.88 MiB: 0.10sec, 2.34 GiB/sec
228.88 MiB: 0.09sec, 2.39 GiB/sec
228.88 MiB: 0.09sec, 2.38 GiB/sec
228.88 MiB: 0.10sec, 2.35 GiB/sec
228.88 MiB: 0.10sec, 2.29 GiB/sec
228.88 MiB: 0.09sec, 2.36 GiB/sec
A: tcp://127.0.0.1:57650 B: tcp://127.0.0.1:57652 - performance from iperf3
B memory -> A memory
Connecting to host 127.0.0.1, port 5001
[  5] local 127.0.0.1 port 57689 connected to 127.0.0.1 port 5001
[ ID] Interval           Transfer     Bitrate
[  5]   0.00-1.00   sec  7.39 GBytes  7566 MBytes/sec                  
[  5]   1.00-2.00   sec  6.35 GBytes  6507 MBytes/sec                  
[  5]   2.00-3.00   sec  7.18 GBytes  7355 MBytes/sec                  
[  5]   3.00-4.00   sec  7.30 GBytes  7480 MBytes/sec                  
[  5]   4.00-5.00   sec  6.87 GBytes  7039 MBytes/sec                  
[  5]   5.00-6.00   sec  7.92 GBytes  8112 MBytes/sec                  
[  5]   6.00-7.00   sec  7.86 GBytes  8054 MBytes/sec                  
[  5]   7.00-8.00   sec  7.88 GBytes  8065 MBytes/sec                  
[  5]   8.00-9.00   sec  7.61 GBytes  7795 MBytes/sec                  
[  5]   9.00-10.00  sec  6.83 GBytes  6996 MBytes/sec                  
[  5]  10.00-11.00  sec  7.15 GBytes  7324 MBytes/sec                  
[  5]  11.00-12.00  sec  7.79 GBytes  7974 MBytes/sec                  
[  5]  12.00-13.00  sec  7.80 GBytes  7989 MBytes/sec                  
[  5]  13.00-14.00  sec  7.84 GBytes  8026 MBytes/sec                  
[  5]  14.00-15.00  sec  7.86 GBytes  8044 MBytes/sec                  
[  5]  15.00-16.00  sec  7.79 GBytes  7979 MBytes/sec                  
[  5]  16.00-17.00  sec  7.92 GBytes  8110 MBytes/sec                  
[  5]  17.00-18.00  sec  6.76 GBytes  6921 MBytes/sec                  
[  5]  18.00-19.00  sec  7.45 GBytes  7627 MBytes/sec                  
[  5]  19.00-20.00  sec  7.38 GBytes  7558 MBytes/sec                  
[  5]  20.00-21.00  sec  7.52 GBytes  7699 MBytes/sec                  
[  5]  21.00-22.00  sec  7.22 GBytes  7397 MBytes/sec                  
[  5]  22.00-23.00  sec  7.26 GBytes  7432 MBytes/sec                  
[  5]  23.00-24.00  sec  7.00 GBytes  7170 MBytes/sec                  
[  5]  24.00-25.00  sec  7.19 GBytes  7362 MBytes/sec                  
[  5]  25.00-26.00  sec  6.80 GBytes  6961 MBytes/sec                  
[  5]  26.00-27.00  sec  6.91 GBytes  7072 MBytes/sec                  
[  5]  27.00-28.00  sec  7.22 GBytes  7390 MBytes/sec                  
[  5]  28.00-29.00  sec  7.41 GBytes  7591 MBytes/sec                  
[  5]  29.00-30.00  sec  7.31 GBytes  7484 MBytes/sec                  
- - - - - - - - - - - - - - - - - - - - - - - - -
[ ID] Interval           Transfer     Bitrate
[  5]   0.00-30.00  sec   221 GBytes  7536 MBytes/sec                  sender
[  5]   0.00-30.00  sec   221 GBytes  7536 MBytes/sec                  receiver

iperf Done.

Dask lags further behind a very fast network, but it's still much faster than a slow network! Does it need to be faster? How often are we actually bandwidth-constrained (versus being constrained by event loop/GIL/worker threads/things that can do something useful with that bandwidth)?

@mrocklin
Copy link
Member

Thanks for doing this @gjoseph92 . This is interesting.

task dependencies: 180-280MiB/sec
comms handler: 290-330MiB/sec
iperf3 raw TCP: 590 MBytes/sec

So dask's networking is only half as fast as raw TCP here. That's better than I expected actually.
Using comms handlers directly is faster, though not hugely. Also not surprising.

I actually am surprised by both of these actually.

For reference, my historical expectation is that on a full (non-virtual) machine I would expect the following:

  • 10GB/s memory bandwidth
  • similar magnitude but less for pandas serialization, maybe 2x less if using protocol <5?
  • around 800 MB/s for Tornado (software networking stacks have junk in them)

So adding bandwidths as one does in an inverse summation kind of way, I would us expect to get something like 700MB/s through Dask comms. This is obviously hardware dependent though, and certainly VMs are not real "M"s.

cc @quasiben @jakirkham

@mrocklin
Copy link
Member

@gjoseph92 for the purposes of tuning shuffle computations, I think that we should build up some intuition around the size of dataframes when using comms. I would be curious to know the expected bandwidth as we send dataframes of varying sizes. Do we stay at the 300MB/s level if we shift from 100MB dataframes to 20MB? to 5MB? to 1MB? to 100kB? Understanding that response curve is probably useful for us.

@mrocklin
Copy link
Member

Dask lags further behind a very fast network, but it's still much faster than a slow network! Does it need to be faster? How often are we actually bandwidth-constrained (versus being constrained by event loop/GIL/worker threads/things that can do something useful with that bandwidth)?

Yeah, I'll admit that a 2 GB/s communication bandwidth feels fast to me for data processing workloads. I'm fine leaving this one alone for a while :)

@gjoseph92
Copy link
Collaborator Author

For the curious, here are py-spy profiles of the sending and receiving workers running locally (go to MainThread on both, left-heavy view):

On the receive side, 30% is actual socket reading in Tornado, 25% is making the read buffer in TCP comms, 25% idle. On the send side, 30% is Tornado socket writing, 42% is idle.

@jakirkham
Copy link
Member

Yeah with the buffer allocation time, it is worth noting that builtin Python creation operations, like bytearray (though not limited to it), will zero initialize the memory. This takes additional time, which can make it quite slow. As these zeros ended up being overwritten this is also quite wasteful.

We could avoid this by using numpy.empty, which will not zero initialize the memory. We will eventually pay a small cost when writing to those pages, but expect this is smaller than Tornado's overhead. It would also bypass the cost of zero initializing them.

In [1]: import numpy

In [2]: %timeit bytearray(1_000_000)
18.3 µs ± 125 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

In [3]: %timeit numpy.empty((1_000_000,), dtype="u1")
630 ns ± 7.44 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

I've been thinking we would benefit here from collecting the routines for memory allocation somewhere and then using the most efficient ones when available ( #3970 ).

@gjoseph92
Copy link
Collaborator Author

Good to know. Not calloc-ing here definitely seems desirable.

I also just realized another discrepancy: iperf is using plain TCP, whereas my Coiled example was using TLS.

@jakirkham
Copy link
Member

It goes a bit beyond not using calloc. NumPy will use Huge Pages on Linux if available at runtime ( numpy/numpy#14216 ). Also NumPy has its own caching allocator. In the future NumPy will allow the underlying allocator to be configureable ( numpy/numpy#17582 ). So expect there will be other gains by using NumPy for memory allocations here.

@gjoseph92
Copy link
Collaborator Author

Running our shuffles, I'm seeing something that looks to me like SSL reads within Tornado could be blocking the event loop. But I haven't dug into Tornado much before so I might be understanding this incorrectly.

Here's a py-spy profile of one worker:
https://www.speedscope.app/#profileURL=https%3a%2f%2fgistcdn.githack.com%2fgjoseph92%2fa32ffb1d9cf6248ed4fb56897d2b9208%2fraw%2f51e5bad24d3125ef7a861a33a15c91e12392c02e%2ftls-10_3_17_192-38675.json
(go to MainThread, left-heavy view)

The worker should constantly be both reading and writing about the same amount of data. But in left-heavy view, you see ssl.read takes up 58% of time, but ssl.send takes 11%.

In tracing through the code from Tornado BaseIOStream.read_into, I noticed this comment in _read_to_buffer_loop:

while not self.closed():
    # Read from the socket until we get EWOULDBLOCK or equivalent.
    # SSL sockets do some internal buffering, and if the data is
    # sitting in the SSL object's buffer select() and friends
    # can't see it; the only way to find out if it's there is to
    # try to read it.
    if self._read_to_buffer() == 0:
        break

To me that sounds like the only way to know reading from SSL is done is to try reading and see what happens. But what happens when there isn't data sitting in the SSL object's buffer?

It seems like that will eventually call ssl.read() with however many bytes of self._read_buffer are remaining.

Is it possible that if the SSL buffer was empty, the ssl.read() could block on the socket until more data arrived?

Again I'm not very Tornado- or SSL-literate and this was a pretty cursory read-through, so this might be off-base.

@jakirkham
Copy link
Member

FWICT this is the underlying read used by Python's SSLSocket. Maybe it will be clearer what is going on after looking at that

@gjoseph92
Copy link
Collaborator Author

I took a py-spy profile of the same workload as above running on an equivalent cluster, but using TCP instead of TLS.

Anecdotally I noticed tasks just completed a lot faster, even though the dashboard reported similar average worker bandwidths. In the profile, the worker event loop only gets 15% idle time with TLS, vs 40% idle time with TCP. With TLS, ~60% of the event loop is spent on Tornado's read_from_fd; with TCP it's ~20%.

@gjoseph92
Copy link
Collaborator Author

I ran a version of my script above on identical clusters using TCP vs TLS. This particular trial showed TCP ~1.5x faster; on others it's been up to 1.9x faster.

image
image

Full data
(env) gabe dask-playground/shuffle-service » python network.py
Using existing cluster: 'tls'
AWS instance types:
{'tls://10.6.25.185:37333': 't3a.small', 'tls://10.6.28.157:44541': 't3a.small'}
send: tls://10.6.25.185:42549 recv: tls://10.6.28.157:37465 - performance over comms handler
124 B: p5 0.00sec, 49.08 kiB/sec
124 B: p50 0.00sec, 87.26 kiB/sec
124 B: p95 0.00sec, 106.21 kiB/sec

send: tls://10.6.25.185:45677 recv: tls://10.6.28.157:37907 - performance over comms handler
7.94 kiB: p5 0.00sec, 3.05 MiB/sec
7.94 kiB: p50 0.00sec, 5.43 MiB/sec
7.94 kiB: p95 0.00sec, 6.56 MiB/sec

send: tls://10.6.25.185:46735 recv: tls://10.6.28.157:45483 - performance over comms handler
93.88 kiB: p5 0.00sec, 31.51 MiB/sec
93.88 kiB: p50 0.00sec, 54.78 MiB/sec
93.88 kiB: p95 0.00sec, 66.58 MiB/sec

send: tls://10.6.25.185:42835 recv: tls://10.6.28.157:41033 - performance over comms handler
1.00 MiB: p5 0.01sec, 162.17 MiB/sec
1.00 MiB: p50 0.00sec, 241.55 MiB/sec
1.00 MiB: p95 0.00sec, 278.84 MiB/sec

send: tls://10.6.25.185:36965 recv: tls://10.6.28.157:44421 - performance over comms handler
3.00 MiB: p5 0.01sec, 236.60 MiB/sec
3.00 MiB: p50 0.01sec, 332.83 MiB/sec
3.00 MiB: p95 0.01sec, 379.63 MiB/sec

send: tls://10.6.25.185:35735 recv: tls://10.6.28.157:34113 - performance over comms handler
9.99 MiB: p5 0.03sec, 298.00 MiB/sec
9.99 MiB: p50 0.02sec, 407.80 MiB/sec
9.99 MiB: p95 0.02sec, 458.55 MiB/sec

send: tls://10.6.25.185:44853 recv: tls://10.6.28.157:36471 - performance over comms handler
100.00 MiB: p5 0.30sec, 331.10 MiB/sec
100.00 MiB: p50 0.28sec, 356.10 MiB/sec
100.00 MiB: p95 0.25sec, 394.01 MiB/sec

send: tls://10.6.25.185:44913 recv: tls://10.6.28.157:36359 - performance over comms handler
500.00 MiB: p5 1.47sec, 339.68 MiB/sec
500.00 MiB: p50 1.36sec, 366.40 MiB/sec
500.00 MiB: p95 1.29sec, 387.31 MiB/sec

(env) gabe dask-playground/shuffle-service » python network.py
Using existing cluster: 'tcp'
AWS instance types:
{'tcp://10.6.23.136:43093': 't3a.small', 'tcp://10.6.29.243:46371': 't3a.small'}
send: tcp://10.6.23.136:42379 recv: tcp://10.6.29.243:37361 - performance over comms handler
124 B: p5 0.00sec, 34.54 kiB/sec
124 B: p50 0.00sec, 69.10 kiB/sec
124 B: p95 0.00sec, 93.15 kiB/sec

send: tcp://10.6.23.136:42497 recv: tcp://10.6.29.243:36421 - performance over comms handler
7.94 kiB: p5 0.00sec, 2.42 MiB/sec
7.94 kiB: p50 0.00sec, 4.58 MiB/sec
7.94 kiB: p95 0.00sec, 5.94 MiB/sec

send: tcp://10.6.23.136:44721 recv: tcp://10.6.29.243:37403 - performance over comms handler
93.88 kiB: p5 0.00sec, 22.87 MiB/sec
93.88 kiB: p50 0.00sec, 45.35 MiB/sec
93.88 kiB: p95 0.00sec, 60.30 MiB/sec

send: tcp://10.6.23.136:42129 recv: tcp://10.6.29.243:35227 - performance over comms handler
1.00 MiB: p5 0.01sec, 155.52 MiB/sec
1.00 MiB: p50 0.00sec, 290.19 MiB/sec
1.00 MiB: p95 0.00sec, 392.86 MiB/sec

send: tcp://10.6.23.136:41833 recv: tcp://10.6.29.243:40797 - performance over comms handler
3.00 MiB: p5 0.01sec, 329.73 MiB/sec
3.00 MiB: p50 0.01sec, 466.97 MiB/sec
3.00 MiB: p95 0.01sec, 517.41 MiB/sec

send: tcp://10.6.23.136:43359 recv: tcp://10.6.29.243:33185 - performance over comms handler
9.99 MiB: p5 0.02sec, 461.39 MiB/sec
9.99 MiB: p50 0.02sec, 556.19 MiB/sec
9.99 MiB: p95 0.02sec, 568.35 MiB/sec

send: tcp://10.6.23.136:38937 recv: tcp://10.6.29.243:37997 - performance over comms handler
100.00 MiB: p5 0.24sec, 416.11 MiB/sec
100.00 MiB: p50 0.21sec, 467.91 MiB/sec
100.00 MiB: p95 0.21sec, 478.96 MiB/sec

send: tcp://10.6.23.136:45309 recv: tcp://10.6.29.243:46773 - performance over comms handler
500.00 MiB: p5 1.27sec, 395.25 MiB/sec
500.00 MiB: p50 1.09sec, 459.01 MiB/sec
500.00 MiB: p95 1.07sec, 465.86 MiB/sec

@jakirkham
Copy link
Member

Since we are comparing low-level sends, maybe it is worth running some benchmarks with ery as well

@gjoseph92
Copy link
Collaborator Author

@quasiben I know you and others have looked at cloud network performance a bit already. Is there anyone who might be interested in looking at these performance issues so we don't repeat work?

@jakirkham from reading the SSLSocket read implementation and OpenSSL docs, I'm now not so sure of my theory. Assuming the socket is non-blocking (it is, right?), the SSL_read should return and set an SSL_ERROR_WANT_READ error, according to the docs. _ssl__SSLSocket_read_impl handles that, stops trying to read, and raises a Python exception with that code. Tornado's SSLIOStream catches that SSL_ERROR_WANT_READ error specifically, which ends the read_to_buffer_loop.

A different interesting thing from the OpenSSL docs though:

As at any time it's possible that non-application data needs to be sent, a read function can also cause write operations.

I don't know OpenSSL very well. Would an SSL write on the same socket through the same SSL object count as non-application data? Since this shuffle workload has each worker communicating with all others, there's a lot of concurrent sending and receiving. If reads could somehow be blocking on the writes, that might explain why we see so much event loop blocking in reads.

Maybe a good next step would be to have this test script to all-to-all transfers, instead of A->B. Also, I don't think I was py-spying in native mode; I'll try that and see if we can get traces for what's happening inside of read.

@gjoseph92
Copy link
Collaborator Author

gjoseph92 commented Sep 8, 2021

Got a native profile from py-spy. First I just must say @benfred, so many thanks for this great tool—being able to get native profiles integrated with the Python callstack with a single flag and no mucking around in perf is amazing!

Worker profile (see MainThread in left-heavy view):

Screen Shot 2021-09-07 at 11 22 41 PM

So we can immediately see that 71% of the event loop's time is spent blocking on glibc pthread_cond_timedwait, invoked within CPython's SSL socket read implementation. This does not seem like what we want for non-blocking reads!

One odd thing is that py-spy's line numbers in _ssl.c seem to be off-by-one what I'm seeing for the CPython 3.9.1 source on GitHub (@benfred you might be interested in this) (I'm running a conda build of Python on this cluster FYI). If you look at the SSL_read call, py-spy says it's called on _ssl.c:2588, which is one line below the SSL_read on GitHub. If you look at the part we're really interested in (the condition wait), py-spy says _ssl.c:2590. That line's just a struct assignment, but the line one above is PySSL_END_ALLOW_THREADS. We can see that macro calls PyEval_RestoreThread which calls take_gil.

So the problem is the classic https://bugs.python.org/issue7946: contention re-acquiring the GIL when returning from a socket.send in a multithreaded application. This has already been discussed in #4443 (comment).

But that thread was about the scheduler. It's interesting to see how much this affects workers' performance transferring data. We have much more control to optimize the scheduler; when thinking about networking code, since workers are running user code, we should just assume there could always be high GIL contention.

@jakirkham
Copy link
Member

jakirkham commented Sep 8, 2021

Yeah was looking at the GIL release/acquisition lines earlier and was wondering if that was involved somehow.

That said, how much data are we receiving here? Currently we are receiving at most 2GB per read ( due to an OpenSSL 1.0.2 bug; see https://bugs.python.org/issue42853 ). Are we filling more than 2GB and if so how much more?

@gjoseph92
Copy link
Collaborator Author

Yeah I'm curious if releasing the GIL could actually be skipped for non-blocking sockets.

I think we're definitely getting less than 2GB per read but I can confirm later. Our sends aim to be ~5MB MB for memory reasons.

@mrocklin
Copy link
Member

mrocklin commented Sep 8, 2021

Yeah I'm curious if releasing the GIL could actually be skipped for non-blocking sockets.

How hard would this be to try? Is this a small change in cPython followed by a rebuild?

@gjoseph92
Copy link
Collaborator Author

I think the issue is just that network IO is much more serial than we want it to be.

You can see in the same py-spy profile in Dask-DefaultThreads-62-4 and -2, these threads are blocked 80% of the time in our maybe_pack function acquiring a semaphore.maybe_pack is a function that processes some DataFrames, serializes them, and puts them on a queue for a pool of coroutines to send over the comm. When a coroutine has sent an item, it releases the semaphore. (So this is basically just a bounded queue, just with the blocking points shifted a little for memory reasons.)

This tells me that we can't write data out fast enough to the network. But looking at the event loop, we saw writes only take 7% of the time. The issue is that the sending coroutines don't get scheduled on the event loop often enough, because the read coroutines spend so much time blocking the event loop trying to re-acquire the GIL.

@mrocklin
Copy link
Member

mrocklin commented Sep 8, 2021 via email

@gjoseph92
Copy link
Collaborator Author

How hard would this be to try? Is this a small change in cPython followed by a rebuild?

I think so? Alternatively if we could get _ssl_locks_count (set here) to be 0 then it wouldn't release the GIL. I'm not sure of the consequences of letting SSL use threads but not running PyEval_SaveThread. That's probably bad. Maybe we could make versions of PyEval_SaveThread/RestoreThread that hold onto the GIL though.

Do we know why the GIL would be hard to reacquire here?

Probably just because our worker threads are mostly doing GIL-holding things when they're not blocked?

@jakirkham
Copy link
Member

A much simpler test would be to turn SSL off. Have we already done this somewhere (guessing so)?

@gjoseph92
Copy link
Collaborator Author

gjoseph92 commented Sep 8, 2021

See the profile in #5258 (comment).

I spent far too much time trying to get an equivalent profile with --native on TCP last night, but the workload kept running out of memory and crashing (for somewhat unrelated reasons #5250 (comment)).

Also TCP sockets still release and retake the GIL on every send, so is that the test we'd want?

@jakirkham
Copy link
Member

jakirkham commented Sep 8, 2021

Just to add to what Gabe said above on this.

Do we know why the GIL would be hard to reacquire here?

I think this is the same answer that Antione gave related to socket.send ( #4443 (comment) ) as the problem is the same. Quoting the relevant bit below

acquire the GIL (slow if need to wait for some other thread to release it!)

So presumably something in Python picks up the slack while OpenSSL is doing work. However once OpenSSL is ready to rejoin it has to wait until there is a moment where it can reacquire the GIL

@jcrist
Copy link
Member

jcrist commented Sep 8, 2021

How hard would this be to try? Is this a small change in cPython followed by a rebuild?

My gut says this would be fairly quick to hack badly into cpython to see if things improve (just stop dropping the gil for all socket calls). A proper patch would interact more with the asyncio eventloop implementation (asyncio or uvloop) to only keep the gil for async socket operations.

@gjoseph92
Copy link
Collaborator Author

@jcrist agreed. Building cpython (and getting it into a docker image if we wanted to test on our real shuffle workload) would be the slowest part. I would be very interested to see the results of this!

@jakirkham
Copy link
Member

Have we looked at this again in light of the addition of asyncio comms ( #5450 )?

Also put together PR ( #5750 ), which optionally uses NumPy to allocate frames.

@jakirkham
Copy link
Member

Saw PR ( python/cpython#31492 ) recently, which looks potentially interesting in this context

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Discussing a topic with no specific actions yet networking
Projects
None yet
Development

No branches or pull requests

5 participants