Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(services/hdfs): Atomic write for hdfs #3875

Merged
merged 27 commits into from
Jan 3, 2024

Conversation

shbhmrzd
Copy link
Contributor

@shbhmrzd shbhmrzd commented Jan 1, 2024

For #3724
@Xuanwo Please help with an error I am facing during tests

error[E0277]: `(dyn futures::Future<Output = std::result::Result<(), types::error::Error>> + std::marker::Send + 'static)` cannot be shared between threads safely
   --> core/src/services/hdfs/writer.rs:58:21
    |
58  | impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
    |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^ `(dyn futures::Future<Output = std::result::Result<(), types::error::Error>> + std::marker::Send + 'static)` cannot be shared between threads safely
    |
    = help: the trait `Sync` is not implemented for `(dyn futures::Future<Output = std::result::Result<(), types::error::Error>> + std::marker::Send + 'static)`
    = note: required for `Unique<(dyn futures::Future<Output = std::result::Result<(), types::error::Error>> + std::marker::Send + 'static)>` to implement `Sync`
note: required because it appears within the type `Box<dyn Future<Output = Result<(), Error>> + Send>`
   --> /Users/sraizada/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/boxed.rs:195:12
    |
195 | pub struct Box<
    |            ^^^
note: required because it appears within the type `Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>`
   --> /Users/sraizada/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/pin.rs:410:12
    |
