Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: migrate object reader and object writer #1442

Merged
merged 6 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ half = { "version" = "=2.2.1", default-features = false, features = [
"num-traits",
] }
bytes = "1.4"
byteorder = "1.5"
chrono = "0.4.23"
criterion = { version = "0.5", features = ["async", "async_tokio"] }
datafusion-common = "32.0"
Expand Down
5 changes: 3 additions & 2 deletions rust/lance-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors.workspace = true
license.workspace = true
repository.workspace = true
readme.workspace = true
description.workspace = true
description = "Lance Columnar Format -- Core Library"
keywords.workspace = true
categories.workspace = true
rust-version.workspace = true
Expand All @@ -20,8 +20,9 @@ arrow-cast.workspace = true
arrow-data.workspace = true
arrow-schema.workspace = true
arrow-select.workspace = true
async-trait.workspace = true
async-recursion.workspace = true
async-trait.workspace = true
byteorder.workspace = true
bytes.workspace = true
chrono.workspace = true
datafusion-common.workspace = true
Expand Down
7 changes: 7 additions & 0 deletions rust/lance-core/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//! IO utilities for Lance Columnar Format.
//!

use std::ops::{Range, RangeFrom, RangeFull, RangeTo};

use arrow_array::UInt32Array;

pub mod local;
pub mod object_reader;
pub mod object_writer;
mod stream;
mod traits;
mod utils;

pub use object_reader::CloudObjectReader;
pub use object_writer::ObjectWriter;
pub use stream::{RecordBatchStream, RecordBatchStreamAdapter};
pub use traits::*;
pub use utils::*;
Expand Down
68 changes: 68 additions & 0 deletions rust/lance-core/src/io/object_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2023 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Range;
use std::sync::Arc;

use async_trait::async_trait;

use bytes::Bytes;
use object_store::{path::Path, ObjectStore};

use super::Reader;
use crate::error::Result;

/// Object Reader
///
/// Object Store + Base Path
#[derive(Debug)]
pub struct CloudObjectReader {
// Object Store.
pub object_store: Arc<dyn ObjectStore>,
// File path
pub path: Path,

block_size: usize,
}

impl CloudObjectReader {
/// Create an ObjectReader from URI
pub fn new(object_store: Arc<dyn ObjectStore>, path: Path, block_size: usize) -> Result<Self> {
Ok(Self {
object_store,
path,
block_size,
})
}
}

