Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GSoC 2018] Multistream API for vocabulary building in *2vec #2078

Merged
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
92e6e22
multistream scan vocab for doc2vec, word2vec & fastText
Jun 4, 2018
2618a2e
fixes
Jun 4, 2018
7960af8
fix tags for doc2vec
Jun 4, 2018
b8da97a
fix tests
Jun 4, 2018
16be716
removed benchmark vocab
Jun 4, 2018
c2d674a
addressing comments
Jun 7, 2018
85e689c
make interfaces and documentation more pretty
Jun 7, 2018
0d5ae38
add word2vec multistream tests
Jun 7, 2018
df3ae5f
fix pep8
Jun 8, 2018
49357cb
iteritems -> items
Jun 8, 2018
0365eea
more precise test
Jun 8, 2018
812ab8c
add doc2vec tests
Jun 8, 2018
f11f44d
add fasttext tests
Jun 8, 2018
941dfd8
remove prints
Jun 8, 2018
36e7238
fix seed=42
Jun 8, 2018
fa57f7a
fixed tests
Jun 8, 2018
9ea007d
add build_vocab test for fasttext
Jun 8, 2018
aec68ea
fix
Jun 8, 2018
07f3fd4
change size from 10 to 5 in fasttext test because of appveyor memory …
Jun 8, 2018
8b49fb8
another test with memory error
Jun 8, 2018
d0c11d9
fix py3 tests
Jun 8, 2018
5974448
fix iteritems for py3
Jun 8, 2018
1419847
fix functools reduce
Jun 8, 2018
280e826
addressing comments
Jun 12, 2018
7d489f4
addressing @jayantj comments
Jun 13, 2018
49a1ee6
fix language
Jun 13, 2018
1cbad7f
add final vocab pruning in multistream modes
Jun 13, 2018
d024625
keys -> iterkeys
Jun 14, 2018
5e4de19
use heapq.nlargest
Jun 15, 2018
74e7b02
fix
Jun 15, 2018
0d12d8b
multistream flag to input_streams param
Jun 19, 2018
25d00cd
fix tests
Jun 19, 2018
2281265
fix flake 8
Jun 19, 2018
543a9e0
fix doc2vec docstrings
Jun 19, 2018
d520d68
fix merging streams
Jun 19, 2018
d11a0b8
fix doc2vec
Jun 19, 2018
ecd8f39
max_vocab_size -> max_vocab_size / workers
Jun 19, 2018
a96d5a4
fixed
Jun 19, 2018
0a327b0
/ -> // (py3 division)
Jun 19, 2018
62873fb
fix
Jun 19, 2018
5f61219
Merge branch 'develop' into feature/gsoc-multistream-vocab
Jun 20, 2018
c67f964
fix docstring
Jun 20, 2018
a16cec0
Merge branch 'develop' into feature/gsoc-multistream-vocab
Jun 24, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 46 additions & 17 deletions gensim/models/base_any2vec.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from types import GeneratorType
from gensim.utils import deprecated
import warnings
import itertools

