-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix and enable multithreading #32
Conversation
Processes results within scope of multiprocessing pool Sets chunksize to address possible performance concerns Requests only available CPUs Remove now redundant comments
Remove unused parameters, use consistent functions for imap_unordered. Rename intermediate variables.
I've merged the fixes in #33 and repeated tests. I realised that my old tests hadn't configured single core tests correctly. On the new tests, running with a single CPU or with multithread disabled takes roughly the same time, and running with extra cores reduced the processing time substantially. How significant it is depends on the data you're processing. Results are now consistent except for the very last step: The last file runs with The differences here seem very significant, so I'd guess it's something to do with the order that floats are operated on, rather than some wrong calculation being done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently, all is okay. No need to refactor anything else for the moment so we keep the same logic to name the variables and the functions. This could be done at a later stage in case we have time.
The chunksize
seems to do a reasonable job.
I'll take a look at the extracted spectra and let you know if we're fine to go ahead.
@@ -922,7 +915,7 @@ def xcorr_shift_all( packaged_args ): | |||
this_ref_array[numpy.where(this_init_x_array==item)] = \ | |||
numpy.max(final_lam_bes[loc-2:loc+3]) | |||
|
|||
return [this_row,this_init_x_array,this_ref_array] | |||
return [this_row, this_ref_array] | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new function and the refactoring look fine for me. Also, I've run some tests and they return what expected. I'll take a look at the extracted spectra to double-check that all is ok as some changes affect the wavelength solution and calibration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect. If there are any issues, I can try disabling different parts of the 'multithread' json to track down where the difference occurs.
I've undone "1. Use os.sched_getaffinity(0) instead of multiprocessing.cpu_count()", as it caused a crash on macOS. I've created an issue to fix that as a follow-up here, though it may be low priority: #36. |
There's a few more changes needed before enabling multithreading, but they're a little too much work for one PR, and we've hit the end of the year. For this PR, I'm disabling multithreading again. A little more work is needed for each section before enabling:
|
Quick update, (3) is now fixed, so the pipeline produces identical results whether running single-threaded or multi-threaded. The issue was the headers loaded by cube_gen. I'm still inclined to leave multithreading disabled by default until we address the other points. This PR should be good to go. |
The 'mulithread' config caused the program to freeze on OzStar. I've made 1 change to fix the freeze, then 3 other changes to optimize a little.
The Freeze
The freeze was caused by creating a
multiprocess.pool
, making a list of jobs usingimap_unordered
, but only consuming the list after the pool was closed.The jobs returned by
imap_unordered
are lazily evaluated, so it won't start running the processes until the list is read. It then yields each result in the order they complete. Reading the list only after the pool closes causes it to wait forever.See https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.imap (with imap_unordered just below it) for more info.
I've moved the code that consumes the list to its own function (which needs some more renaming), and that solves the freeze.
Other changes
The other changes are:
os.sched_getaffinity(0)
instead ofmultiprocessing.cpu_count()
to get the number of CPUs available to the job, rather than the number of CPUs on the machine.chunksize
, which sets the number of jobs to assign to & return from each CPU at a time.multithread
to true by default in the json configs. In a follow-up, we can change it programmatically instead.chunksize
is a bit of a complicated issue. I'm inclined to do what I've done here, and set an optimistic chunksize with some small tests to back it up, and revisit later if needed. There were comments suggesting a similar approach.Here's the considerations:
Tests
The pipeline already has an overall debug timer in the output file, search for
All done in
With multithreading, the run completes thousands of seconds faster compared to running single threaded, and hundreds of seconds faster if running with additional cores.
With chunksize evenly dividing jobs I saw a ~ 40 second improvement. I'm unsure if that's beyond margin of error.
There are small differences in the logs before & after, but I think they're insignificant (e.g. differences due to adding numbers in different orders). It might be worth doing a test & plotting results to be sure.