#[async_trait]
impl Reader for CloudObjectReader {
fn path(&self) -> &Path {
&self.path
}

fn block_size(&self) -> usize {
self.block_size
}

/// Object/File Size.
async fn size(&self) -> Result<usize> {
Ok(self.object_store.head(&self.path).await?.size)
}

async fn get_range(&self, range: Range<usize>) -> Result<Bytes> {
Ok(self.object_store.get_range(&self.path, range).await?)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ use std::pin::Pin;
use std::task::{Context, Poll};

use async_trait::async_trait;
use lance_core::io::{WriteExt, Writer as LanceWrite};
use object_store::{path::Path, MultipartId};
use object_store::{path::Path, MultipartId, ObjectStore};
use pin_project::pin_project;
use snafu::{location, Location};
use tokio::io::{AsyncWrite, AsyncWriteExt};

use crate::io::ObjectStore;
use crate::{Error, Result};
use crate::{
io::{WriteExt, Writer},
Error, Result,
};

/// AsyncWrite with the capability to tell the position the data is written.
///
Expand All @@ -33,16 +34,16 @@ pub struct ObjectWriter {
#[pin]
writer: Box<dyn AsyncWrite + Send + Unpin>,

pub(crate) multipart_id: MultipartId,
// TODO: pub(crate)
pub multipart_id: MultipartId,

cursor: usize,
}

impl ObjectWriter {
pub async fn new(object_store: &ObjectStore, path: &Path) -> Result<Self> {
pub async fn new(object_store: &dyn ObjectStore, path: &Path) -> Result<Self> {
let (multipart_id, writer) =
object_store
.inner
.put_multipart(path)
.await
.map_err(|e| Error::IO {
Expand All @@ -63,7 +64,7 @@ impl ObjectWriter {
}

#[async_trait]
impl LanceWrite for ObjectWriter {
impl Writer for ObjectWriter {
async fn tell(&mut self) -> Result<usize> {
Ok(self.cursor)
}
Expand Down Expand Up @@ -95,18 +96,21 @@ impl AsyncWrite for ObjectWriter {

#[cfg(test)]
mod tests {
use object_store::path::Path;
use std::sync::Arc;

use object_store::{memory::InMemory, path::Path};
use tokio::io::AsyncWriteExt;

use crate::format::Metadata;
use crate::io::object_reader::{read_struct, CloudObjectReader};
use crate::io::ObjectStore;
use crate::{
format::Metadata,
io::{read_struct, CloudObjectReader},
};

use super::*;

#[tokio::test]
async fn test_write() {
let store = ObjectStore::memory();
let store = InMemory::new();

let mut object_writer = ObjectWriter::new(&store, &Path::from("/foo"))
.await
Expand All @@ -128,7 +132,7 @@ mod tests {

#[tokio::test]
async fn test_write_proto_structs() {
let store = ObjectStore::memory();
let store = InMemory::new();
let path = Path::from("/foo");

let mut object_writer = ObjectWriter::new(&store, &path).await.unwrap();
Expand All @@ -144,7 +148,7 @@ mod tests {
assert_eq!(pos, 0);
object_writer.shutdown().await.unwrap();

let object_reader = CloudObjectReader::new(&store, path, 1024).unwrap();
let object_reader = CloudObjectReader::new(Arc::new(store), path, 1024).unwrap();
let actual: Metadata = read_struct(&object_reader, pos).await.unwrap();
assert_eq!(metadata, actual);
}
Expand Down
46 changes: 46 additions & 0 deletions rust/lance-core/src/io/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::min;

use arrow_array::{
types::{BinaryType, LargeBinaryType, LargeUtf8Type, Utf8Type},
ArrayRef,
};
use arrow_schema::DataType;
use byteorder::{ByteOrder, LittleEndian};
use lance_arrow::*;
use prost::Message;
use snafu::{location, Location};

use crate::encodings::{binary::BinaryDecoder, plain::PlainDecoder, AsyncIndex, Decoder};
use crate::format::ProtoStruct;
use crate::io::{ReadBatchParams, Reader};
use crate::{Error, Result};

Expand Down Expand Up @@ -78,3 +83,44 @@ pub async fn read_fixed_stride_array(
let decoder = PlainDecoder::new(reader, data_type, position, length)?;
decoder.get(params.into()).await
}

/// Read a protobuf message at file position 'pos'.
// TODO: pub(crate)
pub async fn read_message<M: Message + Default>(reader: &dyn Reader, pos: usize) -> Result<M> {
let file_size = reader.size().await?;
if pos > file_size {
return Err(Error::IO {
message: "file size is too small".to_string(),
location: location!(),
});
}

let range = pos..min(pos + 4096, file_size);
let buf = reader.get_range(range.clone()).await?;
let msg_len = LittleEndian::read_u32(&buf) as usize;

if msg_len + 4 > buf.len() {
let remaining_range = range.end..min(4 + pos + msg_len, file_size);
let remaining_bytes = reader.get_range(remaining_range).await?;
let buf = [buf, remaining_bytes].concat();
assert!(buf.len() >= msg_len + 4);
Ok(M::decode(&buf[4..4 + msg_len])?)
} else {
Ok(M::decode(&buf[4..4 + msg_len])?)
}
}

/// Read a Protobuf-backed struct at file position: `pos`.
// TODO: pub(crate)
pub async fn read_struct<
'm,
M: Message + Default + 'static,
T: ProtoStruct<Proto = M> + From<M>,
>(
reader: &dyn Reader,
pos: usize,
) -> Result<T> {
let msg = read_message::<M>(reader, pos).await?;
let obj = T::from(msg);
Ok(obj)
}
2 changes: 1 addition & 1 deletion rust/lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ arrow-schema = { workspace = true }
arrow-select = { workspace = true }
async-recursion.workspace = true
async-trait.workspace = true
byteorder = "1.4.3"
byteorder.workspace = true
bytes.workspace = true
chrono = "0.4.23"
clap = { version = "4.1.1", features = ["derive"], optional = true }
Expand Down
3 changes: 1 addition & 2 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use chrono::{prelude::*, Duration};
use futures::future::BoxFuture;
use futures::stream::{self, StreamExt, TryStreamExt};
use futures::{Future, FutureExt};
use lance_core::io::WriteExt;
use lance_core::io::{read_struct, WriteExt};
use log::warn;
use object_store::path::Path;
use tracing::instrument;
Expand Down Expand Up @@ -64,7 +64,6 @@ use crate::index::vector::open_index;
use crate::io::reader::read_manifest_indexes;
use crate::io::{
commit::{commit_new_dataset, commit_transaction, CommitError},
object_reader::read_struct,
object_store::ObjectStoreParams,
read_manifest, read_metadata_offset, write_manifest, ObjectStore,
};
Expand Down
15 changes: 0 additions & 15 deletions rust/lance/src/format.rs

This file was deleted.

8 changes: 3 additions & 5 deletions rust/lance/src/index/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod pq;
mod traits;
mod utils;

use lance_core::io::{read_message, Reader};
use lance_index::vector::pq::{PQBuildParams, ProductQuantizer};
use lance_linalg::distance::*;
use tracing::instrument;
Expand All @@ -52,10 +53,7 @@ use crate::{
ivf::Ivf,
},
},
io::{
object_reader::{read_message, ObjectReader},
read_message_from_buf, read_metadata_offset,
},
io::{read_message_from_buf, read_metadata_offset},
Error, Result,
};
pub use traits::*;
Expand Down Expand Up @@ -294,7 +292,7 @@ pub(crate) async fn open_index(
let index_file = index_dir.child(INDEX_FILE_NAME);

let object_store = dataset.object_store();
let reader: Arc<dyn ObjectReader> = object_store.open(&index_file).await?.into();
let reader: Arc<dyn Reader> = object_store.open(&index_file).await?.into();

let file_size = reader.size().await?;
let block_size = object_store.block_size();
Expand Down
17 changes: 6 additions & 11 deletions rust/lance/src/index/vector/diskann/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,25 @@ use std::{
use arrow_array::{ArrayRef, Float32Array, RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use async_trait::async_trait;
use lance_core::ROW_ID_FIELD;
use lance_core::{io::Reader, Error, Result, ROW_ID_FIELD};
use lance_index::vector::{Query, DIST_COL};
use object_store::path::Path;
use ordered_float::OrderedFloat;
use serde::Serialize;
use tracing::instrument;

use super::row_vertex::{RowVertex, RowVertexSerDe};
use crate::index::{
vector::graph::{Graph, VertexWithDistance},
vector::VectorIndex,
};
use crate::{
dataset::Dataset,
index::{
prefilter::PreFilter,
vector::graph::{GraphReadParams, PersistedGraph},
Index,
},
Result,
};
use crate::{
index::{
vector::graph::{Graph, VertexWithDistance},
vector::VectorIndex,
},
io::object_reader::ObjectReader,
Error,
};

/// DiskANN search state.
Expand Down Expand Up @@ -260,7 +255,7 @@ impl VectorIndex for DiskANNIndex {

async fn load(
&self,
_reader: &dyn ObjectReader,
_reader: &dyn Reader,
_offset: usize,
_length: usize,
) -> Result<Box<dyn VectorIndex>> {
Expand Down
Loading