Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Add Write::sink API #2440

Merged
merged 8 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/docs/internals/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@
//! vendors that provide s3-like RESTful APIs, and our s3 service is
//! implemented to support all of them, not just AWS S3.
//!
//! Obviously, we can use `duck` as scheme, let's add a new variant in [`Scheme`], and implement all reqired functions like `Scheme::from_str` and `Scheme::into_static`:
//! Obviously, we can use `duck` as scheme, let's add a new variant in [`Scheme`], and implement all required functions like `Scheme::from_str` and `Scheme::into_static`:
//!
//! ```ignore
//! pub enum Scheme {
Expand Down
21 changes: 21 additions & 0 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,27 @@ where
Ok(())
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
if let Some(total_size) = self.size {
if self.written + size > total_size {
return Err(Error::new(
ErrorKind::ContentTruncated,
&format!(
"writer got too much data, expect: {size}, actual: {}",
self.written + size
),
));
}
}

let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;
w.sink(size, s).await?;
self.written += size;
Ok(())
}

async fn abort(&mut self) -> Result<()> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
Expand Down
4 changes: 4 additions & 0 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
self.inner.abort().await
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
self.inner.sink(size, s).await
}

async fn close(&mut self) -> Result<()> {
self.inner.close().await
}
Expand Down
8 changes: 8 additions & 0 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,14 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
})
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
self.inner.sink(size, s).await.map_err(|err| {
err.with_operation(WriteOperation::Sink)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
}

async fn close(&mut self) -> Result<()> {
self.inner.close().await.map_err(|err| {
err.with_operation(WriteOperation::Close)
Expand Down
32 changes: 32 additions & 0 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,38 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
}
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
match self.inner.sink(size, s).await {
Ok(_) => {
self.written += size;
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={} -> data sink {}B",
self.scheme,
WriteOperation::Sink,
self.path,
self.written,
size
);
Ok(())
}
Err(err) => {
if let Some(lvl) = self.failure_level {
log!(
target: LOGGING_TARGET,
lvl,
"service={} operation={} path={} written={} -> data sink failed: {err:?}",
self.scheme,
WriteOperation::Sink,
self.path,
self.written,
)
}
Err(err)
}
}
}

async fn abort(&mut self) -> Result<()> {
match self.inner.abort().await {
Ok(_) => {
Expand Down
7 changes: 7 additions & 0 deletions core/src/layers/madsim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,13 @@ impl oio::Write for MadsimWriter {
}
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> crate::Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
"will be supported in the future",
))
}

async fn abort(&mut self) -> crate::Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
Expand Down
11 changes: 11 additions & 0 deletions core/src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,17 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
})
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
self.inner
.sink(size, s)
.await
.map(|_| self.bytes += size)
.map_err(|err| {
self.handle.increment_errors_total(self.op, err.kind());
err
})
}

async fn abort(&mut self) -> Result<()> {
self.inner.abort().await.map_err(|err| {
self.handle.increment_errors_total(self.op, err.kind());
Expand Down
10 changes: 10 additions & 0 deletions core/src/layers/minitrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,16 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
.await
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
self.inner
.sink(size, s)
.in_span(Span::enter_with_parent(
WriteOperation::Sink.into_static(),
&self.span,
))
.await
}

async fn abort(&mut self) -> Result<()> {
self.inner
.abort()
Expand Down
4 changes: 4 additions & 0 deletions core/src/layers/oteltrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
self.inner.write(bs).await
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
self.inner.sink(size, s).await
}

async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
}
Expand Down
16 changes: 16 additions & 0 deletions core/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,22 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
})
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
self.inner
.sink(size, s)
.await
.map(|_| {
self.stats
.bytes_total
.with_label_values(&[&self.scheme, Operation::Write.into_static()])
.observe(size as f64)
})
.map_err(|err| {
self.stats.increment_errors_total(self.op, err.kind());
err
})
}

async fn abort(&mut self) -> Result<()> {
self.inner.abort().await.map_err(|err| {
self.stats.increment_errors_total(self.op, err.kind());
Expand Down
5 changes: 5 additions & 0 deletions core/src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,11 @@ impl<R: oio::Write> oio::Write for RetryWrapper<R> {
}
}

/// Sink will move the input stream, so we can't retry it.
async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
self.inner.sink(size, s).await
}

async fn abort(&mut self) -> Result<()> {
let mut backoff = self.builder.build();

Expand Down
13 changes: 13 additions & 0 deletions core/src/layers/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,19 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
})?
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
let timeout = self.io_timeout(size);

tokio::time::timeout(timeout, self.inner.sink(size, s))
.await
.map_err(|_| {
Error::new(ErrorKind::Unexpected, "operation timeout")
.with_operation(WriteOperation::Sink)
.with_context("timeout", timeout.as_secs_f64().to_string())
.set_temporary()
})?
}

async fn abort(&mut self) -> Result<()> {
tokio::time::timeout(self.timeout, self.inner.abort())
.await
Expand Down
8 changes: 8 additions & 0 deletions core/src/layers/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,14 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
self.inner.write(bs).await
}

