Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Intermittent test_streaming_fault_tolerance failures #43798

Closed
omatthew98 opened this issue Mar 7, 2024 · 2 comments
Closed

[Data] Intermittent test_streaming_fault_tolerance failures #43798

omatthew98 opened this issue Mar 7, 2024 · 2 comments
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P2 Important issue, but not time-critical

Comments

@omatthew98
Copy link
Contributor

What happened + What you expected to happen

The test_streaming_fault_tolerance test is flaky resulting in intermittent failures with the test_streaming_integration test suite. Below are two example stack traces from these failures.

❯ pytest test_streaming_integration.py -k test_streaming_fault_tolerance
Test session starts (platform: darwin, Python 3.8.18, pytest 7.0.1, pytest-sugar 0.9.5)
rootdir: /Users/mowen/code/ray, configfile: pytest.ini
plugins: asyncio-0.16.0, docker-tools-3.1.3, sphinx-0.5.1.dev0, forked-1.4.0, repeat-0.9.3, sugar-0.9.5, timeout-2.1.0, shutil-1.7.0, rerunfailures-11.1.2, virtualenv-1.7.0, anyio-3.7.1, lazy-fixture-0.6.3, httpserver-1.0.6
timeout: 180.0s
timeout method: signal
timeout func_only: False
collecting ...

――――――――――――――――――――――――――――――――――――― test_streaming_fault_tolerance ――――――――――――――――――――――――――――――――――――――

ray_start_10_cpus_shared = RayContext(dashboard_url='', python_version='3.8.18', ray_version='3.0.0.dev0', ray_commit='{{RAY_COMMIT_SHA}}')
restore_data_context = None

    def test_streaming_fault_tolerance(ray_start_10_cpus_shared, restore_data_context):
        class RandomExit:
            def __call__(self, x):
                import os

                if random.random() > 0.9:
                    print("force exit")
                    os._exit(1)
                return x

        # Test recover.
        base = ray.data.range(1000, override_num_blocks=100)
        ds1 = base.map_batches(
            RandomExit, compute=ray.data.ActorPoolStrategy(size=4), max_task_retries=999
        )
>       ds1.take_all()

test_streaming_integration.py:586:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../dataset.py:2417: in take_all
    for row in self.iter_rows():
../iterator.py:241: in _wrapped_iterator
    for batch in batch_iterable:
../iterator.py:185: in _create_iterator
    for batch in iterator:
../_internal/block_batching/iter_batches.py:176: in iter_batches
    next_batch = next(async_batch_iter)
../_internal/util.py:939: in make_async_gen
    next_item = output_queue.get()
../_internal/util.py:841: in get
    self._consumer_semaphore.acquire()
