From b3b5c2d9ff85847b554dcf5b2262836ae4c35c63 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 7 May 2020 15:48:01 -0700 Subject: [PATCH 01/18] Assign `header` and `frames` before returning --- distributed/protocol/serialize.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/distributed/protocol/serialize.py b/distributed/protocol/serialize.py index 4d02bc6520..0467f71b93 100644 --- a/distributed/protocol/serialize.py +++ b/distributed/protocol/serialize.py @@ -52,7 +52,9 @@ def dask_loads(header, frames): def pickle_dumps(x): - return {"serializer": "pickle"}, [pickle.dumps(x)] + header = {"serializer": "pickle"} + frames = [pickle.dumps(x)] + return header, frames def pickle_loads(header, frames): From b3dfdb206b08ba3622b6e35247f635b6e8958497 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 7 May 2020 15:48:03 -0700 Subject: [PATCH 02/18] Import `HIGHEST_PROTOCOL` at top-level --- distributed/protocol/pickle.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/distributed/protocol/pickle.py b/distributed/protocol/pickle.py index 9a1f135444..4774a5c41b 100644 --- a/distributed/protocol/pickle.py +++ b/distributed/protocol/pickle.py @@ -1,5 +1,6 @@ import logging import pickle +from pickle import HIGHEST_PROTOCOL import cloudpickle @@ -31,20 +32,20 @@ def dumps(x): 3. If it is long, then first check type, then check __main__ """ try: - result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL) + result = pickle.dumps(x, protocol=HIGHEST_PROTOCOL) if len(result) < 1000: if b"__main__" in result: - return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL) + return cloudpickle.dumps(x, protocol=HIGHEST_PROTOCOL) else: return result else: if _always_use_pickle_for(x) or b"__main__" not in result: return result else: - return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL) + return cloudpickle.dumps(x, protocol=HIGHEST_PROTOCOL) except Exception: try: - return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL) + return cloudpickle.dumps(x, protocol=HIGHEST_PROTOCOL) except Exception as e: logger.info("Failed to serialize %s. Exception: %s", x, e) raise From 4ad42d488249e694e22b6c1152ea4152edab6e28 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 7 May 2020 15:48:04 -0700 Subject: [PATCH 03/18] Collect keyword arguments to `*dumps` --- distributed/protocol/pickle.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/distributed/protocol/pickle.py b/distributed/protocol/pickle.py index 4774a5c41b..568163c3b5 100644 --- a/distributed/protocol/pickle.py +++ b/distributed/protocol/pickle.py @@ -31,21 +31,22 @@ def dumps(x): 2. If it is short then check if it contains __main__ 3. If it is long, then first check type, then check __main__ """ + dump_kwargs = {"protocol": HIGHEST_PROTOCOL} try: - result = pickle.dumps(x, protocol=HIGHEST_PROTOCOL) + result = pickle.dumps(x, **dump_kwargs) if len(result) < 1000: if b"__main__" in result: - return cloudpickle.dumps(x, protocol=HIGHEST_PROTOCOL) + return cloudpickle.dumps(x, **dump_kwargs) else: return result else: if _always_use_pickle_for(x) or b"__main__" not in result: return result else: - return cloudpickle.dumps(x, protocol=HIGHEST_PROTOCOL) + return cloudpickle.dumps(x, **dump_kwargs) except Exception: try: - return cloudpickle.dumps(x, protocol=HIGHEST_PROTOCOL) + return cloudpickle.dumps(x, **dump_kwargs) except Exception as e: logger.info("Failed to serialize %s. Exception: %s", x, e) raise From f5380c132249e1ecbf7734f4ef34d796a309595f Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 7 May 2020 15:48:05 -0700 Subject: [PATCH 04/18] Assign `result` and `return` once at end --- distributed/protocol/pickle.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/distributed/protocol/pickle.py b/distributed/protocol/pickle.py index 568163c3b5..3f4c44a77b 100644 --- a/distributed/protocol/pickle.py +++ b/distributed/protocol/pickle.py @@ -24,7 +24,7 @@ def _always_use_pickle_for(x): return False -def dumps(x): +def dumps(x, *, buffer_callback=None): """ Manage between cloudpickle and pickle 1. Try pickle @@ -36,25 +36,25 @@ def dumps(x): result = pickle.dumps(x, **dump_kwargs) if len(result) < 1000: if b"__main__" in result: - return cloudpickle.dumps(x, **dump_kwargs) - else: - return result + result = cloudpickle.dumps(x, **dump_kwargs) else: - if _always_use_pickle_for(x) or b"__main__" not in result: - return result - else: - return cloudpickle.dumps(x, **dump_kwargs) + if not (_always_use_pickle_for(x) or b"__main__" not in result): + result = cloudpickle.dumps(x, **dump_kwargs) except Exception: try: - return cloudpickle.dumps(x, **dump_kwargs) + result = cloudpickle.dumps(x, **dump_kwargs) except Exception as e: logger.info("Failed to serialize %s. Exception: %s", x, e) raise + return result -def loads(x): +def loads(x, *, buffers=()): try: - return pickle.loads(x) + if buffers: + return pickle.loads(x, buffers=buffers) + else: + return pickle.loads(x) except Exception: logger.info("Failed to deserialize %s", x[:10000], exc_info=True) raise From ed54236f4e780217d7b135d6c733417d141458a5 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 7 May 2020 15:48:06 -0700 Subject: [PATCH 05/18] Support out-of-band buffer serialization --- distributed/protocol/pickle.py | 2 ++ distributed/protocol/serialize.py | 7 +++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/pickle.py b/distributed/protocol/pickle.py index 3f4c44a77b..082210f875 100644 --- a/distributed/protocol/pickle.py +++ b/distributed/protocol/pickle.py @@ -32,6 +32,8 @@ def dumps(x, *, buffer_callback=None): 3. If it is long, then first check type, then check __main__ """ dump_kwargs = {"protocol": HIGHEST_PROTOCOL} + if HIGHEST_PROTOCOL >= 5: + dump_kwargs["buffer_callback"] = buffer_callback try: result = pickle.dumps(x, **dump_kwargs) if len(result) < 1000: diff --git a/distributed/protocol/serialize.py b/distributed/protocol/serialize.py index 0467f71b93..e4fba2b7ba 100644 --- a/distributed/protocol/serialize.py +++ b/distributed/protocol/serialize.py @@ -53,12 +53,15 @@ def dask_loads(header, frames): def pickle_dumps(x): header = {"serializer": "pickle"} - frames = [pickle.dumps(x)] + frames = [None] + buffer_callback = lambda f: frames.append(memoryview(f)) + frames[0] = pickle.dumps(x, buffer_callback=buffer_callback) return header, frames def pickle_loads(header, frames): - return pickle.loads(b"".join(frames)) + x, buffers = frames[0], frames[1:] + return pickle.loads(x, buffers=buffers) def msgpack_dumps(x): From d374832254afc0a9209e2ddf7ad0f329753c75dd Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 7 May 2020 15:48:07 -0700 Subject: [PATCH 06/18] Require `cloudpickle` version `1.3.0` Needed for out-of-band buffer handling. --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index b0d20cdb1e..95a681d66c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ click >= 6.6 -cloudpickle >= 0.2.2 +cloudpickle >= 1.3.0 contextvars;python_version<'3.7' dask >= 2.9.0 msgpack >= 0.6.0 From 198204989483d0042801f6a1c076f624ce686769 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 7 May 2020 15:48:08 -0700 Subject: [PATCH 07/18] Test Pickle with out-of-band buffers --- distributed/protocol/tests/test_pickle.py | 39 ++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/distributed/protocol/tests/test_pickle.py b/distributed/protocol/tests/test_pickle.py index 681992ef84..2fc6a29521 100644 --- a/distributed/protocol/tests/test_pickle.py +++ b/distributed/protocol/tests/test_pickle.py @@ -6,7 +6,7 @@ import pytest -from distributed.protocol.pickle import dumps, loads +from distributed.protocol.pickle import HIGHEST_PROTOCOL, dumps, loads def test_pickle_data(): @@ -15,6 +15,37 @@ def test_pickle_data(): assert loads(dumps(d)) == d +def test_pickle_out_of_band(): + try: + from pickle import PickleBuffer + except ImportError: + pass + + class MemoryviewHolder: + def __init__(self, mv): + self.mv = memoryview(mv) + + def __reduce_ex__(self, protocol): + if protocol >= 5: + return MemoryviewHolder, (PickleBuffer(self.mv),) + else: + return MemoryviewHolder, (self.mv.tobytes(),) + + mv = memoryview(b"123") + mvh = MemoryviewHolder(mv) + + if HIGHEST_PROTOCOL >= 5: + l = [] + d = dumps(mvh, buffer_callback=l.append) + mvh2 = loads(d, buffers=l) + else: + mvh2 = loads(dumps(mvh)) + + assert isinstance(mvh2, MemoryviewHolder) + assert isinstance(mvh2.mv, memoryview) + assert mvh2.mv == mv + + def test_pickle_numpy(): np = pytest.importorskip("numpy") x = np.ones(5) @@ -23,6 +54,12 @@ def test_pickle_numpy(): x = np.ones(5000) assert (loads(dumps(x)) == x).all() + if HIGHEST_PROTOCOL >= 5: + x = np.ones(5000) + l = [] + d = dumps(x, buffer_callback=l.append) + assert (loads(d, buffers=l) == x).all() + @pytest.mark.xfail( sys.version_info[:2] == (3, 8), From 4b54ba57b668cf793c1ca55132113659ffa2310c Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Sun, 10 May 2020 20:15:07 -0700 Subject: [PATCH 08/18] Import `PickleBuffer` (if available) --- distributed/protocol/tests/test_pickle.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/distributed/protocol/tests/test_pickle.py b/distributed/protocol/tests/test_pickle.py index 2fc6a29521..370c2852a9 100644 --- a/distributed/protocol/tests/test_pickle.py +++ b/distributed/protocol/tests/test_pickle.py @@ -8,6 +8,11 @@ from distributed.protocol.pickle import HIGHEST_PROTOCOL, dumps, loads +try: + from pickle import PickleBuffer +except ImportError: + pass + def test_pickle_data(): data = [1, b"123", "123", [123], {}, set()] @@ -16,11 +21,6 @@ def test_pickle_data(): def test_pickle_out_of_band(): - try: - from pickle import PickleBuffer - except ImportError: - pass - class MemoryviewHolder: def __init__(self, mv): self.mv = memoryview(mv) From f5aed042bc46d1fea7422ae83d765cf73fe695ef Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Sun, 10 May 2020 20:15:11 -0700 Subject: [PATCH 09/18] Test `serialize`/`deserialize` with `pickle` --- distributed/protocol/tests/test_pickle.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/distributed/protocol/tests/test_pickle.py b/distributed/protocol/tests/test_pickle.py index 370c2852a9..5d548db321 100644 --- a/distributed/protocol/tests/test_pickle.py +++ b/distributed/protocol/tests/test_pickle.py @@ -6,6 +6,7 @@ import pytest +from distributed.protocol import deserialize, serialize from distributed.protocol.pickle import HIGHEST_PROTOCOL, dumps, loads try: @@ -18,6 +19,7 @@ def test_pickle_data(): data = [1, b"123", "123", [123], {}, set()] for d in data: assert loads(dumps(d)) == d + assert deserialize(*serialize(d, serializers=("pickle",))) == d def test_pickle_out_of_band(): @@ -45,20 +47,29 @@ def __reduce_ex__(self, protocol): assert isinstance(mvh2.mv, memoryview) assert mvh2.mv == mv + mvh3 = deserialize(*serialize(mvh, serializers=("pickle",))) + + assert isinstance(mvh3, MemoryviewHolder) + assert isinstance(mvh3.mv, memoryview) + assert mvh3.mv == mv + def test_pickle_numpy(): np = pytest.importorskip("numpy") x = np.ones(5) assert (loads(dumps(x)) == x).all() + assert (deserialize(*serialize(x, serializers=("pickle",))) == x).all() x = np.ones(5000) assert (loads(dumps(x)) == x).all() + assert (deserialize(*serialize(x, serializers=("pickle",))) == x).all() if HIGHEST_PROTOCOL >= 5: x = np.ones(5000) l = [] d = dumps(x, buffer_callback=l.append) assert (loads(d, buffers=l) == x).all() + assert (deserialize(*serialize(x, serializers=("pickle",))) == x).all() @pytest.mark.xfail( @@ -82,10 +93,17 @@ def funcs(): for func in funcs(): wr = weakref.ref(func) + func2 = loads(dumps(func)) wr2 = weakref.ref(func2) assert func2(1) == func(1) - del func, func2 + + func3 = deserialize(*serialize(func, serializers=("pickle",))) + wr3 = weakref.ref(func3) + assert func3(1) == func(1) + + del func, func2, func3 gc.collect() assert wr() is None assert wr2() is None + assert wr3() is None From 924dd45293252358780d5fdd5cbfd20ccb720a2b Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Sun, 10 May 2020 20:15:16 -0700 Subject: [PATCH 10/18] Check serialized header + frames --- distributed/protocol/tests/test_pickle.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/tests/test_pickle.py b/distributed/protocol/tests/test_pickle.py index 5d548db321..3cd794739e 100644 --- a/distributed/protocol/tests/test_pickle.py +++ b/distributed/protocol/tests/test_pickle.py @@ -47,12 +47,22 @@ def __reduce_ex__(self, protocol): assert isinstance(mvh2.mv, memoryview) assert mvh2.mv == mv - mvh3 = deserialize(*serialize(mvh, serializers=("pickle",))) + h, f = serialize(mvh, serializers=("pickle",)) + mvh3 = deserialize(h, f) assert isinstance(mvh3, MemoryviewHolder) assert isinstance(mvh3.mv, memoryview) assert mvh3.mv == mv + if HIGHEST_PROTOCOL >= 5: + assert len(f) == 2 + assert isinstance(f[0], bytes) + assert isinstance(f[1], memoryview) + assert f[1] == mv + else: + assert len(f) == 1 + assert isinstance(f[0], bytes) + def test_pickle_numpy(): np = pytest.importorskip("numpy") @@ -66,10 +76,18 @@ def test_pickle_numpy(): if HIGHEST_PROTOCOL >= 5: x = np.ones(5000) + l = [] d = dumps(x, buffer_callback=l.append) + assert len(l) == 1 + assert isinstance(l[0], PickleBuffer) assert (loads(d, buffers=l) == x).all() - assert (deserialize(*serialize(x, serializers=("pickle",))) == x).all() + + h, f = serialize(x, serializers=("pickle",)) + assert len(f) == 2 + assert isinstance(f[0], bytes) + assert isinstance(f[1], memoryview) + assert (deserialize(h, f) == x).all() @pytest.mark.xfail( From 6efa0acd6abaaf05b7eb1f44f2cb6abb9a366166 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Sun, 10 May 2020 20:23:31 -0700 Subject: [PATCH 11/18] Check out-of-band buffers' content --- distributed/protocol/tests/test_pickle.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/distributed/protocol/tests/test_pickle.py b/distributed/protocol/tests/test_pickle.py index 3cd794739e..99791bae0a 100644 --- a/distributed/protocol/tests/test_pickle.py +++ b/distributed/protocol/tests/test_pickle.py @@ -40,6 +40,10 @@ def __reduce_ex__(self, protocol): l = [] d = dumps(mvh, buffer_callback=l.append) mvh2 = loads(d, buffers=l) + + assert len(l) == 1 + assert isinstance(l[0], PickleBuffer) + assert l[0].raw() == mv else: mvh2 = loads(dumps(mvh)) @@ -81,6 +85,7 @@ def test_pickle_numpy(): d = dumps(x, buffer_callback=l.append) assert len(l) == 1 assert isinstance(l[0], PickleBuffer) + assert l[0].raw() == memoryview(x).cast("B") assert (loads(d, buffers=l) == x).all() h, f = serialize(x, serializers=("pickle",)) From 70b7f41df381229771af17ee7cf98fb9163f0726 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Sun, 10 May 2020 20:27:17 -0700 Subject: [PATCH 12/18] Take `memoryview` of `PickleBuffer` for testing --- distributed/protocol/tests/test_pickle.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/tests/test_pickle.py b/distributed/protocol/tests/test_pickle.py index 99791bae0a..f4a4ec7f8e 100644 --- a/distributed/protocol/tests/test_pickle.py +++ b/distributed/protocol/tests/test_pickle.py @@ -43,7 +43,7 @@ def __reduce_ex__(self, protocol): assert len(l) == 1 assert isinstance(l[0], PickleBuffer) - assert l[0].raw() == mv + assert memoryview(l[0]) == mv else: mvh2 = loads(dumps(mvh)) @@ -85,7 +85,7 @@ def test_pickle_numpy(): d = dumps(x, buffer_callback=l.append) assert len(l) == 1 assert isinstance(l[0], PickleBuffer) - assert l[0].raw() == memoryview(x).cast("B") + assert memoryview(l[0]) == memoryview(x) assert (loads(d, buffers=l) == x).all() h, f = serialize(x, serializers=("pickle",)) From 8038bbaa4711913c13cc41ce6f601ebcb9f51773 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 18 May 2020 15:59:44 -0700 Subject: [PATCH 13/18] Collect buffers internally first Before calling the user provided buffer callback, collect buffers in an internal list. That way if the mechanism of pickling needs to be changed, the internal list can be purged before handing these to the user. At the end of pickling, make sure the user's buffer callback is called on each buffer in order. --- distributed/protocol/pickle.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/distributed/protocol/pickle.py b/distributed/protocol/pickle.py index 082210f875..45145b03a9 100644 --- a/distributed/protocol/pickle.py +++ b/distributed/protocol/pickle.py @@ -31,23 +31,30 @@ def dumps(x, *, buffer_callback=None): 2. If it is short then check if it contains __main__ 3. If it is long, then first check type, then check __main__ """ + buffers = [] dump_kwargs = {"protocol": HIGHEST_PROTOCOL} if HIGHEST_PROTOCOL >= 5: - dump_kwargs["buffer_callback"] = buffer_callback + dump_kwargs["buffer_callback"] = buffers.append try: + del buffers[:] result = pickle.dumps(x, **dump_kwargs) if len(result) < 1000: if b"__main__" in result: + del buffers[:] result = cloudpickle.dumps(x, **dump_kwargs) else: if not (_always_use_pickle_for(x) or b"__main__" not in result): + del buffers[:] result = cloudpickle.dumps(x, **dump_kwargs) except Exception: try: + del buffers[:] result = cloudpickle.dumps(x, **dump_kwargs) except Exception as e: logger.info("Failed to serialize %s. Exception: %s", x, e) raise + for b in buffers: + buffer_callback(b) return result From 0578b6a4f064cb8852f1135d923c6d8b9e32594b Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 18 May 2020 16:06:05 -0700 Subject: [PATCH 14/18] Only collect buffers if `buffer_callback` exists --- distributed/protocol/pickle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/protocol/pickle.py b/distributed/protocol/pickle.py index 45145b03a9..ff261e4dc7 100644 --- a/distributed/protocol/pickle.py +++ b/distributed/protocol/pickle.py @@ -33,7 +33,7 @@ def dumps(x, *, buffer_callback=None): """ buffers = [] dump_kwargs = {"protocol": HIGHEST_PROTOCOL} - if HIGHEST_PROTOCOL >= 5: + if HIGHEST_PROTOCOL >= 5 and buffer_callback is not None: dump_kwargs["buffer_callback"] = buffers.append try: del buffers[:] From 1f3033c277b4602f725308fa61e63d7d5415343f Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 20 May 2020 14:15:49 -0700 Subject: [PATCH 15/18] Use `elif` instead for simplicity --- distributed/protocol/pickle.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/distributed/protocol/pickle.py b/distributed/protocol/pickle.py index ff261e4dc7..026c53c5a7 100644 --- a/distributed/protocol/pickle.py +++ b/distributed/protocol/pickle.py @@ -42,10 +42,9 @@ def dumps(x, *, buffer_callback=None): if b"__main__" in result: del buffers[:] result = cloudpickle.dumps(x, **dump_kwargs) - else: - if not (_always_use_pickle_for(x) or b"__main__" not in result): - del buffers[:] - result = cloudpickle.dumps(x, **dump_kwargs) + elif not (_always_use_pickle_for(x) or b"__main__" not in result): + del buffers[:] + result = cloudpickle.dumps(x, **dump_kwargs) except Exception: try: del buffers[:] From 3878f02a0b5e86d8fd2599402215ac7f79d9dd34 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 20 May 2020 14:17:55 -0700 Subject: [PATCH 16/18] Use De Morgan's law to simplify logic --- distributed/protocol/pickle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/protocol/pickle.py b/distributed/protocol/pickle.py index 026c53c5a7..319d498321 100644 --- a/distributed/protocol/pickle.py +++ b/distributed/protocol/pickle.py @@ -42,7 +42,7 @@ def dumps(x, *, buffer_callback=None): if b"__main__" in result: del buffers[:] result = cloudpickle.dumps(x, **dump_kwargs) - elif not (_always_use_pickle_for(x) or b"__main__" not in result): + elif not _always_use_pickle_for(x) and b"__main__" in result: del buffers[:] result = cloudpickle.dumps(x, **dump_kwargs) except Exception: From 6745c7fc51c2a2463fe881b8ac55603965592968 Mon Sep 17 00:00:00 2001 From: jakirkham Date: Wed, 20 May 2020 14:48:53 -0700 Subject: [PATCH 17/18] Check `buffer_callback` before calling it Co-authored-by: Jim Crist-Harif --- distributed/protocol/pickle.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/pickle.py b/distributed/protocol/pickle.py index 319d498321..a3864b25a2 100644 --- a/distributed/protocol/pickle.py +++ b/distributed/protocol/pickle.py @@ -52,8 +52,9 @@ def dumps(x, *, buffer_callback=None): except Exception as e: logger.info("Failed to serialize %s. Exception: %s", x, e) raise - for b in buffers: - buffer_callback(b) + if buffer_callback is not None: + for b in buffers: + buffer_callback(b) return result From 195924f85eb2f4929c7a9b545139d61e8c98e90c Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 20 May 2020 14:50:16 -0700 Subject: [PATCH 18/18] Use `buffer.clear()` instead of `del buffer[:]` --- distributed/protocol/pickle.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/protocol/pickle.py b/distributed/protocol/pickle.py index a3864b25a2..9774202e4f 100644 --- a/distributed/protocol/pickle.py +++ b/distributed/protocol/pickle.py @@ -36,18 +36,18 @@ def dumps(x, *, buffer_callback=None): if HIGHEST_PROTOCOL >= 5 and buffer_callback is not None: dump_kwargs["buffer_callback"] = buffers.append try: - del buffers[:] + buffers.clear() result = pickle.dumps(x, **dump_kwargs) if len(result) < 1000: if b"__main__" in result: - del buffers[:] + buffers.clear() result = cloudpickle.dumps(x, **dump_kwargs) elif not _always_use_pickle_for(x) and b"__main__" in result: - del buffers[:] + buffers.clear() result = cloudpickle.dumps(x, **dump_kwargs) except Exception: try: - del buffers[:] + buffers.clear() result = cloudpickle.dumps(x, **dump_kwargs) except Exception as e: logger.info("Failed to serialize %s. Exception: %s", x, e)