Skip to content

Commit

Permalink
Move out process_result_queue from cycle in LdaMulticore (#2358)
Browse files Browse the repository at this point in the history
* Move out `process_result_queue`

+ simplify inner check
+ add heuristic for case with large number of workers

* delete heuristic

* swap branches in if self.batch check
  • Loading branch information
horpto authored and menshikh-iv committed Jan 29, 2019
1 parent 9242a6b commit 59cf775
Showing 1 changed file with 27 additions and 28 deletions.
55 changes: 27 additions & 28 deletions gensim/models/ldamulticore.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,14 @@ def update(self, corpus, chunks_as_numpy=False):

self.state.numdocs += lencorpus

if not self.batch:
updatetype = "online"
updateafter = self.chunksize * self.workers
else:
if self.batch:
updatetype = "batch"
updateafter = lencorpus
evalafter = min(lencorpus, (self.eval_every or 0) * updateafter)
else:
updatetype = "online"
updateafter = self.chunksize * self.workers
eval_every = self.eval_every or 0
evalafter = min(lencorpus, eval_every * updateafter)

updates_per_pass = max(1, lencorpus / updateafter)
logger.info(
Expand All @@ -257,47 +258,45 @@ def update(self, corpus, chunks_as_numpy=False):
def rho():
return pow(self.offset + pass_ + (self.num_updates / self.chunksize), -self.decay)

def process_result_queue(force=False):
"""
Clear the result queue, merging all intermediate results, and update the
LDA model if necessary.
"""
merged_new = False
while not result_queue.empty():
other.merge(result_queue.get())
queue_size[0] -= 1
merged_new = True

if (force and merged_new and queue_size[0] == 0) or (other.numdocs >= updateafter):
self.do_mstep(rho(), other, pass_ > 0)
other.reset()
if eval_every > 0 and (force or (self.num_updates / updateafter) % eval_every == 0):
self.log_perplexity(chunk, total_docs=lencorpus)

logger.info("training LDA model using %i processes", self.workers)
pool = Pool(self.workers, worker_e_step, (job_queue, result_queue,))
for pass_ in range(self.passes):
queue_size, reallen = [0], 0
other = LdaState(self.eta, self.state.sstats.shape)

def process_result_queue(force=False):
"""
Clear the result queue, merging all intermediate results, and update the
LDA model if necessary.
"""
merged_new = False
while not result_queue.empty():
other.merge(result_queue.get())
queue_size[0] -= 1
merged_new = True
if (force and merged_new and queue_size[0] == 0) or (not self.batch and (other.numdocs >= updateafter)):
self.do_mstep(rho(), other, pass_ > 0)
other.reset()
if self.eval_every is not None \
and ((force and queue_size[0] == 0)
or (self.eval_every != 0 and (self.num_updates / updateafter) % self.eval_every == 0)):
self.log_perplexity(chunk, total_docs=lencorpus)

chunk_stream = utils.grouper(corpus, self.chunksize, as_numpy=chunks_as_numpy)
for chunk_no, chunk in enumerate(chunk_stream):
reallen += len(chunk) # keep track of how many documents we've processed so far

# put the chunk into the workers' input job queue
chunk_put = False
while not chunk_put:
while True:
try:
job_queue.put((chunk_no, chunk, self), block=False, timeout=0.1)
chunk_put = True
job_queue.put((chunk_no, chunk, self), block=False)
queue_size[0] += 1
logger.info(
"PROGRESS: pass %i, dispatched chunk #%i = documents up to #%i/%i, "
"outstanding queue size %i",
pass_, chunk_no, chunk_no * self.chunksize + len(chunk), lencorpus, queue_size[0]
)
break
except queue.Full:
# in case the input job queue is full, keep clearing the
# result queue, to make sure we don't deadlock
Expand Down

0 comments on commit 59cf775

Please sign in to comment.