#[tracing::instrument(
parent = &self.span,
level = "trace",
skip_all)]
async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
self.inner.sink(size, s).await
}

#[tracing::instrument(
parent = &self.span,
level = "trace",
Expand Down
7 changes: 7 additions & 0 deletions core/src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,13 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
Ok(())
}

async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
))
}

async fn abort(&mut self) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
Expand Down
7 changes: 7 additions & 0 deletions core/src/raw/adapters/typed_kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,13 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
Ok(())
}

async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
))
}

async fn abort(&mut self) -> Result<()> {
self.buf.clear();

Expand Down
9 changes: 4 additions & 5 deletions core/src/raw/http_util/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::task::Poll;
use bytes::Buf;
use bytes::BufMut;
use bytes::Bytes;
use futures::Stream;
use futures::StreamExt;

use crate::raw::*;
Expand All @@ -41,10 +40,10 @@ pub enum AsyncBody {
Empty,
/// Body with bytes.
Bytes(Bytes),
/// Body with stream.
Stream(oio::Streamer),
}

type BytesStream = Box<dyn Stream<Item = Result<Bytes>> + Send + Sync + Unpin>;

/// IncomingAsyncBody carries the content returned by remote servers.
///
/// # Notes
Expand All @@ -58,15 +57,15 @@ pub struct IncomingAsyncBody {
///
/// After [TAIT](https://rust-lang.github.io/rfcs/2515-type_alias_impl_trait.html)
/// has been stable, we can change `IncomingAsyncBody` into `IncomingAsyncBody<S>`.
inner: BytesStream,
inner: oio::Streamer,
size: Option<u64>,
consumed: u64,
chunk: Option<Bytes>,
}

impl IncomingAsyncBody {
/// Construct a new incoming async body
pub fn new(s: BytesStream, size: Option<u64>) -> Self {
pub fn new(s: oio::Streamer, size: Option<u64>) -> Self {
Self {
inner: s,
size,
Expand Down
7 changes: 6 additions & 1 deletion core/src/raw/http_util/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use http::Response;
use super::body::IncomingAsyncBody;
use super::parse_content_length;
use super::AsyncBody;
use crate::raw::oio::into_stream;
use crate::Error;
use crate::ErrorKind;
use crate::Result;
Expand Down Expand Up @@ -93,6 +94,7 @@ impl HttpClient {
req_builder = match body {
AsyncBody::Empty => req_builder.body(reqwest::Body::from("")),
AsyncBody::Bytes(bs) => req_builder.body(reqwest::Body::from(bs)),
AsyncBody::Stream(s) => req_builder.body(reqwest::Body::wrap_stream(s)),
};

let mut resp = req_builder.send().await.map_err(|err| {
Expand Down Expand Up @@ -143,7 +145,10 @@ impl HttpClient {
.set_source(err)
});

let body = IncomingAsyncBody::new(Box::new(stream), content_length);
let body = IncomingAsyncBody::new(
Box::new(into_stream::from_futures_stream(stream)),
content_length,
);

let resp = hr.body(body).expect("response must build succeed");

Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/http_util/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub fn new_request_build_error(err: http::Error) -> Error {
pub fn new_request_credential_error(err: anyhow::Error) -> Error {
Error::new(
ErrorKind::Unexpected,
"loading credentail to sign http request",
"loading credential to sign http request",
)
.set_temporary()
.with_operation("reqsign::LoadCredential")
Expand Down
10 changes: 7 additions & 3 deletions core/src/raw/http_util/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use http::Version;
use super::new_request_build_error;
use super::AsyncBody;
use super::IncomingAsyncBody;
use crate::raw::oio::into_stream;
use crate::*;

/// Multipart is a builder for multipart/form-data.
Expand Down Expand Up @@ -250,7 +251,7 @@ pub struct MixedPart {
}

impl MixedPart {
/// Create a new mixed part with gien uri.
/// Create a new mixed part with given uri.
pub fn new(uri: &str) -> Self {
let mut part_headers = HeaderMap::new();
part_headers.insert(CONTENT_TYPE, "application/http".parse().unwrap());
Expand Down Expand Up @@ -283,6 +284,7 @@ impl MixedPart {
let content = match body {
AsyncBody::Empty => Bytes::new(),
AsyncBody::Bytes(bs) => bs,
AsyncBody::Stream(_) => panic!("multipart request can't contain stream body"),
};

Self {
Expand Down Expand Up @@ -317,8 +319,10 @@ impl MixedPart {

let bs: Bytes = self.content;
let length = bs.len();
let body =
IncomingAsyncBody::new(Box::new(stream::iter(vec![Ok(bs)])), Some(length as u64));
let body = IncomingAsyncBody::new(
Box::new(into_stream::from_futures_stream(stream::iter(vec![Ok(bs)]))),
Some(length as u64),
);

builder
.body(body)
Expand Down
Loading