Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streaming: frequent timeout in end-to-end tests #6617

Closed
Tracked by #6640
BugenZhao opened this issue Nov 28, 2022 · 3 comments
Closed
Tracked by #6640

streaming: frequent timeout in end-to-end tests #6617

BugenZhao opened this issue Nov 28, 2022 · 3 comments
Assignees
Labels
component/streaming Stream processing related issue. priority/critical type/bug Something isn't working

Comments

@BugenZhao
Copy link
Member

BugenZhao commented Nov 28, 2022

Recently we have encountered timeouts frequently in end-to-end tests on CI. Here're some samples of the main workflow. I'll investigate it with async-stack-trace.

May related:

Investigation

I've run all e2e tests locally for hours (😄) and finally reproduced the stuck. Here's the async stack trace. async-stack-trace.txt

Env: risedev ci-start ci-3cn-3fe, ci-release, parallel e2e

  1. We're stuck when running the SQL SELECT 1 AS v1 FROM m16 AS mm16 JOIN m16 ON mm16.v1 = m16.v1, which is in mv_on_mv.slt.
[captured 503.54091ms ago]
Actor 505053: `SELECT 1 AS v1 FROM m16 AS mm16 JOIN m16 ON mm16.v1 = m16.v1` [378.999355834s]
...
  1. Some actors cannot receive a new barrier for ~370 secs. The queue of concurrent barriers are likely to be full.
>> Actor 505036
[captured 571.610386ms ago]
Actor 505036: `` [379.003356032s]
  Epoch 3433058452635648 [!!! 370.139258916s]
    SourceExecutor 7B4CC00000006 (actor 505036, executor 6) [!!! 370.139258916s]
      source_recv_barrier [!!! 370.139258916s]
  1. This is because the earliest barrier with epoch 3433057878016000 cannot be collected, according to the RPC traces.
--- RPC Traces ---
>> RPC 127.0.0.1:5687 (1044022)
[captured 467.23672ms ago]
/stream_service.StreamService/BarrierComplete:1044022 [378.999355588s]
  collect_barrier (epoch 3433057878016000) [!!! 378.999355588s]
  1. So why can't we collect this barrier? Search for 3433057878016000 and we find that most actors from Actor 505048 to Actor 505059 (4*3=12 parallelisms of one fragment) have a root span of Epoch 3433057878016000, which means it has been collected. But there're two actors behaving strangely: they are still on the <initial> epoch. 🤔
>> Actor 505054
[captured 379.501681106s ago]
Actor 505054: `SELECT 1 AS v1 FROM m16 AS mm16 JOIN m16 ON mm16.v1 = m16.v1` [4.000044ms]
  Epoch <initial> [4.000044ms]
    MaterializeExecutor 7B4DE0000003F (actor 505054, executor 63) [4.000044ms]
      expect_first_barrier [4.000044ms]
        ProjectExecutor 7B4DE0000003E (actor 505054, executor 62) [4.000044ms]
          HashJoinExecutor 7B4DE0000003C (actor 505054, executor 60) [4.000044ms]
            expect_first_barrier [4.000044ms]
              MergeExecutor 7B4DE00000039 (actor 505054, executor 57) [4.000044ms]
                LocalInput (actor 505076) [4.000044ms]
                LocalInput (actor 505077) [4.000044ms]
                LocalInput (actor 505078) [4.000044ms]
                LocalInput (actor 505079) [4.000044ms]
              MergeExecutor 7B4DE0000003A (actor 505054, executor 58) [4.000044ms]
                LocalInput (actor 505064) [4.000044ms]
                LocalInput (actor 505065) [4.000044ms]
                LocalInput (actor 505066) [4.000044ms]
                LocalInput (actor 505067) [4.000044ms]
  1. The second line tells us the answer to the mystery. The stack trace of this actor is captured 379.501681106s ago. The stack trace is reported periodically from the same tokio task of the actor (with futures::join) since it's thread-local.
    Normally, only if the thread is busy with some CPU-intensive tasks, will we miss the reporting period and get a stale trace. This shouldn't be the case as we're stuck, and the CPU tends to be idle.
    So why the tokio task cannot be scheduled? It's likely because this worker thread has been parked to the OS and not scheduled. So tokio cannot schedule the async tasks on it as well.
  2. Let's check if there're other tasks not scheduled for a long time with regex captured 3.+\ds ago. We can find 8 actors, and all of them are scheduled to the second compute node at :5688. So there must be some problems in this process.
  3. To verify "thread parked", let's check the "sync" stack trace with gdb -batch -ex 'thread apply all bt' -p 3786162. sync-stack-trace.txt
    It shows that most threads are parked actively by tokio runtime as there's no task to schedule.
