Skip to content

Commit

Permalink
Merge pull request #1 from scribd/dimensionsa
Browse files Browse the repository at this point in the history
Add support for dimensional counts
  • Loading branch information
rtyler authored Feb 12, 2024
2 parents fa0608d + 55fd3c3 commit 826ba5a
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ jobs:
GITHUB_TOKEN: ${{ github.token }}
with:
upload_url: ${{ steps.create_release.outputs.upload_url }}
asset_path: ./target/lambda/query-metrics/bootstrap.zip
asset_path: ./target/lambda/query-metrics-lambda/bootstrap.zip
asset_name: query-metrics-bootstrap-${{ github.ref_name }}.zip
asset_content_type: application/zip
22 changes: 10 additions & 12 deletions lambdas/query-metrics/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
[package]
name = "query-metrics"
version = "0.1.0"
version = "0.2.0"
edition = "2021"

# Starting in Rust 1.62 you can use `cargo add` to add dependencies
# to your project.
#
# If you're using an older Rust version,
# download cargo-edit(https://github.com/killercup/cargo-edit#installation)
# to install the `add` subcommand.
#
# Running `cargo add DEPENDENCY_NAME` will
# add the latest version of a dependency to the list,
# and it will keep the alphabetic ordering for you.
[[bin]]
name = "query-metrics"
path = "src/cli.rs"

[[bin]]
name = "query-metrics-lambda"
path = "src/main.rs"

[dependencies]
anyhow = "1.0.79"
Expand All @@ -21,7 +18,8 @@ aws-sdk-cloudwatch = "1.11.0"
aws-sdk-config = "1.11.0"
aws_lambda_events = { version = "0.12.0" }
base64 = "0.21.7"
deltalake = { version = "0.16.5", features = ["datafusion", "s3"] }
deltalake-core = { version = "0.17.0", features = ["datafusion"] }
deltalake-aws = { version = "0.1.0" }

lambda_runtime = "0.8.3"
serde = { version = "1.0.195", features = ["derive"] }
Expand Down
97 changes: 97 additions & 0 deletions lambdas/query-metrics/src/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
///
/// The CLI helps test a manifest
///
use std::collections::HashMap;
use std::sync::Arc;

use deltalake_core::arrow::util::pretty::print_batches;
use deltalake_core::arrow::{array::PrimitiveArray, datatypes::Int64Type};
use deltalake_core::datafusion::common::*;
use deltalake_core::datafusion::execution::context::SessionContext;
use tracing::log::*;

mod config;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
deltalake_aws::register_handlers(None);

tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_target(true)
.init();

let conf = config::Configuration::from_file("prod-manifest.yml");

for (name, gauges) in conf.gauges.iter() {
for gauge in gauges.iter() {
println!("Querying the {name} table");
let ctx = SessionContext::new();
let table = deltalake_core::open_table(&gauge.url)
.await
.expect("Failed to register table");
println!("table opened");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
ctx.register_table("source", Arc::new(table))
.expect("Failed to register table with datafusion");

println!("Running query: {}", gauge.query);

let df = ctx
.sql(&gauge.query)
.await
.expect("Failed to execute query");

match gauge.measurement_type {
config::Measurement::Count => {
let count = df.count().await.expect("Failed to collect batches");
println!("Counted {count} rows");
}
config::Measurement::DimensionalCount => {
println!("Need to run dimensional count");
let batches = df.collect().await.expect("Failed to collect batches");
//let batches = df.explain(false, false).unwrap().collect().await.expect("Failed to collect batches");
let _ = print_batches(&batches);

println!("I see this many batches: {}", batches.len());
// Interestingly the collect produces a lot of zero row batches
for batch in batches.iter().filter(|b| b.num_rows() > 0) {
if let Some(_counts) = batch.column_by_name("count") {
// Fetching the count column just to ensure that it exists before doing
// any more computation
let schema = batch.schema();
let fields = schema.fields();

for row in 0..batch.num_rows() {
let mut dimensions: HashMap<String, String> = HashMap::new();
let mut counted = false;
let mut count = 0;

for (idx, column) in batch.columns().iter().enumerate() {
let field = &fields[idx];
let name = field.name();
if name == "count" {
let arr: &PrimitiveArray<Int64Type> =
arrow::array::cast::as_primitive_array(&column);
count = arr.value(row);
counted = true;
} else {
let arr = arrow::array::cast::as_string_array(&column);
dimensions.insert(name.into(), arr.value(row).into());
}
}

if counted {
println!("{count}: {dimensions:?}");
}
}
} else {
error!("The result set must have a column named `count`");
}
}
}
}
}
}
Ok(())
}
4 changes: 2 additions & 2 deletions lambdas/query-metrics/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ pub struct Configuration {
}

impl Configuration {
#[cfg(test)]
fn from_file<S: Into<String> + AsRef<Path>>(location: S) -> Self {
pub fn from_file<S: Into<String> + AsRef<Path>>(location: S) -> Self {
serde_yaml::from_reader(File::open(location).expect("Failed to open manifest"))
.expect("Failed to deserialize")
}
Expand All @@ -39,6 +38,7 @@ pub struct Gauge {
#[serde(rename_all = "lowercase")]
pub enum Measurement {
Count,
DimensionalCount,
}

#[cfg(test)]
Expand Down
74 changes: 70 additions & 4 deletions lambdas/query-metrics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,23 @@
use aws_lambda_events::event::cloudwatch_events::CloudWatchEvent;
use aws_sdk_cloudwatch::{
primitives::DateTime,
types::{MetricDatum, StandardUnit},
types::{Dimension, MetricDatum, StandardUnit},
};
use deltalake::datafusion::common::*;
use deltalake::datafusion::execution::context::SessionContext;
use deltalake_core::arrow::{array::PrimitiveArray, datatypes::Int64Type};
use deltalake_core::datafusion::common::*;
use deltalake_core::datafusion::execution::context::SessionContext;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
use tracing::log::*;

use std::collections::HashMap;
use std::sync::Arc;
use std::time::SystemTime;

mod config;

async fn function_handler(_event: LambdaEvent<CloudWatchEvent>) -> Result<(), Error> {
deltalake_aws::register_handlers(None);

let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
let cloudwatch = aws_sdk_cloudwatch::Client::new(&aws_config);

Expand All @@ -31,7 +35,7 @@ async fn function_handler(_event: LambdaEvent<CloudWatchEvent>) -> Result<(), Er
for gauge in gauges.iter() {
debug!("Querying the {name} table");
let ctx = SessionContext::new();
let table = deltalake::open_table(&gauge.url)
let table = deltalake_core::open_table(&gauge.url)
.await
.expect("Failed to register table");
ctx.register_table("source", Arc::new(table))
Expand Down Expand Up @@ -64,6 +68,68 @@ async fn function_handler(_event: LambdaEvent<CloudWatchEvent>) -> Result<(), Er
.await?;
debug!("Result of CloudWatch send: {res:?}");
}
config::Measurement::DimensionalCount => {
let batches = df.collect().await.expect("Failed to collect batches");
debug!("I see this many batches: {}", batches.len());

// Interestingly the collect produces a lot of zero row batches
for batch in batches.iter().filter(|b| b.num_rows() > 0) {
if let Some(_counts) = batch.column_by_name("count") {
// Fetching the count column just to ensure that it exists before doing
// any more computation
let schema = batch.schema();
let fields = schema.fields();

for row in 0..batch.num_rows() {
let mut dimensions: HashMap<String, String> = HashMap::new();
let mut counted = false;
let mut count = 0;

for (idx, column) in batch.columns().iter().enumerate() {
let field = &fields[idx];
let name = field.name();
if name == "count" {
let arr: &PrimitiveArray<Int64Type> =
arrow::array::cast::as_primitive_array(&column);
count = arr.value(row);
counted = true;
} else {
let arr = arrow::array::cast::as_string_array(&column);
dimensions.insert(name.into(), arr.value(row).into());
}
}

if counted {
debug!("{count}: {dimensions:?}");
let mut dims: Vec<Dimension> = vec![];

for (key, value) in dimensions.iter() {
dims.push(
Dimension::builder().name(key).value(value).build(),
);
}
let datum = MetricDatum::builder()
.metric_name(&gauge.name)
.timestamp(DateTime::from(SystemTime::now()))
.set_dimensions(Some(dims))
.value(count as f64)
.unit(StandardUnit::Count)
.build();

let res = cloudwatch
.put_metric_data()
.namespace(format!("DataLake/{name}"))
.metric_data(datum)
.send()
.await?;
debug!("Result of CloudWatch send: {res:?}");
}
}
} else {
error!("The result set must have a column named `count`");
}
}
}
}
}
}
Expand Down

0 comments on commit 826ba5a

Please sign in to comment.