From a5d5b5718a76d547d029bc92c6087ebb6c9fef99 Mon Sep 17 00:00:00 2001 From: clarkzinzow Date: Tue, 5 Dec 2023 14:01:42 -0800 Subject: [PATCH] PR feedback. --- Cargo.lock | 12 ++ Cargo.toml | 1 + daft/table/table_io.py | 6 +- src/daft-compression/Cargo.toml | 9 + .../src/compression.rs | 0 src/daft-compression/src/lib.rs | 4 + src/daft-csv/Cargo.toml | 1 + src/daft-csv/src/metadata.rs | 3 +- src/daft-csv/src/read.rs | 109 ++++++------ src/daft-decoding/src/deserialize.rs | 7 +- src/daft-decoding/src/lib.rs | 1 - src/daft-json/Cargo.toml | 1 + src/daft-json/src/inference.rs | 52 +++--- src/daft-json/src/lib.rs | 2 +- src/daft-json/src/python.rs | 2 +- src/daft-json/src/read.rs | 120 ++++++------- src/daft-json/src/{metadata.rs => schema.rs} | 159 +++++++++--------- src/daft-json/test/dtypes.jsonl | 4 +- src/daft-scan/src/glob.rs | 2 +- 19 files changed, 254 insertions(+), 241 deletions(-) create mode 100644 src/daft-compression/Cargo.toml rename src/{daft-decoding => daft-compression}/src/compression.rs (100%) create mode 100644 src/daft-compression/src/lib.rs rename src/daft-json/src/{metadata.rs => schema.rs} (80%) diff --git a/Cargo.lock b/Cargo.lock index bf3918a74b..956658982a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1092,6 +1092,7 @@ dependencies = [ name = "daft" version = "0.1.10" dependencies = [ + "daft-compression", "daft-core", "daft-csv", "daft-dsl", @@ -1109,6 +1110,15 @@ dependencies = [ "tikv-jemallocator", ] +[[package]] +name = "daft-compression" +version = "0.1.10" +dependencies = [ + "async-compression", + "tokio", + "url", +] + [[package]] name = "daft-core" version = "0.1.10" @@ -1153,6 +1163,7 @@ dependencies = [ "chrono-tz", "common-error", "csv-async", + "daft-compression", "daft-core", "daft-decoding", "daft-io", @@ -1262,6 +1273,7 @@ dependencies = [ "chrono", "chrono-tz", "common-error", + "daft-compression", "daft-core", "daft-decoding", "daft-io", diff --git a/Cargo.toml b/Cargo.toml index e48c2714b5..9feb6153e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,5 @@ [dependencies] +daft-compression = {path = "src/daft-compression", default-features = false} daft-core = {path = "src/daft-core", default-features = false} daft-csv = {path = "src/daft-csv", default-features = false} daft-dsl = {path = "src/daft-dsl", default-features = false} diff --git a/daft/table/table_io.py b/daft/table/table_io.py index e587bf5778..6774ae5175 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -152,7 +152,7 @@ def read_parquet( if isinstance(config, NativeStorageConfig): assert isinstance( file, (str, pathlib.Path) - ), "Native downloader only works on string inputs to read_parquet" + ), "Native downloader only works on string or Path inputs to read_parquet" tbl = MicroPartition.read_parquet( str(file), columns=read_options.column_names, @@ -245,7 +245,9 @@ def read_csv( if storage_config is not None: config = storage_config.config if isinstance(config, NativeStorageConfig): - assert isinstance(file, (str, pathlib.Path)), "Native downloader only works on string inputs to read_csv" + assert isinstance( + file, (str, pathlib.Path) + ), "Native downloader only works on string or Path inputs to read_csv" has_header = csv_options.header_index is not None csv_convert_options = CsvConvertOptions( limit=read_options.num_rows, diff --git a/src/daft-compression/Cargo.toml b/src/daft-compression/Cargo.toml new file mode 100644 index 0000000000..6b695535e5 --- /dev/null +++ b/src/daft-compression/Cargo.toml @@ -0,0 +1,9 @@ +[dependencies] +async-compression = {workspace = true} +tokio = {workspace = true} +url = {workspace = true} + +[package] +edition = {workspace = true} +name = "daft-compression" +version = {workspace = true} diff --git a/src/daft-decoding/src/compression.rs b/src/daft-compression/src/compression.rs similarity index 100% rename from src/daft-decoding/src/compression.rs rename to src/daft-compression/src/compression.rs diff --git a/src/daft-compression/src/lib.rs b/src/daft-compression/src/lib.rs new file mode 100644 index 0000000000..9b869b3345 --- /dev/null +++ b/src/daft-compression/src/lib.rs @@ -0,0 +1,4 @@ +//! Utilities for async decompression of data. +pub mod compression; + +pub use compression::CompressionCodec; diff --git a/src/daft-csv/Cargo.toml b/src/daft-csv/Cargo.toml index d0e164b7d9..620316e8d0 100644 --- a/src/daft-csv/Cargo.toml +++ b/src/daft-csv/Cargo.toml @@ -9,6 +9,7 @@ chrono = {workspace = true} chrono-tz = {workspace = true} common-error = {path = "../common/error", default-features = false} csv-async = "1.2.6" +daft-compression = {path = "../daft-compression", default-features = false} daft-core = {path = "../daft-core", default-features = false} daft-decoding = {path = "../daft-decoding"} daft-io = {path = "../daft-io", default-features = false} diff --git a/src/daft-csv/src/metadata.rs b/src/daft-csv/src/metadata.rs index 8e669e4708..f45f25946b 100644 --- a/src/daft-csv/src/metadata.rs +++ b/src/daft-csv/src/metadata.rs @@ -15,7 +15,8 @@ use tokio::{ use tokio_util::io::StreamReader; use crate::{schema::merge_schema, CsvParseOptions}; -use daft_decoding::{compression::CompressionCodec, inference::infer}; +use daft_compression::CompressionCodec; +use daft_decoding::inference::infer; const DEFAULT_COLUMN_PREFIX: &str = "column_"; diff --git a/src/daft-csv/src/read.rs b/src/daft-csv/src/read.rs index 9f941645b6..772bbca00b 100644 --- a/src/daft-csv/src/read.rs +++ b/src/daft-csv/src/read.rs @@ -27,7 +27,8 @@ use tokio_util::io::StreamReader; use crate::ArrowSnafu; use crate::{metadata::read_csv_schema_single, CsvConvertOptions, CsvParseOptions, CsvReadOptions}; -use daft_decoding::{compression::CompressionCodec, deserialize::deserialize_column}; +use daft_compression::CompressionCodec; +use daft_decoding::deserialize::deserialize_column; trait ByteRecordChunkStream = Stream>>; trait ColumnArrayChunkStream = Stream< @@ -81,64 +82,58 @@ pub fn read_csv_bulk( ) -> DaftResult> { let runtime_handle = get_runtime(multithreaded_io)?; let _rt_guard = runtime_handle.enter(); - let tables = runtime_handle - .block_on(async move { - // Launch a read task per URI, throttling the number of concurrent file reads to num_parallel tasks. - let task_stream = futures::stream::iter(uris.iter().enumerate().map(|(i, uri)| { - let (uri, convert_options, parse_options, read_options, io_client, io_stats) = ( - uri.to_string(), - convert_options.clone(), - parse_options.clone(), - read_options.clone(), - io_client.clone(), - io_stats.clone(), - ); - tokio::task::spawn(async move { - let table = read_csv_single_into_table( - uri.as_str(), - convert_options, - parse_options, - read_options, - io_client, - io_stats, - max_chunks_in_flight, - ) - .await?; - Ok((i, table)) - }) - })); - let mut remaining_rows = convert_options - .as_ref() - .and_then(|opts| opts.limit.map(|limit| limit as i64)); - task_stream - // Each task is annotated with its position in the output, so we can use unordered buffering to help mitigate stragglers - // and sort the task results at the end. - .buffer_unordered(num_parallel_tasks) - // Terminate the stream if we have already reached the row limit. With the upstream buffering, we will still read up to - // num_parallel_tasks redundant files. - .try_take_while(|result| { - match (result, remaining_rows) { - // Limit has been met, early-teriminate. - (_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)), - // Limit has not yet been met, update remaining limit slack and continue. - (Ok((_, table)), Some(rows_left)) => { - remaining_rows = Some(rows_left - table.len() as i64); - futures::future::ready(Ok(true)) - } - // (1) No limit, never early-terminate. - // (2) Encountered error, propagate error to try_collect to allow it to short-circuit. - (_, None) | (Err(_), _) => futures::future::ready(Ok(true)), - } - }) - .try_collect::>() + let tables = runtime_handle.block_on(async move { + // Launch a read task per URI, throttling the number of concurrent file reads to num_parallel tasks. + let task_stream = futures::stream::iter(uris.iter().map(|uri| { + let (uri, convert_options, parse_options, read_options, io_client, io_stats) = ( + uri.to_string(), + convert_options.clone(), + parse_options.clone(), + read_options.clone(), + io_client.clone(), + io_stats.clone(), + ); + tokio::task::spawn(async move { + read_csv_single_into_table( + uri.as_str(), + convert_options, + parse_options, + read_options, + io_client, + io_stats, + max_chunks_in_flight, + ) .await - }) - .context(super::JoinSnafu {})?; + }) + .context(super::JoinSnafu {}) + })); + let mut remaining_rows = convert_options + .as_ref() + .and_then(|opts| opts.limit.map(|limit| limit as i64)); + task_stream + // Limit the number of file reads we have in flight at any given time. + .buffered(num_parallel_tasks) + // Terminate the stream if we have already reached the row limit. With the upstream buffering, we will still read up to + // num_parallel_tasks redundant files. + .try_take_while(|result| { + match (result, remaining_rows) { + // Limit has been met, early-teriminate. + (_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)), + // Limit has not yet been met, update remaining limit slack and continue. + (Ok(table), Some(rows_left)) => { + remaining_rows = Some(rows_left - table.len() as i64); + futures::future::ready(Ok(true)) + } + // (1) No limit, never early-terminate. + // (2) Encountered error, propagate error to try_collect to allow it to short-circuit. + (_, None) | (Err(_), _) => futures::future::ready(Ok(true)), + } + }) + .try_collect::>() + .await + })?; - // Sort the task results by task index, yielding tables whose order matches the input URI order. - let mut collected = tables.into_iter().collect::>>()?; - collected.sort_by_key(|(idx, _)| *idx); - Ok(collected.into_iter().map(|(_, v)| v).collect()) + tables.into_iter().collect::>>() } async fn read_csv_single_into_table( diff --git a/src/daft-decoding/src/deserialize.rs b/src/daft-decoding/src/deserialize.rs index 4a0762b254..a57a2c2ac2 100644 --- a/src/daft-decoding/src/deserialize.rs +++ b/src/daft-decoding/src/deserialize.rs @@ -12,6 +12,8 @@ use csv_async::ByteRecord; pub(crate) const ISO8601: &str = "%+"; pub(crate) const ISO8601_NO_TIME_ZONE: &str = "%Y-%m-%dT%H:%M:%S%.f"; pub(crate) const ISO8601_NO_TIME_ZONE_NO_FRACTIONAL: &str = "%Y-%m-%dT%H:%M:%S"; +pub(crate) const ISO8601_DATE: &str = "%Y-%m-%d"; +pub(crate) const ISO8601_DATE_SLASHES: &str = "%Y/%m/%d"; pub(crate) const RFC3339_WITH_SPACE: &str = "%Y-%m-%d %H:%M:%S%.f%:z"; pub(crate) const RFC3339_WITH_SPACE_NO_TIME_ZONE: &str = "%Y-%m-%d %H:%M:%S%.f"; pub(crate) const RFC3339_WITH_SPACE_NO_TIME_ZONE_NO_FRACTIONAL: &str = "%Y-%m-%d %H:%M:%S"; @@ -20,11 +22,10 @@ pub(crate) const ALL_NAIVE_TIMESTAMP_FMTS: &[&str] = &[ ISO8601_NO_TIME_ZONE_NO_FRACTIONAL, RFC3339_WITH_SPACE_NO_TIME_ZONE, RFC3339_WITH_SPACE_NO_TIME_ZONE_NO_FRACTIONAL, + ISO8601_DATE, + ISO8601_DATE_SLASHES, ]; pub(crate) const ALL_TIMESTAMP_FMTS: &[&str] = &[ISO8601, RFC3339_WITH_SPACE]; - -pub(crate) const ISO8601_DATE: &str = "%Y-%m-%d"; -pub(crate) const ISO8601_DATE_SLASHES: &str = "%Y/%m/%d"; pub(crate) const ALL_NAIVE_DATE_FMTS: &[&str] = &[ISO8601_DATE, ISO8601_DATE_SLASHES]; // Ideally this trait should not be needed and both `csv` and `csv_async` crates would share diff --git a/src/daft-decoding/src/lib.rs b/src/daft-decoding/src/lib.rs index a7ecfe2241..831ff13429 100644 --- a/src/daft-decoding/src/lib.rs +++ b/src/daft-decoding/src/lib.rs @@ -1,4 +1,3 @@ //! Utilities for decoding data from various sources into both array data and metadata (e.g. schema inference) -pub mod compression; pub mod deserialize; pub mod inference; diff --git a/src/daft-json/Cargo.toml b/src/daft-json/Cargo.toml index 01ac5dff97..74060528e0 100644 --- a/src/daft-json/Cargo.toml +++ b/src/daft-json/Cargo.toml @@ -8,6 +8,7 @@ bytes = {workspace = true} chrono = {workspace = true} chrono-tz = {workspace = true} common-error = {path = "../common/error", default-features = false} +daft-compression = {path = "../daft-compression", default-features = false} daft-core = {path = "../daft-core", default-features = false} daft-decoding = {path = "../daft-decoding"} daft-io = {path = "../daft-io", default-features = false} diff --git a/src/daft-json/src/inference.rs b/src/daft-json/src/inference.rs index eb59470f54..4afac92836 100644 --- a/src/daft-json/src/inference.rs +++ b/src/daft-json/src/inference.rs @@ -1,6 +1,6 @@ use std::{borrow::Borrow, collections::HashSet}; -use arrow2::datatypes::{DataType, Field, Metadata, Schema}; +use arrow2::datatypes::{DataType, Field, Metadata, Schema, TimeUnit}; use arrow2::error::{Error, Result}; use indexmap::IndexMap; use json_deserializer::{Number, Value}; @@ -70,7 +70,6 @@ fn infer_array(values: &[Value]) -> Result { .collect::>>()?; let dt = if !types.is_empty() { - let types = types.into_iter().collect::>(); coerce_data_type(types) } else { DataType::Null @@ -99,9 +98,8 @@ pub(crate) fn column_types_map_to_fields( column_types .into_iter() .map(|(name, dtype_set)| { - let dtypes = dtype_set.into_iter().collect::>(); // Get consolidated dtype for column. - let dtype = coerce_data_type(dtypes); + let dtype = coerce_data_type(dtype_set); arrow2::datatypes::Field::new(name, dtype, true) }) .collect::>() @@ -113,20 +111,16 @@ pub(crate) fn column_types_map_to_fields( /// * Lists and scalars are coerced to a list of a compatible scalar /// * Structs contain the union of all fields /// * All other types are coerced to `Utf8` -pub(crate) fn coerce_data_type(datatypes: Vec) -> DataType { +pub(crate) fn coerce_data_type(mut datatypes: HashSet) -> DataType { // Drop null dtype from the dtype set. - let datatypes = datatypes - .into_iter() - .filter(|dt| !matches!((*dt).borrow(), DataType::Null)) - .collect::>(); + datatypes.remove(&DataType::Null); if datatypes.is_empty() { return DataType::Null; } - let are_all_equal = datatypes.windows(2).all(|w| w[0] == w[1]); - - if are_all_equal { + // All equal. + if datatypes.len() == 1 { return datatypes.into_iter().next().unwrap(); } @@ -162,10 +156,7 @@ pub(crate) fn coerce_data_type(datatypes: Vec) -> DataType { // Coerce dtype set for each field. let fields = fields .into_iter() - .map(|(name, dts)| { - let dts = dts.into_iter().collect::>(); - Field::new(name, coerce_data_type(dts), true) - }) + .map(|(name, dts)| Field::new(name, coerce_data_type(dts), true)) .collect(); return DataType::Struct(fields); } @@ -177,11 +168,11 @@ pub(crate) fn coerce_data_type(datatypes: Vec) -> DataType { (DataType::Utf8, _) | (_, DataType::Utf8) => DataType::Utf8, (DataType::List(lhs), DataType::List(rhs)) => { let inner = - coerce_data_type(vec![lhs.data_type().clone(), rhs.data_type().clone()]); + coerce_data_type([lhs.data_type().clone(), rhs.data_type().clone()].into()); DataType::List(Box::new(Field::new(ITEM_NAME, inner, true))) } (scalar, DataType::List(list)) | (DataType::List(list), scalar) => { - let inner = coerce_data_type(vec![scalar, list.data_type().clone()]); + let inner = coerce_data_type([scalar, list.data_type().clone()].into()); DataType::List(Box::new(Field::new(ITEM_NAME, inner, true))) } (DataType::Float64, DataType::Int64) | (DataType::Int64, DataType::Float64) => { @@ -191,9 +182,9 @@ pub(crate) fn coerce_data_type(datatypes: Vec) -> DataType { DataType::Int64 } (DataType::Time32(left_tu), DataType::Time32(right_tu)) => { - // Set unified time unit to the highest granularity time unit. + // Set unified time unit to the lowest granularity time unit. let unified_tu = if left_tu == right_tu - || time_unit_to_ordinal(&left_tu) > time_unit_to_ordinal(&right_tu) + || time_unit_to_ordinal(&left_tu) < time_unit_to_ordinal(&right_tu) { left_tu } else { @@ -205,31 +196,34 @@ pub(crate) fn coerce_data_type(datatypes: Vec) -> DataType { DataType::Timestamp(left_tu, left_tz), DataType::Timestamp(right_tu, right_tz), ) => { - // Set unified time unit to the highest granularity time unit. + // Set unified time unit to the lowest granularity time unit. let unified_tu = if left_tu == right_tu - || time_unit_to_ordinal(&left_tu) > time_unit_to_ordinal(&right_tu) + || time_unit_to_ordinal(&left_tu) < time_unit_to_ordinal(&right_tu) { left_tu } else { right_tu }; // Set unified time zone to UTC. - let unified_tz = if left_tz == right_tz { - left_tz.clone() - } else { - Some("Z".to_string()) + let unified_tz = match (&left_tz, &right_tz) { + (None, None) => None, + (None, _) | (_, None) => return DataType::Utf8, + (Some(l), Some(r)) if l == r => left_tz, + (Some(_), Some(_)) => Some("Z".to_string()), }; DataType::Timestamp(unified_tu, unified_tz) } + (DataType::Timestamp(_, None), DataType::Date32) + | (DataType::Date32, DataType::Timestamp(_, None)) => { + DataType::Timestamp(TimeUnit::Second, None) + } (_, _) => DataType::Utf8, } }) .unwrap() } -fn time_unit_to_ordinal(tu: &arrow2::datatypes::TimeUnit) -> usize { - use arrow2::datatypes::TimeUnit; - +fn time_unit_to_ordinal(tu: &TimeUnit) -> usize { match tu { TimeUnit::Second => 0, TimeUnit::Millisecond => 1, diff --git a/src/daft-json/src/lib.rs b/src/daft-json/src/lib.rs index b5ad345d75..667a81469c 100644 --- a/src/daft-json/src/lib.rs +++ b/src/daft-json/src/lib.rs @@ -8,11 +8,11 @@ use snafu::Snafu; mod decoding; mod inference; -pub mod metadata; pub mod options; #[cfg(feature = "python")] pub mod python; pub mod read; +pub mod schema; // pub use metadata::read_json_schema_bulk; pub use options::{JsonConvertOptions, JsonParseOptions, JsonReadOptions}; diff --git a/src/daft-json/src/python.rs b/src/daft-json/src/python.rs index 788cc01bf3..139ba4a531 100644 --- a/src/daft-json/src/python.rs +++ b/src/daft-json/src/python.rs @@ -57,7 +57,7 @@ pub mod pylib { multithreaded_io.unwrap_or(true), io_config.unwrap_or_default().config.into(), )?; - let schema = crate::metadata::read_json_schema( + let schema = crate::schema::read_json_schema( uri, parse_options, max_bytes, diff --git a/src/daft-json/src/read.rs b/src/daft-json/src/read.rs index 2e1642f28a..c696d9ac2f 100644 --- a/src/daft-json/src/read.rs +++ b/src/daft-json/src/read.rs @@ -19,9 +19,9 @@ use tokio_util::io::StreamReader; use crate::{decoding::deserialize_records, ArrowSnafu, ChunkSnafu}; use crate::{ - metadata::read_json_schema_single, JsonConvertOptions, JsonParseOptions, JsonReadOptions, + schema::read_json_schema_single, JsonConvertOptions, JsonParseOptions, JsonReadOptions, }; -use daft_decoding::compression::CompressionCodec; +use daft_compression::CompressionCodec; trait LineChunkStream = Stream>>; trait ColumnArrayChunkStream = Stream< @@ -75,64 +75,63 @@ pub fn read_json_bulk( ) -> DaftResult> { let runtime_handle = get_runtime(multithreaded_io)?; let _rt_guard = runtime_handle.enter(); - let tables = runtime_handle - .block_on(async move { - // Launch a read task per URI, throttling the number of concurrent file reads to num_parallel tasks. - let task_stream = futures::stream::iter(uris.iter().enumerate().map(|(i, uri)| { - let (uri, convert_options, parse_options, read_options, io_client, io_stats) = ( - uri.to_string(), - convert_options.clone(), - parse_options.clone(), - read_options.clone(), - io_client.clone(), - io_stats.clone(), - ); - tokio::task::spawn(async move { - let table = read_json_single_into_table( - uri.as_str(), - convert_options, - parse_options, - read_options, - io_client, - io_stats, - max_chunks_in_flight, - ) - .await?; - Ok((i, table)) - }) - })); - let mut remaining_rows = convert_options - .as_ref() - .and_then(|opts| opts.limit.map(|limit| limit as i64)); - task_stream - // Each task is annotated with its position in the output, so we can use unordered buffering to help mitigate stragglers - // and sort the task results at the end. - .buffer_unordered(num_parallel_tasks) - // Terminate the stream if we have already reached the row limit. With the upstream buffering, we will still read up to - // num_parallel_tasks redundant files. - .try_take_while(|result| { - match (result, remaining_rows) { - // Limit has been met, early-teriminate. - (_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)), - // Limit has not yet been met, update remaining limit slack and continue. - (Ok((_, table)), Some(rows_left)) => { - remaining_rows = Some(rows_left - table.len() as i64); - futures::future::ready(Ok(true)) - } - // (1) No limit, never early-terminate. - // (2) Encountered error, propagate error to try_collect to allow it to short-circuit. - (_, None) | (Err(_), _) => futures::future::ready(Ok(true)), + let tables = runtime_handle.block_on(async move { + // Launch a read task per URI, throttling the number of concurrent file reads to num_parallel tasks. + let task_stream = futures::stream::iter(uris.iter().map(|uri| { + let (uri, convert_options, parse_options, read_options, io_client, io_stats) = ( + uri.to_string(), + convert_options.clone(), + parse_options.clone(), + read_options.clone(), + io_client.clone(), + io_stats.clone(), + ); + tokio::task::spawn(async move { + let table = read_json_single_into_table( + uri.as_str(), + convert_options, + parse_options, + read_options, + io_client, + io_stats, + max_chunks_in_flight, + ) + .await?; + DaftResult::Ok(table) + }) + .context(crate::JoinSnafu) + })); + let mut remaining_rows = convert_options + .as_ref() + .and_then(|opts| opts.limit.map(|limit| limit as i64)); + task_stream + // Limit the number of file reads we have in flight at any given time. + .buffered(num_parallel_tasks) + // Terminate the stream if we have already reached the row limit. With the upstream buffering, we will still read up to + // num_parallel_tasks redundant files. + .try_take_while(|result| { + match (result, remaining_rows) { + // Limit has been met, early-teriminate. + (_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)), + // Limit has not yet been met, update remaining limit slack and continue. + (Ok(table), Some(rows_left)) => { + remaining_rows = Some(rows_left - table.len() as i64); + futures::future::ready(Ok(true)) } - }) - .try_collect::>() - .await - }) - .context(super::JoinSnafu {})?; + // (1) No limit, never early-terminate. + // (2) Encountered error, propagate error to try_collect to allow it to short-circuit. + (_, None) | (Err(_), _) => futures::future::ready(Ok(true)), + } + }) + .try_collect::>() + .await + })?; + tables.into_iter().collect::>>() // Sort the task results by task index, yielding tables whose order matches the input URI order. - let mut collected = tables.into_iter().collect::>>()?; - collected.sort_by_key(|(idx, _)| *idx); - Ok(collected.into_iter().map(|(_, v)| v).collect()) + // let mut collected = tables.into_iter().collect::>>()?; + // collected.sort_by_key(|(idx, _)| *idx); + // Ok(collected.into_iter().map(|(_, v)| v).collect()) } async fn read_json_single_into_table( @@ -548,14 +547,15 @@ mod tests { Field::new("null", DataType::Null), Field::new("date", DataType::Date), // TODO(Clark): Add coverage for time parsing once we add support for representing time series in Daft. - // // Timezone should be finest granularity found in file, i.e. nanoseconds. + // // Time unit should be coarest granularity found in file, i.e. seconds. // Field::new("time", DataType::Time(TimeUnit::Nanoseconds)), - // Timezone should be finest granularity found in file, i.e. microseconds. + // Time unit should be coarsest granularity found in file, i.e. seconds due to naive date inclusion. Field::new( "naive_timestamp", - DataType::Timestamp(TimeUnit::Microseconds, None) + DataType::Timestamp(TimeUnit::Seconds, None) ), // Timezone should be UTC due to field having multiple different timezones across records. + // Time unit should be coarsest granularity found in file, i.e. milliseconds. Field::new( "timestamp", DataType::Timestamp(TimeUnit::Milliseconds, Some("Z".to_string())) diff --git a/src/daft-json/src/metadata.rs b/src/daft-json/src/schema.rs similarity index 80% rename from src/daft-json/src/metadata.rs rename to src/daft-json/src/schema.rs index 10cd46108e..4b0c1c4c75 100644 --- a/src/daft-json/src/metadata.rs +++ b/src/daft-json/src/schema.rs @@ -17,7 +17,7 @@ use crate::{ inference::{column_types_map_to_fields, infer_records_schema}, ArrowSnafu, JsonParseOptions, StdIOSnafu, }; -use daft_decoding::compression::CompressionCodec; +use daft_compression::CompressionCodec; #[derive(Debug, Clone)] pub struct JsonReadStats { @@ -280,14 +280,15 @@ mod tests { Field::new("null", DataType::Null), Field::new("date", DataType::Date), // TODO(Clark): Add coverage for time parsing once we add support for representing time series in Daft. - // // Timezone should be finest granularity found in file, i.e. nanoseconds. + // // Time unit should be coarest granularity found in file, i.e. seconds. // Field::new("time", DataType::Time(TimeUnit::Nanoseconds)), - // Timezone should be finest granularity found in file, i.e. microseconds. + // Time unit should be coarsest granularity found in file, i.e. seconds due to naive date inclusion. Field::new( "naive_timestamp", - DataType::Timestamp(TimeUnit::Microseconds, None) + DataType::Timestamp(TimeUnit::Seconds, None) ), // Timezone should be UTC due to field having multiple different timezones across records. + // Time unit should be coarsest granularity found in file, i.e. milliseconds. Field::new( "timestamp", DataType::Timestamp(TimeUnit::Milliseconds, Some("Z".to_string())) @@ -374,83 +375,75 @@ mod tests { Ok(()) } - // #[test] - // fn test_json_schema_local_max_bytes() -> DaftResult<()> { - // let file = format!("{}/test/iris_tiny.jsonl", env!("CARGO_MANIFEST_DIR"),); - - // let mut io_config = IOConfig::default(); - // io_config.s3.anonymous = true; - // let io_client = Arc::new(IOClient::new(io_config.into())?); - - // let (schema, read_stats) = - // read_json_schema(file.as_ref(), None, Some(100), io_client.clone(), None)?; - // assert_eq!( - // schema, - // Schema::new(vec![ - // Field::new("sepalLength", DataType::Float64), - // Field::new("sepalWidth", DataType::Float64), - // Field::new("petalLength", DataType::Float64), - // Field::new("petalWidth", DataType::Float64), - // Field::new("species", DataType::Utf8), - // ])?, - // ); - // // Max bytes doesn't include header, so add 15 bytes to upper bound. - // assert!( - // read_stats.total_bytes_read <= 100 + 15, - // "{}", - // read_stats.total_bytes_read - // ); - // assert!( - // read_stats.total_records_read <= 10, - // "{}", - // read_stats.total_records_read - // ); - - // Ok(()) - // } - - // #[rstest] - // fn test_json_schema_s3( - // #[values( - // // Uncompressed - // None, - // // brotli - // Some("br"), - // // bzip2 - // Some("bz2"), - // // deflate - // Some("deflate"), - // // gzip - // Some("gz"), - // // lzma - // Some("lzma"), - // // xz - // Some("xz"), - // // zlib - // Some("zl"), - // // zstd - // Some("zst"), - // )] - // compression: Option<&str>, - // ) -> DaftResult<()> { - // let file = format!( - // "s3://daft-public-data/test_fixtures/json-dev/mvp.json{}", - // compression.map_or("".to_string(), |ext| format!(".{}", ext)) - // ); - - // let mut io_config = IOConfig::default(); - // io_config.s3.anonymous = true; - // let io_client = Arc::new(IOClient::new(io_config.into())?); - - // let (schema, _) = read_json_schema(file.as_ref(), None, None, io_client.clone(), None)?; - // assert_eq!( - // schema, - // Schema::new(vec![ - // Field::new("a", DataType::Int64), - // Field::new("b", DataType::Utf8) - // ])? - // ); - - // Ok(()) - // } + #[test] + fn test_json_schema_local_max_bytes() -> DaftResult<()> { + let file = format!("{}/test/iris_tiny.jsonl", env!("CARGO_MANIFEST_DIR"),); + + let mut io_config = IOConfig::default(); + io_config.s3.anonymous = true; + let io_client = Arc::new(IOClient::new(io_config.into())?); + + let schema = read_json_schema(file.as_ref(), None, Some(100), io_client.clone(), None)?; + assert_eq!( + schema, + Schema::new(vec![ + Field::new("sepalLength", DataType::Float64), + Field::new("sepalWidth", DataType::Float64), + Field::new("petalLength", DataType::Float64), + Field::new("petalWidth", DataType::Float64), + Field::new("species", DataType::Utf8), + ])?, + ); + + Ok(()) + } + + #[rstest] + fn test_json_schema_s3( + #[values( + // Uncompressed + None, + // brotli + Some("br"), + // bzip2 + Some("bz2"), + // TODO(Clark): Add deflate compressed JSON file to test data fixtures. + // deflate + // Some("deflate"), + // gzip + Some("gz"), + // lzma + Some("lzma"), + // xz + Some("xz"), + // zlib + Some("zl"), + // zstd + Some("zst"), + )] + compression: Option<&str>, + ) -> DaftResult<()> { + let file = format!( + "s3://daft-public-data/test_fixtures/json-dev/iris_tiny.jsonl{}", + compression.map_or("".to_string(), |ext| format!(".{}", ext)) + ); + + let mut io_config = IOConfig::default(); + io_config.s3.anonymous = true; + let io_client = Arc::new(IOClient::new(io_config.into())?); + + let schema = read_json_schema(file.as_ref(), None, None, io_client.clone(), None)?; + assert_eq!( + schema, + Schema::new(vec![ + Field::new("sepalLength", DataType::Float64), + Field::new("sepalWidth", DataType::Float64), + Field::new("petalLength", DataType::Float64), + Field::new("petalWidth", DataType::Float64), + Field::new("species", DataType::Utf8), + ])? + ); + + Ok(()) + } } diff --git a/src/daft-json/test/dtypes.jsonl b/src/daft-json/test/dtypes.jsonl index 118f19d73c..3986d97356 100644 --- a/src/daft-json/test/dtypes.jsonl +++ b/src/daft-json/test/dtypes.jsonl @@ -1,4 +1,4 @@ -{"int": 1, "float": 2.3, "bool": false, "str": "foo", "null": null, "date": "2023-11-29", "naive_timestamp": "2023-11-29T06:31:52", "timestamp": "2023-11-29T06:31:52Z", "list": [1, 2, 3], "obj": {"a": 1, "b": false}, "nested_list": [[{"a": "foo"}]], "nested_obj": {"obj": {"a": 4}, "list": [1, 2]}} -{"int": 2, "float": 3.3, "bool": true, "str": "bar", "null": null, "date": "2023/11/28", "naive_timestamp": "2023-11-29T06:31:52.342", "timestamp": "2023-11-29T06:31:52.342+07:00", "list": [4, 5], "obj": {"a": 2, "b": false}, "nested_list": [[{"a": "bar"}]], "nested_obj": {"obj": {"a": 6}, "list": [3, 4]}} +{"int": 1, "float": 2.3, "bool": false, "str": "foo", "null": null, "date": "2023-11-29", "naive_timestamp": "2023-11-29T06:31:52.342", "timestamp": "2023-11-29T06:31:52.342567Z", "list": [1, 2, 3], "obj": {"a": 1, "b": false}, "nested_list": [[{"a": "foo"}]], "nested_obj": {"obj": {"a": 4}, "list": [1, 2]}} +{"int": 2, "float": 3.3, "bool": true, "str": "bar", "null": null, "date": "2023/11/28", "naive_timestamp": "2023-11-29", "timestamp": "2023-11-29T06:31:52.342+07:00", "list": [4, 5], "obj": {"a": 2, "b": false}, "nested_list": [[{"a": "bar"}]], "nested_obj": {"obj": {"a": 6}, "list": [3, 4]}} {"int": null, "float": null, "bool": null, "str": null, "null": null, "date": null, "naive_timestamp": null, "timestamp": null, "list": null, "obj": null, "nested_list": null, "nested_obj": null} {"int": 3, "float": 4.3, "bool": false, "str": "baz", "null": null, "date": "2023-11-27", "naive_timestamp": "2023-11-29 06:31:52.342567", "timestamp": "2023-11-29 06:31:52.342-07:00", "list": [6, 7, null, 9], "obj": {"a": null, "b": false}, "nested_list": [[{"a": null}]], "nested_obj": {"obj": {"a": null}, "list": [5, null]}} diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index fb0e1f9345..0175ef003c 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -190,7 +190,7 @@ impl GlobScanOperator { )?; schema } - FileFormatConfig::Json(_) => daft_json::metadata::read_json_schema( + FileFormatConfig::Json(_) => daft_json::schema::read_json_schema( first_filepath.as_str(), None, None,