Skip to content

Commit

Permalink
Make greptime buildable with opendal 0.46 (#5)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored May 25, 2024
1 parent b13a1cc commit 8fe327b
Show file tree
Hide file tree
Showing 23 changed files with 375 additions and 250 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion src/common/datasource/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt;
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncWriteCompatExt;

use self::csv::CsvFormat;
use self::json::JsonFormat;
Expand Down Expand Up @@ -146,7 +147,8 @@ pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
let reader = object_store
.reader(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
.map_err(|e| DataFusionError::External(Box::new(e)))?
.into_bytes_stream(..);

let mut upstream = compression_type.convert_stream(reader).fuse();

Expand Down Expand Up @@ -203,6 +205,7 @@ pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
.writer_with(&path)
.concurrent(concurrency)
.await
.map(|v| v.into_futures_async_write().compat_write())
.context(error::WriteObjectSnafu { path })
});

Expand Down
9 changes: 8 additions & 1 deletion src/common/datasource/src/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use derive_builder::Builder;
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::io::SyncIoBridge;

use super::stream_to_file;
Expand Down Expand Up @@ -164,10 +165,16 @@ impl FileOpener for CsvOpener {
#[async_trait]
impl FileFormat for CsvFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();

let decoded = self.compression_type.convert_async_read(reader);

Expand Down
9 changes: 8 additions & 1 deletion src/common/datasource/src/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::SendableRecordBatchStream;
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::io::SyncIoBridge;

use super::stream_to_file;
Expand Down Expand Up @@ -82,10 +83,16 @@ impl Default for JsonFormat {
#[async_trait]
impl FileFormat for JsonFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();

let decoded = self.compression_type.convert_async_read(reader);

Expand Down
14 changes: 12 additions & 2 deletions src/common/datasource/src/file_format/orc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ use orc_rust::arrow_reader::ArrowReaderBuilder;
use orc_rust::async_arrow_reader::ArrowStreamReader;
use snafu::ResultExt;
use tokio::io::{AsyncRead, AsyncSeek};
use tokio_util::compat::FuturesAsyncReadCompatExt;

use crate::error::{self, Result};
use crate::file_format::FileFormat;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct OrcFormat;

/// TODO: it's better to avoid AsyncRead + AsyncSeek, use range based read instead.
pub async fn new_orc_stream_reader<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<ArrowStreamReader<R>> {
Expand All @@ -51,10 +53,16 @@ pub async fn infer_orc_schema<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>
#[async_trait]
impl FileFormat for OrcFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();

let schema = infer_orc_schema(reader).await?;

Expand Down Expand Up @@ -100,7 +108,9 @@ impl FileOpener for OrcOpener {
let reader = object_store
.reader(meta.location().to_string().as_str())
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
.map_err(|e| DataFusionError::External(Box::new(e)))?
.into_futures_async_read(0..meta.object_meta.size as u64)
.compat();

let stream_reader = new_orc_stream_reader(reader)
.await
Expand Down
29 changes: 22 additions & 7 deletions src/common/datasource/src/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::future::BoxFuture;
use futures::StreamExt;
use object_store::{ObjectStore, Reader, Writer};
use object_store::{FuturesAsyncReader, ObjectStore};
use parquet::basic::{Compression, ZstdLevel};
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};

use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder, LazyBufferedWriter};
use crate::error::{self, Result};
Expand All @@ -45,10 +46,16 @@ pub struct ParquetFormat {}
#[async_trait]
impl FileFormat for ParquetFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let mut reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();

let metadata = reader
.get_metadata()
Expand Down Expand Up @@ -98,7 +105,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {

pub struct LazyParquetFileReader {
object_store: ObjectStore,
reader: Option<Reader>,
reader: Option<Compat<FuturesAsyncReader>>,
path: String,
}

Expand All @@ -114,7 +121,13 @@ impl LazyParquetFileReader {
/// Must initialize the reader, or throw an error from the future.
async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
if self.reader.is_none() {
let reader = self.object_store.reader(&self.path).await?;
let meta = self.object_store.stat(&self.path).await?;
let reader = self
.object_store
.reader(&self.path)
.await?
.into_futures_async_read(0..meta.content_length())
.compat();
self.reader = Some(reader);
}

Expand Down Expand Up @@ -167,23 +180,25 @@ pub struct BufferedWriter {
}

type InnerBufferedWriter = LazyBufferedWriter<
object_store::Writer,
Compat<object_store::FuturesAsyncWriter>,
ArrowWriter<SharedBuffer>,
impl Fn(String) -> BoxFuture<'static, Result<Writer>>,
impl Fn(String) -> BoxFuture<'static, Result<Compat<object_store::FuturesAsyncWriter>>>,
>;

impl BufferedWriter {
fn make_write_factory(
store: ObjectStore,
concurrency: usize,
) -> impl Fn(String) -> BoxFuture<'static, Result<Writer>> {
) -> impl Fn(String) -> BoxFuture<'static, Result<Compat<object_store::FuturesAsyncWriter>>>
{
move |path| {
let store = store.clone();
Box::pin(async move {
store
.writer_with(&path)
.concurrent(concurrency)
.await
.map(|v| v.into_futures_async_write().compat_write())
.context(error::WriteObjectSnafu { path })
})
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/datasource/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi

let written = tmp_store.read(&output_path).await.unwrap();
let origin = store.read(origin_path).await.unwrap();
assert_eq_lines(written, origin);
assert_eq_lines(written.to_vec(), origin.to_vec());
}

pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usize) -> usize) {
Expand Down Expand Up @@ -158,7 +158,7 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz

let written = tmp_store.read(&output_path).await.unwrap();
let origin = store.read(origin_path).await.unwrap();
assert_eq_lines(written, origin);
assert_eq_lines(written.to_vec(), origin.to_vec());
}

// Ignore the CRLF difference across operating systems.
Expand Down
84 changes: 43 additions & 41 deletions src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@ mod gcs;
mod oss;
mod s3;

use std::sync::Arc;
use std::time::Duration;
use std::{env, path};
use std::path;

use common_base::readable_size::ReadableSize;
use common_telemetry::info;
use object_store::layers::{LruCacheLayer, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{HttpClient, ObjectStore, ObjectStoreBuilder};
use object_store::{HttpClient, ObjectStore};
use snafu::prelude::*;

use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
Expand Down Expand Up @@ -107,13 +105,14 @@ async fn create_object_store_with_cache(
if let Some(path) = cache_path {
let atomic_temp_dir = join_dir(path, ".tmp/");
clean_temp_dir(&atomic_temp_dir)?;
let cache_store = Fs::default()
.root(path)
.atomic_write_dir(&atomic_temp_dir)
.build()
.context(error::InitBackendSnafu)?;

let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
let mut builder = Fs::default();
builder.root(path);
builder.atomic_write_dir(&atomic_temp_dir);
let cache_store = ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish();

let cache_layer = LruCacheLayer::new(cache_store, cache_capacity.0 as usize)
.await
.context(error::InitBackendSnafu)?;

Expand All @@ -138,35 +137,38 @@ pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> {
Ok(())
}

/// FIXME: we need to use reqwest 0.12 here.
pub(crate) fn build_http_client() -> Result<HttpClient> {
let http_builder = {
let mut builder = reqwest::ClientBuilder::new();

// Pool max idle per host controls connection pool size.
// Default to no limit, set to `0` for disable it.
let pool_max_idle_per_host = env::var("_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(usize::MAX);
builder = builder.pool_max_idle_per_host(pool_max_idle_per_host);

// Connect timeout default to 30s.
let connect_timeout = env::var("_GREPTIMEDB_HTTP_CONNECT_TIMEOUT")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(30);
builder = builder.connect_timeout(Duration::from_secs(connect_timeout));

// Pool connection idle timeout default to 90s.
let idle_timeout = env::var("_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(90);

builder = builder.pool_idle_timeout(Duration::from_secs(idle_timeout));

builder
};

HttpClient::build(http_builder).context(error::InitBackendSnafu)
// let http_builder = {
// let mut builder = reqwest::ClientBuilder::new();
//
// // Pool max idle per host controls connection pool size.
// // Default to no limit, set to `0` for disable it.
// let pool_max_idle_per_host = env::var("_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST")
// .ok()
// .and_then(|v| v.parse::<usize>().ok())
// .unwrap_or(usize::MAX);
// builder = builder.pool_max_idle_per_host(pool_max_idle_per_host);
//
// // Connect timeout default to 30s.
// let connect_timeout = env::var("_GREPTIMEDB_HTTP_CONNECT_TIMEOUT")
// .ok()
// .and_then(|v| v.parse::<u64>().ok())
// .unwrap_or(30);
// builder = builder.connect_timeout(Duration::from_secs(connect_timeout));
//
// // Pool connection idle timeout default to 90s.
// let idle_timeout = env::var("_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT")
// .ok()
// .and_then(|v| v.parse::<u64>().ok())
// .unwrap_or(90);
//
// builder = builder.pool_idle_timeout(Duration::from_secs(idle_timeout));
//
// builder
// };
//
// HttpClient::build(http_builder).context(error::InitBackendSnafu)

HttpClient::new().context(error::InitBackendSnafu)
}
3 changes: 2 additions & 1 deletion src/file-engine/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ impl FileRegionManifest {
let bs = object_store
.read(path)
.await
.context(LoadRegionManifestSnafu { region_id })?;
.context(LoadRegionManifestSnafu { region_id })?
.to_vec();
Self::decode(bs.as_slice())
}

Expand Down
19 changes: 10 additions & 9 deletions src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ impl FileCache {
self.memory_index.insert(key, value).await;
}

pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
self.memory_index.get(&key).await
}

/// Reads a file from the cache.
pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
// We must use `get()` to update the estimator of the cache.
Expand Down Expand Up @@ -372,7 +376,6 @@ fn parse_index_key(name: &str) -> Option<IndexKey> {
#[cfg(test)]
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use futures::AsyncReadExt;
use object_store::services::Fs;

use super::*;
Expand Down Expand Up @@ -451,10 +454,9 @@ mod tests {
.await;

// Read file content.
let mut reader = cache.reader(key).await.unwrap();
let mut buf = String::new();
reader.read_to_string(&mut buf).await.unwrap();
assert_eq!("hello", buf);
let reader = cache.reader(key).await.unwrap();
let buf = reader.read(..).await.unwrap().to_vec();
assert_eq!("hello", String::from_utf8(buf).unwrap());

// Get weighted size.
cache.memory_index.run_pending_tasks().await;
Expand Down Expand Up @@ -549,10 +551,9 @@ mod tests {

for (i, file_id) in file_ids.iter().enumerate() {
let key = IndexKey::new(region_id, *file_id, file_type);
let mut reader = cache.reader(key).await.unwrap();
let mut buf = String::new();
reader.read_to_string(&mut buf).await.unwrap();
assert_eq!(i.to_string(), buf);
let reader = cache.reader(key).await.unwrap();
let buf = reader.read(..).await.unwrap().to_vec();
assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
}
}

Expand Down
Loading

0 comments on commit 8fe327b

Please sign in to comment.