Skip to content

Commit

Permalink
[GSoC 2018] Multistream API for vocabulary building in *2vec (#2078)
Browse files Browse the repository at this point in the history
* multistream scan vocab for doc2vec, word2vec & fastText

* fixes

* fix tags for doc2vec

* fix tests

* removed benchmark vocab

* addressing comments

* make interfaces and documentation more pretty

* add word2vec multistream tests

* fix pep8

* iteritems -> items

* more precise test

* add doc2vec tests

* add fasttext tests

* remove prints

* fix seed=42

* fixed tests

* add build_vocab test for fasttext

* fix

* change size from 10 to 5 in fasttext test because of appveyor memory limits

* another test with memory error

* fix py3 tests

* fix iteritems for py3

* fix functools reduce

* addressing comments

* addressing @jayantj comments

* fix language

* add final vocab pruning in multistream modes

* keys -> iterkeys

* use heapq.nlargest

* fix

* multistream flag to input_streams param

* fix tests

* fix flake 8

* fix doc2vec docstrings

* fix merging streams

* fix doc2vec

* max_vocab_size -> max_vocab_size / workers

* fixed

* / -> // (py3 division)

* fix

* fix docstring
  • Loading branch information
persiyanov authored and menshikh-iv committed Jul 12, 2018
1 parent 19e725d commit 408a714
Show file tree
Hide file tree
Showing 9 changed files with 541 additions and 121 deletions.
65 changes: 47 additions & 18 deletions gensim/models/base_any2vec.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from types import GeneratorType
from gensim.utils import deprecated
import warnings
import itertools

try:
from queue import Queue
Expand Down Expand Up @@ -130,6 +131,11 @@ def _check_training_sanity(self, epochs=None, total_examples=None, total_words=N
"""Check that the training parameters provided make sense. e.g. raise error if `epochs` not provided."""
raise NotImplementedError()

def _check_input_data_sanity(self, data_iterable=None, data_iterables=None):
"""Check that only one argument is not None."""
if not ((data_iterable is not None) ^ (data_iterables is not None)):
raise ValueError("You must provide only one of singlestream or multistream arguments.")

def _worker_loop(self, job_queue, progress_queue):
"""Train the model, lifting batches of data from the queue.
Expand Down Expand Up @@ -322,14 +328,16 @@ def _log_epoch_progress(self, progress_queue, job_queue, cur_epoch=0, total_exam
self.total_train_time += elapsed
return trained_word_count, raw_word_count, job_tally

def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None,
def _train_epoch(self, data_iterable=None, data_iterables=None, cur_epoch=0, total_examples=None,
total_words=None, queue_factor=2, report_delay=1.0):
"""Train the model for a single epoch.
Parameters
----------
data_iterable : iterable of list of object
The input corpus. This will be split in chunks and these chunks will be pushed to the queue.
data_iterables : iterable of iterables of list of object
The iterable of input streams like `data_iterable`. Use this parameter in multistream mode.
cur_epoch : int, optional
The current training epoch, needed to compute the training parameters for each job.
For example in many implementations the learning rate would be dropping with the number of epochs.
Expand All @@ -353,6 +361,7 @@ def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None,
* Total word count used in training.
"""
self._check_input_data_sanity(data_iterable, data_iterables)
job_queue = Queue(maxsize=queue_factor * self.workers)
progress_queue = Queue(maxsize=(queue_factor + 1) * self.workers)

Expand All @@ -363,6 +372,9 @@ def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None,
for _ in xrange(self.workers)
]

# Chain all input streams into one, because multistream training is not supported yet.
if data_iterables is not None:
data_iterable = itertools.chain(*data_iterables)
workers.append(threading.Thread(
target=self._job_producer,
args=(data_iterable, job_queue),
Expand All @@ -378,7 +390,7 @@ def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None,

return trained_word_count, raw_word_count, job_tally

def train(self, data_iterable, epochs=None, total_examples=None,
def train(self, data_iterable=None, data_iterables=None, epochs=None, total_examples=None,
total_words=None, queue_factor=2, report_delay=1.0, callbacks=(), **kwargs):
"""Train the model for multiple epochs using multiple workers.
Expand Down Expand Up @@ -433,8 +445,9 @@ def train(self, data_iterable, epochs=None, total_examples=None,
callback.on_epoch_begin(self)

trained_word_count_epoch, raw_word_count_epoch, job_tally_epoch = self._train_epoch(
data_iterable, cur_epoch=cur_epoch, total_examples=total_examples, total_words=total_words,
queue_factor=queue_factor, report_delay=report_delay)
data_iterable=data_iterable, data_iterables=data_iterables, cur_epoch=cur_epoch,
total_examples=total_examples, total_words=total_words, queue_factor=queue_factor,
report_delay=report_delay)
trained_word_count += trained_word_count_epoch
raw_word_count += raw_word_count_epoch
job_tally += job_tally_epoch
Expand Down Expand Up @@ -525,9 +538,9 @@ def _do_train_job(self, data_iterable, job_parameters, thread_private_mem):
def _set_train_params(self, **kwargs):
raise NotImplementedError()

def __init__(self, sentences=None, workers=3, vector_size=100, epochs=5, callbacks=(), batch_words=10000,
trim_rule=None, sg=0, alpha=0.025, window=5, seed=1, hs=0, negative=5, ns_exponent=0.75, cbow_mean=1,
min_alpha=0.0001, compute_loss=False, fast_version=0, **kwargs):
def __init__(self, sentences=None, input_streams=None, workers=3, vector_size=100, epochs=5, callbacks=(),
batch_words=10000, trim_rule=None, sg=0, alpha=0.025, window=5, seed=1, hs=0, negative=5,
ns_exponent=0.75, cbow_mean=1, min_alpha=0.0001, compute_loss=False, fast_version=0, **kwargs):
"""
Parameters
Expand Down Expand Up @@ -624,13 +637,20 @@ def __init__(self, sentences=None, workers=3, vector_size=100, epochs=5, callbac
self.neg_labels = zeros(self.negative + 1)
self.neg_labels[0] = 1.

if sentences is not None:
if isinstance(sentences, GeneratorType):
if sentences is not None or input_streams is not None:
self._check_input_data_sanity(data_iterable=sentences, data_iterables=input_streams)
if input_streams is not None:
if not isinstance(input_streams, (tuple, list)):
raise TypeError("You must pass tuple or list as the input_streams argument.")
if any(isinstance(stream, GeneratorType) for stream in input_streams):
raise TypeError("You can't pass a generator as any of input streams. Try an iterator.")
elif isinstance(sentences, GeneratorType):
raise TypeError("You can't pass a generator as the sentences argument. Try an iterator.")
self.build_vocab(sentences, trim_rule=trim_rule)

self.build_vocab(sentences=sentences, input_streams=input_streams, trim_rule=trim_rule)
self.train(
sentences, total_examples=self.corpus_count, epochs=self.epochs, start_alpha=self.alpha,
end_alpha=self.min_alpha, compute_loss=compute_loss)
sentences=sentences, input_streams=input_streams, total_examples=self.corpus_count, epochs=self.epochs,
start_alpha=self.alpha, end_alpha=self.min_alpha, compute_loss=compute_loss)
else:
if trim_rule is not None:
logger.warning(
Expand Down Expand Up @@ -763,7 +783,8 @@ def __str__(self):
self.__class__.__name__, len(self.wv.index2word), self.vector_size, self.alpha
)

def build_vocab(self, sentences, update=False, progress_per=10000, keep_raw_vocab=False, trim_rule=None, **kwargs):
def build_vocab(self, sentences=None, input_streams=None, workers=None, update=False, progress_per=10000,
keep_raw_vocab=False, trim_rule=None, **kwargs):
"""Build vocabulary from a sequence of sentences (can be a once-only generator stream).
Parameters
Expand All @@ -773,7 +794,13 @@ def build_vocab(self, sentences, update=False, progress_per=10000, keep_raw_voca
consider an iterable that streams the sentences directly from disk/network.
See :class:`~gensim.models.word2vec.BrownCorpus`, :class:`~gensim.models.word2vec.Text8Corpus`
or :class:`~gensim.models.word2vec.LineSentence` module for such examples.
update : bool, optional
input_streams : list or tuple of iterable of iterables
The tuple or list of `sentences`-like arguments. Use it if you have multiple input streams. It is possible
to process streams in parallel, using `workers` parameter.
workers : int
Used if `input_streams` is passed. Determines how many processes to use for vocab building.
Actual number of workers is determined by `min(len(input_streams), workers)`.
update : bool
If true, the new words in `sentences` will be added to model's vocab.
progress_per : int, optional
Indicates how many words to process before showing/updating the progress.
Expand All @@ -797,8 +824,10 @@ def build_vocab(self, sentences, update=False, progress_per=10000, keep_raw_voca
Key word arguments propagated to `self.vocabulary.prepare_vocab`
"""
workers = workers or self.workers
total_words, corpus_count = self.vocabulary.scan_vocab(
sentences, progress_per=progress_per, trim_rule=trim_rule)
sentences=sentences, input_streams=input_streams, progress_per=progress_per, trim_rule=trim_rule,
workers=workers)
self.corpus_count = corpus_count
report_values = self.vocabulary.prepare_vocab(
self.hs, self.negative, self.wv, update=update, keep_raw_vocab=keep_raw_vocab,
Expand Down Expand Up @@ -887,7 +916,7 @@ def estimate_memory(self, vocab_size=None, report=None):
)
return report

def train(self, sentences, total_examples=None, total_words=None,
def train(self, sentences=None, input_streams=None, total_examples=None, total_words=None,
epochs=None, start_alpha=None, end_alpha=None, word_count=0,
queue_factor=2, report_delay=1.0, compute_loss=False, callbacks=()):
"""Train the model. If the hyper-parameters are passed, they override the ones set in the constructor.
Expand Down Expand Up @@ -933,8 +962,8 @@ def train(self, sentences, total_examples=None, total_words=None,
self.compute_loss = compute_loss
self.running_training_loss = 0.0
return super(BaseWordEmbeddingsModel, self).train(
sentences, total_examples=total_examples, total_words=total_words,
epochs=epochs, start_alpha=start_alpha, end_alpha=end_alpha, word_count=word_count,
data_iterable=sentences, data_iterables=input_streams, total_examples=total_examples,
total_words=total_words, epochs=epochs, start_alpha=start_alpha, end_alpha=end_alpha, word_count=word_count,
queue_factor=queue_factor, report_delay=report_delay, compute_loss=compute_loss, callbacks=callbacks)

def _get_job_params(self, cur_epoch):
Expand Down
Loading

0 comments on commit 408a714

Please sign in to comment.