/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py:433: in acquire
    self._cond.wait(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Condition(<unlocked _thread.lock object at 0x169cc2d50>, 0)>, timeout = None

    def wait(self, timeout=None):
        """Wait until notified or until a timeout occurs.

        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.

        This method releases the underlying lock, and then blocks until it is
        awakened by a notify() or notify_all() call for the same condition
        variable in another thread, or until the optional timeout occurs. Once
        awakened or timed out, it re-acquires the lock and returns.

        When the timeout argument is present and not None, it should be a
        floating point number specifying a timeout for the operation in seconds
        (or fractions thereof).

        When the underlying lock is an RLock, it is not released using its
        release() method, since this may not actually unlock the lock when it
        was acquired multiple times recursively. Instead, an internal interface
        of the RLock class is used, which really unlocks it even when it has
        been recursively acquired several times. Another internal interface is
        then used to restore the recursion level when the lock is reacquired.

        """
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
>               waiter.acquire()
E               Failed: Timeout >180.0s

/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py:302: Failed
------------------------------------------ Captured stderr setup ------------------------------------------
2024-03-07 14:53:36,022	INFO worker.py:1752 -- Started a local Ray instance.
------------------------------------------ Captured stdout call -------------------------------------------
(raylet) A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: ffffffffffffffff40ea52dfdfff82295a14bd5c01000000 Worker ID: dd6213e14975ef4a7581e8264e64200d458d0d2d75f81efad6a5bfc0 Node ID: 8a20e13dcf051a4198a68cc43f14571844989590c6a2c2f532722271 Worker IP address: 127.0.0.1 Worker port: 61204 Worker PID: 5843 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
(MapWorker(MapBatches(RandomExit)) pid=5843) force exit
------------------------------------------ Captured stderr call -------------------------------------------
2024-03-07 14:53:38,147	WARNING util.py:561 -- The argument ``compute`` is deprecated in Ray 2.9. Please specify argument ``concurrency`` instead. For more information, see https://docs.ray.io/en/master/data/transforming-data.html#stateful-transforms.
2024-03-07 14:53:38,154	INFO streaming_executor.py:115 -- Starting execution of Dataset. Full log is in /tmp/ray/session_2024-03-07_14-53-34_167105_5632/logs/ray-data.log
2024-03-07 14:53:38,154	INFO streaming_executor.py:116 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> ActorPoolMapOperator[MapBatches(RandomExit)]

Running 0:   0%|          | 0/100 [00:00<?, ?it/s]
Running: 4/10.0 CPU, 0/0.0 GPU, 2.4KB/75.0MB object_store_memory:  12%|█▏        | 12/100 [00:01<00:08, 10.74it/s]tches(RandomExit) 2:   0%|          | 0/100 [00:00<?, ?it/s]

- MapBatches(RandomExit): 16 active, 11 queued, [cpu: 4.0, objects: 320.0B], 4 actors [0/28 objects local]:- ReadRange: 5 active, 26 queued, [cpu: 5.0, objects: 1.4KB]:  34%|███▍      | 34/100 [00:02<00:01, 34.01it- ReadRange: 5 active, 26 queued, [cpu: 5.0, objects: 1.4KB]:  70%|███████   | 70/100 [00:02<00:00, 35.15it/s]
Running: 4/10.0 CPU, 0/0.0 GPU, 1.6KB/75.0MB object_store_memory:  12%|█▏        | 12/100 [00:02<00:08, 10.Running: 4/10.0 CPU, 0/0.0 GPU, 1.6KB/75.0MB object_store_memory:  62%|██████▏   | 62/100 [00:02<00:01, 32.50it/s]████▏   | 62/100 [00:02<00:01, 32.55it/s]
- ReadRange: 0 active, 0 queued, [cpu: 0.0, objects: 1.1KB]:  70%|███████   | 70/100 [00:03<00:00, 35.15it/- ReadRange: 0 active, 0 queued, [cpu: 0.0, objects: 1.1KB]: 100%|██████████| 100/100 [00:03<00:00, 32.15it/s]
Running: 3/10.0 CPU, 0/0.0 GPU, 1.3KB/75.0MB object_store_memory:  62%|██████▏   | 62/100 [00:03<00:01, 32.Running: 3/10.0 CPU, 0/0.0 GPU, 1.3KB/75.0MB object_store_memory:  86%|████████▌ | 86/100 [00:03<00:00, 28.52it/s]███████▋ | 87/100 [00:03<00:00, 28.97it/s]

Running: 1/10.0 CPU, 0/0.0 GPU, 480.0B/75.0MB object_store_memory:  86%|████████▌ | 86/100 [00:04<00:00, 28Running: 1/10.0 CPU, 0/0.0 GPU, 480.0B/75.0MB object_store_memory:  95%|█████████▌| 95/100 [00:04<00:00, 16.65it/s]██████▌| 96/100 [00:04<00:00, 16.87it/s]
- ReadRange: 0 active, 0 queued, [cpu: 0.0, objects: 80.0B]: 100%|██████████| 100/100 [00:04<00:00, 32.15itRunning: 1/10.0 CPU, 0/0.0 GPU, 480.0B/75.0MB object_store_memory:  95%|█████████▌| 95/100 [00:20<00:00, 16.65it/s]

+++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++ors [0/100 objects local]:  96%|█████████▌| 96/100 [00:20<00:00, 16.87it/s]
~~~~~~~~~~~~~~~~~~~~~~~ Stack of Thread-3 (11443417088) ~~~~~~~~~~~~~~~~~~~~~~~~
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/mowen/code/ray/python/ray/data/_internal/util.py", line 918, in execute_computation
    for item in fn(thread_safe_generator):
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 167, in _async_iter_batches
    yield from extract_data_from_batch(batch_iter)
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/util.py", line 211, in extract_data_from_batch
    for batch in batch_iter:
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 306, in restore_original_order
    for batch in batch_iter:
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 218, in threadpool_computations_format_collate
    yield from formatted_batch_iter
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/util.py", line 159, in format_batches
    for batch in block_iter:
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/util.py", line 118, in blocks_to_batches
    for block in block_iter:
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/util.py", line 55, in resolve_block_refs
    for block_ref in block_ref_iter:
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 254, in prefetch_batches_locally
    for block_ref, metadata in block_ref_iter:
  File "/Users/mowen/code/ray/python/ray/data/_internal/util.py", line 898, in __next__
    return next(self.it)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/legacy_compat.py", line 45, in execute_to_legacy_block_iterator
    for bundle in bundle_iter:
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
    return self.get_next()
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 152, in get_next
    item = self._outer._output_node.get_output_blocking(
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 280, in get_output_blocking
    time.sleep(0.01)

~~~~~~~~~~~~~~~~~~~~~~ Stack of Prefetcher (11426590720) ~~~~~~~~~~~~~~~~~~~~~~~
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/util.py", line 243, in _run
    self._condition.wait()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 302, in wait
    waiter.acquire()

~~~~~~~~~~~~~~~~~~~~~~~ Stack of Thread-2 (11409764352) ~~~~~~~~~~~~~~~~~~~~~~~~
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/mowen/code/ray/python/ray/data/_internal/stats.py", line 569, in _run_update_loop
    time.sleep(StatsManager.STATS_ACTOR_UPDATE_INTERVAL_SECONDS)

~~ Stack of StreamingExecutor-39bb3fc939db4a51a53ed88f23b1bd1a (11392937984) ~~~
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 222, in run
    continue_sched = self._scheduling_loop_step(self._topology)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 281, in _scheduling_loop_step
    num_errored_blocks = process_completed_tasks(
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 392, in process_completed_tasks
    ready, _ = ray.wait(
  File "/Users/mowen/code/ray/python/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/Users/mowen/code/ray/python/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/Users/mowen/code/ray/python/ray/_private/worker.py", line 2893, in wait
    ready_ids, remaining_ids = worker.core_worker.wait(

~~~~~~~~~~~~~~~~~~~~~~~ Stack of Thread-1 (11376111616) ~~~~~~~~~~~~~~~~~~~~~~~~
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/site-packages/tqdm/_monitor.py", line 60, in run
    self.was_killed.wait(self.sleep_interval)
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 558, in wait
    signaled = self._cond.wait(timeout)
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 306, in wait
    gotit = waiter.acquire(True, timeout)

~~~~~~~~~~~~~~~~~~~~ Stack of ray_print_logs (11357564928) ~~~~~~~~~~~~~~~~~~~~~
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/mowen/code/ray/python/ray/_private/worker.py", line 901, in print_logs
    data = subscriber.poll()

~~~~~~~~~~~~~~~ Stack of ray_listen_error_messages (11340738560) ~~~~~~~~~~~~~~~
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/mowen/code/ray/python/ray/_private/worker.py", line 2139, in listen_error_messages
    _, error_data = worker.gcs_error_subscriber.poll()

+++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
❯ pytest test_streaming_integration.py -k test_streaming_fault_tolerance
Test session starts (platform: darwin, Python 3.8.18, pytest 7.0.1, pytest-sugar 0.9.5)
rootdir: /Users/mowen/code/ray, configfile: pytest.ini
plugins: asyncio-0.16.0, docker-tools-3.1.3, sphinx-0.5.1.dev0, forked-1.4.0, repeat-0.9.3, sugar-0.9.5, timeout-2.1.0, shutil-1.7.0, rerunfailures-11.1.2, virtualenv-1.7.0, anyio-3.7.1, lazy-fixture-0.6.3, httpserver-1.0.6
timeout: 180.0s
timeout method: signal
timeout func_only: False
collecting ...

――――――――――――――――――――――――――――――――――――― test_streaming_fault_tolerance ―――――――――――――――――――――――――――――――――――――

ray_start_10_cpus_shared = RayContext(dashboard_url='', python_version='3.8.18', ray_version='3.0.0.dev0', ray_commit='{{RAY_COMMIT_SHA}}')
restore_data_context = None

    def test_streaming_fault_tolerance(ray_start_10_cpus_shared, restore_data_context):
        class RandomExit:
            def __call__(self, x):
                import os

                if random.random() > 0.9:
                    print("force exit")
                    os._exit(1)
                return x

        # Test recover.
        base = ray.data.range(1000, override_num_blocks=100)
        ds1 = base.map_batches(
            RandomExit, compute=ray.data.ActorPoolStrategy(size=4), max_task_retries=999
        )
>       ds1.take_all()

test_streaming_integration.py:586:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../dataset.py:2417: in take_all
    for row in self.iter_rows():
../iterator.py:241: in _wrapped_iterator
    for batch in batch_iterable:
../iterator.py:185: in _create_iterator
    for batch in iterator:
../_internal/block_batching/iter_batches.py:176: in iter_batches
    next_batch = next(async_batch_iter)
../_internal/util.py:939: in make_async_gen
    next_item = output_queue.get()
../_internal/util.py:841: in get
    self._consumer_semaphore.acquire()
/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py:433: in acquire
    self._cond.wait(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Condition(<unlocked _thread.lock object at 0x16aea1480>, 0)>, timeout = None

    def wait(self, timeout=None):
        """Wait until notified or until a timeout occurs.

        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.

        This method releases the underlying lock, and then blocks until it is
        awakened by a notify() or notify_all() call for the same condition
        variable in another thread, or until the optional timeout occurs. Once
        awakened or timed out, it re-acquires the lock and returns.

        When the timeout argument is present and not None, it should be a
        floating point number specifying a timeout for the operation in seconds
        (or fractions thereof).

        When the underlying lock is an RLock, it is not released using its
        release() method, since this may not actually unlock the lock when it
        was acquired multiple times recursively. Instead, an internal interface
        of the RLock class is used, which really unlocks it even when it has
        been recursively acquired several times. Another internal interface is
        then used to restore the recursion level when the lock is reacquired.

        """
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
>               waiter.acquire()
E               Failed: Timeout >180.0s

/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py:302: Failed
----------------------------------------- Captured stderr setup ------------------------------------------
2024-03-07 14:54:58,512	INFO worker.py:1752 -- Started a local Ray instance.
------------------------------------------ Captured stdout call ------------------------------------------
(raylet) A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: ffffffffffffffffac7f6bf64de51a664888c44901000000 Worker ID: 9bda3b0ba9d741da336d15d5fce8a7f1488ddfd24b2fb5d7f2873234 Node ID: 59db3c3b41503230398b7914a94ffe2d37f0c2bc21dbafdae6a27013 Worker IP address: 127.0.0.1 Worker port: 53034 Worker PID: 8563 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
(MapWorker(MapBatches(RandomExit)) pid=8563) force exit
------------------------------------------ Captured stderr call ------------------------------------------
2024-03-07 14:54:59,796	WARNING util.py:561 -- The argument ``compute`` is deprecated in Ray 2.9. Please specify argument ``concurrency`` instead. For more information, see https://docs.ray.io/en/master/data/transforming-data.html#stateful-transforms.
2024-03-07 14:54:59,803	INFO streaming_executor.py:115 -- Starting execution of Dataset. Full log is in /tmp/ray/session_2024-03-07_14-54-56_794698_8469/logs/ray-data.log
2024-03-07 14:54:59,803	INFO streaming_executor.py:116 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> ActorPoolMapOperator[MapBatches(RandomExit)]

Running 0:   0%|          | 0/100 [00:00<?, ?it/s]
Running: 4/10.0 CPU, 0/0.0 GPU, 2.5KB/75.0MB object_store_memory:  15%|█▌        | 15/100 [00:01<00:06, 13.85it/s]ches(RandomExit) 2:   0%|          | 0/100 [00:00<?, ?it/s]

- MapBatches(RandomExit): 16 active, 12 queued, [cpu: 4.0, objects: 320.0B], 4 actors [0/31 objects local]- ReadRange: 5 active, 12 queued, [cpu: 5.0, objects: 3.2KB]:  39%|███▉      | 39/100 [00:02<00:01, 35.99i- ReadRange: 5 active, 12 queued, [cpu: 5.0, objects: 3.2KB]:  84%|████████▍ | 84/100 [00:02<00:00, 39.79it/s]
Running: 4/10.0 CPU, 0/0.0 GPU, 3.4KB/75.0MB object_store_memory:  15%|█▌        | 15/100 [00:02<00:06, 13Running: 4/10.0 CPU, 0/0.0 GPU, 3.4KB/75.0MB object_store_memory:  48%|████▊     | 48/100 [00:02<00:02, 23.79it/s]███▊     | 48/100 [00:02<00:02, 23.95it/s]

Running: 4/10.0 CPU, 0/0.0 GPU, 1.6KB/75.0MB object_store_memory:  48%|████▊     | 48/100 [00:03<00:02, 23Running: 4/10.0 CPU, 0/0.0 GPU, 1.6KB/75.0MB object_store_memory:  81%|████████  | 81/100 [00:03<00:00, 24.91it/s]██████▏ | 82/100 [00:03<00:00, 25.28it/s]
- ReadRange: 0 active, 0 queued, [cpu: 0.0, objects: 400.0B]:  84%|████████▍ | 84/100 [00:03<00:00, 39.79i- ReadRange: 0 active, 0 queued, [cpu: 0.0, objects: 400.0B]: 100%|██████████| 100/100 [00:03<00:00, 26.09- ReadRange: 0 active, 0 queued, [cpu: 0.0, objects: 400.0B]: 100%|██████████| 100/100 [00:20<00:00, 26.09Running: 4/10.0 CPU, 0/0.0 GPU, 1.6KB/75.0MB object_store_memory:  81%|████████  | 81/100 [00:20<00:00, 24.91it/s]ches(RandomExit): 16 active, 3 queued, [cpu: 4.0, objects: 320.0B], 4 actors [0/97 objects local]:+++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++

~~~~~~~~~~~~~~~~~~~~~~~ Stack of Thread-3 (11398295552) ~~~~~~~~~~~~~~~~~~~~~~~~
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/mowen/code/ray/python/ray/data/_internal/util.py", line 918, in execute_computation
    for item in fn(thread_safe_generator):
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 167, in _async_iter_batches
    yield from extract_data_from_batch(batch_iter)
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/util.py", line 211, in extract_data_from_batch
    for batch in batch_iter:
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 306, in restore_original_order
    for batch in batch_iter:
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 218, in threadpool_computations_format_collate
    yield from formatted_batch_iter
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/util.py", line 159, in format_batches
    for batch in block_iter:
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/util.py", line 118, in blocks_to_batches
    for block in block_iter:
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/util.py", line 55, in resolve_block_refs
    for block_ref in block_ref_iter:
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 254, in prefetch_batches_locally
    for block_ref, metadata in block_ref_iter:
  File "/Users/mowen/code/ray/python/ray/data/_internal/util.py", line 898, in __next__
    return next(self.it)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/legacy_compat.py", line 45, in execute_to_legacy_block_iterator
    for bundle in bundle_iter:
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
    return self.get_next()
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 152, in get_next
    item = self._outer._output_node.get_output_blocking(
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 280, in get_output_blocking
    time.sleep(0.01)

~~~~~~~~~~~~~~~~~~~~~~ Stack of Prefetcher (11381469184) ~~~~~~~~~~~~~~~~~~~~~~~
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/mowen/code/ray/python/ray/data/_internal/block_batching/util.py", line 243, in _run
    self._condition.wait()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 302, in wait
    waiter.acquire()

~~~~~~~~~~~~~~~~~~~~~~~ Stack of Thread-2 (11364642816) ~~~~~~~~~~~~~~~~~~~~~~~~
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/mowen/code/ray/python/ray/data/_internal/stats.py", line 569, in _run_update_loop
    time.sleep(StatsManager.STATS_ACTOR_UPDATE_INTERVAL_SECONDS)

~~ Stack of StreamingExecutor-ff4dad507a3e4ff688a03708b29f56a1 (11347816448) ~~~
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 222, in run
    continue_sched = self._scheduling_loop_step(self._topology)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 281, in _scheduling_loop_step
    num_errored_blocks = process_completed_tasks(
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 392, in process_completed_tasks
    ready, _ = ray.wait(
  File "/Users/mowen/code/ray/python/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/Users/mowen/code/ray/python/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/Users/mowen/code/ray/python/ray/_private/worker.py", line 2893, in wait
    ready_ids, remaining_ids = worker.core_worker.wait(

~~~~~~~~~~~~~~~~~~~~~~~ Stack of Thread-1 (11330990080) ~~~~~~~~~~~~~~~~~~~~~~~~
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/site-packages/tqdm/_monitor.py", line 60, in run
    self.was_killed.wait(self.sleep_interval)
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 558, in wait
    signaled = self._cond.wait(timeout)
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 306, in wait
    gotit = waiter.acquire(True, timeout)

~~~~~~~~~~~~~~~~~~~~ Stack of ray_print_logs (11314163712) ~~~~~~~~~~~~~~~~~~~~~
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/mowen/code/ray/python/ray/_private/worker.py", line 901, in print_logs
    data = subscriber.poll()

~~~~~~~~~~~~~~~ Stack of ray_listen_error_messages (11000901632) ~~~~~~~~~~~~~~~
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/Users/mowen/miniforge3/envs/ray/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/mowen/code/ray/python/ray/_private/worker.py", line 2139, in listen_error_messages
    _, error_data = worker.gcs_error_subscriber.poll()

+++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++

 python/ray/data/tests/test_streaming_integration.py ⨯                                     100% ██████████
======================================== short test summary info =========================================
FAILED test_streaming_integration.py::test_streaming_fault_tolerance - Failed: Timeout >180.0s

Results (181.60s):
       1 failed
         - python/ray/data/tests/test_streaming_integration.py:571 test_streaming_fault_tolerance
      15 deselected

Versions / Dependencies

Current master has this issue, but this seems to have been present for some time.

Reproduction script

cd python/ray/data/tests
pytest test_streaming_integration.py -k test_streaming_fault_tolerance

Note: this does not fail every time (and running multiple instances of the test seems to result in increased failures).

Issue Severity

Low: It annoys or frustrates me.

@omatthew98 omatthew98 added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) P2 Important issue, but not time-critical data Ray Data-related issues and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Mar 7, 2024
@omatthew98
Copy link
Contributor Author

After more debugging, the issue seems to come from this section of the test

  base = ray.data.range(1000, override_num_blocks=100)
  ds1 = base.map_batches(
      RandomExit, compute=ray.data.ActorPoolStrategy(size=4), max_task_retries=999
  )
  ds1.take_all()

Some of the time (likely due to the exits), tasks do not become ready causing the process to hang. This call to ray.wait is where the hung tasks do not ever become ready.

@raulchen
Copy link
Contributor

depends on #43914

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P2 Important issue, but not time-critical
Projects
None yet
Development

No branches or pull requests

2 participants