Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Bump OpenDAL 0.46, arrow 51, tonic 0.11, reqwest 0.12, hyper 1, http 1 #15442

Merged
merged 28 commits into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4bcbb80
Save current work
Xuanwo May 7, 2024
47704d0
Refactor layer
Xuanwo May 7, 2024
a4edea5
Merge remote-tracking branch 'origin/main' into upgrade-opendal
Xuanwo May 8, 2024
f9dd7b3
Save work
Xuanwo May 8, 2024
517ffa0
Save current work
Xuanwo May 8, 2024
32748ae
Build pass
Xuanwo May 8, 2024
0129269
cargo fix
Xuanwo May 8, 2024
aed49b2
cargo check pass
Xuanwo May 8, 2024
fe0b61b
Merge branch 'main' into upgrade-opendal
Xuanwo May 8, 2024
f657fbd
Cleanup deps
Xuanwo May 9, 2024
545b4f6
Merge branch 'main' into upgrade-opendal
Xuanwo May 9, 2024
10cccc0
Format files
Xuanwo May 9, 2024
deaced2
Fix bytes reader use too small range
Xuanwo May 9, 2024
18161d5
Split read offset and consume offset
Xuanwo May 10, 2024
404f78d
Fix input pipeline
Xuanwo May 10, 2024
ae8cbd7
leave a todo here
Xuanwo May 10, 2024
61e583f
Fix eof not calculated correctly
Xuanwo May 10, 2024
b49d4db
Fix offset check
Xuanwo May 10, 2024
4c40a8b
Try concurrent load
Xuanwo May 10, 2024
a342551
format toml
Xuanwo May 10, 2024
f5031f5
Merge remote-tracking branch 'origin/main' into upgrade-opendal
Xuanwo May 10, 2024
4db149b
Let's try use opendal's async read
Xuanwo May 10, 2024
03ecd95
reduce to 2 concurrent
Xuanwo May 10, 2024
e5182bc
Also fix support for input pipeline
Xuanwo May 10, 2024
97275f8
Merge branch 'main' into upgrade-opendal
Xuanwo May 10, 2024
2747979
try 4 concurrent
Xuanwo May 10, 2024
ecc0846
Merge remote-tracking branch 'refs/remotes/xuanwo/upgrade-opendal' in…
Xuanwo May 10, 2024
46b8f31
Remove an extra head
Xuanwo May 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
834 changes: 538 additions & 296 deletions Cargo.lock

Large diffs are not rendered by default.

