Skip to content

Commit

Permalink
feat: Add ElastiCache and DynamoDB metadata to the cloud linter
Browse files Browse the repository at this point in the history
Update the preview cloud linter command to query for ElastiCache and
DynamoDB metadata.

Serialize the metadata and print out the JSON in the command to show it
is working. File creation will come after the metrics are added.

Move the rate limiting code to a new utils file.
  • Loading branch information
nand4011 committed Mar 29, 2024
1 parent 8955d18 commit 2c00934
Show file tree
Hide file tree
Showing 10 changed files with 1,113 additions and 127 deletions.
763 changes: 704 additions & 59 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion momento-cli-opts/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ This command will be used to fetch information about your Elasticache clusters a
to help find opportunities for optimizations with Momento.
"
)]
CloudLinter {},
CloudLinter {
#[arg(long, short, help = "The AWS region to examine")]
region: String,
},
}

#[derive(Debug, Parser)]
Expand Down
4 changes: 4 additions & 0 deletions momento/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ qrcode = "0.12.0"
webbrowser = "^0.8.4"
humantime = "2.1.0"
governor = "0.6.3"
aws-config = "1.1.8"
aws-sdk-cloudwatch = "1.19.0"
aws-sdk-dynamodb = "1.19.0"
aws-sdk-elasticache = "1.18.0"

[dev-dependencies]
assert_cmd = "2.0.2"
Expand Down
162 changes: 162 additions & 0 deletions momento/src/commands/cloud_linter/dynamodb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
use std::sync::Arc;

use aws_config::SdkConfig;
use aws_sdk_dynamodb::types::{TimeToLiveDescription, TimeToLiveStatus};
use governor::DefaultDirectRateLimiter;
use serde::{Deserialize, Serialize};

use crate::commands::cloud_linter::utils::rate_limit;
use crate::error::CliError;
use crate::utils::console::console_info;

#[derive(Serialize, Deserialize)]
pub(crate) struct DynamoDbMetadata {
avg_item_size_bytes: i64,
billing_mode: Option<String>,
gsi_count: i64,
item_count: i64,
ttl_enabled: bool,
is_global_table: bool,
lsi_count: i64,
table_class: Option<String>,
table_size_bytes: i64,
p_throughput_decreases_day: Option<i64>,
p_throughput_read_units: Option<i64>,
p_throughput_write_units: Option<i64>,
}

pub(crate) async fn get_ddb_metadata(
config: &SdkConfig,
limiter: Arc<DefaultDirectRateLimiter>,
) -> Result<Vec<DynamoDbMetadata>, CliError> {
let ddb_client = aws_sdk_dynamodb::Client::new(&config);

console_info!("Listing Dynamo DB tables");
let table_names = list_table_names(&ddb_client, Arc::clone(&limiter)).await?;

console_info!("Describing tables");
let mut table_info = Vec::with_capacity(table_names.len());
for table_name in table_names {
let metadata = describe_table(&ddb_client, &table_name, Arc::clone(&limiter)).await?;
table_info.push(metadata);
}

Ok(table_info)
}

async fn list_table_names(
ddb_client: &aws_sdk_dynamodb::Client,
limiter: Arc<DefaultDirectRateLimiter>,
) -> Result<Vec<String>, CliError> {
let mut table_names = Vec::new();
let mut name_stream = ddb_client.list_tables().into_paginator().send();

while let Some(result) = rate_limit(Arc::clone(&limiter), || name_stream.next()).await {
match result {
Ok(result) => {
if let Some(names) = result.table_names {
table_names.extend(names);
}
}
Err(err) => {
return Err(CliError {
msg: format!("Failed to list Dynamo DB table names: {}", err),
})
}
}
}

Ok(table_names)
}

async fn describe_table(
ddb_client: &aws_sdk_dynamodb::Client,
table_name: &str,
limiter: Arc<DefaultDirectRateLimiter>,
) -> Result<DynamoDbMetadata, CliError> {
let ttl = rate_limit(Arc::clone(&limiter), || async {
ddb_client
.describe_time_to_live()
.table_name(table_name)
.send()
.await
})
.await?;

let ttl_enabled = matches!(
ttl.time_to_live_description,
Some(TimeToLiveDescription {
time_to_live_status: Some(TimeToLiveStatus::Enabled),
..
})
);

let description = rate_limit(Arc::clone(&limiter), || async {
ddb_client
.describe_table()
.table_name(table_name)
.send()
.await
})
.await?;

let table = description.table.ok_or(CliError {
msg: "Table description not found".to_string(),
})?;

let item_count = table.item_count.unwrap_or_default();
let table_size_bytes = table.table_size_bytes.unwrap_or_default();
let avg_item_size_bytes = if item_count > 0 {
table_size_bytes / item_count
} else {
0
};

let billing_mode = table
.billing_mode_summary
.and_then(|summary| summary.billing_mode)
.map(|billing_mode| billing_mode.as_str().to_string());

let table_class = table
.table_class_summary
.and_then(|summary| summary.table_class)
.map(|class| class.as_str().to_string());

let gsi_count = table
.global_secondary_indexes
.map(|gsi| gsi.len() as i64)
.unwrap_or_default();

let lsi_count = table
.local_secondary_indexes
.map(|lsi| lsi.len() as i64)
.unwrap_or_default();

let is_global_table = table.global_table_version.is_some();

let (p_throughput_decreases_day, p_throughput_read_units, p_throughput_write_units) = table
.provisioned_throughput
.map(|p| {
(
p.number_of_decreases_today,
p.read_capacity_units,
p.write_capacity_units,
)
})
.unwrap_or_default();

Ok(DynamoDbMetadata {
avg_item_size_bytes,
billing_mode,
gsi_count,
item_count,
ttl_enabled,
is_global_table,
lsi_count,
table_class,
table_size_bytes,
p_throughput_decreases_day,
p_throughput_read_units,
p_throughput_write_units,
})
}
114 changes: 114 additions & 0 deletions momento/src/commands/cloud_linter/elasticache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::sync::Arc;

use aws_config::SdkConfig;
use aws_sdk_elasticache::types::CacheCluster;
use governor::DefaultDirectRateLimiter;
use serde::{Deserialize, Serialize};

use crate::commands::cloud_linter::utils::rate_limit;
use crate::error::CliError;
use crate::utils::console::console_info;

#[derive(Serialize, Deserialize)]
pub(crate) struct ElastiCacheMetadata {
cluster_id: String,
engine: String,
cache_node_type: String,
preferred_az: String,
cluster_mode_enabled: bool,
}

pub(crate) async fn get_elasticache_metadata(
config: &SdkConfig,
limiter: Arc<DefaultDirectRateLimiter>,
) -> Result<Vec<ElastiCacheMetadata>, CliError> {
console_info!("Describing ElastiCache clusters");
let elasticache_client = aws_sdk_elasticache::Client::new(&config);
list_table_names(&elasticache_client, limiter)
.await?
.into_iter()
.map(ElastiCacheMetadata::try_from)
.collect()
}

async fn list_table_names(
elasticache_client: &aws_sdk_elasticache::Client,
limiter: Arc<DefaultDirectRateLimiter>,
) -> Result<Vec<CacheCluster>, CliError> {
let mut elasticache_clusters = Vec::new();
let mut elasticache_stream = elasticache_client
.describe_cache_clusters()
.show_cache_node_info(true)
.into_paginator()
.send();

while let Some(result) = rate_limit(Arc::clone(&limiter), || elasticache_stream.next()).await {
match result {
Ok(result) => {
if let Some(clusters) = result.cache_clusters {
elasticache_clusters.extend(clusters);
}
}
Err(err) => {
return Err(CliError {
msg: format!("Failed to describe cache clusters: {}", err),
})
}
}
}

Ok(elasticache_clusters)
}

impl TryFrom<CacheCluster> for ElastiCacheMetadata {
type Error = CliError;

fn try_from(value: CacheCluster) -> Result<Self, Self::Error> {
let cache_cluster_id = value.cache_cluster_id.ok_or(CliError {
msg: "ElastiCache cluster has no ID".to_string(),
})?;
let cache_node_type = value.cache_node_type.ok_or(CliError {
msg: "ElastiCache cluster has no node type".to_string(),
})?;
let preferred_az = value.preferred_availability_zone.ok_or(CliError {
msg: "ElastiCache cluster has no preferred availability zone".to_string(),
})?;

let engine = value.engine.ok_or(CliError {
msg: "ElastiCache cluster has no node type".to_string(),
})?;
match engine.as_str() {
"redis" => {
let (cluster_id, cluster_mode_enabled) = value
.replication_group_id
.map(|replication_group_id| {
let trimmed_cluster_id = cache_cluster_id
.trim_start_matches(&format!("{}-", replication_group_id));
let parts_len = trimmed_cluster_id.split('-').count();
(replication_group_id, parts_len == 2)
})
.unwrap_or_else(|| (cache_cluster_id, false));

Ok(ElastiCacheMetadata {
cluster_id,
engine,
cache_node_type,
preferred_az,
cluster_mode_enabled,
})
}
"memcached" => Ok(ElastiCacheMetadata {
cluster_id: cache_cluster_id,
engine,
cache_node_type,
preferred_az,
cluster_mode_enabled: false,
}),
_ => {
return Err(CliError {
msg: format!("Unsupported engine: {}", engine),
})
}
}
}
}
84 changes: 21 additions & 63 deletions momento/src/commands/cloud_linter/linter_cli.rs
Original file line number Diff line number Diff line change
@@ -1,73 +1,31 @@
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;

use governor::DefaultDirectRateLimiter;
use aws_config::{BehaviorVersion, Region};
use governor::{Quota, RateLimiter};

#[allow(dead_code)] // remove after this is used outside a test
async fn rate_limit<F, Fut, T>(func: F, limiter: Arc<DefaultDirectRateLimiter>) -> T
where
F: Fn() -> Fut,
Fut: Future<Output=T>,
{
loop {
let permit = limiter.check();
match permit {
Ok(_) => {
return func().await;
}
Err(_) => {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
}

#[cfg(test)]
mod tests {
use governor::{Quota, RateLimiter};
use tokio::sync::Mutex;

use super::*;

#[tokio::test]
async fn test_rate_limit() {
let counter = Arc::new(Mutex::new(0));
use crate::commands::cloud_linter::dynamodb::get_ddb_metadata;
use crate::commands::cloud_linter::elasticache::get_elasticache_metadata;
use crate::error::CliError;
use crate::utils::console::console_info;

let quota =
Quota::per_second(core::num::NonZeroU32::new(10).expect("should create non-zero quota"));
let limiter = Arc::new(RateLimiter::direct(quota));
pub async fn run_cloud_linter(region: String) -> Result<(), CliError> {
let config = aws_config::defaults(BehaviorVersion::latest())
.region(Region::new(region))
.load()
.await;

let test_func = {
let counter = Arc::clone(&counter);
move || {
let counter = Arc::clone(&counter);
async move {
let mut count = counter.lock().await;
*count += 1;
}
}
};
let start_time = tokio::time::Instant::now();
let quota =
Quota::per_second(core::num::NonZeroU32::new(1).expect("should create non-zero quota"));
let limiter = Arc::new(RateLimiter::direct(quota));

let mut tasks = Vec::new();
for _ in 0..20 {
let limiter = Arc::clone(&limiter);
let func = test_func.clone();
let task = tokio::spawn(async move {
rate_limit(func, limiter).await;
});
tasks.push(task);
}
let ddb_metadata = get_ddb_metadata(&config, Arc::clone(&limiter)).await?;

for task in tasks {
task.await.expect("increment task should succeed");
}
let ddb_json = serde_json::to_string_pretty(&ddb_metadata)?;
console_info!("DynamoDB metadata:\n{}", ddb_json);

let final_count = *counter.lock().await;
assert_eq!(final_count, 20);
let elasticache_metadata = get_elasticache_metadata(&config, Arc::clone(&limiter)).await?;
let elasticache_json = serde_json::to_string_pretty(&elasticache_metadata)?;
console_info!("ElastiCache metadata:\n{}", elasticache_json);

let expected_duration = Duration::from_secs(1);
assert!(start_time.elapsed() >= expected_duration);
}
Ok(())
}
Loading

0 comments on commit 2c00934

Please sign in to comment.