-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove dispatcher deadlock for distributed LDA #1817
Merged
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…into 3.1.i1 # Conflicts: # docs/src/conf.py # gensim/__init__.py # setup.py
# Conflicts: # gensim/utils.py
Thanks for PR @darindf, I want to reproduce deadlock, can you write a plan "how to reproduce deadlock" here (I understand your message, but want more formal description) And about the Pyro4 exception, as I understand, |
The issue is the way the worker process gets jobs and publish results.
The relevant section of code is
job = None while job is None and not self.finished:
try: job = self.dispatcher.getjob(self.myid)
except Queue.Empty: # no new job: try again,
unless we're finished with all work continue
if job is not None: logger.info("worker #%s received
job #%i", self.myid, self.jobsdone)
self.processjob(job)
self.dispatcher.jobdone(self.myid) else:
logger.info("worker #%i stopping asking for jobs", self.myid)
So an easy way to produce this is start nameserver dispatcher, one
worker, and the main ldaModel in distributed mode. Modify the above code
snippet to include quit() after getJob()
while job is None and not self.finished: try:
job = self.dispatcher.getjob(self.myid) except
Queue.Empty: # no new job: try again, unless
we're finished with all work continue
if job is not None: logger.info("worker #%s
received job #%i", self.myid, self.jobsdone) quit()
# simulate failure issue with process job,
# network partitioning, or machine failure
# self.dispatcher.jobdone(self.myid) else:
# logger.info("worker #%i stopping asking for jobs",
# self.myid)The quit function is to simulate the failure of the processjob to
complete, machine failure, network partitioning or any other failure for
the worker to end or fail to communicate back to the dispatcher.
The sequence of the main program that called dispatch.putjob will
eventually call dispatcher.getstate(). The first operations puts a job
into the dispatcher queue, the latter operation waits on all the jobs
are done through done with 2 counters, number of jobs submitted, number
of jobs completed via the dispatcher get state loop
while self._jobsdone < self._jobsreceived: time.sleep(0.5)
# check every half a second
Obviously if the worker terminates as above, the _jobsdone counter will
never get updated, and thus this loop is never terminated, and the
process calling dispatcher.getState never returns.
The fix I introduced was to periodically check to see if all the workers
are alive, and thus we should still wait for them to call jobdone, i.e.
aliveness heartbeat.
As the number of workers and number of machines increase, the
likelyhood failure increases, decreasing reliability, thus this should
improve scalability. We are looking at using 4 to 6 machines running 4
to 7 workers.
There may be still a race condition, the worker network may become
unpartitioned for the dispatcher, but that has 2 cases. If the worker
calls jobdone when the network is partitioned, the worker would fail,
i.e socket communication failure would occur, this would cause the job
not to be posted back to the dispatcher, thus the same deadlock would
occur in the dispatcher getstate.
The second case is if the network comes back, thus the worker is able to
call jobdone, as the proccessjob took a long time to complete, but in
the meantime the dispatcher has given up, as it detected network failure
using the new keep alive. In the case the LdaModel can be re ran and an
error is known (worker network is down), which is better than waiting
endlessly for it complete.
For this last case I'm working on job resiliency, if the dispatcher
detects that worker is not alive, the job is reposted back to the
job queue, and the worker is marked as dead and removed from the
worker pool.
Additional option I'm considering is to re-identify workers during the
dispatcher.getstate to perform a similar operation in
dispatcher.initialize. The logic being that as a lda model is being
processed (i.e. dispatcher is in getstate method), new workers can still
be added to the network to participate in the model evaluation, however
the current logic only adds workers at the beginning of a LdaModel
evaluation (dispatcher.initialize call). Thus if a machine is rebooted
in the middle of model evaluation, it would gracefully exit
participation with the dispatcher, and come back on line to rejoin the
evaluation when the reboot is complete. (Note other errors would have
use the recovery strategy, i.e. network unplugged, etc)
On Tue, Dec 26, 2017, at 8:24 PM, Menshikh Ivan wrote:
Thanks for PR @darindf[1],
I want to reproduce deadlock, can you write a plan "how to reproduce
deadlock" here (I understand your message, but want more formal
description)> And about the Pyro4 exception, as I understand, worker.ping should
raise it (if a worker is unreachable), but what happens next in the
master process?> — You are receiving this because you were mentioned. Reply to this
email directly, view it on GitHub[2], or mute the thread[3].>
If this email is spam, report it towww.OnlyMyEmail.com[4]
|
menshikh-iv
changed the title
Remove dispatcher deadlock
Remove dispatcher deadlock for distributed LDA
Jan 18, 2018
Big thanks @darindf! |
sj29-innovate
pushed a commit
to sj29-innovate/gensim
that referenced
this pull request
Feb 21, 2018
* Updated to 3.1.i1 version for fix 1771 * Remove suprious print statement * Fixed variable not used error
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Dead lock is possible when after a worker receives a job from the dispatcher but fails to calls the dispatcher processJob (i.e. machine died, worker process died). In this case the dispatcher is stuck in a infinite loop checking if the number of jobs done meets the number of jobs received.
Added call back mechanism within this loop for the dispatcher to check aliveness of each worker. If worker is not reachable, then an exception is thrown back (exception from Pyro4) so that LdaModel function can fail, thus removing the deadlock condition.