Thread 29 (Thread 0x7fac16bfb640 (LWP 3786211) "compute-node"):
#0  syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1  0x0000557a37f72e15 in parking_lot_core::thread_parker::imp::ThreadParker::futex_wait (self=0x7fac16bfb4b8, ts=<error reading variable: Cannot access memory at address 0x0>) at /home/bugenzhao/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.9.4/src/thread_parker/linux.rs:112
#2  parking_lot_core::thread_parker::imp::{impl#0}::park (self=0x7fac16bfb4b8) at /home/bugenzhao/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.9.4/src/thread_parker/linux.rs:66
#3  parking_lot_core::parking_lot::park::{closure#0}<parking_lot::condvar::{impl#1}::wait_until_internal::{closure_env#0}, parking_lot::condvar::{impl#1}::wait_until_internal::{closure_env#1}, parking_lot::condvar::{impl#1}::wait_until_internal::{closure_env#2}> (thread_data=0x7fac16bfb458) at /home/bugenzhao/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.9.4/src/parking_lot.rs:635
#4  parking_lot_core::parking_lot::with_thread_data<parking_lot_core::parking_lot::ParkResult, parking_lot_core::parking_lot::park::{closure_env#0}<parking_lot::condvar::{impl#1}::wait_until_internal::{closure_env#0}, parking_lot::condvar::{impl#1}::wait_until_internal::{closure_env#1}, parking_lot::condvar::{impl#1}::wait_until_internal::{closure_env#2}>> () at /home/bugenzhao/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.9.4/src/parking_lot.rs:600
#5  parking_lot_core::parking_lot::park<parking_lot::condvar::{impl#1}::wait_until_internal::{closure_env#0}, parking_lot::condvar::{impl#1}::wait_until_internal::{closure_env#1}, parking_lot::condvar::{impl#1}::wait_until_internal::{closure_env#2}> (park_token=..., key=<optimized out>, validate=..., before_sleep=..., timed_out=..., timeout=<error reading variable: Cannot access memory at address 0x8>) at /home/bugenzhao/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.9.4/src/parking_lot.rs:600
#6  parking_lot::condvar::Condvar::wait_until_internal (self=<optimized out>, mutex=<optimized out>, timeout=...) at src/condvar.rs:333
#7  0x0000557a37f3cade in parking_lot::condvar::Condvar::wait<()> () at src/loom/std/parking_lot.rs:150
#8  tokio::loom::std::parking_lot::Condvar::wait<()> (self=0x7fac491bdfa8, guard=...) at src/loom/std/parking_lot.rs:150
#9  tokio::runtime::scheduler::multi_thread::park::Inner::park_condvar (self=0x7fac491bdfa0) at src/runtime/scheduler/multi_thread/park.rs:153
#10 tokio::runtime::scheduler::multi_thread::park::Inner::park (self=0x7fac491bdfa0, handle=<optimized out>) at src/runtime/scheduler/multi_thread/park.rs:124
#11 tokio::runtime::scheduler::multi_thread::park::Parker::park (self=<optimized out>, handle=<optimized out>) at src/runtime/scheduler/multi_thread/park.rs:68
#12 0x0000557a37f3140f in tokio::runtime::scheduler::multi_thread::worker::Context::park_timeout (self=0x7fac16bf64b8, core=0x7fac7c60ff00, duration=...) at src/runtime/scheduler/multi_thread/worker.rs:535
#13 0x0000557a37f3095f in tokio::runtime::scheduler::multi_thread::worker::Context::park (self=0x7fac16bf64b8, core=0x7fac7c60ff00) at src/runtime/scheduler/multi_thread/worker.rs:506
  1. However, there're some exceptions: some threads are parked on the parking-lot lock in risingwave_common::cache::LruCache. This is abnormal, as the mutex guard cannot be held across the await point, and we assume the critical section is lightweight. By diving it more deeply, we find a different stack trace.
