Skip to content

Commit

Permalink
Add the Numeric metric type and add some documentation and tooling
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed Sep 4, 2024
1 parent c7b8054 commit ef00ee4
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 3 deletions.
22 changes: 22 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

.PHONY: help
help: ## Show this help
@egrep -h '\s##\s' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'

.PHONY: all build build-release check test clean
all: check build test ## Perform all the checks builds and testing

check: ## Ensure that the crate meets the basic formatting and structure
cargo fmt --check
cargo clippy

build: ## Build the crate with each set of features
./ci/build.sh

build-release: check test ## Build the release versions of Lambdas
./ci/build-release.sh
test: ## Run the crate's tests with each set of features
./ci/test.sh

clean: ## Clean up resources from build
cargo clean
7 changes: 7 additions & 0 deletions ci/build-release.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/sh

if [ -f "${HOME}/.cargo/env" ]; then
. "${HOME}/.cargo/env"
fi;

exec cargo lambda build --release --output-format zip
11 changes: 11 additions & 0 deletions ci/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/sh

if [ -f "${HOME}/.cargo/env" ]; then
. "${HOME}/.cargo/env"
fi;

set -xe

cargo fmt --check

exec cargo build
6 changes: 6 additions & 0 deletions ci/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/sh
if [ -f "${HOME}/.cargo/env" ]; then
. "${HOME}/.cargo/env"
fi;

exec cargo test --verbose
2 changes: 1 addition & 1 deletion lambdas/query-metrics/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "query-metrics"
version = "0.3.0"
version = "0.4.0"
edition = "2021"

[[bin]]
Expand Down
50 changes: 50 additions & 0 deletions lambdas/query-metrics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@

# query-metrics

This Lambda will execute DataFusion queries defined in a YAML file and submit
the results to CloudWatch Metrics which can be alerted upon or forwarded into
other tools



## Types

### Count

This is the simplest type of query and will simply record the number of rows from the query, e.g.:

```sql
SELECT id FROM source WHERE id > 1000 AND id <= 2000
```

Would consistently produce a counted metric value of `1000`.


### Numeric

Numeric is likely the most common and easy to understand query. There should only be one row in the result set and all of its values should be numeric values, e.g.:

```sql
SELECT COUNT(*) AS total, SUM(CASE WHEN (id > 1000 AND id <= 2000) THEN 1 ELSE 0 END) AS valid_ids FROM source
```

This will produce a result set of:

```
+-------+-----------+
| total | valid_ids |
+-------+-----------+
| 4096 | 1000 |
+-------+-----------+
```

Which wiull produce metric values of:

* `total` 4096
* `valid_ids` 1000


### Dimensional Count

The dimensional count is the most advanced query type and can be used to
provide dimensional (or tagged) metrics in CloudWatch
28 changes: 28 additions & 0 deletions lambdas/query-metrics/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,34 @@ async fn main() -> anyhow::Result<()> {
let count = df.count().await.expect("Failed to collect batches");
println!("Counted {count} rows");
}
config::Measurement::Numeric => {
println!("Need to run dimensional count");
let batches = df.collect().await.expect("Failed to collect batches");
let _ = print_batches(&batches);

println!("I see this many batches: {}", batches.len());
let mut dimensions: HashMap<String, i64> = HashMap::new();
for batch in batches.iter().filter(|b| b.num_rows() > 0) {
let schema = batch.schema();
let fields = schema.fields();
for row in 0..batch.num_rows() {
for (idx, column) in batch.columns().iter().enumerate() {
let field = &fields[idx];
let name = field.name();

if !dimensions.contains_key(name) {
dimensions.insert(name.to_string(), 0);
}
let current = dimensions.get(name).expect("Failed to retrieve");
let arr: &PrimitiveArray<Int64Type> =
arrow::array::cast::as_primitive_array(&column);
let count = arr.value(row);
dimensions.insert(name.to_string(), count + current);
}
}
}
println!("results: {dimensions:?}");
}
config::Measurement::DimensionalCount => {
println!("Need to run dimensional count");
let batches = df.collect().await.expect("Failed to collect batches");
Expand Down
1 change: 1 addition & 0 deletions lambdas/query-metrics/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct Gauge {
#[serde(rename_all = "lowercase")]
pub enum Measurement {
Count,
Numeric,
DimensionalCount,
}

Expand Down
45 changes: 43 additions & 2 deletions lambdas/query-metrics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn function_handler(_event: LambdaEvent<CloudWatchEvent>) -> Result<(), Er
std::env::var("MANIFEST_B64").expect("The `MANIFEST_B64` variable was not defined"),
)
.expect("The `MANIFEST_B64` environment variable does not contain a valid manifest yml");
debug!("Configuration loaded: {conf:?}");
info!("Configuration loaded: {conf:?}");

for (name, gauges) in conf.gauges.iter() {
for gauge in gauges.iter() {
Expand All @@ -41,7 +41,7 @@ async fn function_handler(_event: LambdaEvent<CloudWatchEvent>) -> Result<(), Er
ctx.register_table("source", Arc::new(table))
.expect("Failed to register table with datafusion");

debug!("Running query: {}", gauge.query);
info!("Running query: {}", gauge.query);

let df = ctx
.sql(&gauge.query)
Expand All @@ -68,6 +68,47 @@ async fn function_handler(_event: LambdaEvent<CloudWatchEvent>) -> Result<(), Er
.await?;
debug!("Result of CloudWatch send: {res:?}");
}
config::Measurement::Numeric => {
let batches = df.collect().await.expect("Failed to collect batches");
let mut values: HashMap<String, i64> = HashMap::new();

for batch in batches.iter().filter(|b| b.num_rows() > 0) {
let schema = batch.schema();
let fields = schema.fields();
for row in 0..batch.num_rows() {
for (idx, column) in batch.columns().iter().enumerate() {
let field = &fields[idx];
let name = field.name();

if !values.contains_key(name) {
values.insert(name.to_string(), 0);
}
let current = values.get(name).expect("Failed to retrieve");
let arr: &PrimitiveArray<Int64Type> =
arrow::array::cast::as_primitive_array(&column);
let count = arr.value(row);
values.insert(name.to_string(), count + current);
}
}
}
info!("results: {values:?}");
for (key, value) in values.into_iter() {
let datum = MetricDatum::builder()
.metric_name(&key)
.timestamp(DateTime::from(SystemTime::now()))
.unit(StandardUnit::Count)
.value(value as f64)
.build();

let res = cloudwatch
.put_metric_data()
.namespace(format!("DataLake/{name}"))
.metric_data(datum)
.send()
.await?;
info!("submitting {key} to cloudwatch: {res:?}");
}
}
config::Measurement::DimensionalCount => {
let batches = df.collect().await.expect("Failed to collect batches");
debug!("I see this many batches: {}", batches.len());
Expand Down

0 comments on commit ef00ee4

Please sign in to comment.