try:
from queue import Queue
Expand Down Expand Up @@ -130,6 +131,11 @@ def _check_training_sanity(self, epochs=None, total_examples=None, total_words=N
"""Check that the training parameters provided make sense. e.g. raise error if `epochs` not provided."""
raise NotImplementedError()

def _check_input_data_sanity(self, data_iterable=None, data_iterables=None):
"""Check that only one argument is not None."""
if not ((data_iterable is not None) ^ (data_iterables is not None)):
raise ValueError("You must provide only one of singlestream or multistream arguments.")

def _worker_loop(self, job_queue, progress_queue):
"""Train the model, lifting batches of data from the queue.

Expand Down Expand Up @@ -322,14 +328,16 @@ def _log_epoch_progress(self, progress_queue, job_queue, cur_epoch=0, total_exam
self.total_train_time += elapsed
return trained_word_count, raw_word_count, job_tally

def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None,
def _train_epoch(self, data_iterable=None, data_iterables=None, cur_epoch=0, total_examples=None,
total_words=None, queue_factor=2, report_delay=1.0):
"""Train the model for a single epoch.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a comment about the doc-improvements here, which all look good, but a side observation about this method, which I only noticed during related reviews last week, is that its strategy of re-launching a fresh 'producer' thread and fresh 'worker' threads for each epoch, as I believe was introduced in #1777, likely drives down overall throughput and CPU utilization compared to the prior strategy. The one potential advantage I'd see for adopting such a full-sync teardown&restart between epochs would be allowing the user to specify some callback for mid-training reporting at each epoch's end – but that hasn't yet ben added.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gojomo why would that drive down throughput & CPU utilization?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When some threads have finished an epoch, but others haven't, cores will be idle not because of GIL/etc but because there's no thread even trying to move forward onto the next epoch's data. Plus any overhead of re-launching threads (magnitude unclear). Old strategy launched exactly workers + 1 threads. This one launches epochs * (workers + 1) threads.

Copy link
Owner

@piskvorky piskvorky Jun 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, you're worried that at the end of each epoch, some threads may be idle (while other threads are finishing their last job) until the next epoch starts.

Isn't that idleness infinitesimal, since any single job takes almost no time at all? I may be missing something but this type of delay shouldn't be even measurable.

Copy link
Collaborator

@gojomo gojomo Jun 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure of the magnitude, only the direction: this means more idle cores, every time an epoch rolls over. Of course only measuring could tell for sure, and the proportionate impact becomes smaller with larger corpuses.

As it's not yet clear to me the relative interplay of existing GIL/queue sync bottlenecks that have been preventing higher throughput (or else I would have tried more to fix them), adding yet more thread launches/waits/syncs-against-a-not-yet-filled-queue is something I'd have been wary of doing without measurement at the time. Even the coarse once-a-second progress logging tended to show slower throughput at the beginning of training; that slow-start might now be repeated at each epoch - for example, via GIL-contention between the 1st few new worker threads getting a job, and the new producer thread, trying to get ahead of the workers again.


Parameters
----------
data_iterable : iterable of list of object
The input corpus. This will be split in chunks and these chunks will be pushed to the queue.
data_iterables : iterable of iterables of list of object
The iterable of input streams like `data_iterable`. Use this parameter in multistream mode.
cur_epoch : int, optional
The current training epoch, needed to compute the training parameters for each job.
For example in many implementations the learning rate would be dropping with the number of epochs.
Expand All @@ -353,6 +361,7 @@ def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None,
* Total word count used in training.

"""
self._check_input_data_sanity(data_iterable, data_iterables)
job_queue = Queue(maxsize=queue_factor * self.workers)
progress_queue = Queue(maxsize=(queue_factor + 1) * self.workers)

Expand All @@ -363,6 +372,9 @@ def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None,
for _ in xrange(self.workers)
]

# Chain all input streams into one, because multistream training is not supported yet.
if data_iterables is not None:
data_iterable = itertools.chain(*data_iterables)
workers.append(threading.Thread(
target=self._job_producer,
args=(data_iterable, job_queue),
Expand All @@ -378,7 +390,7 @@ def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None,

return trained_word_count, raw_word_count, job_tally

def train(self, data_iterable, epochs=None, total_examples=None,
def train(self, data_iterable=None, data_iterables=None, epochs=None, total_examples=None,
total_words=None, queue_factor=2, report_delay=1.0, callbacks=(), **kwargs):
"""Train the model for multiple epochs using multiple workers.

Expand Down Expand Up @@ -433,8 +445,9 @@ def train(self, data_iterable, epochs=None, total_examples=None,
callback.on_epoch_begin(self)

trained_word_count_epoch, raw_word_count_epoch, job_tally_epoch = self._train_epoch(
data_iterable, cur_epoch=cur_epoch, total_examples=total_examples, total_words=total_words,
queue_factor=queue_factor, report_delay=report_delay)
data_iterable=data_iterable, data_iterables=data_iterables, cur_epoch=cur_epoch,
total_examples=total_examples, total_words=total_words, queue_factor=queue_factor,
report_delay=report_delay)
trained_word_count += trained_word_count_epoch
raw_word_count += raw_word_count_epoch
job_tally += job_tally_epoch
Expand Down Expand Up @@ -525,8 +538,8 @@ def _do_train_job(self, data_iterable, job_parameters, thread_private_mem):
def _set_train_params(self, **kwargs):
raise NotImplementedError()

def __init__(self, sentences=None, workers=3, vector_size=100, epochs=5, callbacks=(), batch_words=10000,
trim_rule=None, sg=0, alpha=0.025, window=5, seed=1, hs=0, negative=5, cbow_mean=1,
def __init__(self, sentences=None, input_streams=None, workers=3, vector_size=100, epochs=5, callbacks=(),
batch_words=10000, trim_rule=None, sg=0, alpha=0.025, window=5, seed=1, hs=0, negative=5, cbow_mean=1,
min_alpha=0.0001, compute_loss=False, fast_version=0, **kwargs):
"""

Expand Down Expand Up @@ -623,13 +636,20 @@ def __init__(self, sentences=None, workers=3, vector_size=100, epochs=5, callbac
self.neg_labels = zeros(self.negative + 1)
self.neg_labels[0] = 1.

if sentences is not None:
if isinstance(sentences, GeneratorType):
if sentences is not None or input_streams is not None:
self._check_input_data_sanity(data_iterable=sentences, data_iterables=input_streams)
if input_streams is not None:
if not isinstance(input_streams, (tuple, list)):
raise TypeError("You must pass tuple or list as the input_streams argument.")
if any(isinstance(stream, GeneratorType) for stream in input_streams):
raise TypeError("You can't pass a generator as any of input streams. Try an iterator.")
elif isinstance(sentences, GeneratorType):
raise TypeError("You can't pass a generator as the sentences argument. Try an iterator.")
self.build_vocab(sentences, trim_rule=trim_rule)

self.build_vocab(sentences=sentences, input_streams=input_streams, trim_rule=trim_rule)
self.train(
sentences, total_examples=self.corpus_count, epochs=self.epochs, start_alpha=self.alpha,
end_alpha=self.min_alpha, compute_loss=compute_loss)
sentences=sentences, input_streams=input_streams, total_examples=self.corpus_count, epochs=self.epochs,
start_alpha=self.alpha, end_alpha=self.min_alpha, compute_loss=compute_loss)
else:
if trim_rule is not None:
logger.warning(
Expand Down Expand Up @@ -762,7 +782,8 @@ def __str__(self):
self.__class__.__name__, len(self.wv.index2word), self.vector_size, self.alpha
)

def build_vocab(self, sentences, update=False, progress_per=10000, keep_raw_vocab=False, trim_rule=None, **kwargs):
def build_vocab(self, sentences=None, input_streams=None, workers=None, update=False, progress_per=10000,
keep_raw_vocab=False, trim_rule=None, **kwargs):
"""Build vocabulary from a sequence of sentences (can be a once-only generator stream).

Parameters
Expand All @@ -772,7 +793,13 @@ def build_vocab(self, sentences, update=False, progress_per=10000, keep_raw_voca
consider an iterable that streams the sentences directly from disk/network.
See :class:`~gensim.models.word2vec.BrownCorpus`, :class:`~gensim.models.word2vec.Text8Corpus`
or :class:`~gensim.models.word2vec.LineSentence` module for such examples.
update : bool, optional
input_streams : list or tuple of iterable of iterables
The tuple or list of `sentences`-like arguments. Use it if you have multiple input streams. It is possible
to process streams in parallel, using `workers` parameter.
workers : int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better use different naming like multistream_workers (for avoiding potential collision by parameter names) or this has no sense @persiyanov?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is no point in this. In most cases, by setting workers parameter user means the same degree of parallelization for both scan vocab (multiprocessing) and training the model (multithreading), IMO

Used if `input_streams` is passed. Determines how many processes to use for vocab building.
Actual number of workers is determined by `min(len(input_streams), workers)`.
update : bool
If true, the new words in `sentences` will be added to model's vocab.
progress_per : int, optional
Indicates how many words to process before showing/updating the progress.
Expand All @@ -796,8 +823,10 @@ def build_vocab(self, sentences, update=False, progress_per=10000, keep_raw_voca
Key word arguments propagated to `self.vocabulary.prepare_vocab`

"""
workers = workers or self.workers
total_words, corpus_count = self.vocabulary.scan_vocab(
sentences, progress_per=progress_per, trim_rule=trim_rule)
sentences=sentences, input_streams=input_streams, progress_per=progress_per, trim_rule=trim_rule,
workers=workers)
self.corpus_count = corpus_count
report_values = self.vocabulary.prepare_vocab(
self.hs, self.negative, self.wv, update=update, keep_raw_vocab=keep_raw_vocab,
Expand Down Expand Up @@ -886,7 +915,7 @@ def estimate_memory(self, vocab_size=None, report=None):
)
return report

def train(self, sentences, total_examples=None, total_words=None,
def train(self, sentences=None, input_streams=None, total_examples=None, total_words=None,
epochs=None, start_alpha=None, end_alpha=None, word_count=0,
queue_factor=2, report_delay=1.0, compute_loss=False, callbacks=()):
"""Train the model. If the hyper-parameters are passed, they override the ones set in the constructor.
Expand Down Expand Up @@ -932,8 +961,8 @@ def train(self, sentences, total_examples=None, total_words=None,
self.compute_loss = compute_loss
self.running_training_loss = 0.0
return super(BaseWordEmbeddingsModel, self).train(
sentences, total_examples=total_examples, total_words=total_words,
epochs=epochs, start_alpha=start_alpha, end_alpha=end_alpha, word_count=word_count,
data_iterable=sentences, data_iterables=input_streams, total_examples=total_examples,
total_words=total_words, epochs=epochs, start_alpha=start_alpha, end_alpha=end_alpha, word_count=word_count,
queue_factor=queue_factor, report_delay=report_delay, compute_loss=compute_loss, callbacks=callbacks)

def _get_job_params(self, cur_epoch):
Expand Down
Loading