Thread 78 (Thread 0x7fab49bfc640 (LWP 3786260) "compute-node"):
#0  syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1  0x0000557a37f6c039 in parking_lot_core::thread_parker::imp::ThreadParker::futex_wait (self=0x7fab49bfc4b8, ts=<error reading variable: Cannot access memory at address 0x0>) at /home/bugenzhao/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.9.4/src/thread_parker/linux.rs:112
#2  parking_lot_core::thread_parker::imp::{impl#0}::park (self=0x80) at /home/bugenzhao/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.9.4/src/thread_parker/linux.rs:66
#3  parking_lot_core::parking_lot::park::{closure#0}<parking_lot::raw_mutex::{impl#3}::lock_slow::{closure_env#0}, parking_lot::raw_mutex::{impl#3}::lock_slow::{closure_env#1}, parking_lot::raw_mutex::{impl#3}::lock_slow::{closure_env#2}> (thread_data=0x7fab49bfc458) at /home/bugenzhao/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.9.4/src/parking_lot.rs:635
#4  parking_lot_core::parking_lot::with_thread_data<parking_lot_core::parking_lot::ParkResult, parking_lot_core::parking_lot::park::{closure_env#0}<parking_lot::raw_mutex::{impl#3}::lock_slow::{closure_env#0}, parking_lot::raw_mutex::{impl#3}::lock_slow::{closure_env#1}, parking_lot::raw_mutex::{impl#3}::lock_slow::{closure_env#2}>> () at /home/bugenzhao/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.9.4/src/parking_lot.rs:600
#5  parking_lot_core::parking_lot::park<parking_lot::raw_mutex::{impl#3}::lock_slow::{closure_env#0}, parking_lot::raw_mutex::{impl#3}::lock_slow::{closure_env#1}, parking_lot::raw_mutex::{impl#3}::lock_slow::{closure_env#2}> (key=140378797998464, park_token=..., timeout=..., validate=..., before_sleep=..., timed_out=...) at /home/bugenzhao/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.9.4/src/parking_lot.rs:600
#6  parking_lot::raw_mutex::RawMutex::lock_slow (self=<optimized out>, timeout=...) at src/raw_mutex.rs:262
#7  0x0000557a35519c2c in parking_lot::raw_mutex::{impl#0}::lock (self=0x7fac7c638180) at /home/bugenzhao/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot-0.12.1/src/raw_mutex.rs:72
#8  lock_api::mutex::Mutex<parking_lot::raw_mutex::RawMutex, risingwave_common::cache::LruCacheShard<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>>>::lock<parking_lot::raw_mutex::RawMutex, risingwave_common::cache::LruCacheShard<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>>> () at src/common/src/cache.rs:660
#9  risingwave_common::cache::LruCache<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>>::release<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>> (self=0x7fac7c6b22a0, handle=0x7faa7ff43e40) at src/common/src/cache.rs:660
#10 risingwave_common::cache::{impl#15}::drop<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>> (self=<optimized out>) at src/common/src/cache.rs:865
#11 0x0000557a356b1d56 in core::ptr::drop_in_place<risingwave_common::cache::CacheableEntry<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>>> () at /rustc/b8c35ca26b191bb9a9ac669a4b3f4d3d52d97fb1/library/core/src/ptr/mod.rs:491
#12 core::ptr::drop_in_place<core::option::Option<risingwave_common::cache::CacheableEntry<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>>>> () at /rustc/b8c35ca26b191bb9a9ac669a4b3f4d3d52d97fb1/library/core/src/ptr/mod.rs:491
#13 core::ptr::drop_in_place<core::cell::UnsafeCell<core::option::Option<risingwave_common::cache::CacheableEntry<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>>>>> () at /rustc/b8c35ca26b191bb9a9ac669a4b3f4d3d52d97fb1/library/core/src/ptr/mod.rs:491
#14 core::ptr::drop_in_place<tokio::loom::std::unsafe_cell::UnsafeCell<core::option::Option<risingwave_common::cache::CacheableEntry<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>>>>> () at /rustc/b8c35ca26b191bb9a9ac669a4b3f4d3d52d97fb1/library/core/src/ptr/mod.rs:491
#15 core::ptr::drop_in_place<tokio::sync::oneshot::Inner<risingwave_common::cache::CacheableEntry<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>>>> () at /rustc/b8c35ca26b191bb9a9ac669a4b3f4d3d52d97fb1/library/core/src/ptr/mod.rs:491
#16 alloc::sync::Arc<tokio::sync::oneshot::Inner<risingwave_common::cache::CacheableEntry<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>>>>::drop_slow<tokio::sync::oneshot::Inner<risingwave_common::cache::CacheableEntry<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>>>> (self=<optimized out>) at /rustc/b8c35ca26b191bb9a9ac669a4b3f4d3d52d97fb1/library/alloc/src/sync.rs:1114
#17 0x0000557a35798ce2 in alloc::sync::{impl#27}::drop<tokio::sync::oneshot::Inner<risingwave_common::cache::CacheableEntry<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>>>> (self=0x7fab49bf6170) at /rustc/b8c35ca26b191bb9a9ac669a4b3f4d3d52d97fb1/library/alloc/src/sync.rs:1711
#18 core::ptr::drop_in_place<alloc::sync::Arc<tokio::sync::oneshot::Inner<risingwave_common::cache::CacheableEntry<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>>>>> () at /rustc/b8c35ca26b191bb9a9ac669a4b3f4d3d52d97fb1/library/core/src/ptr/mod.rs:491
#19 tokio::sync::oneshot::Sender<risingwave_common::cache::CacheableEntry<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>>>::send<risingwave_common::cache::CacheableEntry<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>>> (self=..., t=...) at /home/bugenzhao/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.22.0/src/sync/oneshot.rs:626
#20 0x0000557a35509899 in risingwave_common::cache::LruCache<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>>::insert<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>> (self=0x7fa9d45c5218, key=31664, hash=31664, charge=226, value=<optimized out>) at src/common/src/cache.rs:692
#21 0x0000557a355a82e6 in risingwave_common::cache::{impl#11}::lookup_with_request_dedup::{async_fn#0}::{async_block#0}<u64, alloc::boxed::Box<risingwave_storage::hummock::sstable::Sstable, alloc::alloc::Global>, risingwave_storage::hummock::sstable_store::{impl#3}::sstable::{async_fn#0}::{closure_env#0}, risingwave_storage::hummock::error::HummockError, core::future::from_generator::GenFuture<risingwave_storage::hummock::sstable_store::{impl#3}::sstable::{async_fn#0}::{closure#0}::{async_block_env#0}>> () at src/common/src/cache.rs:827
  1. This leads to a recent change in fix(cache): do not drop cache-entry inside lock #6315. Actually, we encounter the same deadlock here. The Drop of CacheableEntry acquires the lock of its shard, so dropping it inside the scope of a lock guard will lead to deadlock. fix(cache): do not drop cache-entry inside lock #6315 tried to fix that by deallocating the returned entries, due to send error, i.e., the oneshot channel receiver dropped.
    let handle = unsafe {
    let mut shard = self.shards[self.shard(hash)].lock();
    let pending_request = shard.write_request.remove(&key);
    let ptr = shard.insert(key, hash, charge, value, &mut to_delete);
    debug_assert!(!ptr.is_null());
    if let Some(que) = pending_request {
    for sender in que {
    (*ptr).add_ref();
    if let Err(e) = sender.send(CacheableEntry {
    cache: self.clone(),
    handle: ptr,
    }) {
    errs.push(e);
    }
    }
    }
    CacheableEntry {
    cache: self.clone(),
    handle: ptr,
    }
    };

    However, this is not the only case. if the receiver is dropped right after we check whether it's dropped, then the send won't return Err, while the inner wrapping the value will be dropped before returning Ok.
