diff --git a/CHANGELOG.md b/CHANGELOG.md index 91916ece22..4fe9c060e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ This release contains a major refactoring. ### :+1: Improvements +* Refactor ldamulticore to serialize less data (PR [#2300](https://github.com/RaRe-Technologies/gensim/pull/2300), __[@horpto](https://github.com/horpto)__) * KeyedVectors & X2Vec API streamlining, consistency (PR [#2698](https://github.com/RaRe-Technologies/gensim/pull/2698), __[@gojomo](https://github.com/gojomo)__) * No more wheels for x32 platforms (if you need x32 binaries, please build them yourself). (__[menshikh-iv](https://github.com/menshikh-iv)__, [#6](https://github.com/RaRe-Technologies/gensim-wheels/pull/6)) diff --git a/gensim/models/ldamulticore.py b/gensim/models/ldamulticore.py index d758b9e48e..a4a0c297de 100644 --- a/gensim/models/ldamulticore.py +++ b/gensim/models/ldamulticore.py @@ -277,7 +277,7 @@ def process_result_queue(force=False): self.log_perplexity(chunk, total_docs=lencorpus) logger.info("training LDA model using %i processes", self.workers) - pool = Pool(self.workers, worker_e_step, (job_queue, result_queue,)) + pool = Pool(self.workers, worker_e_step, (job_queue, result_queue, self)) for pass_ in range(self.passes): queue_size, reallen = [0], 0 other = LdaState(self.eta, self.state.sstats.shape) @@ -289,7 +289,7 @@ def process_result_queue(force=False): # put the chunk into the workers' input job queue while True: try: - job_queue.put((chunk_no, chunk, self), block=False) + job_queue.put((chunk_no, chunk, self.state), block=False) queue_size[0] += 1 logger.info( "PROGRESS: pass %i, dispatched chunk #%i = documents up to #%i/%i, " @@ -316,7 +316,7 @@ def process_result_queue(force=False): pool.terminate() -def worker_e_step(input_queue, result_queue): +def worker_e_step(input_queue, result_queue, worker_lda): """Perform E-step for each job. Parameters @@ -326,17 +326,20 @@ def worker_e_step(input_queue, result_queue): responsible for processing it. result_queue : queue of :class:`~gensim.models.ldamodel.LdaState` After the worker finished the job, the state of the resulting (trained) worker model is appended to this queue. - + worker_lda : :class:`~gensim.models.ldamulticore.LdaMulticore` + LDA instance which performed e step """ logger.debug("worker process entering E-step loop") while True: logger.debug("getting a new job") - chunk_no, chunk, worker_lda = input_queue.get() + chunk_no, chunk, w_state = input_queue.get() logger.debug("processing chunk #%i of %i documents", chunk_no, len(chunk)) + worker_lda.state = w_state + worker_lda.sync_state() worker_lda.state.reset() worker_lda.do_estep(chunk) # TODO: auto-tune alpha? del chunk logger.debug("processed chunk, queuing the result") result_queue.put(worker_lda.state) - del worker_lda # free up some memory + worker_lda.state = None logger.debug("result put")