diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 57bcf6b..50d4ca8 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -83,6 +83,6 @@ jobs: GITHUB_TOKEN: ${{ github.token }} with: upload_url: ${{ steps.create_release.outputs.upload_url }} - asset_path: ./target/lambda/query-metrics/bootstrap.zip + asset_path: ./target/lambda/query-metrics-lambda/bootstrap.zip asset_name: query-metrics-bootstrap-${{ github.ref_name }}.zip asset_content_type: application/zip diff --git a/lambdas/query-metrics/Cargo.toml b/lambdas/query-metrics/Cargo.toml index 70d3e10..525c59b 100644 --- a/lambdas/query-metrics/Cargo.toml +++ b/lambdas/query-metrics/Cargo.toml @@ -1,18 +1,15 @@ [package] name = "query-metrics" -version = "0.1.0" +version = "0.2.0" edition = "2021" -# Starting in Rust 1.62 you can use `cargo add` to add dependencies -# to your project. -# -# If you're using an older Rust version, -# download cargo-edit(https://github.com/killercup/cargo-edit#installation) -# to install the `add` subcommand. -# -# Running `cargo add DEPENDENCY_NAME` will -# add the latest version of a dependency to the list, -# and it will keep the alphabetic ordering for you. +[[bin]] +name = "query-metrics" +path = "src/cli.rs" + +[[bin]] +name = "query-metrics-lambda" +path = "src/main.rs" [dependencies] anyhow = "1.0.79" @@ -21,7 +18,8 @@ aws-sdk-cloudwatch = "1.11.0" aws-sdk-config = "1.11.0" aws_lambda_events = { version = "0.12.0" } base64 = "0.21.7" -deltalake = { version = "0.16.5", features = ["datafusion", "s3"] } +deltalake-core = { version = "0.17.0", features = ["datafusion"] } +deltalake-aws = { version = "0.1.0" } lambda_runtime = "0.8.3" serde = { version = "1.0.195", features = ["derive"] } diff --git a/lambdas/query-metrics/src/cli.rs b/lambdas/query-metrics/src/cli.rs new file mode 100644 index 0000000..c0a42a7 --- /dev/null +++ b/lambdas/query-metrics/src/cli.rs @@ -0,0 +1,97 @@ +/// +/// The CLI helps test a manifest +/// +use std::collections::HashMap; +use std::sync::Arc; + +use deltalake_core::arrow::util::pretty::print_batches; +use deltalake_core::arrow::{array::PrimitiveArray, datatypes::Int64Type}; +use deltalake_core::datafusion::common::*; +use deltalake_core::datafusion::execution::context::SessionContext; +use tracing::log::*; + +mod config; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + deltalake_aws::register_handlers(None); + + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_target(true) + .init(); + + let conf = config::Configuration::from_file("prod-manifest.yml"); + + for (name, gauges) in conf.gauges.iter() { + for gauge in gauges.iter() { + println!("Querying the {name} table"); + let ctx = SessionContext::new(); + let table = deltalake_core::open_table(&gauge.url) + .await + .expect("Failed to register table"); + println!("table opened"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + ctx.register_table("source", Arc::new(table)) + .expect("Failed to register table with datafusion"); + + println!("Running query: {}", gauge.query); + + let df = ctx + .sql(&gauge.query) + .await + .expect("Failed to execute query"); + + match gauge.measurement_type { + config::Measurement::Count => { + let count = df.count().await.expect("Failed to collect batches"); + println!("Counted {count} rows"); + } + config::Measurement::DimensionalCount => { + println!("Need to run dimensional count"); + let batches = df.collect().await.expect("Failed to collect batches"); + //let batches = df.explain(false, false).unwrap().collect().await.expect("Failed to collect batches"); + let _ = print_batches(&batches); + + println!("I see this many batches: {}", batches.len()); + // Interestingly the collect produces a lot of zero row batches + for batch in batches.iter().filter(|b| b.num_rows() > 0) { + if let Some(_counts) = batch.column_by_name("count") { + // Fetching the count column just to ensure that it exists before doing + // any more computation + let schema = batch.schema(); + let fields = schema.fields(); + + for row in 0..batch.num_rows() { + let mut dimensions: HashMap = HashMap::new(); + let mut counted = false; + let mut count = 0; + + for (idx, column) in batch.columns().iter().enumerate() { + let field = &fields[idx]; + let name = field.name(); + if name == "count" { + let arr: &PrimitiveArray = + arrow::array::cast::as_primitive_array(&column); + count = arr.value(row); + counted = true; + } else { + let arr = arrow::array::cast::as_string_array(&column); + dimensions.insert(name.into(), arr.value(row).into()); + } + } + + if counted { + println!("{count}: {dimensions:?}"); + } + } + } else { + error!("The result set must have a column named `count`"); + } + } + } + } + } + } + Ok(()) +} diff --git a/lambdas/query-metrics/src/config.rs b/lambdas/query-metrics/src/config.rs index ac7ea76..247e52d 100644 --- a/lambdas/query-metrics/src/config.rs +++ b/lambdas/query-metrics/src/config.rs @@ -13,8 +13,7 @@ pub struct Configuration { } impl Configuration { - #[cfg(test)] - fn from_file + AsRef>(location: S) -> Self { + pub fn from_file + AsRef>(location: S) -> Self { serde_yaml::from_reader(File::open(location).expect("Failed to open manifest")) .expect("Failed to deserialize") } @@ -39,6 +38,7 @@ pub struct Gauge { #[serde(rename_all = "lowercase")] pub enum Measurement { Count, + DimensionalCount, } #[cfg(test)] diff --git a/lambdas/query-metrics/src/main.rs b/lambdas/query-metrics/src/main.rs index 5b07fe8..ac3472a 100644 --- a/lambdas/query-metrics/src/main.rs +++ b/lambdas/query-metrics/src/main.rs @@ -5,19 +5,23 @@ use aws_lambda_events::event::cloudwatch_events::CloudWatchEvent; use aws_sdk_cloudwatch::{ primitives::DateTime, - types::{MetricDatum, StandardUnit}, + types::{Dimension, MetricDatum, StandardUnit}, }; -use deltalake::datafusion::common::*; -use deltalake::datafusion::execution::context::SessionContext; +use deltalake_core::arrow::{array::PrimitiveArray, datatypes::Int64Type}; +use deltalake_core::datafusion::common::*; +use deltalake_core::datafusion::execution::context::SessionContext; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; use tracing::log::*; +use std::collections::HashMap; use std::sync::Arc; use std::time::SystemTime; mod config; async fn function_handler(_event: LambdaEvent) -> Result<(), Error> { + deltalake_aws::register_handlers(None); + let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; let cloudwatch = aws_sdk_cloudwatch::Client::new(&aws_config); @@ -31,7 +35,7 @@ async fn function_handler(_event: LambdaEvent) -> Result<(), Er for gauge in gauges.iter() { debug!("Querying the {name} table"); let ctx = SessionContext::new(); - let table = deltalake::open_table(&gauge.url) + let table = deltalake_core::open_table(&gauge.url) .await .expect("Failed to register table"); ctx.register_table("source", Arc::new(table)) @@ -64,6 +68,68 @@ async fn function_handler(_event: LambdaEvent) -> Result<(), Er .await?; debug!("Result of CloudWatch send: {res:?}"); } + config::Measurement::DimensionalCount => { + let batches = df.collect().await.expect("Failed to collect batches"); + debug!("I see this many batches: {}", batches.len()); + + // Interestingly the collect produces a lot of zero row batches + for batch in batches.iter().filter(|b| b.num_rows() > 0) { + if let Some(_counts) = batch.column_by_name("count") { + // Fetching the count column just to ensure that it exists before doing + // any more computation + let schema = batch.schema(); + let fields = schema.fields(); + + for row in 0..batch.num_rows() { + let mut dimensions: HashMap = HashMap::new(); + let mut counted = false; + let mut count = 0; + + for (idx, column) in batch.columns().iter().enumerate() { + let field = &fields[idx]; + let name = field.name(); + if name == "count" { + let arr: &PrimitiveArray = + arrow::array::cast::as_primitive_array(&column); + count = arr.value(row); + counted = true; + } else { + let arr = arrow::array::cast::as_string_array(&column); + dimensions.insert(name.into(), arr.value(row).into()); + } + } + + if counted { + debug!("{count}: {dimensions:?}"); + let mut dims: Vec = vec![]; + + for (key, value) in dimensions.iter() { + dims.push( + Dimension::builder().name(key).value(value).build(), + ); + } + let datum = MetricDatum::builder() + .metric_name(&gauge.name) + .timestamp(DateTime::from(SystemTime::now())) + .set_dimensions(Some(dims)) + .value(count as f64) + .unit(StandardUnit::Count) + .build(); + + let res = cloudwatch + .put_metric_data() + .namespace(format!("DataLake/{name}")) + .metric_data(datum) + .send() + .await?; + debug!("Result of CloudWatch send: {res:?}"); + } + } + } else { + error!("The result set must have a column named `count`"); + } + } + } } } }