From dfbb03ca62188bb8aa2afd5a83d194a394a2825e Mon Sep 17 00:00:00 2001 From: Sevenannn Date: Fri, 9 Aug 2024 14:31:39 -0700 Subject: [PATCH 1/3] Initial commit --- Cargo.toml | 3 + Makefile | 6 +- tests/arrow_record_batch_gen/mod.rs | 456 ++++++++++++++++++++++++++++ tests/docker/mod.rs | 246 +++++++++++++++ tests/integration.rs | 29 ++ tests/postgres/common.rs | 89 ++++++ tests/postgres/mod.rs | 148 +++++++++ 7 files changed, 976 insertions(+), 1 deletion(-) create mode 100644 tests/arrow_record_batch_gen/mod.rs create mode 100644 tests/docker/mod.rs create mode 100644 tests/integration.rs create mode 100644 tests/postgres/common.rs create mode 100644 tests/postgres/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 5cc27d1..bee0037 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,9 @@ datafusion-federation-sql = { git = "https://github.com/spiceai/datafusion-feder itertools = "0.13.0" [dev-dependencies] +anyhow = "1.0.86" +bollard = "0.16.1" +rand = "0.8.5" reqwest = "0.12.5" secrecy = "0.8.0" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } diff --git a/Makefile b/Makefile index 3fbb992..5dfc086 100644 --- a/Makefile +++ b/Makefile @@ -7,4 +7,8 @@ test: .PHONY: lint lint: - cargo clippy --all-features \ No newline at end of file + cargo clippy --all-features + +.PHONY: test-integration +test-integration: + cargo test --test integration --features postgres -- --nocapture \ No newline at end of file diff --git a/tests/arrow_record_batch_gen/mod.rs b/tests/arrow_record_batch_gen/mod.rs new file mode 100644 index 0000000..b1047bb --- /dev/null +++ b/tests/arrow_record_batch_gen/mod.rs @@ -0,0 +1,456 @@ +use arrow::array::RecordBatch; +use arrow::{ + array::*, + datatypes::{ + i256, DataType, Date32Type, Date64Type, Field, Fields, IntervalDayTime, + IntervalMonthDayNano, IntervalUnit, Schema, TimeUnit, + }, +}; +use chrono::NaiveDate; +use std::sync::Arc; + +// Helper functions to create arrow record batches of different types + +// Binary/LargeBinary/FixedSizeBinary +pub(crate) fn get_arrow_binary_record_batch() -> RecordBatch { + // Binary/LargeBinary/FixedSizeBinary Array + let values: Vec<&[u8]> = vec![b"one", b"two", b""]; + let binary_array = BinaryArray::from_vec(values.clone()); + let large_binary_array = LargeBinaryArray::from_vec(values); + let input_arg = vec![vec![1, 2], vec![3, 4], vec![5, 6]]; + let fixed_size_binary_array = + FixedSizeBinaryArray::try_from_iter(input_arg.into_iter()).unwrap(); + + let schema = Schema::new(vec![ + Field::new("binary", DataType::Binary, false), + Field::new("large_binary", DataType::LargeBinary, false), + Field::new("fixed_size_binary", DataType::FixedSizeBinary(2), false), + ]); + + RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(binary_array), + Arc::new(large_binary_array), + Arc::new(fixed_size_binary_array), + ], + ) + .expect("Failed to created arrow binary record batch") +} + +// All Int types +pub(crate) fn get_arrow_int_recordbatch() -> RecordBatch { + // Arrow Integer Types + let int8_arr = Int8Array::from(vec![1, 2, 3]); + let int16_arr = Int16Array::from(vec![1, 2, 3]); + let int32_arr = Int32Array::from(vec![1, 2, 3]); + let int64_arr = Int64Array::from(vec![1, 2, 3]); + let uint8_arr = UInt8Array::from(vec![1, 2, 3]); + let uint16_arr = UInt16Array::from(vec![1, 2, 3]); + let uint32_arr = UInt32Array::from(vec![1, 2, 3]); + let uint64_arr = UInt64Array::from(vec![1, 2, 3]); + + let schema = Schema::new(vec![ + Field::new("int8", DataType::Int8, false), + Field::new("int16", DataType::Int16, false), + Field::new("int32", DataType::Int32, false), + Field::new("int64", DataType::Int64, false), + Field::new("uint8", DataType::UInt8, false), + Field::new("uint16", DataType::UInt16, false), + Field::new("uint32", DataType::UInt32, false), + Field::new("uint64", DataType::UInt64, false), + ]); + + RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(int8_arr), + Arc::new(int16_arr), + Arc::new(int32_arr), + Arc::new(int64_arr), + Arc::new(uint8_arr), + Arc::new(uint16_arr), + Arc::new(uint32_arr), + Arc::new(uint64_arr), + ], + ) + .expect("Failed to created arrow int record batch") +} + +// All Float Types +pub(crate) fn get_arrow_float_record_batch() -> RecordBatch { + // Arrow Float Types + let float32_arr = Float32Array::from(vec![1.0, 2.0, 3.0]); + let float64_arr = Float64Array::from(vec![1.0, 2.0, 3.0]); + + let schema = Schema::new(vec![ + Field::new("float32", DataType::Float32, false), + Field::new("float64", DataType::Float64, false), + ]); + + RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(float32_arr), Arc::new(float64_arr)], + ) + .expect("Failed to created arrow float record batch") +} + +// Utf8/LargeUtf8 +pub(crate) fn get_arrow_utf8_record_batch() -> RecordBatch { + // Utf8, LargeUtf8 Types + let string_arr = StringArray::from(vec!["foo", "bar", "baz"]); + let large_string_arr = LargeStringArray::from(vec!["foo", "bar", "baz"]); + let bool_arr: BooleanArray = vec![true, true, false].into(); + + let schema = Schema::new(vec![ + Field::new("utf8", DataType::Utf8, false), + Field::new("largeutf8", DataType::LargeUtf8, false), + Field::new("boolean", DataType::Boolean, false), + ]); + + RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(string_arr), + Arc::new(large_string_arr), + Arc::new(bool_arr), + ], + ) + .expect("Failed to created arrow utf8 record batch") +} + +// Time32, Time64 +pub(crate) fn get_arrow_time_record_batch() -> RecordBatch { + // Time32, Time64 Types + let time32_milli_array: Time32MillisecondArray = vec![ + (10 * 3600 + 30 * 60) * 1_000, + (10 * 3600 + 45 * 60 + 15) * 1_000, + (11 * 3600 + 0 * 60 + 15) * 1_000, + ] + .into(); + let time32_sec_array: Time32SecondArray = vec![ + (10 * 3600 + 30 * 60), + (10 * 3600 + 45 * 60 + 15), + (11 * 3600 + 00 * 60 + 15), + ] + .into(); + let time64_micro_array: Time64MicrosecondArray = vec![ + (10 * 3600 + 30 * 60) * 1_000_000, + (10 * 3600 + 45 * 60 + 15) * 1_000_000, + (11 * 3600 + 0 * 60 + 15) * 1_000_000, + ] + .into(); + let time64_nano_array: Time64NanosecondArray = vec![ + (10 * 3600 + 30 * 60) * 1_000_000_000, + (10 * 3600 + 45 * 60 + 15) * 1_000_000_000, + (11 * 3600 + 00 * 60 + 15) * 1_000_000_000, + ] + .into(); + + let schema = Schema::new(vec![ + Field::new( + "time32_milli", + DataType::Time32(TimeUnit::Millisecond), + false, + ), + Field::new("time32_sec", DataType::Time32(TimeUnit::Second), false), + Field::new( + "time64_micro", + DataType::Time64(TimeUnit::Microsecond), + false, + ), + Field::new("time64_nano", DataType::Time64(TimeUnit::Nanosecond), false), + ]); + + RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(time32_milli_array), + Arc::new(time32_sec_array), + Arc::new(time64_micro_array), + Arc::new(time64_nano_array), + ], + ) + .expect("Failed to created arrow time record batch") +} + +// Timestamp (with/without TZ), +pub(crate) fn get_arrow_timestamp_record_batch() -> RecordBatch { + // Timestamp Types + let timestamp_second_array = + TimestampSecondArray::from(vec![1_680_000_000, 1_680_040_000, 1_680_080_000]); + let timestamp_milli_array = TimestampMillisecondArray::from(vec![ + 1_680_000_000_000, + 1_680_040_000_000, + 1_680_080_000_000, + ]) + .with_timezone("+10:00".to_string()); + let timestamp_micro_array = TimestampMicrosecondArray::from(vec![ + 1_680_000_000_000_000, + 1_680_040_000_000_000, + 1_680_080_000_000_000, + ]) + .with_timezone("+10:00".to_string()); + let timestamp_nano_array = TimestampNanosecondArray::from(vec![ + 1_680_000_000_000_000_000, + 1_680_040_000_000_000_000, + 1_680_080_000_000_000_000, + ]) + .with_timezone("+10:00".to_string()); + + let schema = Schema::new(vec![ + Field::new( + "timestamp_second", + DataType::Timestamp(TimeUnit::Second, None), + false, + ), + Field::new( + "timestamp_milli", + DataType::Timestamp(TimeUnit::Millisecond, Some(Arc::from("+10:00".to_string()))), + false, + ), + Field::new( + "timestamp_micro", + DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("+10:00".to_string()))), + false, + ), + Field::new( + "timestamp_nano", + DataType::Timestamp(TimeUnit::Nanosecond, Some(Arc::from("+10:00".to_string()))), + false, + ), + ]); + + RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(timestamp_second_array), + Arc::new(timestamp_milli_array), + Arc::new(timestamp_micro_array), + Arc::new(timestamp_nano_array), + ], + ) + .expect("Failed to created arrow timestamp record batch") +} + +// Date32, Date64 +pub(crate) fn get_arrow_date_record_batch() -> RecordBatch { + let date32_array = Date32Array::from(vec![ + Date32Type::from_naive_date(NaiveDate::from_ymd_opt(2015, 3, 14).unwrap_or_default()), + Date32Type::from_naive_date(NaiveDate::from_ymd_opt(2016, 1, 12).unwrap_or_default()), + Date32Type::from_naive_date(NaiveDate::from_ymd_opt(2017, 9, 17).unwrap_or_default()), + ]); + let date64_array = Date64Array::from(vec![ + Date64Type::from_naive_date(NaiveDate::from_ymd_opt(2015, 3, 14).unwrap_or_default()), + Date64Type::from_naive_date(NaiveDate::from_ymd_opt(2016, 1, 12).unwrap_or_default()), + Date64Type::from_naive_date(NaiveDate::from_ymd_opt(2017, 9, 17).unwrap_or_default()), + ]); + + println!("{:?}", date32_array.value(0)); + + let schema = Schema::new(vec![ + Field::new("date32", DataType::Date32, false), + Field::new("date64", DataType::Date64, false), + ]); + + RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(date32_array), Arc::new(date64_array)], + ) + .expect("Failed to created arrow date record batch") +} + +// struct +pub(crate) fn get_arrow_struct_record_batch() -> RecordBatch { + let boolean = Arc::new(BooleanArray::from(vec![false, false, true, true])); + let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31])); + + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("b", DataType::Boolean, false)), + boolean.clone() as ArrayRef, + ), + ( + Arc::new(Field::new("c", DataType::Int32, false)), + int.clone() as ArrayRef, + ), + ]); + + let schema = Schema::new(vec![Field::new( + "struct", + DataType::Struct(Fields::from(vec![ + Field::new("b", DataType::Boolean, false), + Field::new("c", DataType::Int32, false), + ])), + false, + )]); + + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]) + .expect("Failed to created arrow struct record batch") +} + +// Decimal128/Decimal256 +pub(crate) fn get_arrow_decimal_record_batch() -> RecordBatch { + let decimal128_array = + Decimal128Array::from(vec![i128::from(123), i128::from(222), i128::from(321)]); + let decimal256_array = + Decimal256Array::from(vec![i256::from(123), i256::from(222), i256::from(321)]); + + let schema = Schema::new(vec![ + Field::new("decimal128", DataType::Decimal128(38, 10), false), + Field::new("decimal256", DataType::Decimal256(76, 10), false), + ]); + + RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(decimal128_array), Arc::new(decimal256_array)], + ) + .expect("Failed to created arrow decimal record batch") +} + +// Duration +pub(crate) fn get_arrow_duration_record_batch() -> RecordBatch { + let duration_nano_array = DurationNanosecondArray::from(vec![1, 2, 3]); + let duration_micro_array = DurationMicrosecondArray::from(vec![1, 2, 3]); + let duration_milli_array = DurationMillisecondArray::from(vec![1, 2, 3]); + let duration_sec_array = DurationSecondArray::from(vec![1, 2, 3]); + + let schema = Schema::new(vec![ + Field::new( + "duration_nano", + DataType::Duration(TimeUnit::Nanosecond), + false, + ), + Field::new( + "duration_micro", + DataType::Duration(TimeUnit::Microsecond), + false, + ), + Field::new( + "duration_milli", + DataType::Duration(TimeUnit::Millisecond), + false, + ), + Field::new("duration_sec", DataType::Duration(TimeUnit::Second), false), + ]); + + RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(duration_nano_array), + Arc::new(duration_micro_array), + Arc::new(duration_milli_array), + Arc::new(duration_sec_array), + ], + ) + .expect("Failed to created arrow interval record batch") +} + +// Interval +pub(crate) fn get_arrow_interval_record_batch() -> RecordBatch { + let interval_daytime_array = IntervalDayTimeArray::from(vec![ + IntervalDayTime::new(1, 1000), + IntervalDayTime::new(33, 0), + IntervalDayTime::new(0, 12 * 60 * 60 * 1000), + ]); + let interval_monthday_nano_array = IntervalMonthDayNanoArray::from(vec![ + IntervalMonthDayNano::new(1, 2, 1000), + IntervalMonthDayNano::new(12, 1, 0), + IntervalMonthDayNano::new(0, 0, 12 * 1000 * 1000), + ]); + let interval_yearmonth_array = IntervalYearMonthArray::from(vec![2, 25, -1]); + + let schema = Schema::new(vec![ + Field::new( + "interval_daytime", + DataType::Interval(IntervalUnit::DayTime), + false, + ), + Field::new( + "interval_monthday_nano", + DataType::Interval(IntervalUnit::MonthDayNano), + false, + ), + Field::new( + "interval_yearmonth", + DataType::Interval(IntervalUnit::YearMonth), + false, + ), + ]); + + RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(interval_daytime_array), + Arc::new(interval_monthday_nano_array), + Arc::new(interval_yearmonth_array), + ], + ) + .expect("Failed to created arrow interval record batch") +} + +// List/FixedSizeList/LargeList +pub(crate) fn get_arrow_list_record_batch() -> RecordBatch { + let mut list_builder = ListBuilder::new(Int32Builder::new()); + list_builder.append_value([Some(1), Some(2), Some(3)]); + list_builder.append_value([Some(4)]); + list_builder.append_value([Some(6)]); + let list_array = list_builder.finish(); + + let mut large_list_builder = LargeListBuilder::new(Int32Builder::new()); + large_list_builder.append_value([Some(1), Some(2), Some(3)]); + large_list_builder.append_value([Some(4)]); + large_list_builder.append_value([Some(6)]); + let large_list_array = large_list_builder.finish(); + + let mut fixed_size_list_builder = FixedSizeListBuilder::new(Int32Builder::new(), 3); + fixed_size_list_builder.values().append_value(0); + fixed_size_list_builder.values().append_value(1); + fixed_size_list_builder.values().append_value(2); + fixed_size_list_builder.append(true); + fixed_size_list_builder.values().append_value(3); + fixed_size_list_builder.values().append_value(4); + fixed_size_list_builder.values().append_value(5); + fixed_size_list_builder.append(true); + fixed_size_list_builder.values().append_value(6); + fixed_size_list_builder.values().append_value(7); + fixed_size_list_builder.values().append_value(8); + fixed_size_list_builder.append(true); + let fixed_size_list_array = fixed_size_list_builder.finish(); + + let schema = Schema::new(vec![ + Field::new( + "list", + DataType::List(Field::new("item", DataType::Int32, true).into()), + false, + ), + Field::new( + "large_list", + DataType::LargeList(Field::new("item", DataType::Int32, true).into()), + false, + ), + Field::new( + "fixed_size_list", + DataType::FixedSizeList(Field::new("item", DataType::Int32, true).into(), 3), + false, + ), + ]); + + RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(list_array), + Arc::new(large_list_array), + Arc::new(fixed_size_list_array), + ], + ) + .expect("Failed to created arrow list record batch") +} + +// Null +pub(crate) fn get_arrow_null_record_batch() -> RecordBatch { + let null_arr = Int8Array::from(vec![Some(1), None, Some(3)]); + let schema = Schema::new(vec![Field::new("int8", DataType::Int8, true)]); + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(null_arr)]) + .expect("Failed to created arrow null record batch") +} diff --git a/tests/docker/mod.rs b/tests/docker/mod.rs new file mode 100644 index 0000000..47ee8e3 --- /dev/null +++ b/tests/docker/mod.rs @@ -0,0 +1,246 @@ +use std::collections::HashMap; + +use bollard::{ + container::{Config, CreateContainerOptions, RemoveContainerOptions, StartContainerOptions}, + image::CreateImageOptions, + secret::{ + ContainerState, ContainerStateStatusEnum, Health, HealthConfig, HealthStatusEnum, + HostConfig, PortBinding, + }, + Docker, +}; +use futures::StreamExt; +pub struct RunningContainer<'a> { + name: &'a str, + docker: Docker, +} + +impl<'a> RunningContainer<'a> { + pub async fn remove(&self) -> Result<(), anyhow::Error> { + remove(&self.docker, self.name).await + } + + pub async fn stop(&self) -> Result<(), anyhow::Error> { + stop(&self.docker, self.name).await + } + + pub async fn start(&self) -> Result<(), anyhow::Error> { + start(&self.docker, self.name).await + } +} + +pub async fn remove(docker: &Docker, name: &str) -> Result<(), anyhow::Error> { + Ok(docker + .remove_container( + name, + Some(RemoveContainerOptions { + force: true, + ..Default::default() + }), + ) + .await?) +} + +pub async fn stop(docker: &Docker, name: &str) -> Result<(), anyhow::Error> { + Ok(docker.stop_container(name, None).await?) +} + +pub async fn start(docker: &Docker, name: &str) -> Result<(), anyhow::Error> { + Ok(docker + .start_container(name, None::>) + .await?) +} + +pub struct ContainerRunnerBuilder<'a> { + name: &'a str, + image: Option, + port_bindings: Vec<(u16, u16)>, + env_vars: Vec<(String, String)>, + healthcheck: Option, +} + +impl<'a> ContainerRunnerBuilder<'a> { + pub fn new(name: &'a str) -> Self { + ContainerRunnerBuilder { + name, + image: None, + port_bindings: Vec::new(), + env_vars: Vec::new(), + healthcheck: None, + } + } + + pub fn image(mut self, image: String) -> Self { + self.image = Some(image); + self + } + + pub fn add_port_binding(mut self, host_port: u16, container_port: u16) -> Self { + self.port_bindings.push((host_port, container_port)); + self + } + + pub fn add_env_var(mut self, key: &str, value: &str) -> Self { + self.env_vars.push((key.to_string(), value.to_string())); + self + } + + pub fn healthcheck(mut self, healthcheck: HealthConfig) -> Self { + self.healthcheck = Some(healthcheck); + self + } + + pub fn build(self) -> Result, anyhow::Error> { + let image = self + .image + .ok_or_else(|| anyhow::anyhow!("Image must be set"))?; + Ok(ContainerRunner::<'a> { + name: self.name, + docker: Docker::connect_with_local_defaults()?, + image, + port_bindings: self.port_bindings, + env_vars: self.env_vars, + healthcheck: self.healthcheck, + }) + } +} + +pub struct ContainerRunner<'a> { + name: &'a str, + docker: Docker, + image: String, + port_bindings: Vec<(u16, u16)>, + env_vars: Vec<(String, String)>, + healthcheck: Option, +} + +impl<'a> ContainerRunner<'a> { + pub async fn run(self) -> Result, anyhow::Error> { + if self.is_container_running().await? { + remove(&self.docker, self.name).await?; + } + + self.pull_image().await?; + + let options = CreateContainerOptions { + name: self.name, + platform: None, + }; + + let mut port_bindings_map = HashMap::new(); + for (container_port, host_port) in self.port_bindings { + port_bindings_map.insert( + format!("{container_port}/tcp"), + Some(vec![PortBinding { + host_ip: Some("127.0.0.1".to_string()), + host_port: Some(format!("{host_port}/tcp")), + }]), + ); + } + tracing::debug!("Port bindings: {:?}", port_bindings_map); + + let port_bindings = if port_bindings_map.is_empty() { + None + } else { + Some(port_bindings_map) + }; + + let host_config = Some(HostConfig { + port_bindings, + ..Default::default() + }); + + let env_vars: Vec = self + .env_vars + .iter() + .map(|(k, v)| format!("{k}={v}")) + .collect(); + let env_vars_str = env_vars.iter().map(String::as_str).collect::>(); + + let config = Config::<&str> { + image: Some(&self.image), + env: Some(env_vars_str), + host_config, + healthcheck: self.healthcheck, + ..Default::default() + }; + + let _ = self.docker.create_container(Some(options), config).await?; + + self.docker + .start_container(self.name, None::>) + .await?; + + let start_time = std::time::Instant::now(); + loop { + let inspect_container = self.docker.inspect_container(self.name, None).await?; + tracing::trace!("Container status: {:?}", inspect_container.state); + + if let Some(ContainerState { + status: Some(ContainerStateStatusEnum::RUNNING), + health: + Some(Health { + status: Some(HealthStatusEnum::HEALTHY), + .. + }), + .. + }) = inspect_container.state + { + tracing::debug!("Container running & healthy"); + break; + } + + if start_time.elapsed().as_secs() > 30 { + return Err(anyhow::anyhow!("Container failed to start")); + } + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + Ok(RunningContainer::<'a> { + name: self.name, + docker: self.docker, + }) + } + + async fn pull_image(&self) -> Result<(), anyhow::Error> { + // Check if image is already pulled + let images = self.docker.list_images::<&str>(None).await?; + for image in images { + if image.repo_tags.iter().any(|t| t == &self.image) { + tracing::debug!("Docker image {} already pulled", self.image); + return Ok(()); + } + } + + let options = Some(CreateImageOptions::<&str> { + from_image: &self.image, + ..Default::default() + }); + + let mut pulling_stream = self.docker.create_image(options, None, None); + while let Some(event) = pulling_stream.next().await { + tracing::debug!("Pulling image: {:?}", event?); + } + + Ok(()) + } + + async fn is_container_running(&self) -> Result { + let containers = self.docker.list_containers::<&str>(None).await?; + for container in containers { + let Some(names) = container.names else { + continue; + }; + if names.iter().any(|n| { + tracing::debug!("Docker container: {n}"); + n == self.name || n == &format!("/{}", self.name) + }) { + tracing::debug!("Docker container {} already running", self.name); + return Ok(true); + } + } + + Ok(false) + } +} diff --git a/tests/integration.rs b/tests/integration.rs new file mode 100644 index 0000000..b5c74a7 --- /dev/null +++ b/tests/integration.rs @@ -0,0 +1,29 @@ +use arrow::array::RecordBatch; +use tracing::subscriber::DefaultGuard; +use tracing_subscriber::EnvFilter; + +mod arrow_record_batch_gen; +mod docker; +#[cfg(feature = "postgres")] +mod postgres; + +fn init_tracing(default_level: Option<&str>) -> DefaultGuard { + let filter = match (default_level, std::env::var("SPICED_LOG").ok()) { + (_, Some(log)) => EnvFilter::new(log), + (Some(level), None) => EnvFilter::new(level), + _ => EnvFilter::new( + "runtime=TRACE,datafusion-federation=TRACE,datafusion-federation-sql=TRACE", + ), + }; + + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_env_filter(filter) + .with_ansi(true) + .finish(); + tracing::subscriber::set_default(subscriber) +} + +fn container_registry() -> String { + std::env::var("CONTAINER_REGISTRY") + .unwrap_or_else(|_| "public.ecr.aws/docker/library/".to_string()) +} diff --git a/tests/postgres/common.rs b/tests/postgres/common.rs new file mode 100644 index 0000000..5c9ab27 --- /dev/null +++ b/tests/postgres/common.rs @@ -0,0 +1,89 @@ +use bollard::secret::HealthConfig; +#[cfg(feature = "postgres")] +use datafusion_table_providers::sql::db_connection_pool::postgrespool::PostgresConnectionPool; +use rand::Rng; +use secrecy::SecretString; +use std::collections::HashMap; +use tracing::instrument; + +use crate::{ + container_registry, + docker::{ContainerRunnerBuilder, RunningContainer}, +}; + +const PG_PASSWORD: &str = "runtime-integration-test-pw"; +const PG_DOCKER_CONTAINER: &str = "runtime-integration-test-postgres"; + +fn get_pg_params(port: usize) -> HashMap { + let mut params = HashMap::new(); + params.insert( + "pg_host".to_string(), + SecretString::from("localhost".to_string()), + ); + params.insert("pg_port".to_string(), SecretString::from(port.to_string())); + params.insert( + "pg_user".to_string(), + SecretString::from("postgres".to_string()), + ); + params.insert( + "pg_pass".to_string(), + SecretString::from(PG_PASSWORD.to_string()), + ); + params.insert( + "pg_db".to_string(), + SecretString::from("postgres".to_string()), + ); + params.insert( + "pg_sslmode".to_string(), + SecretString::from("disable".to_string()), + ); + params +} + +pub(super) fn get_random_port() -> usize { + rand::thread_rng().gen_range(15432..65535) +} + +#[instrument] +pub(super) async fn start_postgres_docker_container( + port: usize, +) -> Result, anyhow::Error> { + let container_name = format!("{PG_DOCKER_CONTAINER}-{port}"); + let container_name: &'static str = Box::leak(container_name.into_boxed_str()); + let port = if let Ok(port) = port.try_into() { + port + } else { + 15432 + }; + + let running_container = ContainerRunnerBuilder::new(container_name) + .image(format!("{}postgres:latest", container_registry())) + .add_port_binding(5432, port) + .add_env_var("POSTGRES_PASSWORD", PG_PASSWORD) + .healthcheck(HealthConfig { + test: Some(vec![ + "CMD-SHELL".to_string(), + "pg_isready -U postgres".to_string(), + ]), + interval: Some(250_000_000), // 250ms + timeout: Some(100_000_000), // 100ms + retries: Some(5), + start_period: Some(500_000_000), // 100ms + start_interval: None, + }) + .build()? + .run() + .await?; + + tokio::time::sleep(std::time::Duration::from_millis(5000)).await; + Ok(running_container) +} + +#[instrument] +pub(super) async fn get_postgres_connection_pool( + port: usize, +) -> Result { + let pool = PostgresConnectionPool::new(get_pg_params(port)).await?; + + Ok(pool) +} diff --git a/tests/postgres/mod.rs b/tests/postgres/mod.rs new file mode 100644 index 0000000..1b0219b --- /dev/null +++ b/tests/postgres/mod.rs @@ -0,0 +1,148 @@ +use super::*; +use crate::arrow_record_batch_gen::*; +use datafusion::execution::context::SessionContext; +#[cfg(feature = "postgres")] +use datafusion_table_providers::sql::arrow_sql_gen::statement::{ + CreateTableBuilder, InsertBuilder, +}; +#[cfg(feature = "postgres")] +use datafusion_table_providers::{ + postgres::DynPostgresConnectionPool, sql::sql_provider_datafusion::SqlTable, +}; +use std::sync::Arc; +mod common; + +async fn arrow_postgres_round_trip( + port: usize, + arrow_record: RecordBatch, + table_name: &str, +) -> Result<(), String> { + tracing::debug!("Running tests on {table_name}"); + let ctx = SessionContext::new(); + + let pool = common::get_postgres_connection_pool(port) + .await + .map_err(|e| format!("Failed to create postgres connection pool: {e}"))?; + + let db_conn = pool + .connect_direct() + .await + .expect("connection can be established"); + + // Create postgres table from arrow records and insert arrow records + let schema = Arc::clone(&arrow_record.schema()); + let create_table_stmts = CreateTableBuilder::new(schema, table_name).build_postgres(); + let insert_table_stmt = InsertBuilder::new(table_name, vec![arrow_record.clone()]) + .build_postgres(None) + .map_err(|e| format!("Unable to construct postgres insert statement: {e}"))?; + + // Test arrow -> Postgres row coverage + for create_table_stmt in create_table_stmts { + let _ = db_conn + .conn + .execute(&create_table_stmt, &[]) + .await + .map_err(|e| format!("Postgres table cannot be created: {e}")); + } + let _ = db_conn + .conn + .execute(&insert_table_stmt, &[]) + .await + .map_err(|e| format!("Postgres table cannot be created: {e}")); + + // Register datafusion table, test row -> arrow conversion + let sqltable_pool: Arc = Arc::new(pool); + let table = SqlTable::new("postgres", &sqltable_pool, table_name, None) + .await + .expect("table can be created"); + ctx.register_table(table_name, Arc::new(table)) + .expect("Table should be registered"); + let sql = format!("SELECT * FROM {table_name}"); + let df = ctx + .sql(&sql) + .await + .expect("DataFrame can't be created from query"); + + let record_batch = df.collect().await.expect("RecordBatch can't be collected"); + + // Print original arrow record batch and record batch converted from postgres row in terminal + // Check if the values are the same + tracing::debug!("Original Arrow Record Batch: {:?}", arrow_record.columns()); + tracing::debug!( + "Postgres returned Record Batch: {:?}", + record_batch[0].columns() + ); + + // Check results + assert_eq!(record_batch.len(), 1); + assert_eq!(record_batch[0].num_rows(), arrow_record.num_rows()); + assert_eq!(record_batch[0].num_columns(), arrow_record.num_columns()); + + Ok(()) +} + +#[tokio::test] +#[cfg(feature = "postgres")] +async fn test_arrow_postgres_types_conversion() -> Result<(), String> { + let _tracing = init_tracing(Some("integration=debug,info")); + let port = common::get_random_port(); + let _running_container = common::start_postgres_docker_container(port) + .await + .map_err(|e| format!("Failed to create postgres container: {e}"))?; + + tracing::debug!("Container started"); + + arrow_postgres_round_trip(port, get_arrow_binary_record_batch(), "binary_types") + .await + .unwrap(); + + arrow_postgres_round_trip(port, get_arrow_int_recordbatch(), "int_types") + .await + .unwrap(); + + arrow_postgres_round_trip(port, get_arrow_float_record_batch(), "float_types") + .await + .unwrap(); + + arrow_postgres_round_trip(port, get_arrow_utf8_record_batch(), "utf8_types") + .await + .unwrap(); + + // arrow_postgres_round_trip(port, get_arrow_time_record_batch(), "time_types") + // .await + // .unwrap(); + + arrow_postgres_round_trip(port, get_arrow_timestamp_record_batch(), "timestamp_types") + .await + .unwrap(); + + // arrow_postgres_round_trip(port, get_arrow_date_record_batch(), "date_types") + // .await + // .unwrap(); + + arrow_postgres_round_trip(port, get_arrow_struct_record_batch(), "struct_types") + .await + .unwrap(); + + // arrow_postgres_round_trip(port, get_arrow_decimal_record_batch(), "decimal_types") + // .await + // .unwrap(); + + // arrow_postgres_round_trip(port, get_arrow_interval_record_batch(), "interval_types") + // .await + // .unwrap(); + + // arrow_postgres_round_trip(port, get_arrow_duration_record_batch(), "duration_types") + // .await + // .unwrap(); + + // arrow_postgres_round_trip(port, get_arrow_list_record_batch(), "list_types") + // .await + // .unwrap(); + + arrow_postgres_round_trip(port, get_arrow_null_record_batch(), "null_types") + .await + .unwrap(); + + Ok(()) +} From da63e2941d072d85f3ad344579e43008e5f6d429 Mon Sep 17 00:00:00 2001 From: Sevenannn Date: Tue, 13 Aug 2024 23:18:13 -0700 Subject: [PATCH 2/3] Adding integration test in pr workflow, update makefile, fix typo in postgres arrow_sql_gen --- .github/workflows/pr.yaml | 101 +++++++++++++++--------------- Makefile | 2 +- src/sql/arrow_sql_gen/postgres.rs | 2 +- 3 files changed, 54 insertions(+), 51 deletions(-) diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index ccb6b6b..1a1d439 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -1,50 +1,53 @@ --- - name: pr - - on: - pull_request: - branches: - - main - - jobs: - lint: - name: Clippy - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v4 - - - uses: dtolnay/rust-toolchain@stable - - - name: Install Protoc - uses: arduino/setup-protoc@v3 - with: - repo-token: ${{ secrets.GITHUB_TOKEN }} - - - run: cargo clippy --all-features -- -D warnings - - build: - name: Build - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v4 - - - uses: dtolnay/rust-toolchain@stable - - # Putting this into a GitHub Actions matrix will run a separate job per matrix item, whereas in theory - # this can re-use the existing build cache to go faster. - - name: Build without default features - run: cargo check --no-default-features - - - name: Build with only duckdb - run: cargo check --no-default-features --features duckdb - - - name: Build with only postgres - run: cargo check --no-default-features --features postgres - - - name: Build with only sqlite - run: cargo check --no-default-features --features sqlite - - - name: Build with only mysql - run: cargo check --no-default-features --features mysql +name: pr + +on: + pull_request: + branches: + - main + +jobs: + lint: + name: Clippy + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - uses: dtolnay/rust-toolchain@stable + + - name: Install Protoc + uses: arduino/setup-protoc@v3 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + + - run: cargo clippy --all-features -- -D warnings + + build: + name: Build + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - uses: dtolnay/rust-toolchain@stable + + # Putting this into a GitHub Actions matrix will run a separate job per matrix item, whereas in theory + # this can re-use the existing build cache to go faster. + - name: Build without default features + run: cargo check --no-default-features + + - name: Build with only duckdb + run: cargo check --no-default-features --features duckdb + + - name: Build with only postgres + run: cargo check --no-default-features --features postgres + + - name: Build with only sqlite + run: cargo check --no-default-features --features sqlite + + - name: Build with only mysql + run: cargo check --no-default-features --features mysql + + - name: Run integration test + run: cargo test --test integration --no-default-features --features postgres -- --nocapture diff --git a/Makefile b/Makefile index 5dfc086..3f8d543 100644 --- a/Makefile +++ b/Makefile @@ -11,4 +11,4 @@ lint: .PHONY: test-integration test-integration: - cargo test --test integration --features postgres -- --nocapture \ No newline at end of file + cargo test --test integration --no-default-features --features postgres -- --nocapture \ No newline at end of file diff --git a/src/sql/arrow_sql_gen/postgres.rs b/src/sql/arrow_sql_gen/postgres.rs index 21800e6..7a572a5 100644 --- a/src/sql/arrow_sql_gen/postgres.rs +++ b/src/sql/arrow_sql_gen/postgres.rs @@ -269,7 +269,7 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result { }; let v = row.try_get::>(i).with_context(|_| { FailedToGetRowValueSnafu { - pg_type: Type::TIME, + pg_type: Type::JSON, } })?; From 7e562a622437f3e44c7b804e50589525748f5b58 Mon Sep 17 00:00:00 2001 From: Sevenannn Date: Tue, 13 Aug 2024 23:23:47 -0700 Subject: [PATCH 3/3] Update workflow --- .github/workflows/pr.yaml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 1a1d439..707bbcf 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -49,5 +49,14 @@ jobs: - name: Build with only mysql run: cargo check --no-default-features --features mysql + integration-test: + name: Integration Test + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - uses: dtolnay/rust-toolchain@stable + - name: Run integration test run: cargo test --test integration --no-default-features --features postgres -- --nocapture