diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 5286a9d76f..7bbbd1d389 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -60,6 +60,7 @@ half = { "version" = "=2.2.1", default-features = false, features = [ "num-traits", ] } bytes = "1.4" +byteorder = "1.5" chrono = "0.4.23" criterion = { version = "0.5", features = ["async", "async_tokio"] } datafusion-common = "32.0" diff --git a/rust/lance-core/Cargo.toml b/rust/lance-core/Cargo.toml index 2bbda82273..d6c02130dd 100644 --- a/rust/lance-core/Cargo.toml +++ b/rust/lance-core/Cargo.toml @@ -6,7 +6,7 @@ authors.workspace = true license.workspace = true repository.workspace = true readme.workspace = true -description.workspace = true +description = "Lance Columnar Format -- Core Library" keywords.workspace = true categories.workspace = true rust-version.workspace = true @@ -20,8 +20,9 @@ arrow-cast.workspace = true arrow-data.workspace = true arrow-schema.workspace = true arrow-select.workspace = true -async-trait.workspace = true async-recursion.workspace = true +async-trait.workspace = true +byteorder.workspace = true bytes.workspace = true chrono.workspace = true datafusion-common.workspace = true diff --git a/rust/lance-core/src/io.rs b/rust/lance-core/src/io.rs index eb176d2c60..c26d5a8653 100644 --- a/rust/lance-core/src/io.rs +++ b/rust/lance-core/src/io.rs @@ -12,15 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! IO utilities for Lance Columnar Format. +//! + use std::ops::{Range, RangeFrom, RangeFull, RangeTo}; use arrow_array::UInt32Array; pub mod local; +pub mod object_reader; +pub mod object_writer; mod stream; mod traits; mod utils; +pub use object_reader::CloudObjectReader; +pub use object_writer::ObjectWriter; pub use stream::{RecordBatchStream, RecordBatchStreamAdapter}; pub use traits::*; pub use utils::*; diff --git a/rust/lance-core/src/io/object_reader.rs b/rust/lance-core/src/io/object_reader.rs new file mode 100644 index 0000000000..13c26b0d6f --- /dev/null +++ b/rust/lance-core/src/io/object_reader.rs @@ -0,0 +1,68 @@ +// Copyright 2023 Lance Developers. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Range; +use std::sync::Arc; + +use async_trait::async_trait; + +use bytes::Bytes; +use object_store::{path::Path, ObjectStore}; + +use super::Reader; +use crate::error::Result; + +/// Object Reader +/// +/// Object Store + Base Path +#[derive(Debug)] +pub struct CloudObjectReader { + // Object Store. + pub object_store: Arc, + // File path + pub path: Path, + + block_size: usize, +} + +impl CloudObjectReader { + /// Create an ObjectReader from URI + pub fn new(object_store: Arc, path: Path, block_size: usize) -> Result { + Ok(Self { + object_store, + path, + block_size, + }) + } +} + +#[async_trait] +impl Reader for CloudObjectReader { + fn path(&self) -> &Path { + &self.path + } + + fn block_size(&self) -> usize { + self.block_size + } + + /// Object/File Size. + async fn size(&self) -> Result { + Ok(self.object_store.head(&self.path).await?.size) + } + + async fn get_range(&self, range: Range) -> Result { + Ok(self.object_store.get_range(&self.path, range).await?) + } +} diff --git a/rust/lance/src/io/object_writer.rs b/rust/lance-core/src/io/object_writer.rs similarity index 85% rename from rust/lance/src/io/object_writer.rs rename to rust/lance-core/src/io/object_writer.rs index 80ec631b18..74e3d063ca 100644 --- a/rust/lance/src/io/object_writer.rs +++ b/rust/lance-core/src/io/object_writer.rs @@ -16,14 +16,15 @@ use std::pin::Pin; use std::task::{Context, Poll}; use async_trait::async_trait; -use lance_core::io::{WriteExt, Writer as LanceWrite}; -use object_store::{path::Path, MultipartId}; +use object_store::{path::Path, MultipartId, ObjectStore}; use pin_project::pin_project; use snafu::{location, Location}; use tokio::io::{AsyncWrite, AsyncWriteExt}; -use crate::io::ObjectStore; -use crate::{Error, Result}; +use crate::{ + io::{WriteExt, Writer}, + Error, Result, +}; /// AsyncWrite with the capability to tell the position the data is written. /// @@ -33,16 +34,16 @@ pub struct ObjectWriter { #[pin] writer: Box, - pub(crate) multipart_id: MultipartId, + // TODO: pub(crate) + pub multipart_id: MultipartId, cursor: usize, } impl ObjectWriter { - pub async fn new(object_store: &ObjectStore, path: &Path) -> Result { + pub async fn new(object_store: &dyn ObjectStore, path: &Path) -> Result { let (multipart_id, writer) = object_store - .inner .put_multipart(path) .await .map_err(|e| Error::IO { @@ -63,7 +64,7 @@ impl ObjectWriter { } #[async_trait] -impl LanceWrite for ObjectWriter { +impl Writer for ObjectWriter { async fn tell(&mut self) -> Result { Ok(self.cursor) } @@ -95,18 +96,21 @@ impl AsyncWrite for ObjectWriter { #[cfg(test)] mod tests { - use object_store::path::Path; + use std::sync::Arc; + + use object_store::{memory::InMemory, path::Path}; use tokio::io::AsyncWriteExt; - use crate::format::Metadata; - use crate::io::object_reader::{read_struct, CloudObjectReader}; - use crate::io::ObjectStore; + use crate::{ + format::Metadata, + io::{read_struct, CloudObjectReader}, + }; use super::*; #[tokio::test] async fn test_write() { - let store = ObjectStore::memory(); + let store = InMemory::new(); let mut object_writer = ObjectWriter::new(&store, &Path::from("/foo")) .await @@ -128,7 +132,7 @@ mod tests { #[tokio::test] async fn test_write_proto_structs() { - let store = ObjectStore::memory(); + let store = InMemory::new(); let path = Path::from("/foo"); let mut object_writer = ObjectWriter::new(&store, &path).await.unwrap(); @@ -144,7 +148,7 @@ mod tests { assert_eq!(pos, 0); object_writer.shutdown().await.unwrap(); - let object_reader = CloudObjectReader::new(&store, path, 1024).unwrap(); + let object_reader = CloudObjectReader::new(Arc::new(store), path, 1024).unwrap(); let actual: Metadata = read_struct(&object_reader, pos).await.unwrap(); assert_eq!(metadata, actual); } diff --git a/rust/lance-core/src/io/utils.rs b/rust/lance-core/src/io/utils.rs index fb8fc183d9..f89a21fc77 100644 --- a/rust/lance-core/src/io/utils.rs +++ b/rust/lance-core/src/io/utils.rs @@ -12,15 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::min; + use arrow_array::{ types::{BinaryType, LargeBinaryType, LargeUtf8Type, Utf8Type}, ArrayRef, }; use arrow_schema::DataType; +use byteorder::{ByteOrder, LittleEndian}; use lance_arrow::*; +use prost::Message; use snafu::{location, Location}; use crate::encodings::{binary::BinaryDecoder, plain::PlainDecoder, AsyncIndex, Decoder}; +use crate::format::ProtoStruct; use crate::io::{ReadBatchParams, Reader}; use crate::{Error, Result}; @@ -78,3 +83,44 @@ pub async fn read_fixed_stride_array( let decoder = PlainDecoder::new(reader, data_type, position, length)?; decoder.get(params.into()).await } + +/// Read a protobuf message at file position 'pos'. +// TODO: pub(crate) +pub async fn read_message(reader: &dyn Reader, pos: usize) -> Result { + let file_size = reader.size().await?; + if pos > file_size { + return Err(Error::IO { + message: "file size is too small".to_string(), + location: location!(), + }); + } + + let range = pos..min(pos + 4096, file_size); + let buf = reader.get_range(range.clone()).await?; + let msg_len = LittleEndian::read_u32(&buf) as usize; + + if msg_len + 4 > buf.len() { + let remaining_range = range.end..min(4 + pos + msg_len, file_size); + let remaining_bytes = reader.get_range(remaining_range).await?; + let buf = [buf, remaining_bytes].concat(); + assert!(buf.len() >= msg_len + 4); + Ok(M::decode(&buf[4..4 + msg_len])?) + } else { + Ok(M::decode(&buf[4..4 + msg_len])?) + } +} + +/// Read a Protobuf-backed struct at file position: `pos`. +// TODO: pub(crate) +pub async fn read_struct< + 'm, + M: Message + Default + 'static, + T: ProtoStruct + From, +>( + reader: &dyn Reader, + pos: usize, +) -> Result { + let msg = read_message::(reader, pos).await?; + let obj = T::from(msg); + Ok(obj) +} diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index b57c2e65b6..cf0fea58c3 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -32,7 +32,7 @@ arrow-schema = { workspace = true } arrow-select = { workspace = true } async-recursion.workspace = true async-trait.workspace = true -byteorder = "1.4.3" +byteorder.workspace = true bytes.workspace = true chrono = "0.4.23" clap = { version = "4.1.1", features = ["derive"], optional = true } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index b27a239f33..3597dd6779 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -33,7 +33,7 @@ use chrono::{prelude::*, Duration}; use futures::future::BoxFuture; use futures::stream::{self, StreamExt, TryStreamExt}; use futures::{Future, FutureExt}; -use lance_core::io::WriteExt; +use lance_core::io::{read_struct, WriteExt}; use log::warn; use object_store::path::Path; use tracing::instrument; @@ -64,7 +64,6 @@ use crate::index::vector::open_index; use crate::io::reader::read_manifest_indexes; use crate::io::{ commit::{commit_new_dataset, commit_transaction, CommitError}, - object_reader::read_struct, object_store::ObjectStoreParams, read_manifest, read_metadata_offset, write_manifest, ObjectStore, }; diff --git a/rust/lance/src/format.rs b/rust/lance/src/format.rs deleted file mode 100644 index e9a953eab0..0000000000 --- a/rust/lance/src/format.rs +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2023 Lance Developers. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub use lance_core::format::*; diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index 2516db88a5..66d1f325c4 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -30,6 +30,7 @@ pub mod pq; mod traits; mod utils; +use lance_core::io::{read_message, Reader}; use lance_index::vector::pq::{PQBuildParams, ProductQuantizer}; use lance_linalg::distance::*; use tracing::instrument; @@ -52,10 +53,7 @@ use crate::{ ivf::Ivf, }, }, - io::{ - object_reader::{read_message, ObjectReader}, - read_message_from_buf, read_metadata_offset, - }, + io::{read_message_from_buf, read_metadata_offset}, Error, Result, }; pub use traits::*; @@ -294,7 +292,7 @@ pub(crate) async fn open_index( let index_file = index_dir.child(INDEX_FILE_NAME); let object_store = dataset.object_store(); - let reader: Arc = object_store.open(&index_file).await?.into(); + let reader: Arc = object_store.open(&index_file).await?.into(); let file_size = reader.size().await?; let block_size = object_store.block_size(); diff --git a/rust/lance/src/index/vector/diskann/search.rs b/rust/lance/src/index/vector/diskann/search.rs index 445b235798..db0f8e9587 100644 --- a/rust/lance/src/index/vector/diskann/search.rs +++ b/rust/lance/src/index/vector/diskann/search.rs @@ -22,7 +22,7 @@ use std::{ use arrow_array::{ArrayRef, Float32Array, RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; use async_trait::async_trait; -use lance_core::ROW_ID_FIELD; +use lance_core::{io::Reader, Error, Result, ROW_ID_FIELD}; use lance_index::vector::{Query, DIST_COL}; use object_store::path::Path; use ordered_float::OrderedFloat; @@ -30,6 +30,10 @@ use serde::Serialize; use tracing::instrument; use super::row_vertex::{RowVertex, RowVertexSerDe}; +use crate::index::{ + vector::graph::{Graph, VertexWithDistance}, + vector::VectorIndex, +}; use crate::{ dataset::Dataset, index::{ @@ -37,15 +41,6 @@ use crate::{ vector::graph::{GraphReadParams, PersistedGraph}, Index, }, - Result, -}; -use crate::{ - index::{ - vector::graph::{Graph, VertexWithDistance}, - vector::VectorIndex, - }, - io::object_reader::ObjectReader, - Error, }; /// DiskANN search state. @@ -260,7 +255,7 @@ impl VectorIndex for DiskANNIndex { async fn load( &self, - _reader: &dyn ObjectReader, + _reader: &dyn Reader, _offset: usize, _length: usize, ) -> Result> { diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index fe13e55c57..736d76ad55 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -31,7 +31,12 @@ use futures::{ TryStreamExt, }; use lance_arrow::*; -use lance_core::io::{local::to_local_path, Reader, WriteExt, Writer}; +use lance_core::io::{ + local::to_local_path, ObjectWriter, Reader, RecordBatchStream, WriteExt, Writer, +}; +use lance_core::{ + datatypes::Field, encodings::plain::PlainEncoder, format::Index as IndexMetadata, Error, Result, +}; use lance_index::vector::{ pq::{PQBuildParams, ProductQuantizer}, Query, DIST_COL, RESIDUAL_COLUMN, @@ -53,9 +58,6 @@ use super::{ }; use crate::{ dataset::Dataset, - datatypes::Field, - encodings::plain::PlainEncoder, - format::Index as IndexMetadata, index::{ pb, prefilter::PreFilter, @@ -65,11 +67,8 @@ use crate::{ }, Index, }, - io::RecordBatchStream, - io::{object_reader::ObjectReader, object_writer::ObjectWriter}, session::Session, }; -use crate::{Error, Result}; mod builder; mod io; @@ -310,7 +309,7 @@ impl VectorIndex for IVFIndex { async fn load( &self, - _reader: &dyn ObjectReader, + _reader: &dyn Reader, _offset: usize, _length: usize, ) -> Result> { @@ -828,7 +827,7 @@ impl RemapPageTask { impl RemapPageTask { async fn load_and_remap( mut self, - reader: &dyn ObjectReader, + reader: &dyn Reader, index: &IVFIndex, mapping: &HashMap>, ) -> Result { diff --git a/rust/lance/src/index/vector/traits.rs b/rust/lance/src/index/vector/traits.rs index 7ad9e22d3d..e173312357 100644 --- a/rust/lance/src/index/vector/traits.rs +++ b/rust/lance/src/index/vector/traits.rs @@ -20,14 +20,14 @@ use std::{collections::HashMap, sync::Arc}; use arrow_array::{types::Float32Type, RecordBatch}; use async_trait::async_trait; +use lance_core::{ + io::{object_writer::ObjectWriter, Reader}, + Result, +}; use lance_index::vector::Query; use lance_linalg::MatrixView; -use crate::{ - index::{pb::Transform, prefilter::PreFilter, Index}, - io::{object_reader::ObjectReader, object_writer::ObjectWriter}, - Result, -}; +use crate::index::{pb::Transform, prefilter::PreFilter, Index}; /// Vector Index for (Approximate) Nearest Neighbor (ANN) Search. #[async_trait] @@ -64,7 +64,7 @@ pub(crate) trait VectorIndex: Send + Sync + std::fmt::Debug + Index { /// Load the index from the reader on-demand. async fn load( &self, - reader: &dyn ObjectReader, + reader: &dyn Reader, offset: usize, length: usize, ) -> Result>; diff --git a/rust/lance/src/io.rs b/rust/lance/src/io.rs index 13bbc2de32..87bf659973 100644 --- a/rust/lance/src/io.rs +++ b/rust/lance/src/io.rs @@ -25,9 +25,7 @@ use tokio::io::{AsyncWrite, AsyncWriteExt}; pub mod commit; pub(crate) mod deletion; pub(crate) mod exec; -pub mod object_reader; pub mod object_store; -pub mod object_writer; pub(crate) mod reader; mod writer; @@ -47,7 +45,7 @@ pub trait AsyncWriteProtoExt { } #[async_trait] -impl AsyncWriteProtoExt for T { +impl AsyncWriteProtoExt for T { async fn write_footer(&mut self, offset: u64) -> Result<()> { self.write_u64_le(offset).await?; self.write_all(INDEX_MAGIC).await?; diff --git a/rust/lance/src/io/object_reader.rs b/rust/lance/src/io/object_reader.rs deleted file mode 100644 index 2d335f0853..0000000000 --- a/rust/lance/src/io/object_reader.rs +++ /dev/null @@ -1,121 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::cmp::min; - -use std::ops::Range; - -use async_trait::async_trait; -use byteorder::{ByteOrder, LittleEndian}; -use bytes::Bytes; -use lance_core::io::Reader; -use object_store::path::Path; -use prost::Message; -use snafu::{location, Location}; - -use crate::error::{Error, Result}; -use crate::format::ProtoStruct; -use crate::io::ObjectStore; - -pub use lance_core::io::Reader as ObjectReader; - -/// Object Reader -/// -/// Object Store + Base Path -#[derive(Debug)] -pub struct CloudObjectReader { - // Object Store. - // TODO: can we use reference instead? - pub object_store: ObjectStore, - // File path - pub path: Path, - - block_size: usize, -} - -impl<'a> CloudObjectReader { - /// Create an ObjectReader from URI - pub fn new(object_store: &'a ObjectStore, path: Path, block_size: usize) -> Result { - Ok(Self { - object_store: object_store.clone(), - path, - block_size, - }) - } -} - -#[async_trait] -impl ObjectReader for CloudObjectReader { - fn path(&self) -> &Path { - &self.path - } - - fn block_size(&self) -> usize { - self.block_size - } - - /// Object/File Size. - async fn size(&self) -> Result { - Ok(self.object_store.inner.head(&self.path).await?.size) - } - - async fn get_range(&self, range: Range) -> Result { - Ok(self.object_store.inner.get_range(&self.path, range).await?) - } -} - -/// Read a protobuf message at file position 'pos'. -pub(crate) async fn read_message( - reader: &dyn Reader, - pos: usize, -) -> Result { - let file_size = reader.size().await?; - if pos > file_size { - return Err(Error::IO { - message: "file size is too small".to_string(), - location: location!(), - }); - } - - let range = pos..min(pos + 4096, file_size); - let buf = reader.get_range(range.clone()).await?; - let msg_len = LittleEndian::read_u32(&buf) as usize; - - if msg_len + 4 > buf.len() { - let remaining_range = range.end..min(4 + pos + msg_len, file_size); - let remaining_bytes = reader.get_range(remaining_range).await?; - let buf = [buf, remaining_bytes].concat(); - assert!(buf.len() >= msg_len + 4); - Ok(M::decode(&buf[4..4 + msg_len])?) - } else { - Ok(M::decode(&buf[4..4 + msg_len])?) - } -} - -/// Read a Protobuf-backed struct at file position: `pos`. -pub(crate) async fn read_struct< - 'm, - M: Message + Default + 'static, - T: ProtoStruct + From, ->( - reader: &dyn Reader, - pos: usize, -) -> Result { - let msg = read_message::(reader, pos).await?; - let obj = T::from(msg); - Ok(obj) -} diff --git a/rust/lance/src/io/object_store.rs b/rust/lance/src/io/object_store.rs index d7223362e0..50e2b7a729 100644 --- a/rust/lance/src/io/object_store.rs +++ b/rust/lance/src/io/object_store.rs @@ -42,15 +42,12 @@ use snafu::{location, Location}; use tokio::{io::AsyncWriteExt, sync::RwLock}; use url::Url; -use crate::io::object_reader::CloudObjectReader; -use crate::io::object_writer::ObjectWriter; +use self::tracing::ObjectStoreTracingExt; use lance_core::{ error::{Error, Result}, - io::Reader, + io::{CloudObjectReader, ObjectWriter, Reader}, }; -use self::tracing::ObjectStoreTracingExt; - #[cfg(feature = "dynamodb")] use super::commit::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore}; use super::commit::{CommitHandler, CommitLock, RenameCommitHandler, UnsafeCommitHandler}; @@ -598,7 +595,7 @@ impl ObjectStore { match self.scheme.as_str() { "file" => LocalObjectReader::open(path, self.block_size), _ => Ok(Box::new(CloudObjectReader::new( - self, + self.inner.clone(), path.clone(), self.block_size, )?)), @@ -621,7 +618,7 @@ impl ObjectStore { /// Create a new file. pub async fn create(&self, path: &Path) -> Result { - ObjectWriter::new(self, path).await + ObjectWriter::new(self.inner.as_ref(), path).await } /// A helper function to create a file and write content to it. diff --git a/rust/lance/src/io/reader.rs b/rust/lance/src/io/reader.rs index 5b041f9e8f..4fe2b17840 100644 --- a/rust/lance/src/io/reader.rs +++ b/rust/lance/src/io/reader.rs @@ -42,8 +42,8 @@ use lance_arrow::*; use lance_core::{ encodings::{dictionary::DictionaryDecoder, AsyncIndex}, io::{ - read_fixed_stride_array, ReadBatchParams, Reader, RecordBatchStream, - RecordBatchStreamAdapter, + read_fixed_stride_array, read_message, read_struct, ReadBatchParams, Reader, + RecordBatchStream, RecordBatchStreamAdapter, }, Error, Result, ROW_ID, ROW_ID_FIELD, }; @@ -54,9 +54,8 @@ use tracing::instrument; use super::deletion::{read_deletion_file, DeletionVector}; use super::deletion_file_path; -use super::object_reader::read_message; use crate::format::{pb, Fragment, Index, Manifest, Metadata, PageTable}; -use crate::io::{object_reader::read_struct, read_metadata_offset, read_struct_from_buf}; +use crate::io::{read_metadata_offset, read_struct_from_buf}; use crate::session::Session; use crate::{ datatypes::{Field, Schema}, diff --git a/rust/lance/src/io/writer.rs b/rust/lance/src/io/writer.rs index 9bb4066389..ef5ef3c64e 100644 --- a/rust/lance/src/io/writer.rs +++ b/rust/lance/src/io/writer.rs @@ -23,16 +23,19 @@ use arrow_array::{Array, ArrayRef, RecordBatch, StructArray}; use arrow_buffer::ArrowNativeType; use arrow_schema::DataType; use async_recursion::async_recursion; -use lance_core::io::{WriteExt, Writer}; +use lance_arrow::*; +use lance_core::{ + datatypes::{Field, Schema}, + encodings::{ + binary::BinaryEncoder, dictionary::DictionaryEncoder, plain::PlainEncoder, Encoder, + Encoding, + }, + io::{ObjectWriter, WriteExt, Writer}, +}; use object_store::path::Path; use snafu::{location, Location}; -use crate::arrow::*; -use crate::datatypes::{Field, Schema}; -use crate::encodings::dictionary::DictionaryEncoder; -use crate::encodings::{binary::BinaryEncoder, plain::PlainEncoder, Encoder, Encoding}; use crate::format::{pb, Index, Manifest, Metadata, PageInfo, PageTable, StatisticsMetadata}; -use crate::io::object_writer::ObjectWriter; use crate::{Error, Result}; use super::ObjectStore;