Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dimensional counts #1

Merged
merged 4 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ jobs:
GITHUB_TOKEN: ${{ github.token }}
with:
upload_url: ${{ steps.create_release.outputs.upload_url }}
asset_path: ./target/lambda/query-metrics/bootstrap.zip
asset_path: ./target/lambda/query-metrics-lambda/bootstrap.zip
asset_name: query-metrics-bootstrap-${{ github.ref_name }}.zip
asset_content_type: application/zip
22 changes: 10 additions & 12 deletions lambdas/query-metrics/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
[package]
name = "query-metrics"
version = "0.1.0"
version = "0.2.0"
edition = "2021"

# Starting in Rust 1.62 you can use `cargo add` to add dependencies
# to your project.
#
# If you're using an older Rust version,
# download cargo-edit(https://github.com/killercup/cargo-edit#installation)
# to install the `add` subcommand.
#
# Running `cargo add DEPENDENCY_NAME` will
# add the latest version of a dependency to the list,
# and it will keep the alphabetic ordering for you.
[[bin]]
name = "query-metrics"
path = "src/cli.rs"

[[bin]]
name = "query-metrics-lambda"
path = "src/main.rs"

[dependencies]
anyhow = "1.0.79"
Expand All @@ -21,7 +18,8 @@ aws-sdk-cloudwatch = "1.11.0"
aws-sdk-config = "1.11.0"
aws_lambda_events = { version = "0.12.0" }
base64 = "0.21.7"
deltalake = { version = "0.16.5", features = ["datafusion", "s3"] }
deltalake-core = { version = "0.17.0", features = ["datafusion"] }
deltalake-aws = { version = "0.1.0" }

lambda_runtime = "0.8.3"
serde = { version = "1.0.195", features = ["derive"] }
Expand Down
97 changes: 97 additions & 0 deletions lambdas/query-metrics/src/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
///
/// The CLI helps test a manifest
///
use std::collections::HashMap;
use std::sync::Arc;

use deltalake_core::arrow::util::pretty::print_batches;
use deltalake_core::arrow::{array::PrimitiveArray, datatypes::Int64Type};
use deltalake_core::datafusion::common::*;
use deltalake_core::datafusion::execution::context::SessionContext;
use tracing::log::*;

mod config;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
deltalake_aws::register_handlers(None);

tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_target(true)
.init();

let conf = config::Configuration::from_file("prod-manifest.yml");

for (name, gauges) in conf.gauges.iter() {
for gauge in gauges.iter() {
println!("Querying the {name} table");
let ctx = SessionContext::new();
let table = deltalake_core::open_table(&gauge.url)
.await
.expect("Failed to register table");
println!("table opened");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
ctx.register_table("source", Arc::new(table))
.expect("Failed to register table with datafusion");

println!("Running query: {}", gauge.query);

let df = ctx
.sql(&gauge.query)
.await
.expect("Failed to execute query");

match gauge.measurement_type {
config::Measurement::Count => {
let count = df.count().await.expect("Failed to collect batches");
println!("Counted {count} rows");
}
config::Measurement::DimensionalCount => {
println!("Need to run dimensional count");
let batches = df.collect().await.expect("Failed to collect batches");
//let batches = df.explain(false, false).unwrap().collect().await.expect("Failed to collect batches");
let _ = print_batches(&batches);

println!("I see this many batches: {}", batches.len());
// Interestingly the collect produces a lot of zero row batches
for batch in batches.iter().filter(|b| b.num_rows() > 0) {
if let Some(_counts) = batch.column_by_name("count") {
// Fetching the count column just to ensure that it exists before doing
// any more computation
let schema = batch.schema();
let fields = schema.fields();

for row in 0..batch.num_rows() {
let mut dimensions: HashMap<String, String> = HashMap::new();
let mut counted = false;
let mut count = 0;

for (idx, column) in batch.columns().iter().enumerate() {
let field = &fields[idx];
let name = field.name();
if name == "count" {
let arr: &PrimitiveArray<Int64Type> =
arrow::array::cast::as_primitive_array(&column);
count = arr.value(row);
counted = true;
} else {
let arr = arrow::array::cast::as_string_array(&column);
dimensions.insert(name.into(), arr.value(row).into());
}
}

if counted {
println!("{count}: {dimensions:?}");
}
}
} else {
error!("The result set must have a column named `count`");
}
}
}
}
}
}
Ok(())
}
4 changes: 2 additions & 2 deletions lambdas/query-metrics/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ pub struct Configuration {
}