pub fn send(mut self, t: T) -> Result<(), T> {
    let inner = self.inner.take().unwrap();

    inner.value.with_mut(|ptr| unsafe {
        *ptr = Some(t);
    });

    if !inner.complete() {
        unsafe {
            return Err(inner.consume_value().unwrap());
        }
    }

-> receiver dropped here

    Ok(())
}

Considerations

This explains the reasons why it's hard to reproduce:

  • The backfill executor makes the dropping of an unfinished storage iterator more frequent. By dropping the iterator, it's likely an async LRU cache request is canceled.
  • The race case is really subtle.

Some thoughts:

@BugenZhao BugenZhao added type/bug Something isn't working component/streaming Stream processing related issue. priority/critical labels Nov 28, 2022
@BugenZhao BugenZhao self-assigned this Nov 28, 2022
@github-actions github-actions bot added this to the release-0.1.15 milestone Nov 28, 2022
@BugenZhao BugenZhao changed the title streaming: investigate timeout in end-to-end tests streaming: frequent timeout in end-to-end tests Nov 28, 2022
@BugenZhao
Copy link
Member Author

This is caused by a parking-lot deadlock in the LRU cache, which is not fully covered by #6315. I'll refine the context later. cc @chenzl25 @Little-Wallace

@BugenZhao
Copy link
Member Author

Updated. cc @Little-Wallace Could you please take a look?

@BugenZhao
Copy link
Member Author

No more reproductions after #6634. I believe this is resolved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/streaming Stream processing related issue. priority/critical type/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants