Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Skeleton for Streaming API #4126

Merged
merged 6 commits into from
Feb 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ script:
- python python/ray/rllib/test/test_optimizers.py
- python python/ray/rllib/test/test_evaluators.py

# streaming library tests
- python -m pytest -v --durations=10 python/ray/tests/test_batched_queue.py
- python -m pytest -v --durations=10 python/ray/tests/test_logical_graph.py

# Python3.5+ only. Otherwise we will get `SyntaxError` regardless of how we set the tester.
- python -c 'import sys;exit(sys.version_info>=(3,5))' || python -m pytest -v --durations=10 python/ray/experimental/test/async_test.py

Expand Down
8 changes: 4 additions & 4 deletions ci/travis/install-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ if [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "linux" ]]; then
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
pip install -q cython==0.29.0 cmake gym==0.10.11 opencv-python-headless pyyaml pandas==0.23.4 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky
feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky networkx
conda install -y scipy tensorflow
elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then
sudo apt-get update
Expand All @@ -35,7 +35,7 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
pip install -q cython==0.29.0 cmake gym opencv-python-headless pyyaml pandas==0.23.4 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky
feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx
conda install -y scipy tensorflow
elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then
# check that brew is installed
Expand All @@ -53,7 +53,7 @@ elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
pip install -q cython==0.29.0 cmake tensorflow gym==0.10.11 opencv-python-headless pyyaml pandas==0.23.4 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky
feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky networkx
elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then
# check that brew is installed
which -s brew
Expand All @@ -70,7 +70,7 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
pip install -q cython==0.29.0 cmake tensorflow gym opencv-python-headless pyyaml pandas==0.23.4 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky
feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx
elif [[ "$LINT" == "1" ]]; then
sudo apt-get update
sudo apt-get install -y cmake build-essential autoconf curl libtool unzip
Expand Down
16 changes: 16 additions & 0 deletions python/ray/experimental/streaming/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Streaming Library
=================

Dependencies:

Install NetworkX: ``pip install networkx``

Examples:

- simple.py: A simple example with stateless operators and different parallelism per stage.

Run ``python simple.py --input-file toy.txt``

- wordcount.py: A streaming wordcount example with a stateful operator (rolling sum).

Run ``python wordcount.py --titles-file articles.txt``
Empty file.
227 changes: 227 additions & 0 deletions python/ray/experimental/streaming/batched_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import logging
import numpy as np
import threading
import time

import ray
from ray.experimental import internal_kv

logger = logging.getLogger(__name__)
logger.setLevel("INFO")


def plasma_prefetch(object_id):
"""Tells plasma to prefetch the given object_id."""
local_sched_client = ray.worker.global_worker.raylet_client
ray_obj_id = ray.ObjectID(object_id)
local_sched_client.fetch_or_reconstruct([ray_obj_id], True)


def plasma_get(object_id):
"""Get an object directly from plasma without going through object table.

Precondition: plasma_prefetch(object_id) has been called before.
"""
client = ray.worker.global_worker.plasma_client
plasma_id = ray.pyarrow.plasma.ObjectID(object_id)
while not client.contains(plasma_id):
pass
return client.get(plasma_id)


# TODO: doing the timer in Python land is a bit slow
class FlushThread(threading.Thread):
"""A thread that flushes periodically to plasma.

Attributes:
interval: The flush timeout per batch.
flush_fn: The flush function.
"""

def __init__(self, interval, flush_fn):
threading.Thread.__init__(self)
self.interval = interval # Interval is the max_batch_time
self.flush_fn = flush_fn
self.daemon = True

def run(self):
while True:
time.sleep(self.interval) # Flushing period
self.flush_fn()


class BatchedQueue(object):
"""A batched queue for actor to actor communication.

Attributes:
max_size (int): The maximum size of the queue in number of batches
(if exceeded, backpressure kicks in)
max_batch_size (int): The size of each batch in number of records.
max_batch_time (float): The flush timeout per batch.
prefetch_depth (int): The number of batches to prefetch from plasma.
background_flush (bool): Denotes whether a daemon flush thread should
be used (True) to flush batches to plasma.
base (ndarray): A unique signature for the queue.
read_ack_key (bytes): The signature of the queue in bytes.
prefetch_batch_offset (int): The number of the last read prefetched
batch.
read_batch_offset (int): The number of the last read batch.
read_item_offset (int): The number of the last read record inside a
batch.
write_batch_offset (int): The number of the last written batch.
write_item_offset (int): The numebr of the last written item inside a
batch.
write_buffer (list): The write buffer, i.e. an in-memory batch.
last_flush_time (float): The time the last flushing to plasma took
place.
cached_remote_offset (int): The number of the last read batch as
recorded by the writer after the previous flush.
flush_lock (RLock): A python lock used for flushing batches to plasma.
flush_thread (Threading): The python thread used for flushing batches
to plasma.
"""