55 changes: 33 additions & 22 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,22 @@ members = [
# databend maintains
jsonb = { git = "https://github.com/datafuselabs/jsonb", rev = "3fe3acd" }

opendal = { version = "0.45.1", features = [
opendal = { version = "0.46.0", features = [
"layers-minitrace",
"layers-prometheus-client",
"layers-async-backtrace",
"services-s3",
"services-fs",
"services-gcs",
"services-cos",
"services-obs",
"services-oss",
"services-azblob",
"services-azdls",
"services-ipfs",
"services-http",
"services-moka",
"services-webhdfs",
"services-huggingface",
] }
sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1", default-features = false }
Expand All @@ -135,6 +145,7 @@ openraft = { version = "0.9.9", features = [
] }

# Core crates and utilities
base64 = "0.22"
async-backtrace = "0.2"
async-trait = { version = "0.1.77", package = "async-trait-fn" }
bincode = { version = "2.0.0-rc.3", features = ["serde", "std", "alloc"] }
Expand Down Expand Up @@ -164,23 +175,25 @@ poem = { version = "~1.3.57", features = ["rustls", "multipart", "compression"]
prometheus-client = "0.22"
rand = { version = "0.8.5", features = ["small_rng"] }
regex = "1.8.1"
reqwest = { version = "0.11.19", default-features = false, features = [
reqwest = { version = "0.12", default-features = false, features = [
"json",
"http2",
"rustls-tls",
"rustls-tls-native-roots",
] }
reqwest-hickory-resolver = "0.0.2"
reqwest-hickory-resolver = "0.1"
semver = "1.0.14"
serfig = "0.1.0"
tantivy = "0.22.0"
tokio = { version = "1.35.0", features = ["full"] }
tokio-stream = "0.1.11"
tonic = { version = "0.10.2", features = ["transport", "codegen", "prost", "tls-roots", "tls"] }
tonic-reflection = { version = "0.10.2" }
tonic = { version = "0.11.0", features = ["transport", "codegen", "prost", "tls-roots", "tls"] }
tonic-reflection = { version = "0.11.0" }
typetag = "0.2.3"
uuid = { version = "1.1.2", features = ["serde", "v4"] }
walkdir = "2.3.2"
derive-visitor = "0.3.0"
http = "1"

# Synchronization
dashmap = "5.4.0"
Expand All @@ -200,19 +213,19 @@ anyhow = { version = "1.0.65" }
thiserror = { version = "1" }

# Crates from arrow-rs
arrow = { version = "50" }
arrow-array = { version = "50" }
arrow-buffer = { version = "50" }
arrow-cast = { version = "50", features = ["prettyprint"] }
arrow-data = { version = "50" }
arrow-flight = { version = "50", features = ["flight-sql-experimental", "tls"] }
arrow = { version = "51" }
arrow-array = { version = "51" }
arrow-buffer = { version = "51" }
arrow-cast = { version = "51", features = ["prettyprint"] }
arrow-data = { version = "51" }
arrow-flight = { version = "51", features = ["flight-sql-experimental", "tls"] }
arrow-format = { version = "0.8.1", features = ["flight-data", "flight-service", "ipc"] }
arrow-ipc = { version = "50" }
arrow-ord = { version = "50" }
arrow-schema = { version = "50", features = ["serde"] }
arrow-select = { version = "50" }
parquet = { version = "50", features = ["async"] }
parquet_rs = { package = "parquet", version = "50" }
arrow-ipc = { version = "51" }
arrow-ord = { version = "51" }
arrow-schema = { version = "51", features = ["serde"] }
arrow-select = { version = "51" }
parquet = { version = "51", features = ["async"] }
parquet_rs = { package = "parquet", version = "51" }

# Crates from risingwavelabs
arrow-udf-js = { package = "arrow-udf-js", git = "https://github.com/datafuse-extras/arrow-udf", rev = "a8fdfdd" }
Expand Down Expand Up @@ -272,15 +285,13 @@ rpath = false

[patch.crates-io]
# If there are dependencies that need patching, they can be listed below.
arrow-format = { git = "https://github.com/everpcpc/arrow-format", rev = "ad8f2dd" }
# Changes to upstream: update to tonic 0.11
arrow-format = { git = "https://github.com/Xuanwo/arrow-format", rev = "be633a0" }
metrics = { git = "https://github.com/datafuse-extras/metrics.git", rev = "fc2ecd1" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "54fd72f" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "be8b2c2" }
micromarshal = { git = "https://github.com/ariesdevil/opensrv", rev = "6c96813" }
async-backtrace = { git = "https://github.com/zhang2014/async-backtrace.git", rev = "dea4553" }
z3 = { git = "https://github.com/prove-rs/z3.rs", rev = "247d308" }
z3-sys = { git = "https://github.com/prove-rs/z3.rs", rev = "247d308" }
geozero = { git = "https://github.com/georust/geozero", rev = "1d78b36" }
# Hot fix for cos, remove this during opendal 0.46 upgrade.
opendal = { git = "https://github.com/Xuanwo/opendal", rev = "53377ca" }
reqsign = { git = "https://github.com/Xuanwo/reqsign", rev = "122dc12" }
# proj = { git = "https://github.com/ariesdevil/proj", rev = "51e1c60" }
10 changes: 5 additions & 5 deletions src/binaries/tool/table_meta_inspector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use serde::Deserialize;
use serde::Serialize;
use serfig::collectors::from_file;
use serfig::parsers::Toml;
use tokio::io::AsyncReadExt;

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Parser)]
#[clap(about, version = &**DATABEND_COMMIT_VERSION, author)]
Expand Down Expand Up @@ -78,7 +77,6 @@ fn parse_output(config: &InspectorConfig) -> Result<Box<dyn Write>> {
}

async fn parse_input_data(config: &InspectorConfig) -> Result<Vec<u8>> {
let mut buffer: Vec<u8> = vec![];
match &config.input {
Some(input) => {
let op = match &config.config {
Expand All @@ -98,15 +96,17 @@ async fn parse_input_data(config: &InspectorConfig) -> Result<Vec<u8>> {
Operator::new(builder)?.finish()
}
};
op.reader(input).await?.read_to_end(&mut buffer).await?;
let buf = op.read(input).await?.to_vec();
Ok(buf)
}
None => {
let mut buffer: Vec<u8> = vec![];
let stdin = io::stdin();
let handle = stdin.lock();
io::BufReader::new(handle).read_to_end(&mut buffer)?;
Ok(buffer)
}
};
Ok(buffer)
}
}

async fn run(config: &InspectorConfig) -> Result<()> {
Expand Down
1 change: 1 addition & 0 deletions src/common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ bytes = "^1"
indexmap = "2.2.3"
log = { workspace = true }
num = { version = "0.4", default-features = false, features = ["std"] }
opendal = { workspace = true }
ordered-float = "3.7.0"
ringbuffer = "0.14.2"
roaring = "0.10.1"
Expand Down
2 changes: 1 addition & 1 deletion src/common/arrow/src/arrow/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl From<arrow_schema::DataType> for DataType {
}
DataType::Decimal128(precision, scale) => Self::Decimal(precision as _, scale as _),
DataType::Decimal256(precision, scale) => Self::Decimal256(precision as _, scale as _),
DataType::RunEndEncoded(_, _) => panic!("Run-end encoding not supported by arrow2"),
v => panic!("{:?} encoding not supported by arrow2", v),
}
}
}
Expand Down
10 changes: 2 additions & 8 deletions src/common/arrow/src/arrow/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ pub use deserialize::NestedState;
pub use deserialize::StructIterator;
pub use file::FileReader;
pub use file::RowGroupReader;
#[cfg(feature = "io_parquet_async")]
use futures::AsyncRead;
#[cfg(feature = "io_parquet_async")]
use futures::AsyncSeek;
pub use parquet2::error::Error as ParquetError;
pub use parquet2::fallible_streaming_iterator;
pub use parquet2::metadata::ColumnChunkMetaData;
Expand Down Expand Up @@ -103,10 +99,8 @@ pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
/// Reads parquets' metadata asynchronously.
#[cfg(feature = "io_parquet_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_parquet_async")))]
pub async fn read_metadata_async<R: AsyncRead + AsyncSeek + Send + Unpin>(
reader: &mut R,
) -> Result<FileMetaData> {
Ok(_read_metadata_async(reader).await?)
pub async fn read_metadata_async(reader: opendal::Reader, file_size: u64) -> Result<FileMetaData> {
Ok(_read_metadata_async(reader, file_size).await?)
}

fn convert_days_ms(value: &[u8]) -> crate::arrow::types::days_ms {
Expand Down
63 changes: 34 additions & 29 deletions src/common/arrow/src/arrow/temporal_conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use chrono::format::parse;
use chrono::format::Parsed;
use chrono::format::StrftimeItems;
use chrono::DateTime;
use chrono::Datelike;
use chrono::Duration;
use chrono::FixedOffset;
Expand Down Expand Up @@ -56,7 +57,7 @@ pub fn date32_to_datetime(v: i32) -> NaiveDateTime {
/// converts a `i32` representing a `date32` to [`NaiveDateTime`]
#[inline]
pub fn date32_to_datetime_opt(v: i32) -> Option<NaiveDateTime> {
NaiveDateTime::from_timestamp_opt(v as i64 * SECONDS_IN_DAY, 0)
DateTime::from_timestamp(v as i64 * SECONDS_IN_DAY, 0).map(|v| v.naive_utc())
}

/// converts a `i32` representing a `date32` to [`NaiveDate`]
Expand All @@ -74,13 +75,14 @@ pub fn date32_to_date_opt(days: i32) -> Option<NaiveDate> {
/// converts a `i64` representing a `date64` to [`NaiveDateTime`]
#[inline]
pub fn date64_to_datetime(v: i64) -> NaiveDateTime {
NaiveDateTime::from_timestamp_opt(
DateTime::from_timestamp(
// extract seconds from milliseconds
v / MILLISECONDS,
// discard extracted seconds and convert milliseconds to nanoseconds
(v % MILLISECONDS * MICROSECONDS) as u32,
)
.expect("invalid or out-of-range datetime")
.naive_utc()
}

/// converts a `i64` representing a `date64` to [`NaiveDate`]
Expand Down Expand Up @@ -175,7 +177,7 @@ pub fn timestamp_s_to_datetime(seconds: i64) -> NaiveDateTime {
/// converts a `i64` representing a `timestamp(s)` to [`NaiveDateTime`]
#[inline]
pub fn timestamp_s_to_datetime_opt(seconds: i64) -> Option<NaiveDateTime> {
NaiveDateTime::from_timestamp_opt(seconds, 0)
DateTime::from_timestamp(seconds, 0).map(|v| v.naive_utc())
}

/// converts a `i64` representing a `timestamp(ms)` to [`NaiveDateTime`]
Expand All @@ -187,8 +189,8 @@ pub fn timestamp_ms_to_datetime(v: i64) -> NaiveDateTime {
/// converts a `i64` representing a `timestamp(ms)` to [`NaiveDateTime`]
#[inline]
pub fn timestamp_ms_to_datetime_opt(v: i64) -> Option<NaiveDateTime> {
if v >= 0 {
NaiveDateTime::from_timestamp_opt(
let t = if v >= 0 {
DateTime::from_timestamp(
// extract seconds from milliseconds
v / MILLISECONDS,
// discard extracted seconds and convert milliseconds to nanoseconds
Expand All @@ -198,16 +200,18 @@ pub fn timestamp_ms_to_datetime_opt(v: i64) -> Option<NaiveDateTime> {
let secs_rem = (v / MILLISECONDS, v % MILLISECONDS);
if secs_rem.1 == 0 {
// whole/integer seconds; no adjustment required
NaiveDateTime::from_timestamp_opt(secs_rem.0, 0)
DateTime::from_timestamp(secs_rem.0, 0)
} else {
// negative values with fractional seconds require 'div_floor' rounding behaviour.
// (which isn't yet stabilised: https://github.com/rust-lang/rust/issues/88581)
NaiveDateTime::from_timestamp_opt(
DateTime::from_timestamp(
secs_rem.0 - 1,
(NANOSECONDS + (v % MILLISECONDS * MICROSECONDS)) as u32,
)
}
}
};

t.map(|v| v.naive_utc())
}

/// converts a `i64` representing a `timestamp(us)` to [`NaiveDateTime`]
Expand All @@ -219,8 +223,8 @@ pub fn timestamp_us_to_datetime(v: i64) -> NaiveDateTime {
/// converts a `i64` representing a `timestamp(us)` to [`NaiveDateTime`]
#[inline]
pub fn timestamp_us_to_datetime_opt(v: i64) -> Option<NaiveDateTime> {
if v >= 0 {
NaiveDateTime::from_timestamp_opt(
let t = if v >= 0 {
DateTime::from_timestamp(
// extract seconds from microseconds
v / MICROSECONDS,
// discard extracted seconds and convert microseconds to nanoseconds
Expand All @@ -230,16 +234,18 @@ pub fn timestamp_us_to_datetime_opt(v: i64) -> Option<NaiveDateTime> {
let secs_rem = (v / MICROSECONDS, v % MICROSECONDS);
if secs_rem.1 == 0 {
// whole/integer seconds; no adjustment required
NaiveDateTime::from_timestamp_opt(secs_rem.0, 0)
DateTime::from_timestamp(secs_rem.0, 0)
} else {
// negative values with fractional seconds require 'div_floor' rounding behaviour.
// (which isn't yet stabilised: https://github.com/rust-lang/rust/issues/88581)
NaiveDateTime::from_timestamp_opt(
DateTime::from_timestamp(
secs_rem.0 - 1,
(NANOSECONDS + (v % MICROSECONDS * MILLISECONDS)) as u32,
)
}
}
};

t.map(|v| v.naive_utc())
}

/// converts a `i64` representing a `timestamp(ns)` to [`NaiveDateTime`]
Expand All @@ -251,8 +257,8 @@ pub fn timestamp_ns_to_datetime(v: i64) -> NaiveDateTime {
/// converts a `i64` representing a `timestamp(ns)` to [`NaiveDateTime`]
#[inline]
pub fn timestamp_ns_to_datetime_opt(v: i64) -> Option<NaiveDateTime> {
if v >= 0 {
NaiveDateTime::from_timestamp_opt(
let t = if v >= 0 {
DateTime::from_timestamp(
// extract seconds from nanoseconds
v / NANOSECONDS,
// discard extracted seconds
Expand All @@ -262,16 +268,15 @@ pub fn timestamp_ns_to_datetime_opt(v: i64) -> Option<NaiveDateTime> {
let secs_rem = (v / NANOSECONDS, v % NANOSECONDS);
if secs_rem.1 == 0 {
// whole/integer seconds; no adjustment required
NaiveDateTime::from_timestamp_opt(secs_rem.0, 0)
DateTime::from_timestamp(secs_rem.0, 0)
} else {
// negative values with fractional seconds require 'div_floor' rounding behaviour.
// (which isn't yet stabilised: https://github.com/rust-lang/rust/issues/88581)
NaiveDateTime::from_timestamp_opt(
secs_rem.0 - 1,
(NANOSECONDS + (v % NANOSECONDS)) as u32,
)
DateTime::from_timestamp(secs_rem.0 - 1, (NANOSECONDS + (v % NANOSECONDS)) as u32)
}
}
};

t.map(|v| v.naive_utc())
}

/// Converts a timestamp in `time_unit` and `timezone` into [`chrono::DateTime`].
Expand Down Expand Up @@ -405,10 +410,10 @@ pub fn utf8_to_naive_timestamp_scalar(value: &str, fmt: &str, tu: &TimeUnit) ->
parsed
.to_naive_datetime_with_offset(0)
.map(|x| match tu {
TimeUnit::Second => x.timestamp(),
TimeUnit::Millisecond => x.timestamp_millis(),
TimeUnit::Microsecond => x.timestamp_micros(),
TimeUnit::Nanosecond => x.timestamp_nanos_opt().unwrap(),
TimeUnit::Second => x.and_utc().timestamp(),
TimeUnit::Millisecond => x.and_utc().timestamp_millis(),
TimeUnit::Microsecond => x.and_utc().timestamp_micros(),
TimeUnit::Nanosecond => x.and_utc().timestamp_nanos_opt().unwrap(),
})
.ok()
}
Expand Down Expand Up @@ -531,10 +536,10 @@ pub fn add_naive_interval(timestamp: i64, time_unit: TimeUnit, interval: months_

// convert back to the target unit
match time_unit {
TimeUnit::Second => new_datetime_tz.timestamp_millis() / 1000,
TimeUnit::Millisecond => new_datetime_tz.timestamp_millis(),
TimeUnit::Microsecond => new_datetime_tz.timestamp_nanos_opt().unwrap() / 1000,
TimeUnit::Nanosecond => new_datetime_tz.timestamp_nanos_opt().unwrap(),
TimeUnit::Second => new_datetime_tz.and_utc().timestamp_millis() / 1000,
TimeUnit::Millisecond => new_datetime_tz.and_utc().timestamp_millis(),
TimeUnit::Microsecond => new_datetime_tz.and_utc().timestamp_nanos_opt().unwrap() / 1000,
TimeUnit::Nanosecond => new_datetime_tz.and_utc().timestamp_nanos_opt().unwrap(),
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/common/arrow/src/native/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ impl<R: std::io::Read> NativeReadBuf for BufReader<R> {
}
}

impl<R: bytes::Buf> NativeReadBuf for bytes::buf::Reader<R> {
fn buffer_bytes(&self) -> &[u8] {
self.get_ref().chunk()
}
}

impl NativeReadBuf for &[u8] {
fn buffer_bytes(&self) -> &[u8] {
self
Expand Down
7 changes: 0 additions & 7 deletions src/common/arrow/src/native/read/read_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
use std::convert::TryInto;
use std::io::Read;

use futures::AsyncRead;
use futures::AsyncReadExt;
use parquet2::encoding::hybrid_rle::BitmapIter;
use parquet2::encoding::hybrid_rle::Decoder;
use parquet2::encoding::hybrid_rle::HybridEncoded;
Expand Down Expand Up @@ -188,8 +186,3 @@ pub fn read_u64<R: Read>(r: &mut R, buf: &mut [u8]) -> Result<u64> {
r.read_exact(buf)?;
Ok(u64::from_le_bytes(buf.try_into().unwrap()))
}

pub async fn read_u32_async<R: AsyncRead + Unpin>(r: &mut R, buf: &mut [u8]) -> Result<u32> {
r.read_exact(buf).await?;
Ok(u32::from_le_bytes(buf.try_into().unwrap()))
}
Loading
Loading