diff --git a/annif/backend/nn_ensemble.py b/annif/backend/nn_ensemble.py index 476cd87b3..8766580fb 100644 --- a/annif/backend/nn_ensemble.py +++ b/annif/backend/nn_ensemble.py @@ -14,6 +14,7 @@ from tensorflow.keras.utils import Sequence import tensorflow.keras.backend as K import annif.corpus +import annif.parallel import annif.util from annif.exception import NotInitializedException, NotSupportedException from annif.suggestion import VectorSuggestionResult @@ -174,25 +175,45 @@ def _train(self, corpus, params, jobs=0): self._fit_model( corpus, epochs=int(params['epochs']), - lmdb_map_size=int(params['lmdb_map_size'])) + lmdb_map_size=int(params['lmdb_map_size']), + n_jobs=jobs) - def _corpus_to_vectors(self, corpus, seq): + def _corpus_to_vectors(self, corpus, seq, n_jobs): # pass corpus through all source projects - sources = [(self.project.registry.get_project(project_id), weight) - for project_id, weight - in annif.util.parse_sources(self.params['sources'])] - - for doc in corpus.documents: - doc_scores = [] - for source_project, weight in sources: - hits = source_project.suggest(doc.text) - vector = hits.as_vector(source_project.subjects) - doc_scores.append(np.sqrt(vector) * weight * len(sources)) - score_vector = np.array(doc_scores, - dtype=np.float32).transpose() - subjects = annif.corpus.SubjectSet((doc.uris, doc.labels)) - true_vector = subjects.as_vector(self.project.subjects) - seq.add_sample(score_vector, true_vector) + sources = dict( + annif.util.parse_sources(self.params['sources'])) + + # initialize the source projects before forking, to save memory + self.info( + f"Initializing source projects: {', '.join(sources.keys())}") + for project_id in sources.keys(): + project = self.project.registry.get_project(project_id) + project.initialize(parallel=True) + + psmap = annif.parallel.ProjectSuggestMap( + self.project.registry, + list(sources.keys()), + backend_params=None, + limit=None, + threshold=0.0) + + jobs, pool_class = annif.parallel.get_pool(n_jobs) + + self.info("Processing training documents...") + with pool_class(jobs) as pool: + for hits, uris, labels in pool.imap_unordered( + psmap.suggest, corpus.documents): + doc_scores = [] + for project_id, p_hits in hits.items(): + vector = p_hits.as_vector(self.project.subjects) + doc_scores.append(np.sqrt(vector) + * sources[project_id] + * len(sources)) + score_vector = np.array(doc_scores, + dtype=np.float32).transpose() + subjects = annif.corpus.SubjectSet((uris, labels)) + true_vector = subjects.as_vector(self.project.subjects) + seq.add_sample(score_vector, true_vector) def _open_lmdb(self, cached, lmdb_map_size): lmdb_path = os.path.join(self.datadir, self.LMDB_FILE) @@ -200,7 +221,7 @@ def _open_lmdb(self, cached, lmdb_map_size): shutil.rmtree(lmdb_path) return lmdb.open(lmdb_path, map_size=lmdb_map_size, writemap=True) - def _fit_model(self, corpus, epochs, lmdb_map_size): + def _fit_model(self, corpus, epochs, lmdb_map_size, n_jobs=1): env = self._open_lmdb(corpus == 'cached', lmdb_map_size) if corpus != 'cached': if corpus.is_empty(): @@ -208,10 +229,11 @@ def _fit_model(self, corpus, epochs, lmdb_map_size): 'Cannot train nn_ensemble project with no documents') with env.begin(write=True, buffers=True) as txn: seq = LMDBSequence(txn, batch_size=32) - self._corpus_to_vectors(corpus, seq) + self._corpus_to_vectors(corpus, seq, n_jobs) else: self.info("Reusing cached training data from previous run.") # fit the model using a read-only view of the LMDB + self.info("Training neural network model...") with env.begin(buffers=True) as txn: seq = LMDBSequence(txn, batch_size=32) self._model.fit(seq, verbose=True, epochs=epochs)