def __init__(self,
max_size=999999,
max_batch_size=99999,
max_batch_time=0.01,
prefetch_depth=10,
background_flush=True):
self.max_size = max_size
self.max_batch_size = max_batch_size
self.max_batch_time = max_batch_time
self.prefetch_depth = prefetch_depth
self.background_flush = background_flush

# Common queue metadata -- This serves as the unique id of the queue
self.base = np.random.randint(0, 2**32 - 1, size=5, dtype="uint32")
self.base[-2] = 0
self.base[-1] = 0
self.read_ack_key = np.ndarray.tobytes(self.base)

# Reader state
self.prefetch_batch_offset = 0
self.read_item_offset = 0
self.read_batch_offset = 0
self.read_buffer = []

# Writer state
self.write_item_offset = 0
self.write_batch_offset = 0
self.write_buffer = []
self.last_flush_time = 0.0
self.cached_remote_offset = 0

self.flush_lock = threading.RLock()
self.flush_thread = FlushThread(self.max_batch_time,
self._flush_writes)

def __getstate__(self):
state = dict(self.__dict__)
del state["flush_lock"]
del state["flush_thread"]
del state["write_buffer"]
return state

def __setstate__(self, state):
self.__dict__.update(state)

# This is to enable writing functionality in
# case the queue is not created by the writer
# The reason is that python locks cannot be serialized
def enable_writes(self):
"""Restores the state of the batched queue for writing."""
self.write_buffer = []
self.flush_lock = threading.RLock()
self.flush_thread = FlushThread(self.max_batch_time,
self._flush_writes)

# Batch ids consist of a unique queue id used as prefix along with
# two numbers generated using the batch offset in the queue
def _batch_id(self, batch_offset):
oid = self.base.copy()
oid[-2] = batch_offset // 2**32
oid[-1] = batch_offset % 2**32
return np.ndarray.tobytes(oid)

def _flush_writes(self):
with self.flush_lock:
if not self.write_buffer:
return
batch_id = self._batch_id(self.write_batch_offset)
ray.worker.global_worker.put_object(
ray.ObjectID(batch_id), self.write_buffer)
logger.debug("[writer] Flush batch {} offset {} size {}".format(
self.write_batch_offset, self.write_item_offset,
len(self.write_buffer)))
self.write_buffer = []
self.write_batch_offset += 1
self._wait_for_reader()
self.last_flush_time = time.time()

def _wait_for_reader(self):
"""Checks for backpressure by the downstream reader."""
if self.max_size <= 0: # Unlimited queue
return
if self.write_item_offset - self.cached_remote_offset <= self.max_size:
return # Hasn't reached max size
remote_offset = internal_kv._internal_kv_get(self.read_ack_key)
if remote_offset is None:
# logger.debug("[writer] Waiting for reader to start...")
while remote_offset is None:
time.sleep(0.01)
remote_offset = internal_kv._internal_kv_get(self.read_ack_key)
remote_offset = int(remote_offset)
if self.write_item_offset - remote_offset > self.max_size:
logger.debug(
"[writer] Waiting for reader to catch up {} to {} - {}".format(
remote_offset, self.write_item_offset, self.max_size))
while self.write_item_offset - remote_offset > self.max_size:
time.sleep(0.01)
remote_offset = int(
internal_kv._internal_kv_get(self.read_ack_key))
self.cached_remote_offset = remote_offset

def _read_next_batch(self):
while (self.prefetch_batch_offset <
self.read_batch_offset + self.prefetch_depth):
plasma_prefetch(self._batch_id(self.prefetch_batch_offset))
self.prefetch_batch_offset += 1
self.read_buffer = plasma_get(self._batch_id(self.read_batch_offset))
self.read_batch_offset += 1
logger.debug("[reader] Fetched batch {} offset {} size {}".format(
self.read_batch_offset, self.read_item_offset,
len(self.read_buffer)))
self._ack_reads(self.read_item_offset + len(self.read_buffer))

# Reader acks the key it reads so that writer knows reader's offset.
# This is to cap queue size and simulate backpressure
def _ack_reads(self, offset):
if self.max_size > 0:
internal_kv._internal_kv_put(
self.read_ack_key, offset, overwrite=True)

def put_next(self, item):
with self.flush_lock:
if self.background_flush and not self.flush_thread.is_alive():
logger.debug("[writer] Starting batch flush thread")
self.flush_thread.start()
self.write_buffer.append(item)
self.write_item_offset += 1
if not self.last_flush_time:
self.last_flush_time = time.time()
delay = time.time() - self.last_flush_time
if (len(self.write_buffer) > self.max_batch_size
or delay > self.max_batch_time):
self._flush_writes()

def read_next(self):
if not self.read_buffer:
self._read_next_batch()
assert self.read_buffer
self.read_item_offset += 1
return self.read_buffer.pop(0)
Loading