Skip to content

Commit

Permalink
feat(raw/oio): Use Buffer as cache in RangeWrite
Browse files Browse the repository at this point in the history
  • Loading branch information
reswqa committed Apr 12, 2024
1 parent 62fe9b0 commit ea31eb5
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions core/src/raw/oio/write/range_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static {
/// WritePartResult is the result returned by [`WriteRangeFuture`].
///
/// The error part will carries input `(offset, bytes, err)` so caller can retry them.
type WriteRangeResult = std::result::Result<(), (u64, Bytes, Error)>;
type WriteRangeResult = std::result::Result<(), (u64, Buffer, Error)>;

struct WriteRangeFuture(BoxedStaticFuture<WriteRangeResult>);

Expand All @@ -115,16 +115,16 @@ impl Future for WriteRangeFuture {
}

impl WriteRangeFuture {
pub fn new<W: RangeWrite>(w: Arc<W>, location: Arc<String>, offset: u64, bytes: Bytes) -> Self {
pub fn new<W: RangeWrite>(
w: Arc<W>,
location: Arc<String>,
offset: u64,
bytes: Buffer,
) -> Self {
let fut = async move {
w.write_range(
&location,
offset,
bytes.len() as u64,
Buffer::from(bytes.clone()),
)
.await
.map_err(|err| (offset, bytes, err))
w.write_range(&location, offset, bytes.len() as u64, bytes.clone())
.await
.map_err(|err| (offset, bytes, err))
};

WriteRangeFuture(Box::pin(fut))
Expand All @@ -135,7 +135,7 @@ impl WriteRangeFuture {
pub struct RangeWriter<W: RangeWrite> {
location: Option<Arc<String>>,
next_offset: u64,
buffer: Option<Bytes>,
buffer: Option<Buffer>,
futures: ConcurrentFutures<WriteRangeFuture>,

w: Arc<W>,
Expand All @@ -154,7 +154,7 @@ impl<W: RangeWrite> RangeWriter<W> {
}
}

fn fill_cache(&mut self, bs: Bytes) -> usize {
fn fill_cache(&mut self, bs: Buffer) -> usize {
let size = bs.len();
assert!(self.buffer.is_none());
self.buffer = Some(bs);
Expand All @@ -169,7 +169,7 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
None => {
// Fill cache with the first write.
if self.buffer.is_none() {
let size = self.fill_cache(bs.to_bytes());
let size = self.fill_cache(bs);
return Ok(size);
}

Expand All @@ -192,7 +192,7 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
cache,
));

let size = self.fill_cache(bs.to_bytes());
let size = self.fill_cache(bs);
return Ok(size);
}

Expand All @@ -211,7 +211,7 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
async fn close(&mut self) -> Result<()> {
let Some(location) = self.location.clone() else {
let (size, body) = match self.buffer.clone() {
Some(cache) => (cache.len(), Buffer::from(cache)),
Some(cache) => (cache.len(), cache),
None => (0, Buffer::new()),
};
// Call write_once if there is no data in buffer and no location.
Expand All @@ -235,7 +235,7 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
if let Some(buffer) = self.buffer.clone() {
let offset = self.next_offset;
self.w
.complete_range(&location, offset, buffer.len() as u64, Buffer::from(buffer))
.complete_range(&location, offset, buffer.len() as u64, buffer)
.await?;
self.buffer = None;
}
Expand Down

0 comments on commit ea31eb5

Please sign in to comment.