Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform suggest operations in parallel using multiprocessing in nn_ensemble #568

Merged
merged 3 commits into from
Feb 11, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 41 additions & 19 deletions annif/backend/nn_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from tensorflow.keras.utils import Sequence
import tensorflow.keras.backend as K
import annif.corpus
import annif.parallel
import annif.util
from annif.exception import NotInitializedException, NotSupportedException
from annif.suggestion import VectorSuggestionResult
Expand Down Expand Up @@ -174,44 +175,65 @@ def _train(self, corpus, params, jobs=0):
self._fit_model(
corpus,
epochs=int(params['epochs']),
lmdb_map_size=int(params['lmdb_map_size']))
lmdb_map_size=int(params['lmdb_map_size']),
n_jobs=jobs)

def _corpus_to_vectors(self, corpus, seq):
def _corpus_to_vectors(self, corpus, seq, n_jobs):
# pass corpus through all source projects
sources = [(self.project.registry.get_project(project_id), weight)
for project_id, weight
in annif.util.parse_sources(self.params['sources'])]

for doc in corpus.documents:
doc_scores = []
for source_project, weight in sources:
hits = source_project.suggest(doc.text)
vector = hits.as_vector(source_project.subjects)
doc_scores.append(np.sqrt(vector) * weight * len(sources))
score_vector = np.array(doc_scores,
dtype=np.float32).transpose()
subjects = annif.corpus.SubjectSet((doc.uris, doc.labels))
true_vector = subjects.as_vector(self.project.subjects)
seq.add_sample(score_vector, true_vector)
sources = dict(
annif.util.parse_sources(self.params['sources']))

# initialize the source projects before forking, to save memory
self.info(
f"Initializing source projects: {', '.join(sources.keys())}")
for project_id in sources.keys():
project = self.project.registry.get_project(project_id)
project.initialize(parallel=True)

psmap = annif.parallel.ProjectSuggestMap(
self.project.registry,
list(sources.keys()),
backend_params=None,
limit=None,
threshold=0.0)

jobs, pool_class = annif.parallel.get_pool(n_jobs)

self.info("Processing training documents...")
with pool_class(jobs) as pool:
for hits, uris, labels in pool.imap_unordered(
psmap.suggest, corpus.documents):
doc_scores = []
for project_id, p_hits in hits.items():
vector = p_hits.as_vector(self.project.subjects)
doc_scores.append(np.sqrt(vector)
* sources[project_id]
* len(sources))
score_vector = np.array(doc_scores,
dtype=np.float32).transpose()
subjects = annif.corpus.SubjectSet((uris, labels))
true_vector = subjects.as_vector(self.project.subjects)
seq.add_sample(score_vector, true_vector)

def _open_lmdb(self, cached, lmdb_map_size):
lmdb_path = os.path.join(self.datadir, self.LMDB_FILE)
if not cached and os.path.exists(lmdb_path):
shutil.rmtree(lmdb_path)
return lmdb.open(lmdb_path, map_size=lmdb_map_size, writemap=True)

def _fit_model(self, corpus, epochs, lmdb_map_size):
def _fit_model(self, corpus, epochs, lmdb_map_size, n_jobs=1):
env = self._open_lmdb(corpus == 'cached', lmdb_map_size)
if corpus != 'cached':
if corpus.is_empty():
raise NotSupportedException(
'Cannot train nn_ensemble project with no documents')
with env.begin(write=True, buffers=True) as txn:
seq = LMDBSequence(txn, batch_size=32)
self._corpus_to_vectors(corpus, seq)
self._corpus_to_vectors(corpus, seq, n_jobs)
else:
self.info("Reusing cached training data from previous run.")
# fit the model using a read-only view of the LMDB
self.info("Training neural network model...")
with env.begin(buffers=True) as txn:
seq = LMDBSequence(txn, batch_size=32)
self._model.fit(seq, verbose=True, epochs=epochs)
Expand Down