-
Notifications
You must be signed in to change notification settings - Fork 615
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support lambdas and local functions as callbacks in parallel ExternalSource #3269
Conversation
This comment has been minimized.
This comment has been minimized.
…kling module for ES callback Signed-off-by: Kamil Tokarski <[email protected]>
Signed-off-by: Kamil Tokarski <[email protected]>
Signed-off-by: Kamil Tokarski <[email protected]>
Signed-off-by: Kamil Tokarski <[email protected]>
Signed-off-by: Kamil Tokarski <[email protected]>
…r callbacks instead Signed-off-by: Kamil Tokarski <[email protected]>
Signed-off-by: Kamil Tokarski <[email protected]>
Signed-off-by: Kamil Tokarski <[email protected]>
e72016a
to
383a9e1
Compare
Signed-off-by: Kamil Tokarski <[email protected]>
!build |
CI MESSAGE: [2808327]: BUILD STARTED |
CI MESSAGE: [2808327]: BUILD PASSED |
if callback_pickler is None: | ||
callbacks_ = callbacks | ||
else: | ||
callbacks_ = callback_pickler.dumps(callbacks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So now callbacks
contains either pickled or raw callbacks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, so in the first case it creates kind of additional layer of serialization where multiprocessing pickler just sees already serialized callbacks. At first I simply set Dali customized pickler to be used directly by multiprocessing but it is unfortunately a global change for the whole process and:
- does not work if multiprocessing already started some actual work before DALI was able to modify the context
- there's a question if that's not going to interfere with other packages using multiprocessing, like pytorch (it seemed to work okay, but I think that if we can avoid worrying about that let's avoid it.)
- for user provided packages for serialization I still would go for this type of double serialization
@@ -202,10 +204,14 @@ def __init__( | |||
task_r, task_w = mp.Pipe(duplex=False) | |||
res_r, res_w = mp.Pipe(duplex=False) | |||
sock_reader, sock_writer = socket.socketpair() | |||
if callback_pickler is None: | |||
callbacks_ = callbacks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this?
callbacks_ = callbacks | |
callbacks_arg = callbacks |
I'm not a fan of trailing underscore and leading underscore has a meaning in Python which I guess we should not abuse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
|
||
class CustomPickler: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: weird empty line
dali/python/nvidia/dali/pickling.py
Outdated
|
||
def function_unpickle(name, qualname, code, closure): | ||
code = marshal.loads(code) | ||
globs = {'__builtins__': __builtins__} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why "globs"?
globs = {'__builtins__': __builtins__} | |
builtins = {'__builtins__': __builtins__} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a dictionary containing "module global scope" of the function, the one available later on as fun.__globals__
immutable attribute (you cannot replace the dict but can update its contents). If one called globals()
inside the instantiated function this is the object that would be returned. So it is rather a coincidence that this dictionary contains only builtins at this point - all other global references (if any) are added at the set state stage to handle possible cyclic references.
I didn't call it globals on the other hand to avoid shadowing globals()
inside function_unpickle
dali/python/nvidia/dali/pickling.py
Outdated
|
||
|
||
class DaliCallbackPickler(pickle.Pickler): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: weird empty line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The one line space between first method and the class ... :
?
I can remove it if you think it somehow clashes with the code formatting in DALI though I tried to use it everywhere both in this and other PRs. D:
Most of the classes I can find have docstrings but those are followed with a single empty line too.
dali/python/nvidia/dali/pickling.py
Outdated
if isinstance(obj, type(dummy_lambda)) and obj.__name__ == dummy_lambda.__name__ or \ | ||
getattr(obj, '_dali_pickle_by_value', False): | ||
return function_by_value_reducer(obj) | ||
if '<locals>' in obj.__qualname__: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it work with regular local functions, such as:
def f():
def g(): return 42;
?
g.__qualname__ == 'f.<locals>.g'
, but it's otherwise quite trivial.
How about switching the logic to:
try:
pickle.dumps(obj)
except (whatever it raises):
return function_by_value_reducer_(obj)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, g will be serialized by value and plain pickling fails as pickle recursively steps down the attributes and raises AttributeError when encountering <locals>
.
I'd rather limit this pickle.dumps
test to the situations when I am nearly sure that it is going to fail (or maybe should get rid of the test whatsoever?) for efficiency reasons. If the test succeeded it serialized something only for us to discard that result and proceed with a single step of serialization.
It would be nice if we could catch the exception by overriding dump method of the pickler, catch AttributeError and only then fall back to obj.__dali_pickle_by_value = True. Unfortunately it doesn't seem to be (easily) doable as the pickle.Pickler class is in fact a wrapper around C implementation of pickle and overridden dump method would be called only once for a top level object; recursive steps for dependencies/contents will be serialized by C code unaware of override.
def create_simple_pipeline(callback, py_callback_pickler, batch_size, parallel=True, py_num_workers=None): | ||
|
||
extra = {} | ||
if py_callback_pickler != UseDefault: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not:
if py_callback_pickler != UseDefault: | |
if py_callback_pickler is nt None: |
like we do elsewhere?
I know that it would work with different defaults, but it seems a bit overengineered for the sake of just tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
dali/python/nvidia/dali/pipeline.py
Outdated
@@ -135,13 +136,34 @@ class Pipeline(object): | |||
you will need to call :meth:`start_py_workers` before calling :meth:`build` of any | |||
of the pipelines. You can find more details and caveats of both methods in Python's | |||
``multiprocessing`` module documentation. | |||
`py_callback_pickler` : module or tuple, default = nvidia.dali.pickling | |||
If `py_start_method` is set to *spawn* callback passed to parallel ExternalSource must be picklable. | |||
If run in Python3.8 or newer, DALI uses customized pickle (`nvidia.dali.pickling`) when |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to advertise this module to the users? I think it could be internal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to advertise it because apart from serving as default callback pickler it also contains @pickle_by_value decorator whose whole purpose is to be available for manual usage by users.
1.What about moving everything apart from the decorator, loads and dumps methods to some new module inside _multproc
?
2. Or renaming all the things that would be moved in 1. to start with underscore?
dali/python/nvidia/dali/pipeline.py
Outdated
external source callbacks. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
external source callbacks. | |
external source callbacks. You can even pass a module directly as ``py_callback_pickler``:: | |
import dill | |
src = fn.external_source(lambda sample_info: 42, batch=False, parallel=True, py_callback_pickler=dill) | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind, yo do. :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
dali/test/python/test_utils.py
Outdated
def _test_omitted(): | ||
print("Omitting tests suite for Python3.8+ serialization") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def _test_omitted(): | |
print("Omitting tests suite for Python3.8+ serialization") | |
def _test_skipped(reason): | |
print("Test skipped." if reason is None else f"Test skipped: {reason}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
dali/test/python/test_utils.py
Outdated
if version_info.major > major or \ | ||
(version_info.major == major and (minor is None or version_info.minor >= minor)): | ||
return test_case | ||
return _test_omitted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return _test_omitted | |
return lambda: test_skipped(f"Insufficient Python version {version_info.major}.{version_info.minor} - required {major}.{minor}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
dali/python/nvidia/dali/pickling.py
Outdated
raise ValueError("Unsupported py_callback_pickler value provided.") | ||
|
||
@classmethod | ||
def of_reducer(cls, reducer, dumps_kwargs=None, loads_kwargs=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a weird name. Is it required by some API? What does this of
stand for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah, it's not required by any API, I meant create_instance_of_this_class_from_reducer and for some reason used of rather than from.
dali/python/nvidia/dali/pipeline.py
Outdated
that implements two methods: `dumps` and `loads` to make DALI use them to serialize | ||
external source callbacks. | ||
|
||
Valid value for `py_callback_pickler` is either a module/object implementing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Valid value for `py_callback_pickler` is either a module/object implementing | |
A valid value for `py_callback_pickler` is either a module/object implementing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
dali/python/nvidia/dali/pipeline.py
Outdated
external source callbacks. | ||
|
||
Valid value for `py_callback_pickler` is either a module/object implementing | ||
dumps and loads methods or a tuple where first item is the module/object and the next |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dumps and loads methods or a tuple where first item is the module/object and the next | |
``dumps`` and ``loads`` methods or a tuple where first item is the module/object and the next |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
dali/python/nvidia/dali/pipeline.py
Outdated
two optional parameters are extra kwargs to be passed when calling dumps and loads respectively. | ||
Provided methods and kwargs must themselves be picklable. | ||
|
||
If you run Python3.8 or newer and use default `nvidia.dali.pickling` you can hint DALI to serialize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you run Python3.8 or newer and use default `nvidia.dali.pickling` you can hint DALI to serialize | |
If you run Python3.8 or newer and use the default `nvidia.dali.pickling` you can hint DALI to serialize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -0,0 +1,24 @@ | |||
# Copyright (c) 2020-2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Copyright (c) 2020-2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |
# Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -0,0 +1,432 @@ | |||
# Copyright (c) 2020-2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Copyright (c) 2020-2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |
# Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
def dumps(obj, **kwargs): | ||
if kwargs.get('special_dumps_param') != 42: | ||
raise NoDumpsParam("Expected special_dumps_pram among kwargs, got {}".format(kwargs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raise NoDumpsParam("Expected special_dumps_pram among kwargs, got {}".format(kwargs)) | |
raise NoDumpsParam("Expected special_dumps_param among kwargs, got {}".format(kwargs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
@pipeline_def(batch_size=batch_size, num_threads=2, device_id=0, py_num_workers=py_num_workers, | ||
py_start_method="spawn", **extra) | ||
def crate_pipline(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def crate_pipline(): | |
def create_pipeline(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Signed-off-by: Kamil Tokarski <[email protected]>
!build |
CI MESSAGE: [2840850]: BUILD STARTED |
CI MESSAGE: [2840850]: BUILD PASSED |
Signed-off-by: Kamil Tokarski <[email protected]>
jpeg_file = os.path.join(get_dali_extra_path(), 'db', 'single', 'jpeg', '510', 'ship-1083562_640.jpg') | ||
sequence_lenght = 4 | ||
|
||
def create_callback_with_syntactically_nested_code_referencing_global_var(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow! That's a strong contender for the longest identifier in our code base!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hoped to make the name self-explanatory
dali/test/python/test_utils.py
Outdated
print("Test skipped." if reason is None else f"Test skipped: {reason}") | ||
|
||
|
||
def restrict_python_version(major, minor=None, reason=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def restrict_python_version(major, minor=None, reason=None): | |
def restrict_python_version(major, minor=None): |
Not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix'd
dali/python/nvidia/dali/pipeline.py
Outdated
""" | ||
def __init__(self, batch_size = -1, num_threads = -1, device_id = -1, seed = -1, | ||
exec_pipelined=True, prefetch_queue_depth=2, | ||
exec_async=True, bytes_per_sample=0, | ||
set_affinity=False, max_streams=-1, default_cuda_stream_priority = 0, | ||
*, | ||
enable_memory_stats=False, py_num_workers=1, py_start_method="fork"): | ||
enable_memory_stats=False, py_num_workers=1, py_start_method="fork", | ||
py_callback_pickler=dali_pickle): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
py_callback_pickler=dali_pickle): | |
py_callback_pickler=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Signed-off-by: Kamil Tokarski <[email protected]>
!build |
CI MESSAGE: [2848855]: BUILD STARTED |
CI MESSAGE: [2848855]: BUILD PASSED |
Signed-off-by: Kamil Tokarski <[email protected]>
!build |
CI MESSAGE: [2849616]: BUILD STARTED |
CI MESSAGE: [2849616]: BUILD FAILED |
!build |
CI MESSAGE: [2850059]: BUILD STARTED |
CI MESSAGE: [2850059]: BUILD PASSED |
Description
What happened in this PR
It adds customized serialization of parallel externalsource callbacks so that lambdas and local functions are supported, usage of
functions defined inside Jupyter notebooks is easier and enables user to specify external package to be used to serialize callback if more specialized serialization is needed,
Additional information
Affected modules and functionalities:
pythonic parts of parallel external source implementation
Key points relevant for the review:
Checklist
Tests
Documentation
DALI team only
Requirements
REQ IDs: N/A
JIRA TASK: DALI-2214