impl Configuration {
#[cfg(test)]
fn from_file<S: Into<String> + AsRef<Path>>(location: S) -> Self {
pub fn from_file<S: Into<String> + AsRef<Path>>(location: S) -> Self {
serde_yaml::from_reader(File::open(location).expect("Failed to open manifest"))
.expect("Failed to deserialize")
}
Expand All @@ -39,6 +38,7 @@ pub struct Gauge {
#[serde(rename_all = "lowercase")]
pub enum Measurement {
Count,
DimensionalCount,
}

#[cfg(test)]
Expand Down
74 changes: 70 additions & 4 deletions lambdas/query-metrics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,23 @@
use aws_lambda_events::event::cloudwatch_events::CloudWatchEvent;
use aws_sdk_cloudwatch::{
primitives::DateTime,
types::{MetricDatum, StandardUnit},
types::{Dimension, MetricDatum, StandardUnit},
};
use deltalake::datafusion::common::*;
use deltalake::datafusion::execution::context::SessionContext;
use deltalake_core::arrow::{array::PrimitiveArray, datatypes::Int64Type};
use deltalake_core::datafusion::common::*;
use deltalake_core::datafusion::execution::context::SessionContext;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
use tracing::log::*;

use std::collections::HashMap;
use std::sync::Arc;
use std::time::SystemTime;

mod config;

async fn function_handler(_event: LambdaEvent<CloudWatchEvent>) -> Result<(), Error> {
deltalake_aws::register_handlers(None);

let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
let cloudwatch = aws_sdk_cloudwatch::Client::new(&aws_config);

Expand All @@ -31,7 +35,7 @@ async fn function_handler(_event: LambdaEvent<CloudWatchEvent>) -> Result<(), Er
for gauge in gauges.iter() {
debug!("Querying the {name} table");
let ctx = SessionContext::new();
let table = deltalake::open_table(&gauge.url)
let table = deltalake_core::open_table(&gauge.url)
.await
.expect("Failed to register table");
ctx.register_table("source", Arc::new(table))
Expand Down Expand Up @@ -64,6 +68,68 @@ async fn function_handler(_event: LambdaEvent<CloudWatchEvent>) -> Result<(), Er
.await?;
debug!("Result of CloudWatch send: {res:?}");
}
config::Measurement::DimensionalCount => {
let batches = df.collect().await.expect("Failed to collect batches");
debug!("I see this many batches: {}", batches.len());

// Interestingly the collect produces a lot of zero row batches
for batch in batches.iter().filter(|b| b.num_rows() > 0) {
if let Some(_counts) = batch.column_by_name("count") {
// Fetching the count column just to ensure that it exists before doing
// any more computation
let schema = batch.schema();
let fields = schema.fields();

for row in 0..batch.num_rows() {
let mut dimensions: HashMap<String, String> = HashMap::new();
let mut counted = false;
let mut count = 0;

for (idx, column) in batch.columns().iter().enumerate() {
let field = &fields[idx];
let name = field.name();
if name == "count" {
let arr: &PrimitiveArray<Int64Type> =
arrow::array::cast::as_primitive_array(&column);
count = arr.value(row);
counted = true;
} else {
let arr = arrow::array::cast::as_string_array(&column);
dimensions.insert(name.into(), arr.value(row).into());
}
}

if counted {
debug!("{count}: {dimensions:?}");
let mut dims: Vec<Dimension> = vec![];

for (key, value) in dimensions.iter() {
dims.push(
Dimension::builder().name(key).value(value).build(),
);
}
let datum = MetricDatum::builder()
.metric_name(&gauge.name)
.timestamp(DateTime::from(SystemTime::now()))
.set_dimensions(Some(dims))
.value(count as f64)
.unit(StandardUnit::Count)
.build();

let res = cloudwatch
.put_metric_data()
.namespace(format!("DataLake/{name}"))
.metric_data(datum)
.send()
.await?;
debug!("Result of CloudWatch send: {res:?}");
}
}
} else {
error!("The result set must have a column named `count`");
}
}
}
}
}
}
Expand Down
Loading