diff --git a/.travis.yml b/.travis.yml index a4b2c640e0d3..5f2f4e508435 100644 --- a/.travis.yml +++ b/.travis.yml @@ -171,6 +171,10 @@ script: - python python/ray/rllib/test/test_optimizers.py - python python/ray/rllib/test/test_evaluators.py + # streaming library tests + - python -m pytest -v --durations=10 python/ray/tests/test_batched_queue.py + - python -m pytest -v --durations=10 python/ray/tests/test_logical_graph.py + # Python3.5+ only. Otherwise we will get `SyntaxError` regardless of how we set the tester. - python -c 'import sys;exit(sys.version_info>=(3,5))' || python -m pytest -v --durations=10 python/ray/experimental/test/async_test.py diff --git a/ci/travis/install-dependencies.sh b/ci/travis/install-dependencies.sh index fc1902432b9a..b848a0bc304d 100755 --- a/ci/travis/install-dependencies.sh +++ b/ci/travis/install-dependencies.sh @@ -25,7 +25,7 @@ if [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "linux" ]]; then bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" pip install -q cython==0.29.0 cmake gym==0.10.11 opencv-python-headless pyyaml pandas==0.23.4 requests \ - feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky + feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky networkx conda install -y scipy tensorflow elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then sudo apt-get update @@ -35,7 +35,7 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" pip install -q cython==0.29.0 cmake gym opencv-python-headless pyyaml pandas==0.23.4 requests \ - feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky + feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx conda install -y scipy tensorflow elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then # check that brew is installed @@ -53,7 +53,7 @@ elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" pip install -q cython==0.29.0 cmake tensorflow gym==0.10.11 opencv-python-headless pyyaml pandas==0.23.4 requests \ - feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky + feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky networkx elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then # check that brew is installed which -s brew @@ -70,7 +70,7 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" pip install -q cython==0.29.0 cmake tensorflow gym opencv-python-headless pyyaml pandas==0.23.4 requests \ - feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky + feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx elif [[ "$LINT" == "1" ]]; then sudo apt-get update sudo apt-get install -y cmake build-essential autoconf curl libtool unzip diff --git a/python/ray/experimental/streaming/README.rst b/python/ray/experimental/streaming/README.rst new file mode 100644 index 000000000000..4daab3a8df3e --- /dev/null +++ b/python/ray/experimental/streaming/README.rst @@ -0,0 +1,16 @@ +Streaming Library +================= + +Dependencies: + +Install NetworkX: ``pip install networkx`` + +Examples: + +- simple.py: A simple example with stateless operators and different parallelism per stage. + +Run ``python simple.py --input-file toy.txt`` + +- wordcount.py: A streaming wordcount example with a stateful operator (rolling sum). + +Run ``python wordcount.py --titles-file articles.txt`` diff --git a/python/ray/experimental/streaming/__init__.py b/python/ray/experimental/streaming/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/python/ray/experimental/streaming/batched_queue.py b/python/ray/experimental/streaming/batched_queue.py new file mode 100644 index 000000000000..df4c16d83a40 --- /dev/null +++ b/python/ray/experimental/streaming/batched_queue.py @@ -0,0 +1,227 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging +import numpy as np +import threading +import time + +import ray +from ray.experimental import internal_kv + +logger = logging.getLogger(__name__) +logger.setLevel("INFO") + + +def plasma_prefetch(object_id): + """Tells plasma to prefetch the given object_id.""" + local_sched_client = ray.worker.global_worker.raylet_client + ray_obj_id = ray.ObjectID(object_id) + local_sched_client.fetch_or_reconstruct([ray_obj_id], True) + + +def plasma_get(object_id): + """Get an object directly from plasma without going through object table. + + Precondition: plasma_prefetch(object_id) has been called before. + """ + client = ray.worker.global_worker.plasma_client + plasma_id = ray.pyarrow.plasma.ObjectID(object_id) + while not client.contains(plasma_id): + pass + return client.get(plasma_id) + + +# TODO: doing the timer in Python land is a bit slow +class FlushThread(threading.Thread): + """A thread that flushes periodically to plasma. + + Attributes: + interval: The flush timeout per batch. + flush_fn: The flush function. + """ + + def __init__(self, interval, flush_fn): + threading.Thread.__init__(self) + self.interval = interval # Interval is the max_batch_time + self.flush_fn = flush_fn + self.daemon = True + + def run(self): + while True: + time.sleep(self.interval) # Flushing period + self.flush_fn() + + +class BatchedQueue(object): + """A batched queue for actor to actor communication. + + Attributes: + max_size (int): The maximum size of the queue in number of batches + (if exceeded, backpressure kicks in) + max_batch_size (int): The size of each batch in number of records. + max_batch_time (float): The flush timeout per batch. + prefetch_depth (int): The number of batches to prefetch from plasma. + background_flush (bool): Denotes whether a daemon flush thread should + be used (True) to flush batches to plasma. + base (ndarray): A unique signature for the queue. + read_ack_key (bytes): The signature of the queue in bytes. + prefetch_batch_offset (int): The number of the last read prefetched + batch. + read_batch_offset (int): The number of the last read batch. + read_item_offset (int): The number of the last read record inside a + batch. + write_batch_offset (int): The number of the last written batch. + write_item_offset (int): The numebr of the last written item inside a + batch. + write_buffer (list): The write buffer, i.e. an in-memory batch. + last_flush_time (float): The time the last flushing to plasma took + place. + cached_remote_offset (int): The number of the last read batch as + recorded by the writer after the previous flush. + flush_lock (RLock): A python lock used for flushing batches to plasma. + flush_thread (Threading): The python thread used for flushing batches + to plasma. + """ + + def __init__(self, + max_size=999999, + max_batch_size=99999, + max_batch_time=0.01, + prefetch_depth=10, + background_flush=True): + self.max_size = max_size + self.max_batch_size = max_batch_size + self.max_batch_time = max_batch_time + self.prefetch_depth = prefetch_depth + self.background_flush = background_flush + + # Common queue metadata -- This serves as the unique id of the queue + self.base = np.random.randint(0, 2**32 - 1, size=5, dtype="uint32") + self.base[-2] = 0 + self.base[-1] = 0 + self.read_ack_key = np.ndarray.tobytes(self.base) + + # Reader state + self.prefetch_batch_offset = 0 + self.read_item_offset = 0 + self.read_batch_offset = 0 + self.read_buffer = [] + + # Writer state + self.write_item_offset = 0 + self.write_batch_offset = 0 + self.write_buffer = [] + self.last_flush_time = 0.0 + self.cached_remote_offset = 0 + + self.flush_lock = threading.RLock() + self.flush_thread = FlushThread(self.max_batch_time, + self._flush_writes) + + def __getstate__(self): + state = dict(self.__dict__) + del state["flush_lock"] + del state["flush_thread"] + del state["write_buffer"] + return state + + def __setstate__(self, state): + self.__dict__.update(state) + + # This is to enable writing functionality in + # case the queue is not created by the writer + # The reason is that python locks cannot be serialized + def enable_writes(self): + """Restores the state of the batched queue for writing.""" + self.write_buffer = [] + self.flush_lock = threading.RLock() + self.flush_thread = FlushThread(self.max_batch_time, + self._flush_writes) + + # Batch ids consist of a unique queue id used as prefix along with + # two numbers generated using the batch offset in the queue + def _batch_id(self, batch_offset): + oid = self.base.copy() + oid[-2] = batch_offset // 2**32 + oid[-1] = batch_offset % 2**32 + return np.ndarray.tobytes(oid) + + def _flush_writes(self): + with self.flush_lock: + if not self.write_buffer: + return + batch_id = self._batch_id(self.write_batch_offset) + ray.worker.global_worker.put_object( + ray.ObjectID(batch_id), self.write_buffer) + logger.debug("[writer] Flush batch {} offset {} size {}".format( + self.write_batch_offset, self.write_item_offset, + len(self.write_buffer))) + self.write_buffer = [] + self.write_batch_offset += 1 + self._wait_for_reader() + self.last_flush_time = time.time() + + def _wait_for_reader(self): + """Checks for backpressure by the downstream reader.""" + if self.max_size <= 0: # Unlimited queue + return + if self.write_item_offset - self.cached_remote_offset <= self.max_size: + return # Hasn't reached max size + remote_offset = internal_kv._internal_kv_get(self.read_ack_key) + if remote_offset is None: + # logger.debug("[writer] Waiting for reader to start...") + while remote_offset is None: + time.sleep(0.01) + remote_offset = internal_kv._internal_kv_get(self.read_ack_key) + remote_offset = int(remote_offset) + if self.write_item_offset - remote_offset > self.max_size: + logger.debug( + "[writer] Waiting for reader to catch up {} to {} - {}".format( + remote_offset, self.write_item_offset, self.max_size)) + while self.write_item_offset - remote_offset > self.max_size: + time.sleep(0.01) + remote_offset = int( + internal_kv._internal_kv_get(self.read_ack_key)) + self.cached_remote_offset = remote_offset + + def _read_next_batch(self): + while (self.prefetch_batch_offset < + self.read_batch_offset + self.prefetch_depth): + plasma_prefetch(self._batch_id(self.prefetch_batch_offset)) + self.prefetch_batch_offset += 1 + self.read_buffer = plasma_get(self._batch_id(self.read_batch_offset)) + self.read_batch_offset += 1 + logger.debug("[reader] Fetched batch {} offset {} size {}".format( + self.read_batch_offset, self.read_item_offset, + len(self.read_buffer))) + self._ack_reads(self.read_item_offset + len(self.read_buffer)) + + # Reader acks the key it reads so that writer knows reader's offset. + # This is to cap queue size and simulate backpressure + def _ack_reads(self, offset): + if self.max_size > 0: + internal_kv._internal_kv_put( + self.read_ack_key, offset, overwrite=True) + + def put_next(self, item): + with self.flush_lock: + if self.background_flush and not self.flush_thread.is_alive(): + logger.debug("[writer] Starting batch flush thread") + self.flush_thread.start() + self.write_buffer.append(item) + self.write_item_offset += 1 + if not self.last_flush_time: + self.last_flush_time = time.time() + delay = time.time() - self.last_flush_time + if (len(self.write_buffer) > self.max_batch_size + or delay > self.max_batch_time): + self._flush_writes() + + def read_next(self): + if not self.read_buffer: + self._read_next_batch() + assert self.read_buffer + self.read_item_offset += 1 + return self.read_buffer.pop(0) diff --git a/python/ray/experimental/streaming/benchmarks/micro/batched_queue_benchmark.py b/python/ray/experimental/streaming/benchmarks/micro/batched_queue_benchmark.py new file mode 100644 index 000000000000..4017f7891cfa --- /dev/null +++ b/python/ray/experimental/streaming/benchmarks/micro/batched_queue_benchmark.py @@ -0,0 +1,182 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import logging +import time + +import ray +from ray.experimental.streaming.batched_queue import BatchedQueue + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +parser = argparse.ArgumentParser() +parser.add_argument( + "--rounds", default=10, help="the number of experiment rounds") +parser.add_argument( + "--num-queues", default=1, help="the number of queues in the chain") +parser.add_argument( + "--queue-size", default=10000, help="the queue size in number of batches") +parser.add_argument( + "--batch-size", default=1000, help="the batch size in number of elements") +parser.add_argument( + "--flush-timeout", default=0.001, help="the timeout to flush a batch") +parser.add_argument( + "--prefetch-depth", + default=10, + help="the number of batches to prefetch from plasma") +parser.add_argument( + "--background-flush", + default=False, + help="whether to flush in the backrgound or not") +parser.add_argument( + "--max-throughput", + default="inf", + help="maximum read throughput (elements/s)") + + +@ray.remote +class Node(object): + """An actor that reads from an input queue and writes to an output queue. + + Attributes: + id (int): The id of the actor. + queue (BatchedQueue): The input queue. + out_queue (BatchedQueue): The output queue. + max_reads_per_second (int): The max read throughput (default: inf). + num_reads (int): Number of elements read. + num_writes (int): Number of elements written. + """ + + def __init__(self, + id, + in_queue, + out_queue, + max_reads_per_second=float("inf")): + self.id = id + self.queue = in_queue + self.out_queue = out_queue + self.max_reads_per_second = max_reads_per_second + self.num_reads = 0 + self.num_writes = 0 + self.start = time.time() + + def read_write_forever(self): + debug_log = "[actor {}] Reads throttled to {} reads/s" + log = "" + if self.out_queue is not None: + self.out_queue.enable_writes() + log += "[actor {}] Reads/Writes per second {}" + else: # It's just a reader + log += "[actor {}] Reads per second {}" + # Start spinning + expected_value = 0 + while True: + start = time.time() + N = 100000 + for _ in range(N): + x = self.queue.read_next() + assert x == expected_value, (x, expected_value) + expected_value += 1 + self.num_reads += 1 + if self.out_queue is not None: + self.out_queue.put_next(x) + self.num_writes += 1 + while (self.num_reads / (time.time() - self.start) > + self.max_reads_per_second): + logger.debug( + debug_log.format(self.id, self.max_reads_per_second)) + time.sleep(0.1) + logger.info(log.format(self.id, N / (time.time() - start))) + # Flush any remaining elements + if self.out_queue is not None: + self.out_queue._flush_writes() + + +def test_max_throughput(rounds, + max_queue_size, + max_batch_size, + batch_timeout, + prefetch_depth, + background_flush, + num_queues, + max_reads_per_second=float("inf")): + assert num_queues >= 1 + first_queue = BatchedQueue( + max_size=max_queue_size, + max_batch_size=max_batch_size, + max_batch_time=batch_timeout, + prefetch_depth=prefetch_depth, + background_flush=background_flush) + previous_queue = first_queue + for i in range(num_queues): + # Construct the batched queue + in_queue = previous_queue + out_queue = None + if i < num_queues - 1: + out_queue = BatchedQueue( + max_size=max_queue_size, + max_batch_size=max_batch_size, + max_batch_time=batch_timeout, + prefetch_depth=prefetch_depth, + background_flush=background_flush) + + node = Node.remote(i, in_queue, out_queue, max_reads_per_second) + node.read_write_forever.remote() + previous_queue = out_queue + + value = 0 + # Feed the chain + for round in range(rounds): + logger.info("Round {}".format(round)) + N = 100000 + start = time.time() + for i in range(N): + first_queue.put_next(value) + value += 1 + log = "[writer] Puts per second {}" + logger.info(log.format(N / (time.time() - start))) + first_queue._flush_writes() + + +if __name__ == "__main__": + ray.init() + ray.register_custom_serializer(BatchedQueue, use_pickle=True) + + args = parser.parse_args() + + rounds = int(args.rounds) + max_queue_size = int(args.queue_size) + max_batch_size = int(args.batch_size) + batch_timeout = float(args.flush_timeout) + prefetch_depth = int(args.prefetch_depth) + background_flush = bool(args.background_flush) + num_queues = int(args.num_queues) + max_reads_per_second = float(args.max_throughput) + + logger.info("== Parameters ==") + logger.info("Rounds: {}".format(rounds)) + logger.info("Max queue size: {}".format(max_queue_size)) + logger.info("Max batch size: {}".format(max_batch_size)) + logger.info("Batch timeout: {}".format(batch_timeout)) + logger.info("Prefetch depth: {}".format(prefetch_depth)) + logger.info("Background flush: {}".format(background_flush)) + logger.info("Max read throughput: {}".format(max_reads_per_second)) + + # Estimate the ideal throughput + value = 0 + start = time.time() + for round in range(rounds): + N = 100000 + for _ in range(N): + value += 1 + logger.info("Ideal throughput: {}".format(value / (time.time() - start))) + + logger.info("== Testing max throughput ==") + start = time.time() + test_max_throughput(rounds, max_queue_size, max_batch_size, batch_timeout, + prefetch_depth, background_flush, num_queues, + max_reads_per_second) + logger.info("Elapsed time: {}".format(time.time() - start)) diff --git a/python/ray/experimental/streaming/communication.py b/python/ray/experimental/streaming/communication.py new file mode 100644 index 000000000000..f626d1b38c5d --- /dev/null +++ b/python/ray/experimental/streaming/communication.py @@ -0,0 +1,359 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import hashlib +import logging +import sys + +from ray.experimental.streaming.operator import PStrategy +from ray.experimental.streaming.batched_queue import BatchedQueue + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +# Forward and broadcast stream partitioning strategies +forward_broadcast_strategies = [PStrategy.Forward, PStrategy.Broadcast] + + +# Used to choose output channel in case of hash-based shuffling +def _hash(value): + if isinstance(value, int): + return value + try: + return int(hashlib.sha1(value.encode("utf-8")).hexdigest(), 16) + except AttributeError: + return int(hashlib.sha1(value).hexdigest(), 16) + + +# A data channel is a batched queue between two +# operator instances in a streaming environment +class DataChannel(object): + """A data channel for actor-to-actor communication. + + Attributes: + env (Environment): The environment the channel belongs to. + src_operator_id (UUID): The id of the source operator of the channel. + dst_operator_id (UUID): The id of the destination operator of the + channel. + src_instance_id (int): The id of the source instance. + dst_instance_id (int): The id of the destination instance. + queue (BatchedQueue): The batched queue used for data movement. + """ + + def __init__(self, env, src_operator_id, dst_operator_id, src_instance_id, + dst_instance_id): + self.env = env + self.src_operator_id = src_operator_id + self.dst_operator_id = dst_operator_id + self.src_instance_id = src_instance_id + self.dst_instance_id = dst_instance_id + self.queue = BatchedQueue( + max_size=self.env.config.queue_config.max_size, + max_batch_size=self.env.config.queue_config.max_batch_size, + max_batch_time=self.env.config.queue_config.max_batch_time, + prefetch_depth=self.env.config.queue_config.prefetch_depth, + background_flush=self.env.config.queue_config.background_flush) + + def __repr__(self): + return "({},{},{},{})".format( + self.src_operator_id, self.dst_operator_id, self.src_instance_id, + self.dst_instance_id) + + +# Pulls and merges data from multiple input channels +class DataInput(object): + """An input gate of an operator instance. + + The input gate pulls records from all input channels in a round-robin + fashion. + + Attributes: + input_channels (list): The list of input channels. + channel_index (int): The index of the next channel to pull from. + max_index (int): The number of input channels. + closed (list): A list of flags indicating whether an input channel + has been marked as 'closed'. + all_closed (bool): Denotes whether all input channels have been + closed (True) or not (False). + """ + + def __init__(self, channels): + self.input_channels = channels + self.channel_index = 0 + self.max_index = len(channels) + self.closed = [False] * len( + self.input_channels) # Tracks the channels that have been closed + self.all_closed = False + + # Fetches records from input channels in a round-robin fashion + # TODO (john): Make sure the instance is not blocked on any of its input + # channels + # TODO (john): In case of input skew, it might be better to pull from + # the largest queue more often + def _pull(self): + while True: + if self.max_index == 0: + # TODO (john): We should detect this earlier + return None + # Channel to pull from + channel = self.input_channels[self.channel_index] + self.channel_index += 1 + if self.channel_index == self.max_index: # Reset channel index + self.channel_index = 0 + if self.closed[self.channel_index - 1]: + continue # Channel has been 'closed', check next + record = channel.queue.read_next() + logger.debug("Actor ({},{}) pulled '{}'.".format( + channel.src_operator_id, channel.src_instance_id, record)) + if record is None: + # Mark channel as 'closed' and pull from the next open one + self.closed[self.channel_index - 1] = True + self.all_closed = True + for flag in self.closed: + if flag is False: + self.all_closed = False + break + if not self.all_closed: + continue + # Returns 'None' iff all input channels are 'closed' + return record + + +# Selects output channel(s) and pushes data +class DataOutput(object): + """An output gate of an operator instance. + + The output gate pushes records to output channels according to the + user-defined partitioning scheme. + + Attributes: + partitioning_schemes (dict): A mapping from destination operator ids + to partitioning schemes (see: PScheme in operator.py). + forward_channels (list): A list of channels to forward records. + shuffle_channels (list(list)): A list of output channels to shuffle + records grouped by destination operator. + shuffle_key_channels (list(list)): A list of output channels to + shuffle records by a key grouped by destination operator. + shuffle_exists (bool): A flag indicating that there exists at least + one shuffle_channel. + shuffle_key_exists (bool): A flag indicating that there exists at + least one shuffle_key_channel. + """ + + def __init__(self, channels, partitioning_schemes): + self.key_selector = None + self.round_robin_indexes = [0] + self.partitioning_schemes = partitioning_schemes + # Prepare output -- collect channels by type + self.forward_channels = [] # Forward and broadcast channels + slots = sum(1 for scheme in self.partitioning_schemes.values() + if scheme.strategy == PStrategy.RoundRobin) + self.round_robin_channels = [[]] * slots # RoundRobin channels + self.round_robin_indexes = [-1] * slots + slots = sum(1 for scheme in self.partitioning_schemes.values() + if scheme.strategy == PStrategy.Shuffle) + # Flag used to avoid hashing when there is no shuffling + self.shuffle_exists = slots > 0 + self.shuffle_channels = [[]] * slots # Shuffle channels + slots = sum(1 for scheme in self.partitioning_schemes.values() + if scheme.strategy == PStrategy.ShuffleByKey) + # Flag used to avoid hashing when there is no shuffling by key + self.shuffle_key_exists = slots > 0 + self.shuffle_key_channels = [[]] * slots # Shuffle by key channels + # Distinct shuffle destinations + shuffle_destinations = {} + # Distinct shuffle by key destinations + shuffle_by_key_destinations = {} + # Distinct round robin destinations + round_robin_destinations = {} + index_1 = 0 + index_2 = 0 + index_3 = 0 + for channel in channels: + p_scheme = self.partitioning_schemes[channel.dst_operator_id] + strategy = p_scheme.strategy + if strategy in forward_broadcast_strategies: + self.forward_channels.append(channel) + elif strategy == PStrategy.Shuffle: + pos = shuffle_destinations.setdefault(channel.dst_operator_id, + index_1) + self.shuffle_channels[pos].append(channel) + if pos == index_1: + index_1 += 1 + elif strategy == PStrategy.ShuffleByKey: + pos = shuffle_by_key_destinations.setdefault( + channel.dst_operator_id, index_2) + self.shuffle_key_channels[pos].append(channel) + if pos == index_2: + index_2 += 1 + elif strategy == PStrategy.RoundRobin: + pos = round_robin_destinations.setdefault( + channel.dst_operator_id, index_3) + self.round_robin_channels[pos].append(channel) + if pos == index_3: + index_3 += 1 + else: # TODO (john): Add support for other strategies + sys.exit("Unrecognized or unsupported partitioning strategy.") + # A KeyedDataStream can only be shuffled by key + assert not (self.shuffle_exists and self.shuffle_key_exists) + + # Flushes any remaining records in the output channels + # 'close' indicates whether we should also 'close' the channel (True) + # by propagating 'None' + # or just flush the remaining records to plasma (False) + def _flush(self, close=False): + """Flushes remaining output records in the output queues to plasma. + + None is used as special type of record that is propagated from sources + to sink to notify that the end of data in a stream. + + Attributes: + close (bool): A flag denoting whether the channel should be + also marked as 'closed' (True) or not (False) after flushing. + """ + for channel in self.forward_channels: + if close is True: + channel.queue.put_next(None) + channel.queue._flush_writes() + for channels in self.shuffle_channels: + for channel in channels: + if close is True: + channel.queue.put_next(None) + channel.queue._flush_writes() + for channels in self.shuffle_key_channels: + for channel in channels: + if close is True: + channel.queue.put_next(None) + channel.queue._flush_writes() + for channels in self.round_robin_channels: + for channel in channels: + if close is True: + channel.queue.put_next(None) + channel.queue._flush_writes() + # TODO (john): Add more channel types + + # Returns all destination actor ids + def _destination_actor_ids(self): + destinations = [] + for channel in self.forward_channels: + destinations.append((channel.dst_operator_id, + channel.dst_instance_id)) + for channels in self.shuffle_channels: + for channel in channels: + destinations.append((channel.dst_operator_id, + channel.dst_instance_id)) + for channels in self.shuffle_key_channels: + for channel in channels: + destinations.append((channel.dst_operator_id, + channel.dst_instance_id)) + for channels in self.round_robin_channels: + for channel in channels: + destinations.append((channel.dst_operator_id, + channel.dst_instance_id)) + # TODO (john): Add more channel types + return destinations + + # Pushes the record to the output + # Each individual output queue flushes batches to plasma periodically + # based on 'batch_max_size' and 'batch_max_time' + def _push(self, record): + # Forward record + for channel in self.forward_channels: + logger.debug("[writer] Push record '{}' to channel {}".format( + record, channel)) + channel.queue.put_next(record) + # Forward record + index = 0 + for channels in self.round_robin_channels: + self.round_robin_indexes[index] += 1 + if self.round_robin_indexes[index] == len(channels): + self.round_robin_indexes[index] = 0 # Reset index + channel = channels[self.round_robin_indexes[index]] + logger.debug("[writer] Push record '{}' to channel {}".format( + record, channel)) + channel.queue.put_next(record) + index += 1 + # Hash-based shuffling by key + if self.shuffle_key_exists: + key, _ = record + h = _hash(key) + for channels in self.shuffle_key_channels: + num_instances = len(channels) # Downstream instances + channel = channels[h % num_instances] + logger.debug( + "[key_shuffle] Push record '{}' to channel {}".format( + record, channel)) + channel.queue.put_next(record) + elif self.shuffle_exists: # Hash-based shuffling per destination + h = _hash(record) + for channels in self.shuffle_channels: + num_instances = len(channels) # Downstream instances + channel = channels[h % num_instances] + logger.debug("[shuffle] Push record '{}' to channel {}".format( + record, channel)) + channel.queue.put_next(record) + else: # TODO (john): Handle rescaling + pass + + # Pushes a list of records to the output + # Each individual output queue flushes batches to plasma periodically + # based on 'batch_max_size' and 'batch_max_time' + def _push_all(self, records): + # Forward records + for record in records: + for channel in self.forward_channels: + logger.debug("[writer] Push record '{}' to channel {}".format( + record, channel)) + channel.queue.put_next(record) + # Hash-based shuffling by key per destination + if self.shuffle_key_exists: + for record in records: + key, _ = record + h = _hash(key) + for channels in self.shuffle_channels: + num_instances = len(channels) # Downstream instances + channel = channels[h % num_instances] + logger.debug( + "[key_shuffle] Push record '{}' to channel {}".format( + record, channel)) + channel.queue.put_next(record) + elif self.shuffle_exists: # Hash-based shuffling per destination + for record in records: + h = _hash(record) + for channels in self.shuffle_channels: + num_instances = len(channels) # Downstream instances + channel = channels[h % num_instances] + logger.debug( + "[shuffle] Push record '{}' to channel {}".format( + record, channel)) + channel.queue.put_next(record) + else: # TODO (john): Handle rescaling + pass + + +# Batched queue configuration +class QueueConfig(object): + """The configuration of a batched queue. + + Attributes: + max_size (int): The maximum size of the queue in number of batches + (if exceeded, backpressure kicks in). + max_batch_size (int): The size of each batch in number of records. + max_batch_time (float): The flush timeout per batch. + prefetch_depth (int): The number of batches to prefetch from plasma. + background_flush (bool): Denotes whether a daemon flush thread should + be used (True) to flush batches to plasma. + """ + + def __init__(self, + max_size=999999, + max_batch_size=99999, + max_batch_time=0.01, + prefetch_depth=10, + background_flush=False): + self.max_size = max_size + self.max_batch_size = max_batch_size + self.max_batch_time = max_batch_time + self.prefetch_depth = prefetch_depth + self.background_flush = background_flush diff --git a/python/ray/experimental/streaming/examples/articles.txt b/python/ray/experimental/streaming/examples/articles.txt new file mode 100644 index 000000000000..0bb455fd3f1d --- /dev/null +++ b/python/ray/experimental/streaming/examples/articles.txt @@ -0,0 +1,8 @@ +New York City +Berlin +London +Paris +United States +Germany +France +United Kingdom diff --git a/python/ray/experimental/streaming/examples/key_selectors.py b/python/ray/experimental/streaming/examples/key_selectors.py new file mode 100644 index 000000000000..762a63cce27c --- /dev/null +++ b/python/ray/experimental/streaming/examples/key_selectors.py @@ -0,0 +1,76 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import logging +import time + +import ray +from ray.experimental.streaming.streaming import Environment +from ray.experimental.streaming.batched_queue import BatchedQueue +from ray.experimental.streaming.operator import OpType, PStrategy + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +parser = argparse.ArgumentParser() +parser.add_argument("--input-file", required=True, help="the input text file") + + +# A class used to check attribute-based key selection +class Record(object): + def __init__(self, record): + k, _ = record + self.word = k + self.record = record + + +# Splits input line into words and outputs objects of type Record +# each one consisting of a key (word) and a tuple (word,1) +def splitter(line): + records = [] + words = line.split() + for w in words: + records.append(Record((w, 1))) + return records + + +# Receives an object of type Record and returns the actual tuple +def as_tuple(record): + return record.record + + +if __name__ == "__main__": + # Get program parameters + args = parser.parse_args() + input_file = str(args.input_file) + + ray.init() + ray.register_custom_serializer(Record, use_dict=True) + ray.register_custom_serializer(BatchedQueue, use_pickle=True) + ray.register_custom_serializer(OpType, use_pickle=True) + ray.register_custom_serializer(PStrategy, use_pickle=True) + + # A Ray streaming environment with the default configuration + env = Environment() + env.set_parallelism(2) # Each operator will be executed by two actors + + # 'key_by("word")' physically partitions the stream of records + # based on the hash value of the 'word' attribute (see Record class above) + # 'map(as_tuple)' maps a record of type Record into a tuple + # 'sum(1)' sums the 2nd element of the tuple, i.e. the word count + stream = env.read_text_file(input_file) \ + .round_robin() \ + .flat_map(splitter) \ + .key_by("word") \ + .map(as_tuple) \ + .sum(1) \ + .inspect(print) # Prints the content of the + # stream to stdout + start = time.time() + env_handle = env.execute() # Deploys and executes the dataflow + ray.get(env_handle) # Stay alive until execution finishes + end = time.time() + logger.info("Elapsed time: {} secs".format(end - start)) + logger.debug("Output stream id: {}".format(stream.id)) diff --git a/python/ray/experimental/streaming/examples/simple.py b/python/ray/experimental/streaming/examples/simple.py new file mode 100644 index 000000000000..26272cdc94dc --- /dev/null +++ b/python/ray/experimental/streaming/examples/simple.py @@ -0,0 +1,59 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import logging +import time + +import ray +from ray.experimental.streaming.streaming import Environment +from ray.experimental.streaming.batched_queue import BatchedQueue +from ray.experimental.streaming.operator import OpType, PStrategy + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +parser = argparse.ArgumentParser() +parser.add_argument("--input-file", required=True, help="the input text file") + + +# Test functions +def splitter(line): + return line.split() + + +def filter_fn(word): + if "f" in word: + return True + return False + + +if __name__ == "__main__": + + args = parser.parse_args() + + ray.init() + ray.register_custom_serializer(BatchedQueue, use_pickle=True) + ray.register_custom_serializer(OpType, use_pickle=True) + ray.register_custom_serializer(PStrategy, use_pickle=True) + + # A Ray streaming environment with the default configuration + env = Environment() + + # Stream represents the ouput of the filter and + # can be forked into other dataflows + stream = env.read_text_file(args.input_file) \ + .shuffle() \ + .flat_map(splitter) \ + .set_parallelism(4) \ + .filter(filter_fn) \ + .set_parallelism(2) \ + .inspect(print) # Prints the contents of the + # stream to stdout + start = time.time() + env_handle = env.execute() + ray.get(env_handle) # Stay alive until execution finishes + end = time.time() + logger.info("Elapsed time: {} secs".format(end - start)) + logger.debug("Output stream id: {}".format(stream.id)) diff --git a/python/ray/experimental/streaming/examples/toy.txt b/python/ray/experimental/streaming/examples/toy.txt new file mode 100644 index 000000000000..fabe58790d36 --- /dev/null +++ b/python/ray/experimental/streaming/examples/toy.txt @@ -0,0 +1,5 @@ +This is +a test file +to test if example +works +fine diff --git a/python/ray/experimental/streaming/examples/wordcount.py b/python/ray/experimental/streaming/examples/wordcount.py new file mode 100644 index 000000000000..9cc933ed44dc --- /dev/null +++ b/python/ray/experimental/streaming/examples/wordcount.py @@ -0,0 +1,113 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import logging +import time +import wikipedia + +import ray +from ray.experimental.streaming.streaming import Environment +from ray.experimental.streaming.batched_queue import BatchedQueue +from ray.experimental.streaming.operator import OpType, PStrategy + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +parser = argparse.ArgumentParser() +parser.add_argument( + "--titles-file", + required=True, + help="the file containing the wikipedia titles to lookup") + + +# A custom data source that reads articles from wikipedia +# Custom data sources need to implement a get_next() method +# that returns the next data element, in this case sentences +class Wikipedia(object): + def __init__(self, title_file): + # Titles in this file will be as queries + self.title_file = title_file + # TODO (john): Handle possible exception here + self.title_reader = iter(list(open(self.title_file, "r").readlines())) + self.done = False + self.article_done = True + self.sentences = iter([]) + + # Returns next sentence from a wikipedia article + def get_next(self): + if self.done: + return None # Source exhausted + while True: + if self.article_done: + try: # Try next title + next_title = next(self.title_reader) + except StopIteration: + self.done = True # Source exhausted + return None + # Get next article + logger.debug("Next article: {}".format(next_title)) + article = wikipedia.page(next_title).content + # Split article in sentences + self.sentences = iter(article.split(".")) + self.article_done = False + try: # Try next sentence + sentence = next(self.sentences) + logger.debug("Next sentence: {}".format(sentence)) + return sentence + except StopIteration: + self.article_done = True + + +# Splits input line into words and +# outputs records of the form (word,1) +def splitter(line): + records = [] + words = line.split() + for w in words: + records.append((w, 1)) + return records + + +# Returns the first attribute of a tuple +def key_selector(tuple): + return tuple[0] + + +# Returns the second attribute of a tuple +def attribute_selector(tuple): + return tuple[1] + + +if __name__ == "__main__": + # Get program parameters + args = parser.parse_args() + titles_file = str(args.titles_file) + + ray.init() + ray.register_custom_serializer(BatchedQueue, use_pickle=True) + ray.register_custom_serializer(OpType, use_pickle=True) + ray.register_custom_serializer(PStrategy, use_pickle=True) + + # A Ray streaming environment with the default configuration + env = Environment() + env.set_parallelism(2) # Each operator will be executed by two actors + + # The following dataflow is a simple streaming wordcount + # with a rolling sum operator. + # It reads articles from wikipedia, splits them in words, + # shuffles words, and counts the occurences of each word. + stream = env.source(Wikipedia(titles_file)) \ + .round_robin() \ + .flat_map(splitter) \ + .key_by(key_selector) \ + .sum(attribute_selector) \ + .inspect(print) # Prints the contents of the + # stream to stdout + start = time.time() + env_handle = env.execute() # Deploys and executes the dataflow + ray.get(env_handle) # Stay alive until execution finishes + end = time.time() + logger.info("Elapsed time: {} secs".format(end - start)) + logger.debug("Output stream id: {}".format(stream.id)) diff --git a/python/ray/experimental/streaming/operator.py b/python/ray/experimental/streaming/operator.py new file mode 100644 index 000000000000..9f70a6450c02 --- /dev/null +++ b/python/ray/experimental/streaming/operator.py @@ -0,0 +1,105 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import enum +import logging + +logger = logging.getLogger(__name__) +logger.setLevel("DEBUG") + + +# Stream partitioning schemes +class PScheme(object): + def __init__(self, strategy, partition_fn=None): + self.strategy = strategy + self.partition_fn = partition_fn + + def __repr__(self): + return "({},{})".format(self.strategy, self.partition_fn) + + +# Partitioning strategies +class PStrategy(enum.Enum): + Forward = 0 # Default + Shuffle = 1 + Rescale = 2 + RoundRobin = 3 + Broadcast = 4 + Custom = 5 + ShuffleByKey = 6 + # ... + + +# Operator types +class OpType(enum.Enum): + Source = 0 + Map = 1 + FlatMap = 2 + Filter = 3 + TimeWindow = 4 + KeyBy = 5 + Sink = 6 + WindowJoin = 7 + Inspect = 8 + ReadTextFile = 9 + Reduce = 10 + Sum = 11 + # ... + + +# A logical dataflow operator +class Operator(object): + def __init__(self, + id, + type, + name="", + logic=None, + num_instances=1, + other=None, + state_actor=None): + self.id = id + self.type = type + self.name = name + self.logic = logic # The operator's logic + self.num_instances = num_instances + # One partitioning strategy per downstream operator (default: forward) + self.partitioning_strategies = {} + self.other_args = other # Depends on the type of the operator + self.state_actor = state_actor # Actor to query state + + # Sets the partitioning scheme for an output stream of the operator + def _set_partition_strategy(self, + stream_id, + partitioning_scheme, + dest_operator=None): + self.partitioning_strategies[stream_id] = (partitioning_scheme, + dest_operator) + + # Retrieves the partitioning scheme for the given + # output stream of the operator + # Returns None is no strategy has been defined for the particular stream + def _get_partition_strategy(self, stream_id): + return self.partitioning_strategies.get(stream_id) + + # Cleans metatada from all partitioning strategies that lack a + # destination operator + # Valid entries are re-organized as + # 'destination operator id -> partitioning scheme' + # Should be called only after the logical dataflow has been constructed + def _clean(self): + strategies = {} + for _, v in self.partitioning_strategies.items(): + strategy, destination_operator = v + if destination_operator is not None: + strategies.setdefault(destination_operator, strategy) + self.partitioning_strategies = strategies + + def print(self): + log = "Operator<\nID = {}\nName = {}\nType = {}\n" + log += "Logic = {}\nNumber_of_Instances = {}\n" + log += "Partitioning_Scheme = {}\nOther_Args = {}>\n" + logger.debug( + log.format(self.id, self.name, self.type, self.logic, + self.num_instances, self.partitioning_strategies, + self.other_args)) diff --git a/python/ray/experimental/streaming/operator_instance.py b/python/ray/experimental/streaming/operator_instance.py new file mode 100644 index 000000000000..ae36b143a1a1 --- /dev/null +++ b/python/ray/experimental/streaming/operator_instance.py @@ -0,0 +1,365 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging +import sys +import time +import types + +import ray + +logger = logging.getLogger(__name__) +logger.setLevel("DEBUG") + +# +# Each Ray actor corresponds to an operator instance in the physical dataflow +# Actors communicate using batched queues as data channels (no standing TCP +# connections) +# Currently, batched queues are based on Eric's implementation (see: +# batched_queue.py) + + +def _identity(element): + return element + + +# TODO (john): Specify the interface of state keepers +class OperatorInstance(object): + """A streaming operator instance. + + Attributes: + instance_id (UUID): The id of the instance. + input (DataInput): The input gate that manages input channels of + the instance (see: DataInput in communication.py). + input (DataOutput): The output gate that manages output channels of + the instance (see: DataOutput in communication.py). + state_keepers (list): A list of actor handlers to query the state of + the operator instance. + """ + + def __init__(self, instance_id, input_gate, output_gate, + state_keeper=None): + self.key_index = None # Index for key selection + self.key_attribute = None # Attribute name for key selection + self.instance_id = instance_id + self.input = input_gate + self.output = output_gate + # Handle(s) to one or more user-defined actors + # that can retrieve actor's state + self.state_keeper = state_keeper + # Enable writes + for channel in self.output.forward_channels: + channel.queue.enable_writes() + for channels in self.output.shuffle_channels: + for channel in channels: + channel.queue.enable_writes() + for channels in self.output.shuffle_key_channels: + for channel in channels: + channel.queue.enable_writes() + for channels in self.output.round_robin_channels: + for channel in channels: + channel.queue.enable_writes() + # TODO (john): Add more channel types here + + # Registers actor's handle so that the actor can schedule itself + def register_handle(self, actor_handle): + self.this_actor = actor_handle + + # Used for index-based key extraction, e.g. for tuples + def index_based_selector(self, record): + return record[self.key_index] + + # Used for attribute-based key extraction, e.g. for classes + def attribute_based_selector(self, record): + return vars(record)[self.key_attribute] + + # Starts the actor + def start(self): + pass + + +# A source actor that reads a text file line by line +@ray.remote +class ReadTextFile(OperatorInstance): + """A source operator instance that reads a text file line by line. + + Attributes: + filepath (string): The path to the input file. + """ + + def __init__(self, + instance_id, + operator_metadata, + input_gate, + output_gate, + state_keepers=None): + OperatorInstance.__init__(self, instance_id, input_gate, output_gate, + state_keepers) + self.filepath = operator_metadata.other_args + # TODO (john): Handle possible exception here + self.reader = open(self.filepath, "r") + + # Read input file line by line + def start(self): + while True: + record = self.reader.readline() + # Reader returns empty string ('') on EOF + if not record: + # Flush any remaining records to plasma and close the file + self.output._flush(close=True) + self.reader.close() + return + self.output._push( + record[:-1]) # Push after removing newline characters + + +# Map actor +@ray.remote +class Map(OperatorInstance): + """A map operator instance that applies a user-defined + stream transformation. + + A map produces exactly one output record for each record in + the input stream. + + Attributes: + map_fn (function): The user-defined function. + """ + + def __init__(self, instance_id, operator_metadata, input_gate, + output_gate): + OperatorInstance.__init__(self, instance_id, input_gate, output_gate) + self.map_fn = operator_metadata.logic + + # Applies the mapper each record of the input stream(s) + # and pushes resulting records to the output stream(s) + def start(self): + start = time.time() + elements = 0 + while True: + record = self.input._pull() + if record is None: + self.output._flush(close=True) + logger.debug("[map {}] read/writes per second: {}".format( + self.instance_id, elements / (time.time() - start))) + return + self.output._push(self.map_fn(record)) + elements += 1 + + +# Flatmap actor +@ray.remote +class FlatMap(OperatorInstance): + """A map operator instance that applies a user-defined + stream transformation. + + A flatmap produces one or more output records for each record in + the input stream. + + Attributes: + flatmap_fn (function): The user-defined function. + """ + + def __init__(self, instance_id, operator_metadata, input_gate, + output_gate): + OperatorInstance.__init__(self, instance_id, input_gate, output_gate) + self.flatmap_fn = operator_metadata.logic + + # Applies the splitter to the records of the input stream(s) + # and pushes resulting records to the output stream(s) + def start(self): + while True: + record = self.input._pull() + if record is None: + self.output._flush(close=True) + return + self.output._push_all(self.flatmap_fn(record)) + + +# Filter actor +@ray.remote +class Filter(OperatorInstance): + """A filter operator instance that applies a user-defined filter to + each record of the stream. + + Output records are those that pass the filter, i.e. those for which + the filter function returns True. + + Attributes: + filter_fn (function): The user-defined boolean function. + """ + + def __init__(self, instance_id, operator_metadata, input_gate, + output_gate): + OperatorInstance.__init__(self, instance_id, input_gate, output_gate) + self.filter_fn = operator_metadata.logic + + # Applies the filter to the records of the input stream(s) + # and pushes resulting records to the output stream(s) + def start(self): + while True: + record = self.input._pull() + if record is None: # Close channel and return + self.output._flush(close=True) + return + if self.filter_fn(record): + self.output._push(record) + + +# Inspect actor +@ray.remote +class Inspect(OperatorInstance): + """A inspect operator instance that inspects the content of the stream. + + Inspect is useful for printing the records in the stream. + + Attributes: + inspect_fn (function): The user-defined inspect logic. + """ + + def __init__(self, instance_id, operator_metadata, input_gate, + output_gate): + OperatorInstance.__init__(self, instance_id, input_gate, output_gate) + self.inspect_fn = operator_metadata.logic + + # Applies the inspect logic (e.g. print) to the records of + # the input stream(s) + # and leaves stream unaffected by simply pushing the records to + # the output stream(s) + while True: + record = self.input._pull() + if record is None: + self.output._flush(close=True) + return + self.output._push(record) + self.inspect_fn(record) + + +# Reduce actor +@ray.remote +class Reduce(OperatorInstance): + """A reduce operator instance that combines a new value for a key + with the last reduced one according to a user-defined logic. + + Attributes: + reduce_fn (function): The user-defined reduce logic. + value_attribute (int): The index of the value to reduce + (assuming tuple records). + state (dict): A mapping from keys to values. + """ + + def __init__(self, instance_id, operator_metadata, input_gate, + output_gate): + OperatorInstance.__init__(self, instance_id, input_gate, output_gate, + operator_metadata.state_actor) + self.reduce_fn = operator_metadata.logic + # Set the attribute selector + self.attribute_selector = operator_metadata.other_args + if self.attribute_selector is None: + self.attribute_selector = _identity + elif isinstance(self.attribute_selector, int): + self.key_index = self.attribute_selector + self.attribute_selector = self.index_based_selector + elif isinstance(self.attribute_selector, str): + self.key_attribute = self.attribute_selector + self.attribute_selector = self.attribute_based_selector + elif not isinstance(self.attribute_selector, types.FunctionType): + sys.exit("Unrecognized or unsupported key selector.") + self.state = {} # key -> value + + # Combines the input value for a key with the last reduced + # value for that key to produce a new value. + # Outputs the result as (key,new value) + def start(self): + while True: + record = self.input._pull() + if record is None: + self.output._flush(close=True) + del self.state + return + key, rest = record + new_value = self.attribute_selector(rest) + # TODO (john): Is there a way to update state with + # a single dictionary lookup? + try: + old_value = self.state[key] + new_value = self.reduce_fn(old_value, new_value) + self.state[key] = new_value + except KeyError: # Key does not exist in state + self.state.setdefault(key, new_value) + self.output._push((key, new_value)) + + # Returns the state of the actor + def get_state(self): + return self.state + + +@ray.remote +class KeyBy(OperatorInstance): + """A key_by operator instance that physically partitions the + stream based on a key. + + Attributes: + key_attribute (int): The index of the value to reduce + (assuming tuple records). + """ + + def __init__(self, instance_id, operator_metadata, input_gate, + output_gate): + OperatorInstance.__init__(self, instance_id, input_gate, output_gate) + # Set the key selector + self.key_selector = operator_metadata.other_args + if isinstance(self.key_selector, int): + self.key_index = self.key_selector + self.key_selector = self.index_based_selector + elif isinstance(self.key_selector, str): + self.key_attribute = self.key_selector + self.key_selector = self.attribute_based_selector + elif not isinstance(self.key_selector, types.FunctionType): + sys.exit("Unrecognized or unsupported key selector.") + + # The actual partitioning is done by the output gate + def start(self): + while True: + record = self.input._pull() + if record is None: + self.output._flush(close=True) + return + key = self.key_selector(record) + self.output._push((key, record)) + + +# A custom source actor +@ray.remote +class Source(OperatorInstance): + def __init__(self, instance_id, operator_metadata, input_gate, + output_gate): + OperatorInstance.__init__(self, instance_id, input_gate, output_gate) + # The user-defined source with a get_next() method + self.source = operator_metadata.other_args + + # Starts the source by calling get_next() repeatedly + def start(self): + start = time.time() + elements = 0 + while True: + next = self.source.get_next() + if next is None: + self.output._flush(close=True) + logger.debug("[writer {}] puts per second: {}".format( + self.instance_id, elements / (time.time() - start))) + return + self.output._push(next) + elements += 1 + + +# TODO(john): Time window actor (uses system time) +@ray.remote +class TimeWindow(OperatorInstance): + def __init__(self, queue, width): + self.width = width # In milliseconds + + def time_window(self): + while True: + pass diff --git a/python/ray/experimental/streaming/streaming.py b/python/ray/experimental/streaming/streaming.py new file mode 100644 index 000000000000..f9e4241fe9a1 --- /dev/null +++ b/python/ray/experimental/streaming/streaming.py @@ -0,0 +1,668 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging +import sys +import uuid + +import networkx as nx + +from ray.experimental.streaming.communication import DataChannel, DataInput +from ray.experimental.streaming.communication import DataOutput, QueueConfig +from ray.experimental.streaming.operator import Operator, OpType +from ray.experimental.streaming.operator import PScheme, PStrategy +import ray.experimental.streaming.operator_instance as operator_instance + +logger = logging.getLogger(__name__) +logger.setLevel("INFO") + + +# Generates UUIDs +def _generate_uuid(): + return uuid.uuid4() + + +# Rolling sum's logic +def _sum(value_1, value_2): + return value_1 + value_2 + + +# Partitioning strategies that require all-to-all instance communication +all_to_all_strategies = [ + PStrategy.Shuffle, PStrategy.ShuffleByKey, PStrategy.Broadcast, + PStrategy.RoundRobin +] + + +# Environment configuration +class Config(object): + """Environment configuration. + + This class includes all information about the configuration of the + streaming environment. + + Attributes: + queue_config (QueueConfig): Batched Queue configuration + (see: communication.py) + A batched queue configuration includes the max queue size, + the size of each batch (in number of elements), the batch flush + timeout, and the number of batches to prefetch from plasma + parallelism (int): The number of isntances (actors) for each logical + dataflow operator (default: 1) + """ + + def __init__(self, parallelism=1): + self.queue_config = QueueConfig() + self.parallelism = parallelism + # ... + + +# The execution environment for a streaming job +class Environment(object): + """A streaming environment. + + This class is responsible for constructing the logical and the + physical dataflow. + + Attributes: + logical_topo (DiGraph): The user-defined logical topology in + NetworkX DiGRaph format. + (See: https://networkx.github.io) + physical_topo (DiGraph): The physical topology in NetworkX + DiGRaph format. The physical dataflow is constructed by the + environment based on logical_topo. + operators (dict): A mapping from operator ids to operator metadata + (See: Operator in operator.py). + config (Config): The environment's configuration. + topo_cleaned (bool): A flag that indicates whether the logical + topology is garbage collected (True) or not (False). + actor_handles (list): A list of all Ray actor handles that execute + the streaming dataflow. + """ + + def __init__(self, config=Config()): + self.logical_topo = nx.DiGraph() # DAG + self.physical_topo = nx.DiGraph() # DAG + self.operators = {} # operator id --> operator object + self.config = config # Environment's configuration + self.topo_cleaned = False + # Handles to all actors in the physical dataflow + self.actor_handles = [] + + # Constructs and deploys a Ray actor of a specific type + # TODO (john): Actor placement information should be specified in + # the environment's configuration + def __generate_actor(self, instance_id, operator, input, output): + """Generates an actor that will execute a particular instance of + the logical operator + + Attributes: + instance_id (UUID): The id of the instance the actor will execute. + operator (Operator): The metadata of the logical operator. + input (DataInput): The input gate that manages input channels of + the instance (see: DataInput in communication.py). + input (DataOutput): The output gate that manages output channels + of the instance (see: DataOutput in communication.py). + """ + actor_id = (operator.id, instance_id) + # Record the physical dataflow graph (for debugging purposes) + self.__add_channel(actor_id, input, output) + # Select actor to construct + if operator.type == OpType.Source: + source = operator_instance.Source.remote(actor_id, operator, input, + output) + source.register_handle.remote(source) + return source.start.remote() + elif operator.type == OpType.Map: + map = operator_instance.Map.remote(actor_id, operator, input, + output) + map.register_handle.remote(map) + return map.start.remote() + elif operator.type == OpType.FlatMap: + flatmap = operator_instance.FlatMap.remote(actor_id, operator, + input, output) + flatmap.register_handle.remote(flatmap) + return flatmap.start.remote() + elif operator.type == OpType.Filter: + filter = operator_instance.Filter.remote(actor_id, operator, input, + output) + filter.register_handle.remote(filter) + return filter.start.remote() + elif operator.type == OpType.Reduce: + reduce = operator_instance.Reduce.remote(actor_id, operator, input, + output) + reduce.register_handle.remote(reduce) + return reduce.start.remote() + elif operator.type == OpType.TimeWindow: + pass + elif operator.type == OpType.KeyBy: + keyby = operator_instance.KeyBy.remote(actor_id, operator, input, + output) + keyby.register_handle.remote(keyby) + return keyby.start.remote() + elif operator.type == OpType.Sum: + sum = operator_instance.Reduce.remote(actor_id, operator, input, + output) + # Register target handle at state actor + state_actor = operator.state_actor + if state_actor is not None: + state_actor.register_target.remote(sum) + # Register own handle + sum.register_handle.remote(sum) + return sum.start.remote() + elif operator.type == OpType.Sink: + pass + elif operator.type == OpType.Inspect: + inspect = operator_instance.Inspect.remote(actor_id, operator, + input, output) + inspect.register_handle.remote(inspect) + return inspect.start.remote() + elif operator.type == OpType.ReadTextFile: + # TODO (john): Colocate the source with the input file + read = operator_instance.ReadTextFile.remote( + actor_id, operator, input, output) + read.register_handle.remote(read) + return read.start.remote() + else: # TODO (john): Add support for other types of operators + sys.exit("Unrecognized or unsupported {} operator type.".format( + operator.type)) + + # Constructs and deploys a Ray actor for each instance of + # the given operator + def __generate_actors(self, operator, upstream_channels, + downstream_channels): + """Generates one actor for each instance of the given logical + operator. + + Attributes: + operator (Operator): The logical operator metadata. + upstream_channels (list): A list of all upstream channels for + all instances of the operator. + downstream_channels (list): A list of all downstream channels + for all instances of the operator. + """ + num_instances = operator.num_instances + logger.info("Generating {} actors of type {}...".format( + num_instances, operator.type)) + in_channels = upstream_channels.pop( + operator.id) if upstream_channels else [] + handles = [] + for i in range(num_instances): + # Collect input and output channels for the particular instance + ip = [ + channel for channel in in_channels + if channel.dst_instance_id == i + ] if in_channels else [] + op = [ + channel for channels_list in downstream_channels.values() + for channel in channels_list if channel.src_instance_id == i + ] + log = "Constructed {} input and {} output channels " + log += "for the {}-th instance of the {} operator." + logger.debug(log.format(len(ip), len(op), i, operator.type)) + input_gate = DataInput(ip) + output_gate = DataOutput(op, operator.partitioning_strategies) + handle = self.__generate_actor(i, operator, input_gate, + output_gate) + if handle: + handles.append(handle) + return handles + + # Adds a channel/edge to the physical dataflow graph + def __add_channel(self, actor_id, input, output): + for dest_actor_id in output._destination_actor_ids(): + self.physical_topo.add_edge(actor_id, dest_actor_id) + + # Generates all required data channels between an operator + # and its downstream operators + def _generate_channels(self, operator): + """Generates all output data channels + (see: DataChannel in communication.py) for all instances of + the given logical operator. + + The function constructs one data channel for each pair of + communicating operator instances (instance_1,instance_2), + where instance_1 is an instance of the given operator and instance_2 + is an instance of a direct downstream operator. + + The number of total channels generated depends on the partitioning + strategy specified by the user. + """ + channels = {} # destination operator id -> channels + strategies = operator.partitioning_strategies + for dst_operator, p_scheme in strategies.items(): + num_dest_instances = self.operators[dst_operator].num_instances + entry = channels.setdefault(dst_operator, []) + if p_scheme.strategy == PStrategy.Forward: + for i in range(operator.num_instances): + # ID of destination instance to connect + id = i % num_dest_instances + channel = DataChannel(self, operator.id, dst_operator, i, + id) + entry.append(channel) + elif p_scheme.strategy in all_to_all_strategies: + for i in range(operator.num_instances): + for j in range(num_dest_instances): + channel = DataChannel(self, operator.id, dst_operator, + i, j) + entry.append(channel) + else: + # TODO (john): Add support for other partitioning strategies + sys.exit("Unrecognized or unsupported partitioning strategy.") + return channels + + # An edge denotes a flow of data between logical operators + # and may correspond to multiple data channels in the physical dataflow + def _add_edge(self, source, destination): + self.logical_topo.add_edge(source, destination) + + # Cleans the logical dataflow graph to construct and + # deploy the physical dataflow + def _collect_garbage(self): + if self.topo_cleaned is True: + return + for node in self.logical_topo: + self.operators[node]._clean() + self.topo_cleaned = True + + # Sets the level of parallelism for a registered operator + # Overwrites the environment parallelism (if set) + def _set_parallelism(self, operator_id, level_of_parallelism): + self.operators[operator_id].num_instances = level_of_parallelism + + # Sets the same level of parallelism for all operators in the environment + def set_parallelism(self, parallelism): + self.config.parallelism = parallelism + + # Sets batched queue configuration for the environment + def set_queue_config(self, queue_config): + self.config.queue_config = queue_config + + # Creates and registers a user-defined data source + # TODO (john): There should be different types of sources, e.g. sources + # reading from Kafka, text files, etc. + # TODO (john): Handle case where environment parallelism is set + def source(self, source): + source_id = _generate_uuid() + source_stream = DataStream(self, source_id) + self.operators[source_id] = Operator( + source_id, OpType.Source, "Source", other=source) + return source_stream + + # Creates and registers a new data source that reads a + # text file line by line + # TODO (john): There should be different types of sources, + # e.g. sources reading from Kafka, text files, etc. + # TODO (john): Handle case where environment parallelism is set + def read_text_file(self, filepath): + source_id = _generate_uuid() + source_stream = DataStream(self, source_id) + self.operators[source_id] = Operator( + source_id, OpType.ReadTextFile, "Read Text File", other=filepath) + return source_stream + + # Constructs and deploys the physical dataflow + def execute(self): + """Deploys and executes the physical dataflow.""" + self._collect_garbage() # Make sure everything is clean + # TODO (john): Check if dataflow has any 'logical inconsistencies' + # For example, if there is a forward partitioning strategy but + # the number of downstream instances is larger than the number of + # upstream instances, some of the downstream instances will not be + # used at all + + # Each operator instance is implemented as a Ray actor + # Actors are deployed in topological order, as we traverse the + # logical dataflow from sources to sinks. At each step, data + # producers wait for acknowledge from consumers before starting + # generating data. + upstream_channels = {} + for node in nx.topological_sort(self.logical_topo): + operator = self.operators[node] + # Generate downstream data channels + downstream_channels = self._generate_channels(operator) + # Instantiate Ray actors + handles = self.__generate_actors(operator, upstream_channels, + downstream_channels) + if handles: + self.actor_handles.extend(handles) + upstream_channels.update(downstream_channels) + logger.debug("Running...") + return self.actor_handles + + # Prints the logical dataflow graph + def print_logical_graph(self): + self._collect_garbage() + logger.info("==================================") + logger.info("======Logical Dataflow Graph======") + logger.info("==================================") + # Print operators in topological order + for node in nx.topological_sort(self.logical_topo): + downstream_neighbors = list(self.logical_topo.neighbors(node)) + logger.info("======Current Operator======") + operator = self.operators[node] + operator.print() + logger.info("======Downstream Operators======") + if len(downstream_neighbors) == 0: + logger.info("None\n") + for downstream_node in downstream_neighbors: + self.operators[downstream_node].print() + + # Prints the physical dataflow graph + def print_physical_graph(self): + logger.info("===================================") + logger.info("======Physical Dataflow Graph======") + logger.info("===================================") + # Print all data channels between operator instances + log = "(Source Operator ID,Source Operator Name,Source Instance ID)" + log += " --> " + log += "(Destination Operator ID,Destination Operator Name," + log += "Destination Instance ID)" + logger.info(log) + for src_actor_id, dst_actor_id in self.physical_topo.edges: + src_operator_id, src_instance_id = src_actor_id + dst_operator_id, dst_instance_id = dst_actor_id + logger.info("({},{},{}) --> ({},{},{})".format( + src_operator_id, self.operators[src_operator_id].name, + src_instance_id, dst_operator_id, + self.operators[dst_operator_id].name, dst_instance_id)) + + +# TODO (john): We also need KeyedDataStream and WindowedDataStream as +# subclasses of DataStream to prevent ill-defined logical dataflows + + +# A DataStream corresponds to an edge in the logical dataflow +class DataStream(object): + """A data stream. + + This class contains all information about a logical stream, i.e. an edge + in the logical topology. It is the main class exposed to the user. + + Attributes: + id (UUID): The id of the stream + env (Environment): The environment the stream belongs to. + src_operator_id (UUID): The id of the source operator of the stream. + dst_operator_id (UUID): The id of the destination operator of the + stream. + is_partitioned (bool): Denotes if there is a partitioning strategy + (e.g. shuffle) for the stream or not (default stategy: Forward). + """ + + def __init__(self, + environment, + source_id=None, + dest_id=None, + is_partitioned=False): + self.id = _generate_uuid() + self.env = environment + self.src_operator_id = source_id + self.dst_operator_id = dest_id + # True if a partitioning strategy for this stream exists, + # false otherwise + self.is_partitioned = is_partitioned + + # Generates a new stream after a data transformation is applied + def __expand(self): + stream = DataStream(self.env) + assert (self.dst_operator_id is not None) + stream.src_operator_id = self.dst_operator_id + stream.dst_operator_id = None + return stream + + # Assigns the partitioning strategy to a new 'open-ended' stream + # and returns the stream. At this point, the partitioning strategy + # is not associated with any destination operator. We expect this to + # be done later, as we continue assembling the dataflow graph + def __partition(self, strategy, partition_fn=None): + scheme = PScheme(strategy, partition_fn) + source_operator = self.env.operators[self.src_operator_id] + new_stream = DataStream( + self.env, source_id=source_operator.id, is_partitioned=True) + source_operator._set_partition_strategy(new_stream.id, scheme) + return new_stream + + # Registers the operator to the environment and returns a new + # 'open-ended' stream. The registered operator serves as the destination + # of the previously 'open' stream + def __register(self, operator): + """Registers the given logical operator to the environment and + connects it to its upstream operator (if any). + + A call to this function adds a new edge to the logical topology. + + Attributes: + operator (Operator): The metadata of the logical operator. + """ + self.env.operators[operator.id] = operator + self.dst_operator_id = operator.id + logger.debug("Adding new dataflow edge ({},{}) --> ({},{})".format( + self.src_operator_id, + self.env.operators[self.src_operator_id].name, + self.dst_operator_id, + self.env.operators[self.dst_operator_id].name)) + # Update logical dataflow graphs + self.env._add_edge(self.src_operator_id, self.dst_operator_id) + # Keep track of the partitioning strategy and the destination operator + src_operator = self.env.operators[self.src_operator_id] + if self.is_partitioned is True: + partitioning, _ = src_operator._get_partition_strategy(self.id) + src_operator._set_partition_strategy(_generate_uuid(), + partitioning, operator.id) + elif src_operator.type == OpType.KeyBy: + # Set the output partitioning strategy to shuffle by key + partitioning = PScheme(PStrategy.ShuffleByKey) + src_operator._set_partition_strategy(_generate_uuid(), + partitioning, operator.id) + else: # No partitioning strategy has been defined - set default + partitioning = PScheme(PStrategy.Forward) + src_operator._set_partition_strategy(_generate_uuid(), + partitioning, operator.id) + return self.__expand() + + # Sets the level of parallelism for an operator, i.e. its total + # number of instances. Each operator instance corresponds to an actor + # in the physical dataflow + def set_parallelism(self, num_instances): + """Sets the number of instances for the source operator of the stream. + + Attributes: + num_instances (int): The level of parallelism for the source + operator of the stream. + """ + assert (num_instances > 0) + self.env._set_parallelism(self.src_operator_id, num_instances) + return self + + # Stream Partitioning Strategies # + # TODO (john): Currently, only forward (default), shuffle, + # and broadcast are supported + + # Hash-based record shuffling + def shuffle(self): + """Registers a shuffling partitioning strategy for the stream.""" + return self.__partition(PStrategy.Shuffle) + + # Broadcasts each record to all downstream instances + def broadcast(self): + """Registers a broadcast partitioning strategy for the stream.""" + return self.__partition(PStrategy.Broadcast) + + # Rescales load to downstream instances + def rescale(self): + """Registers a rescale partitioning strategy for the stream. + + Same as Flink's rescale (see: https://ci.apache.org/projects/flink/ + flink-docs-stable/dev/stream/operators/#physical-partitioning). + """ + return self.__partition(PStrategy.Rescale) + + # Round-robin partitioning + def round_robin(self): + """Registers a round-robin partitioning strategy for the stream.""" + return self.__partition(PStrategy.RoundRobin) + + # User-defined partitioning + def partition(self, partition_fn): + """Registers a user-defined partitioning strategy for the stream. + + Attributes: + partition_fn (function): The user-defined partitioning function. + """ + return self.__partition(PStrategy.Custom, partition_fn) + + # Data Trasnformations # + # TODO (john): Expand set of supported operators. + # TODO (john): To support event-time windows we need a mechanism for + # generating and processing watermarks + + # Registers map operator to the environment + def map(self, map_fn, name="Map"): + """Applies a map operator to the stream. + + Attributes: + map_fn (function): The user-defined logic of the map. + """ + op = Operator( + _generate_uuid(), + OpType.Map, + name, + map_fn, + num_instances=self.env.config.parallelism) + return self.__register(op) + + # Registers flatmap operator to the environment + def flat_map(self, flatmap_fn): + """Applies a flatmap operator to the stream. + + Attributes: + flatmap_fn (function): The user-defined logic of the flatmap + (e.g. split()). + """ + op = Operator( + _generate_uuid(), + OpType.FlatMap, + "FlatMap", + flatmap_fn, + num_instances=self.env.config.parallelism) + return self.__register(op) + + # Registers keyBy operator to the environment + # TODO (john): This should returned a KeyedDataStream + def key_by(self, key_selector): + """Applies a key_by operator to the stream. + + Attributes: + key_attribute_index (int): The index of the key attributed + (assuming tuple records). + """ + op = Operator( + _generate_uuid(), + OpType.KeyBy, + "KeyBy", + other=key_selector, + num_instances=self.env.config.parallelism) + return self.__register(op) + + # Registers Reduce operator to the environment + def reduce(self, reduce_fn): + """Applies a rolling sum operator to the stream. + + Attributes: + sum_attribute_index (int): The index of the attribute to sum + (assuming tuple records). + """ + op = Operator( + _generate_uuid(), + OpType.Reduce, + "Sum", + reduce_fn, + num_instances=self.env.config.parallelism) + return self.__register(op) + + # Registers Sum operator to the environment + def sum(self, attribute_selector, state_keeper=None): + """Applies a rolling sum operator to the stream. + + Attributes: + sum_attribute_index (int): The index of the attribute to sum + (assuming tuple records). + """ + op = Operator( + _generate_uuid(), + OpType.Sum, + "Sum", + _sum, + other=attribute_selector, + state_actor=state_keeper, + num_instances=self.env.config.parallelism) + return self.__register(op) + + # Registers window operator to the environment. + # This is a system time window + # TODO (john): This should return a WindowedDataStream + def time_window(self, window_width_ms): + """Applies a system time window to the stream. + + Attributes: + window_width_ms (int): The length of the window in ms. + """ + op = Operator( + _generate_uuid(), + OpType.TimeWindow, + "TimeWindow", + num_instances=self.env.config.parallelism, + other=window_width_ms) + return self.__register(op) + + # Registers filter operator to the environment + def filter(self, filter_fn): + """Applies a filter to the stream. + + Attributes: + filter_fn (function): The user-defined filter function. + """ + op = Operator( + _generate_uuid(), + OpType.Filter, + "Filter", + filter_fn, + num_instances=self.env.config.parallelism) + return self.__register(op) + + # TODO (john): Registers window join operator to the environment + def window_join(self, other_stream, join_attribute, window_width): + op = Operator( + _generate_uuid(), + OpType.WindowJoin, + "WindowJoin", + num_instances=self.env.config.parallelism) + return self.__register(op) + + # Registers inspect operator to the environment + def inspect(self, inspect_logic): + """Inspects the content of the stream. + + Attributes: + inspect_logic (function): The user-defined inspect function. + """ + op = Operator( + _generate_uuid(), + OpType.Inspect, + "Inspect", + inspect_logic, + num_instances=self.env.config.parallelism) + return self.__register(op) + + # Registers sink operator to the environment + # TODO (john): A sink now just drops records but it should be able to + # export data to other systems + def sink(self): + """Closes the stream with a sink operator.""" + op = Operator( + _generate_uuid(), + OpType.Sink, + "Sink", + num_instances=self.env.config.parallelism) + return self.__register(op) diff --git a/python/ray/tests/test_batched_queue.py b/python/ray/tests/test_batched_queue.py new file mode 100644 index 000000000000..62a7da234833 --- /dev/null +++ b/python/ray/tests/test_batched_queue.py @@ -0,0 +1,66 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pytest +import time + +import ray +from ray.experimental.streaming.batched_queue import BatchedQueue + + +@pytest.fixture +def ray_start(): + # Start the Ray processes. + ray.init(num_cpus=2) + yield None + # The code after the yield will run as teardown code. + ray.shutdown() + + +@ray.remote +class Reader(object): + def __init__(self, queue): + self.queue = queue + self.num_reads = 0 + self.start = time.time() + + def read(self, read_slowly): + expected_value = 0 + for _ in range(1000): + x = self.queue.read_next() + assert x == expected_value, (x, expected_value) + expected_value += 1 + self.num_reads += 1 + if read_slowly: + time.sleep(0.001) + + +def test_batched_queue(ray_start): + # Batched queue parameters + max_queue_size = 10000 # Max number of batches in queue + max_batch_size = 1000 # Max number of elements per batch + batch_timeout = 0.001 # 1ms flush timeout + prefetch_depth = 10 # Number of batches to prefetch from plasma + background_flush = False # Don't use daemon thread for flushing + # Two tests: one with a big queue and slow reader, and + # a second one with a small queue and a faster reader + for read_slowly in [True, False]: + # Construct the batched queue + queue = BatchedQueue( + max_size=max_queue_size, + max_batch_size=max_batch_size, + max_batch_time=batch_timeout, + prefetch_depth=prefetch_depth, + background_flush=background_flush) + # Create and start the reader + reader = Reader.remote(queue) + object_id = reader.read.remote(read_slowly=read_slowly) + value = 0 + for _ in range(1000): + queue.put_next(value) + value += 1 + queue._flush_writes() + ray.get(object_id) + # Test once more with a very small queue size and a faster reader + max_queue_size = 10 diff --git a/python/ray/tests/test_logical_graph.py b/python/ray/tests/test_logical_graph.py new file mode 100644 index 000000000000..54578ae9c2d5 --- /dev/null +++ b/python/ray/tests/test_logical_graph.py @@ -0,0 +1,204 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from ray.experimental.streaming.streaming import Environment +from ray.experimental.streaming.operator import OpType, PStrategy + + +def test_parallelism(): + """Tests operator parallelism.""" + env = Environment() + # Try setting a common parallelism for all operators + env.set_parallelism(2) + stream = env.source(None).map(None).filter(None).flat_map(None) + env._collect_garbage() + for operator in env.operators.values(): + if operator.type == OpType.Source: + # TODO (john): Currently each source has only one instance + assert operator.num_instances == 1, (operator.num_instances, 1) + else: + assert operator.num_instances == 2, (operator.num_instances, 2) + # Check again after adding an operator with different parallelism + stream.map(None, "Map1").shuffle().set_parallelism(3).map( + None, "Map2").set_parallelism(4) + env._collect_garbage() + for operator in env.operators.values(): + if operator.type == OpType.Source: + assert operator.num_instances == 1, (operator.num_instances, 1) + elif operator.name != "Map1" and operator.name != "Map2": + assert operator.num_instances == 2, (operator.num_instances, 2) + elif operator.name != "Map2": + assert operator.num_instances == 3, (operator.num_instances, 3) + else: + assert operator.num_instances == 4, (operator.num_instances, 4) + + +def test_partitioning(): + """Tests stream partitioning.""" + env = Environment() + # Try defining multiple partitioning strategies for the same stream + _ = env.source(None).shuffle().rescale().broadcast().map( + None).broadcast().shuffle() + env._collect_garbage() + for operator in env.operators.values(): + p_schemes = operator.partitioning_strategies + for scheme in p_schemes.values(): + # Only last defined strategy should be kept + if operator.type == OpType.Source: + assert scheme.strategy == PStrategy.Broadcast, ( + scheme.strategy, PStrategy.Broadcast) + else: + assert scheme.strategy == PStrategy.Shuffle, ( + scheme.strategy, PStrategy.Shuffle) + + +def test_forking(): + """Tests stream forking.""" + env = Environment() + # Try forking a stream + stream = env.source(None).map(None).set_parallelism(2) + # First branch with a shuffle partitioning strategy + _ = stream.shuffle().key_by(0).sum(1) + # Second branch with the default partitioning strategy + _ = stream.key_by(1).sum(2) + env._collect_garbage() + # Operator ids + source_id = None + map_id = None + keyby1_id = None + keyby2_id = None + sum1_id = None + sum2_id = None + # Collect ids + for id, operator in env.operators.items(): + if operator.type == OpType.Source: + source_id = id + elif operator.type == OpType.Map: + map_id = id + elif operator.type == OpType.KeyBy: + if operator.other_args == 0: + keyby1_id = id + else: + assert operator.other_args == 1, (operator.other_args, 1) + keyby2_id = id + elif operator.type == OpType.Sum: + if operator.other_args == 1: + sum1_id = id + else: + assert operator.other_args == 2, (operator.other_args, 2) + sum2_id = id + # Check generated streams and their partitioning + for source, destination in env.logical_topo.edges: + operator = env.operators[source] + if source == source_id: + assert destination == map_id, (destination, map_id) + elif source == map_id: + p_scheme = operator.partitioning_strategies[destination] + strategy = p_scheme.strategy + key_index = env.operators[destination].other_args + if key_index == 0: # This must be the first branch + assert strategy == PStrategy.Shuffle, (strategy, + PStrategy.Shuffle) + assert destination == keyby1_id, (destination, keyby1_id) + else: # This must be the second branch + assert key_index == 1, (key_index, 1) + assert strategy == PStrategy.Forward, (strategy, + PStrategy.Forward) + assert destination == keyby2_id, (destination, keyby2_id) + elif source == keyby1_id or source == keyby2_id: + p_scheme = operator.partitioning_strategies[destination] + strategy = p_scheme.strategy + key_index = env.operators[destination].other_args + if key_index == 1: # This must be the first branch + assert strategy == PStrategy.ShuffleByKey, ( + strategy, PStrategy.ShuffleByKey) + assert destination == sum1_id, (destination, sum1_id) + else: # This must be the second branch + assert key_index == 2, (key_index, 2) + assert strategy == PStrategy.ShuffleByKey, ( + strategy, PStrategy.ShuffleByKey) + assert destination == sum2_id, (destination, sum2_id) + else: # This must be a sum operator + assert operator.type == OpType.Sum, (operator.type, OpType.Sum) + + +def _test_shuffle_channels(): + """Tests shuffling connectivity.""" + env = Environment() + # Try defining a shuffle + _ = env.source(None).shuffle().map(None).set_parallelism(4) + expected = [(0, 0), (0, 1), (0, 2), (0, 3)] + _test_channels(env, expected) + + +def _test_forward_channels(): + """Tests forward connectivity.""" + env = Environment() + # Try the default partitioning strategy + _ = env.source(None).set_parallelism(4).map(None).set_parallelism(2) + expected = [(0, 0), (1, 1), (2, 0), (3, 1)] + _test_channels(env, expected) + + +def _test_broadcast_channels(): + """Tests broadcast connectivity.""" + env = Environment() + # Try broadcasting + _ = env.source(None).set_parallelism(4).broadcast().map( + None).set_parallelism(2) + expected = [(0, 0), (0, 1), (1, 0), (1, 1), (2, 0), (2, 1), (3, 0), (3, 1)] + _test_channels(env, expected) + + +def _test_round_robin_channels(): + """Tests round-robin connectivity.""" + env = Environment() + # Try broadcasting + _ = env.source(None).round_robin().map(None).set_parallelism(2) + expected = [(0, 0), (0, 1)] + _test_channels(env, expected) + + +def _test_channels(environment, expected_channels): + """Tests operator connectivity.""" + environment._collect_garbage() + map_id = None + # Get id + for id, operator in environment.operators.items(): + if operator.type == OpType.Map: + map_id = id + # Collect channels + channels_per_destination = [] + for operator in environment.operators.values(): + channels_per_destination.append( + environment._generate_channels(operator)) + # Check actual connectivity + actual = [] + for destination in channels_per_destination: + for channels in destination.values(): + for channel in channels: + src_instance_id = channel.src_instance_id + dst_instance_id = channel.dst_instance_id + connection = (src_instance_id, dst_instance_id) + assert channel.dst_operator_id == map_id, ( + channel.dst_operator_id, map_id) + actual.append(connection) + # Make sure connections are as expected + set_1 = set(expected_channels) + set_2 = set(actual) + assert set_1 == set_2, (set_1, set_2) + + +def test_channel_generation(): + """Tests data channel generation.""" + _test_shuffle_channels() + _test_broadcast_channels() + _test_round_robin_channels() + _test_forward_channels() + + +# TODO (john): Add simple wordcount test +def test_wordcount(): + """Tests a simple streaming wordcount.""" + pass