410 | pub struct Pin<P> {
    |            ^^^
note: required because it appears within the type `Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>`
   --> /Users/sraizada/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/option.rs:569:10
    |
569 | pub enum Option<T> {
    |          ^^^^^^
note: required because it appears within the type `HdfsWriter<AsyncFile>`
   --> core/src/services/hdfs/writer.rs:32:12
    |
32  | pub struct HdfsWriter<F> {
    |            ^^^^^^^^^^
note: required by a bound in `raw::oio::write::api::Write`
   --> core/src/raw/oio/write/api.rs:77:33
    |
77  | pub trait Write: Unpin + Send + Sync {
    |                                 ^^^^ required by this bound in `Write`

error[E0277]: `(dyn futures::Future<Output = std::result::Result<(), types::error::Error>> + std::marker::Send + 'static)` cannot be shared between threads safely
   --> core/src/services/hdfs/writer.rs:101:29
    |
101 | impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
    |                             ^^^^^^^^^^^^^^^^^^^^^^ `(dyn futures::Future<Output = std::result::Result<(), types::error::Error>> + std::marker::Send + 'static)` cannot be shared between threads safely
    |
    = help: the trait `Sync` is not implemented for `(dyn futures::Future<Output = std::result::Result<(), types::error::Error>> + std::marker::Send + 'static)`
    = note: required for `Unique<(dyn futures::Future<Output = std::result::Result<(), types::error::Error>> + std::marker::Send + 'static)>` to implement `Sync`
note: required because it appears within the type `Box<dyn Future<Output = Result<(), Error>> + Send>`
   --> /Users/sraizada/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/boxed.rs:195:12
    |
195 | pub struct Box<
    |            ^^^
note: required because it appears within the type `Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>`
   --> /Users/sraizada/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/pin.rs:410:12
    |
410 | pub struct Pin<P> {
    |            ^^^
note: required because it appears within the type `Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>`
   --> /Users/sraizada/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/option.rs:569:10
    |
569 | pub enum Option<T> {
    |          ^^^^^^
note: required because it appears within the type `HdfsWriter<File>`
   --> core/src/services/hdfs/writer.rs:32:12
    |
32  | pub struct HdfsWriter<F> {
    |            ^^^^^^^^^^
note: required by a bound in `raw::oio::write::api::BlockingWrite`
   --> core/src/raw/oio/write/api.rs:213:33
    |
213 | pub trait BlockingWrite: Send + Sync + 'static {
    |                                 ^^^^ required by this bound in `BlockingWrite`

error[E0277]: `(dyn futures::Future<Output = std::result::Result<(), types::error::Error>> + std::marker::Send + 'static)` cannot be shared between threads safely
   --> core/src/services/hdfs/backend.rs:249:19
    |
249 |     type Writer = HdfsWriter<hdrs::AsyncFile>;
    |                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^ `(dyn futures::Future<Output = std::result::Result<(), types::error::Error>> + std::marker::Send + 'static)` cannot be shared between threads safely
    |
    = help: the trait `Sync` is not implemented for `(dyn futures::Future<Output = std::result::Result<(), types::error::Error>> + std::marker::Send + 'static)`
    = help: the following other types implement trait `raw::oio::write::api::Write`:
              Box<T>
              ErrorContextWrapper<T>
              CompleteWriter<W>
              ConcurrentLimitWrapper<R>
              LoggingWriter<W>
              TimeoutWrapper<R>
              RetryWrapper<R, I>
              enum_utils::TwoWays<ONE, TWO>
            and 12 others
    = note: required for `Unique<(dyn futures::Future<Output = std::result::Result<(), types::error::Error>> + std::marker::Send + 'static)>` to implement `Sync`
note: required because it appears within the type `Box<dyn Future<Output = Result<(), Error>> + Send>`
   --> /Users/sraizada/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/boxed.rs:195:12
    |
195 | pub struct Box<
    |            ^^^
note: required because it appears within the type `Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>`
   --> /Users/sraizada/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/pin.rs:410:12
    |
410 | pub struct Pin<P> {
    |            ^^^
note: required because it appears within the type `Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>`
   --> /Users/sraizada/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/option.rs:569:10
    |
569 | pub enum Option<T> {
    |          ^^^^^^
note: required because it appears within the type `HdfsWriter<AsyncFile>`
   --> core/src/services/hdfs/writer.rs:32:12
    |
32  | pub struct HdfsWriter<F> {
    |            ^^^^^^^^^^
note: required for `<HdfsBackend as accessor::Accessor>::Writer` to implement `raw::oio::write::api::Write`
   --> core/src/raw/oio/write/api.rs:77:11
    |
77  | pub trait Write: Unpin + Send + Sync {
    |           ^^^^^
note: required by a bound in `accessor::Accessor::Writer`
   --> core/src/raw/accessor.rs:65:18
    |
65  |     type Writer: oio::Write;
    |                  ^^^^^^^^^^ required by this bound in `Accessor::Writer`

error[E0277]: `(dyn futures::Future<Output = std::result::Result<(), types::error::Error>> + std::marker::Send + 'static)` cannot be shared between threads safely
   --> core/src/services/hdfs/backend.rs:250:27
    |
250 |     type BlockingWriter = HdfsWriter<hdrs::File>;
    |                           ^^^^^^^^^^^^^^^^^^^^^^ `(dyn futures::Future<Output = std::result::Result<(), types::error::Error>> + std::marker::Send + 'static)` cannot be shared between threads safely
    |
    = help: the trait `Sync` is not implemented for `(dyn futures::Future<Output = std::result::Result<(), types::error::Error>> + std::marker::Send + 'static)`
    = help: the following other types implement trait `raw::oio::write::api::BlockingWrite`:
              Box<T>
              ErrorContextWrapper<T>
              CompleteWriter<W>
              ConcurrentLimitWrapper<R>
              LoggingWriter<W>
              BlockingWrapper<I>
              RetryWrapper<R, I>
              kv::backend::KvWriter<S>
            and 4 others
    = note: required for `Unique<(dyn futures::Future<Output = std::result::Result<(), types::error::Error>> + std::marker::Send + 'static)>` to implement `Sync`
note: required because it appears within the type `Box<dyn Future<Output = Result<(), Error>> + Send>`
   --> /Users/sraizada/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/boxed.rs:195:12
    |
195 | pub struct Box<
    |            ^^^
note: required because it appears within the type `Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>`
   --> /Users/sraizada/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/pin.rs:410:12
    |
410 | pub struct Pin<P> {
    |            ^^^
note: required because it appears within the type `Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>`
   --> /Users/sraizada/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/option.rs:569:10
    |
569 | pub enum Option<T> {
    |          ^^^^^^
note: required because it appears within the type `HdfsWriter<File>`
   --> core/src/services/hdfs/writer.rs:32:12
    |
32  | pub struct HdfsWriter<F> {
    |            ^^^^^^^^^^
note: required for `<HdfsBackend as accessor::Accessor>::BlockingWriter` to implement `raw::oio::write::api::BlockingWrite`
   --> core/src/raw/oio/write/api.rs:213:11
    |
213 | pub trait BlockingWrite: Send + Sync + 'static {
    |           ^^^^^^^^^^^^^
note: required by a bound in `accessor::Accessor::BlockingWriter`
   --> core/src/raw/accessor.rs:68:26
    |
68  |     type BlockingWriter: oio::BlockingWrite;
    |                          ^^^^^^^^^^^^^^^^^^ required by this bound in `Accessor::BlockingWriter`
    ```

@shbhmrzd shbhmrzd marked this pull request as draft January 1, 2024 00:18
@shbhmrzd
Copy link
Contributor Author

shbhmrzd commented Jan 1, 2024

Pushed a fix for the above error but the following test case is now failing

thread '<unnamed>' panicked at core/tests/behavior/append.rs:59:5:
  2024-01-01T01:11:05.357805Z DEBUG opendal::services: service=hdfs operation=stat path=8133b9ea-130a-44e2-ae1e-908ba963adcc -> finished: RpStat { meta: Metadata { metakey: FlagSet(Complete | Mode | ContentLength | LastModified), mode: FILE, cache_control: None, content_disposition: None, content_length: Some(0), content_md5: None, content_range: None, content_type: None, etag: None, last_modified: Some(2024-01-01T01:11:05Z), version: None } }
assertion `left == right` failed
    at core/src/layers/logging.rs:466
  left: 0

 right: 1352042
stack backtrace:
  2024-01-01T01:11:05.361107Z TRACE opendal::services: service=hdfs operation=BlockingWriter::write path=f4fe7491-e576-4ad2-aae2-99e63715c046 written=37770186B -> input data 652913B, write 652913B
    at core/src/layers/logging.rs:1372

  2024-01-01T01:11:05.404846Z TRACE opendal::services: service=hdfs operation=BlockingWriter::write path=f4fe7491-e576-4ad2-aae2-99e63715c046 written=38117071B -> input data 346885B, write 346885B
    at core/src/layers/logging.rs:1372

  2024-01-01T01:11:05.422626Z TRACE opendal::services: service=hdfs operation=BlockingWriter::write path=f4fe7491-e576-4ad2-aae2-99e63715c046 written=38200726B -> input data 83655B, write 83655B
    at core/src/layers/logging.rs:1372

   0: rust_begin_unwind
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:645:5
   1: core::panicking::panic_fmt
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panicking.rs:72:14
   2: core::panicking::assert_failed_inner
   3: core::panicking::assert_failed
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panicking.rs:279:5
   4: behavior::append::test_append_create_append::{{closure}}
             at ./tests/behavior/append.rs:59:5
   5: tokio::runtime::park::CachedParkThread::block_on::{{closure}}
             at /home/runner/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.34.0/src/runtime/park.rs:282:63
   6: tokio::runtime::coop::with_budget
             at /home/runner/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.34.0/src/runtime/coop.rs:107:5
   7: tokio::runtime::coop::budget
             at /home/runner/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.34.0/src/runtime/coop.rs:73:5
   8: tokio::runtime::park::CachedParkThread::block_on
             at /home/runner/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.34.0/src/runtime/park.rs:282:31
   9: tokio::runtime::context::blocking::BlockingRegionGuard::block_on
             at /home/runner/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.34.0/src/runtime/context/blocking.rs:66:9
  10: tokio::runtime::handle::Handle::block_on::{{closure}}
             at /home/runner/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.34.0/src/runtime/handle.rs:310:13
  11: tokio::runtime::context::runtime::enter_runtime
             at /home/runner/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.34.0/src/runtime/context/runtime.rs:65:16
  12: tokio::runtime::handle::Handle::block_on
             at /home/runner/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.34.0/src/runtime/handle.rs:309:9
  13: behavior::utils::build_async_trial::{{closure}}
             at ./tests/behavior/utils.rs:77:9
  14: libtest_mimic::Trial::test::{{closure}}
             at /home/runner/.cargo/registry/src/index.crates.io-6f17d22bba15001f/libtest-mimic-0.6.1/src/lib.rs:112:54
  15: core::ops::function::FnOnce::call_once{{vtable.shim}}
             at /rustc/82e1608dfa6e0b556923[2559](https://github.com/apache/incubator-opendal/actions/runs/7373845953/job/20063555522?pr=3875#step:7:2560)e3d385fea5a93112/library/core/src/ops/function.rs:250:5
  16: <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/alloc/src/boxed.rs:2007:9
  17: libtest_mimic::run_single::{{closure}}
             at /home/runner/.cargo/registry/src/index.crates.io-6f17d22bba15001f/libtest-mimic-0.6.1/src/lib.rs:505:43
  18: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panic/unwind_safe.rs:272:9
  19: std::panicking::try::do_call
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:552:40
  20: __rust_try
  21: std::panicking::try
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:516:19
  22: std::panic::catch_unwind
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panic.rs:142:14
  23: libtest_mimic::run_single
             at /home/runner/.cargo/registry/src/index.crates.io-6f17d22bba15001f/libtest-mimic-0.6.1/src/lib.rs:505:5
  24: libtest_mimic::run::{{closure}}
             at /home/runner/.cargo/registry/src/index.crates.io-6f17d22bba15001f/libtest-mimic-0.6.1/src/lib.rs:476:35
  25: <F as threadpool::FnBox>::call_box
             at /home/runner/.cargo/registry/src/index.crates.io-6f17d22bba15001f/threadpool-1.8.1/src/lib.rs:95:9
  26: threadpool::spawn_in_pool::{{closure}}
             at /home/runner/.cargo/registry/src/index.crates.io-6f17d22bba15001f/threadpool-1.8.1/src/lib.rs:769:17
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
test behavior::test_append_create_append                     ... FAILED

@shbhmrzd
Copy link
Contributor Author

shbhmrzd commented Jan 1, 2024

@Xuanwo
I need your help as I am unable to find a fix for the failing test cases. Will take a break and get at it again. Meanwhile, please take a look at the failing test cases for hdfs-cluster and hdfs-default let me know if there are any suggestions.
Thank you!

.env.example Outdated Show resolved Hide resolved
core/src/services/hdfs/writer.rs Outdated Show resolved Hide resolved
core/src/services/hdfs/writer.rs Outdated Show resolved Hide resolved
core/src/services/hdfs/writer.rs Outdated Show resolved Hide resolved
core/src/services/hdfs/writer.rs Outdated Show resolved Hide resolved
@shbhmrzd
Copy link
Contributor Author

shbhmrzd commented Jan 2, 2024

@Xuanwo
Please take a look now.
Some test cases are still failing. I am not sure, what is going wrong. I would need your assistance.
Thank you!

core/src/services/hdfs/writer.rs Outdated Show resolved Hide resolved
.github/workflows/service_test_hdfs.yml Outdated Show resolved Hide resolved
@shbhmrzd shbhmrzd marked this pull request as ready for review January 2, 2024 16:52
@shbhmrzd
Copy link
Contributor Author

shbhmrzd commented Jan 2, 2024

Please review @Xuanwo
Thank you!

core/src/services/hdfs/writer.rs Outdated Show resolved Hide resolved
core/src/services/hdfs/writer.rs Outdated Show resolved Hide resolved
Copy link
Member

@Xuanwo Xuanwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect👍, thanks a lot!

@Xuanwo Xuanwo merged commit 52562e8 into apache:main Jan 3, 2024
41 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants