Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run external source callback in parallel #2543

Merged
merged 52 commits into from
Mar 3, 2021

Conversation

stiepan
Copy link
Member

@stiepan stiepan commented Dec 11, 2020

Why we need this PR?

It adds option to run per-sample external source callbacks in process based python workers.

What happened in this PR?

Fill relevant points, put NA otherwise. Replace anything inside []

  • What solution was applied:
    Added process based workers using multiprocessing module, added custom wrapper around shared memory and mmap to avoid unnecessary copies when data between workers, utilized no_copy mode of external source, added prefetching of batches.
  • Affected modules and functionalities:
    Mostly python wrappers around pipeline and external source. Added shared_mem.cc util.
  • Key points relevant for the review:
    [ Describe here what is the most important part that reviewers should focus on. ]
  • Validation and testing:
    Prepared benchmark test to compare parallelized externalsource with cpu FileReader and sequential externalsource both in training and as a plain piepline just augmenting the data
  • Documentation (including examples):
    Added relevant parameters description to ExternalSource and Pipeline. Documented shared_mem, shared_batch, worker and pool modules.

DALI-1651

@stiepan
Copy link
Member Author

stiepan commented Dec 11, 2020

!build

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [1893889]: BUILD STARTED

@lgtm-com
Copy link
Contributor

lgtm-com bot commented Dec 11, 2020

This pull request introduces 7 alerts when merging 5d9b5f70e6e41c487b166ad3d09947177caab9d7 into 5d5844b - view on LGTM.com

new alerts:

  • 5 for Unused import
  • 2 for Module is imported with 'import' and 'import from'

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [1893889]: BUILD FAILED

@stiepan
Copy link
Member Author

stiepan commented Dec 14, 2020

!build

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [1900108]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [1900108]: BUILD PASSED

@stiepan
Copy link
Member Author

stiepan commented Dec 14, 2020

!build

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [1900572]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [1900572]: BUILD PASSED

@@ -35,6 +35,9 @@ class DLL_PUBLIC DeviceGuard {
// for device id < 0 it is no-op
explicit DeviceGuard(int new_device);
~DeviceGuard();

bool has_old_context();
Copy link
Contributor

@mzient mzient Dec 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function pollutes this utility that has its very well defined purpose. Moreover, this function will not work well if new_device is -1, so it sort of breaks contract here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -1117,6 +1118,22 @@ PYBIND11_MODULE(backend_impl, m) {

m.def("GetCxx11AbiFlag", &GetCxx11AbiFlag);

m.def("HasCudaContext", []{
return DeviceGuard{}.has_old_context();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't abuse DeviceGuard for this.

Suggested change
return DeviceGuard{}.has_old_context();
DALI_ENFORCE(cuInitChecked(),
"Failed to load libcuda.so. "
"Check your library paths and if the driver is installed correctly.");
CUcontext ctx;
CUDA_CALL(cuCtxGetCurrent(&ctx));
return ctx != nullptr;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say if we didn't load CUDA, that's still OK, and we can return, that we don't have the context. There is CPU only mode that may not want to load CUDA (cause it's not there), so the cuInitChecked() will return 0.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@klecki klecki self-assigned this Dec 15, 2020
@stiepan
Copy link
Member Author

stiepan commented Dec 16, 2020

!build

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [1914410]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [1914410]: BUILD FAILED

@stiepan
Copy link
Member Author

stiepan commented Dec 16, 2020

!build

Copy link
Contributor

@klecki klecki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving some comments and some questions.

I would be glad for some documentation, especially nice would be:

  • purpose of what is stored in those wrappers like MemChunk, SharedBatchSerialized etc
  • what some of the indexes mean: context_i, mem_chunk_id, chunk_version, etc
  • maybe it would be really nice to have some overview what a worker does - what part of batch is it expected to process and how the batch is formed back again. Alternatively how is the communication handled -> task numbers going as requests for processing through some pipes, info about ready data in other pipes and the stop signal as None.

I will need to look a bit more at the communication, look mostly fine.

ShmFdWrapper::ShmFdWrapper() {
static std::minstd_rand rand_suffix((unsigned)time(NULL) * getpid());
std::string name;
int sanity_counter = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to feel about this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was in doubt if to go there, but alternatives - failing with the first conflict or trying in while true didn't seem very nice too. It's up to you, I can change it in any of the directions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using mkstemp instead of the loop

if (fd < 0) {
throw std::runtime_error("shm_open call failed");
}
shm_unlink(name.c_str());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the behaviour of unlinking the file but passing the fd around may not be obvious for random reader, I would like to see the comment why you unlink it here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a coment


namespace dali {
namespace python {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file needs some docstrings, what is meant to create handles, what allocate, and what just store.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -1117,6 +1118,22 @@ PYBIND11_MODULE(backend_impl, m) {

m.def("GetCxx11AbiFlag", &GetCxx11AbiFlag);

m.def("HasCudaContext", []{
return DeviceGuard{}.has_old_context();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say if we didn't load CUDA, that's still OK, and we can return, that we don't have the context. There is CPU only mode that may not want to load CUDA (cause it's not there), so the cuInitChecked() will return 0.

@@ -94,6 +98,23 @@ def reset_indices(self):
self.current_iter = 0
self.current_sample = 0

def schedule_batch(self, pipeline, pool, context_i, batch_size):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think most of the stuff in ExternalSource should be private interface. Not sure how much we can move back, but certainly it's not a part of public api other than init and call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire _ExternalSourceGroup is private.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so I am leaving it as it is for know.

del self.batch_pool[batch.mem_chunk_id]
fd, shm_chunk = -1, None
try:
[fd] = multiprocessing.reduction.recvfds(sock, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did you find this thing :P

os.close(fd)
raise
chunk = MemChunk(shm_chunk, batch.chunk_version, batch.capacity)
self.batch_pool[batch.mem_chunk_id] = chunk
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are mem_chunk_id and chunk_version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When worker starts it creates pool (or rather cyclic buffer) of shared memory chunks for each externalsource it supports. Mem_chunk_id is a key identifying those chunks in communication between worker and main process. This way receiving process knows if it has given chunk already mmaped or maybe needs to receive new file descriptor through the socket etc.

I got rid of chunk_version when applying review remarks.

def add_mem_chunk(self, sock, batch):
chunk = self.batch_pool.get(batch.mem_chunk_id)
if chunk is not None:
if chunk.version == batch.chunk_version:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we receive the same batch several times?

Copy link
Member Author

@stiepan stiepan Dec 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't relate to batch but rather underlying shared memory chunk. Anyway I got rid of this counter, I am simply checking if the expected capacity of the chunk changed - if so, receiving process knows it needs to adjust it.

self.rec_pipes = self.pool.res_pipes + [self.pool.from_tracker]

@classmethod
def from_groups(cls, groups, workers_no, init_method, keep_alive_queue_size, initial_chunk_size=1024 * 1024):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess at some point the initial_chunk_size will be configurable as I see it's not yet set in the pipeline.py. Maybe let it stay how it is right now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it should be straightforward, just wasn't sure if we need it, whether it should be per externalsource parameter or single one per pipeline. And how to call it.

batch_i, tasks = context.scheduled.popitem(last=False)
awaiting_batch = context.partially_received[batch_i]
while len(awaiting_batch) < len(tasks) and batch_i not in context.iter_ended:
self._receive_chunk()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random question: can we somehow get starved/blocked by a consuming batches after batch_i that come from other workers? I guess in the end, the "lazy" worker will send us the last part of the batch, and when receiving next batch we will already have most of it in that case, right?

Copy link
Member Author

@stiepan stiepan Dec 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's exactly the idea. Even if we receive some other parts along the way they will simply be stored and ready for being collected by the right externalsource group in the right order. Worker will finally send its part or will die, either way we should not wait forever.

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [1915700]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [1915700]: BUILD PASSED

Comment on lines 1124 to 1126
DALI_ENFORCE(cuInitChecked(),
"Failed to load libcuda.so. "
"Check your library paths and if the driver is installed correctly.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then:

Suggested change
DALI_ENFORCE(cuInitChecked(),
"Failed to load libcuda.so. "
"Check your library paths and if the driver is installed correctly.");
if (!cuInitChecked())
return false;

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return False
call = getattr(x, "__call__", None)
return _is_generator_function(call)

def _accepted_args_count(callable):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammar Nazi attack!

Suggested change
def _accepted_args_count(callable):
def _accepted_arg_count(callable):

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

.def_property_readonly("fd", &SharedMem::fd)
.def("buf",
[](SharedMem *shm) {
auto ptr = shm->ptr();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you know if shm != nullptr. I would add a check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the check

int sanity_counter = 0;
do {
std::stringstream ss;
ss << "/nvidia_dali_" << rand_suffix();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

@stiepan stiepan Dec 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, sounds like exactly what I need. On the other hand though in section bugs is says never use me. :D Don't know how to feel about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to work! The only thing that bothers me a little with this approach is that I need to specify full path in mkstemp to /dev/shm which is the place where shm_open would create a file. I wonder if that's portable enough for us.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be fine.

};

class DLL_PUBLIC MapMemWrapper {
uint64_t size_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that usually the private members goes to the end.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@klecki klecki force-pushed the external_source_multichunks branch from 6deb921 to d036bd6 Compare March 2, 2021 20:34
@dali-automaton
Copy link
Collaborator

CI MESSAGE: [2125074]: BUILD STARTED

if processed_task is None:
self.ready_queue.insert(0, None)
else:
# assert len(ready_queue) < prefetch_queue_depths[scheduled.context_i], "Worker queue size exceeded."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yeah, the assert was there before refactor, and after refactor I got encapsulated out of the data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, it doesn't make much sense, as the queue of the thread that sends back the data can be longer than the prefetch_queue_depth for given callback (as it can have accumulated data from several callbacks).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So maybe just remove it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

self.tasks_cv = threading.Condition()
self.tasks_queue = []

def get_task(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that get_task and _insert_task matches what dispatch and _wait_for_processed do. It could be extracted to a common class. Just an idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's easier to just repeat here at this point. Otherwise we would add more code with wrapper and how to use it I feel. It's not like we want this to look like CRTP.

Comment on lines +153 to +160
# When initializing DALI, we do the following in order:
# * Discover the ops specified in Python, group the ExternalSources (_build_graph())
# * Start the Python workers pool (_start_py_workers())
# * Construct the C++ Pipeline backend and pass the graph to it (_init_pipeline_backend())
# * Build the pieline. (_pipe.Build())
self._py_graph_built = False
self._py_pool_started = False
self._backend_prepared = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe instead of having a plethora of variables and every time checking all of them we should have an entity that would track the state when queried?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to group them in one place. Do we want some kind of state machine for that? Maybe good idea for a followup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, follow up it is.

try:
self._to_tracker.send(None)
except BrokenPipeError:
"""workers already exited, tracker_thread finished its task and exited and closed the pipe"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the docs string equivalent to pass?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably it is. Will turn it into a comment. (and check if this is what it does).

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [2125074]: BUILD FAILED

Signed-off-by: Krzysztof Lecki <[email protected]>
Signed-off-by: Krzysztof Lecki <[email protected]>
Signed-off-by: Krzysztof Lecki <[email protected]>
@lgtm-com
Copy link
Contributor

lgtm-com bot commented Mar 3, 2021

This pull request introduces 2 alerts when merging 0cddf7e into 8736bf4 - view on LGTM.com

new alerts:

  • 2 for Wrong number of arguments in a call

Signed-off-by: Krzysztof Lecki <[email protected]>
Signed-off-by: Krzysztof Lecki <[email protected]>
Signed-off-by: Krzysztof Lecki <[email protected]>
Signed-off-by: Krzysztof Lecki <[email protected]>
@dali-automaton
Copy link
Collaborator

CI MESSAGE: [2128207]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [2128216]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [2128216]: BUILD FAILED

@klecki klecki force-pushed the external_source_multichunks branch from 02528c5 to faef7d0 Compare March 3, 2021 17:01
@klecki
Copy link
Contributor

klecki commented Mar 3, 2021

!build

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [2128464]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [2128799]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [2128464]: BUILD PASSED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [2128799]: BUILD PASSED

@klecki klecki merged commit 06c803f into NVIDIA:master Mar 3, 2021
@JanuszL JanuszL mentioned this pull request May 19, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants