From ec84fbf97ea795a328e3d7d47c853fa8bcc3f86d Mon Sep 17 00:00:00 2001 From: Matt Straathof Date: Thu, 30 May 2024 13:54:18 -0700 Subject: [PATCH] feat: speed up resource collection, add resource specific cli flags --- .gitignore | 4 + momento-cli-opts/src/lib.rs | 25 ++++ .../src/commands/cloud_linter/api_gateway.rs | 2 +- momento/src/commands/cloud_linter/dynamodb.rs | 68 +++++++--- .../src/commands/cloud_linter/elasticache.rs | 2 +- .../src/commands/cloud_linter/linter_cli.rs | 86 +++++++++++- momento/src/commands/cloud_linter/metrics.rs | 29 +++- momento/src/commands/cloud_linter/resource.rs | 14 +- momento/src/commands/cloud_linter/s3.rs | 127 +++++++++++------- .../cloud_linter/serverless_elasticache.rs | 2 +- momento/src/main.rs | 15 ++- 11 files changed, 288 insertions(+), 86 deletions(-) diff --git a/.gitignore b/.gitignore index dcc6f397..e6aa644b 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,7 @@ client-cli.iml bin/ momento.exe obj/ + +linter_results.json +linter_results.json.gz + diff --git a/momento-cli-opts/src/lib.rs b/momento-cli-opts/src/lib.rs index 6d310409..7b63c159 100644 --- a/momento-cli-opts/src/lib.rs +++ b/momento-cli-opts/src/lib.rs @@ -224,9 +224,34 @@ to help find opportunities for optimizations with Momento. CloudLinter { #[arg(long, short, help = "The AWS region to examine")] region: String, + #[arg( + long = "enable-ddb-ttl-check", + help = "Opt in to check whether ddb tables have ttl enabled. If there are lots of tables, could slow down data collection" + )] + enable_ddb_ttl_check: bool, + #[arg( + value_enum, + long = "resource", + help = "Pass in a specific resource type to only collect data on that resource. Example: --resource dynamo" + )] + resource: Option, + #[arg( + long = "metric-collection-rate", + help = "tps at which to invoke the aws `get-metric-data` api", + default_value = "20" + )] + metric_collection_rate: u32, }, } +#[derive(clap::ValueEnum, PartialEq, Eq, Debug, Clone, Copy)] +pub enum CloudLinterResources { + ApiGateway, + S3, + Dynamo, + ElastiCache, +} + #[derive(Debug, Parser)] pub enum CloudSignupCommand { #[command(about = SIGNUP_DEPRECATED_MSG)] diff --git a/momento/src/commands/cloud_linter/api_gateway.rs b/momento/src/commands/cloud_linter/api_gateway.rs index 9b0ba791..0bcdecd8 100644 --- a/momento/src/commands/cloud_linter/api_gateway.rs +++ b/momento/src/commands/cloud_linter/api_gateway.rs @@ -38,7 +38,7 @@ const API_GATEWAY_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! ], }; -#[derive(Serialize, Clone, Debug)] +#[derive(Serialize, Clone, Debug, PartialEq, Eq)] pub(crate) struct ApiGatewayMetadata { #[serde(rename = "name")] name: String, diff --git a/momento/src/commands/cloud_linter/dynamodb.rs b/momento/src/commands/cloud_linter/dynamodb.rs index 65aecfa3..e9a355a9 100644 --- a/momento/src/commands/cloud_linter/dynamodb.rs +++ b/momento/src/commands/cloud_linter/dynamodb.rs @@ -4,6 +4,7 @@ use std::time::Duration; use aws_config::SdkConfig; use aws_sdk_dynamodb::types::{TimeToLiveDescription, TimeToLiveStatus}; +use futures::stream::FuturesUnordered; use governor::DefaultDirectRateLimiter; use indicatif::{ProgressBar, ProgressStyle}; use phf::{phf_map, Map}; @@ -70,7 +71,7 @@ const DDB_GSI_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! { ], }; -#[derive(Serialize, Clone, Debug)] +#[derive(Serialize, Clone, Debug, PartialEq, Eq)] pub(crate) struct DynamoDbMetadata { #[serde(rename = "avgItemSizeBytes")] avg_item_size_bytes: i64, @@ -110,7 +111,7 @@ impl DynamoDbMetadata { } } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub(crate) struct GsiMetadata { #[serde(rename = "gsiName")] gsi_name: String, @@ -177,6 +178,7 @@ pub(crate) async fn process_ddb_resources( metrics_limiter: Arc, describe_ttl_limiter: Arc, sender: Sender, + enable_ddb_ttl_check: bool, ) -> Result<(), CliError> { let ddb_client = aws_sdk_dynamodb::Client::new(config); let metrics_client = aws_sdk_cloudwatch::Client::new(config); @@ -190,18 +192,47 @@ pub(crate) async fn process_ddb_resources( ProgressBar::new(table_names.len() as u64).with_message("Processing Dynamo DB tables"); describe_ddb_tables_bar .set_style(ProgressStyle::with_template(" {msg} {bar} {eta}").expect("invalid template")); + + let futures = FuturesUnordered::new(); + for table_name in table_names { - process_table_resources( - &ddb_client, - &metrics_client, - &table_name, - Arc::clone(&control_plane_limiter), - Arc::clone(&metrics_limiter), - Arc::clone(&describe_ttl_limiter), - sender.clone(), - ) - .await?; - describe_ddb_tables_bar.inc(1); + let sender_clone = sender.clone(); + let ddb_client_clone = ddb_client.clone(); + let metrics_client_clone = metrics_client.clone(); + let table_name_clone = table_name.clone(); + let control_plane_limiter_clone = control_plane_limiter.clone(); + let metrics_limiter_clone = metrics_limiter.clone(); + let describe_ttl_limiter_clone = describe_ttl_limiter.clone(); + let progress_bar_clone = describe_ddb_tables_bar.clone(); + let spawn = tokio::spawn(async move { + let res = process_table_resources( + &ddb_client_clone, + &metrics_client_clone, + &table_name_clone, + control_plane_limiter_clone, + metrics_limiter_clone, + describe_ttl_limiter_clone, + sender_clone, + enable_ddb_ttl_check, + ) + .await; + progress_bar_clone.inc(1); + res + }); + futures.push(spawn); + } + + let all_results = futures::future::join_all(futures).await; + for result in all_results { + match result { + // bubble up any cli errors that we came across + Ok(res) => res?, + Err(_) => { + return Err(CliError { + msg: "failed to wait for all dynamo resources to collect data".to_string(), + }) + } + } } describe_ddb_tables_bar.finish(); @@ -282,6 +313,7 @@ async fn is_ddb_ttl_enabled( Ok(ttl_enabled) } +#[allow(clippy::too_many_arguments)] async fn process_table_resources( ddb_client: &aws_sdk_dynamodb::Client, metrics_client: &aws_sdk_cloudwatch::Client, @@ -290,6 +322,7 @@ async fn process_table_resources( metrics_limiter: Arc, describe_ttl_limiter: Arc, sender: Sender, + enable_ddb_ttl_check: bool, ) -> Result<(), CliError> { let region = ddb_client .config() @@ -451,8 +484,13 @@ async fn process_table_resources( resource .append_metrics(metrics_client, Arc::clone(&metrics_limiter)) .await?; - let ttl_enabled = - is_ddb_ttl_enabled(ddb_client, &resource, Arc::clone(&describe_ttl_limiter)).await?; + let ttl_enabled = match enable_ddb_ttl_check { + true => { + is_ddb_ttl_enabled(ddb_client, &resource, Arc::clone(&describe_ttl_limiter)).await? + } + false => false, + }; + resource.metadata.ttl_enabled = ttl_enabled; sender .send(Resource::DynamoDb(resource)) diff --git a/momento/src/commands/cloud_linter/elasticache.rs b/momento/src/commands/cloud_linter/elasticache.rs index 6ffdfefb..234d209f 100644 --- a/momento/src/commands/cloud_linter/elasticache.rs +++ b/momento/src/commands/cloud_linter/elasticache.rs @@ -53,7 +53,7 @@ pub(crate) const CACHE_METRICS: Map<&'static str, &'static [&'static str]> = phf ], }; -#[derive(Serialize, Clone, Debug)] +#[derive(Serialize, Clone, Debug, PartialEq, Eq)] pub(crate) struct ElastiCacheMetadata { #[serde(rename = "clusterId")] cluster_id: String, diff --git a/momento/src/commands/cloud_linter/linter_cli.rs b/momento/src/commands/cloud_linter/linter_cli.rs index 49612489..69518c44 100644 --- a/momento/src/commands/cloud_linter/linter_cli.rs +++ b/momento/src/commands/cloud_linter/linter_cli.rs @@ -7,6 +7,7 @@ use aws_config::{BehaviorVersion, Region}; use flate2::write::GzEncoder; use flate2::Compression; use governor::{Quota, RateLimiter}; +use momento_cli_opts::CloudLinterResources; use struson::writer::{JsonStreamWriter, JsonWriter}; use tokio::fs::{metadata, File}; use tokio::sync::mpsc::{self, Sender}; @@ -20,7 +21,12 @@ use crate::error::CliError; use super::elasticache::process_elasticache_resources; use super::resource::Resource; -pub async fn run_cloud_linter(region: String) -> Result<(), CliError> { +pub async fn run_cloud_linter( + region: String, + enable_ddb_ttl_check: bool, + only_collect_for_resource: Option, + metric_collection_rate: u32, +) -> Result<(), CliError> { let (tx, mut rx) = mpsc::channel::(32); let file_path = "linter_results.json"; // first we check to make sure we have perms to write files to the current directory @@ -34,7 +40,14 @@ pub async fn run_cloud_linter(region: String) -> Result<(), CliError> { json_writer.name("resources")?; json_writer.begin_array()?; tokio::spawn(async move { - let _ = process_data(region, tx).await; + let _ = process_data( + region, + tx, + enable_ddb_ttl_check, + only_collect_for_resource, + metric_collection_rate, + ) + .await; }); while let Some(message) = rx.recv().await { let _ = json_writer.serialize_value(&message); @@ -56,7 +69,13 @@ pub async fn run_cloud_linter(region: String) -> Result<(), CliError> { Ok(()) } -async fn process_data(region: String, sender: Sender) -> Result<(), CliError> { +async fn process_data( + region: String, + sender: Sender, + enable_ddb_ttl_check: bool, + only_collect_for_resource: Option, + metric_collection_rate: u32, +) -> Result<(), CliError> { let config = aws_config::defaults(BehaviorVersion::latest()) .region(Region::new(region)) .load() @@ -73,10 +92,66 @@ async fn process_data(region: String, sender: Sender) -> Result<(), Cl ); let describe_ttl_limiter = Arc::new(RateLimiter::direct(describe_ttl_quota)); - let metrics_quota = - Quota::per_second(core::num::NonZeroU32::new(20).expect("should create non-zero quota")); + let metrics_quota = Quota::per_second( + core::num::NonZeroU32::new(metric_collection_rate).expect("should create non-zero quota"), + ); let metrics_limiter = Arc::new(RateLimiter::direct(metrics_quota)); + if let Some(resource) = only_collect_for_resource { + match resource { + CloudLinterResources::ApiGateway => { + process_api_gateway_resources( + &config, + Arc::clone(&control_plane_limiter), + Arc::clone(&metrics_limiter), + sender.clone(), + ) + .await?; + return Ok(()); + } + CloudLinterResources::S3 => { + process_s3_resources( + &config, + Arc::clone(&control_plane_limiter), + Arc::clone(&metrics_limiter), + sender.clone(), + ) + .await?; + return Ok(()); + } + CloudLinterResources::Dynamo => { + process_ddb_resources( + &config, + Arc::clone(&control_plane_limiter), + Arc::clone(&metrics_limiter), + Arc::clone(&describe_ttl_limiter), + sender.clone(), + enable_ddb_ttl_check, + ) + .await?; + return Ok(()); + } + CloudLinterResources::ElastiCache => { + process_elasticache_resources( + &config, + Arc::clone(&control_plane_limiter), + Arc::clone(&metrics_limiter), + sender.clone(), + ) + .await?; + + process_serverless_elasticache_resources( + &config, + Arc::clone(&control_plane_limiter), + Arc::clone(&metrics_limiter), + sender.clone(), + ) + .await?; + return Ok(()); + } + } + }; + process_s3_resources( &config, Arc::clone(&control_plane_limiter), @@ -99,6 +174,7 @@ async fn process_data(region: String, sender: Sender) -> Result<(), Cl Arc::clone(&metrics_limiter), Arc::clone(&describe_ttl_limiter), sender.clone(), + enable_ddb_ttl_check, ) .await?; diff --git a/momento/src/commands/cloud_linter/metrics.rs b/momento/src/commands/cloud_linter/metrics.rs index 42eb2bfe..e44771bf 100644 --- a/momento/src/commands/cloud_linter/metrics.rs +++ b/momento/src/commands/cloud_linter/metrics.rs @@ -6,6 +6,8 @@ use aws_sdk_cloudwatch::types::Metric as CloudwatchMetric; use aws_sdk_cloudwatch::types::{Dimension, MetricDataQuery, MetricStat}; use aws_sdk_cloudwatch::Client; use chrono::{Duration, Utc}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use governor::DefaultDirectRateLimiter; use phf::Map; use serde::{Deserialize, Serialize}; @@ -13,7 +15,7 @@ use serde::{Deserialize, Serialize}; use crate::commands::cloud_linter::utils::rate_limit; use crate::error::CliError; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq)] pub(crate) struct Metric { pub name: String, pub values: Vec, @@ -54,11 +56,30 @@ where ) -> Result<(), CliError> { let metric_targets = self.create_metric_targets()?; let mut metrics: Vec> = Vec::new(); + let mut futures = FuturesUnordered::new(); + for target in metric_targets { - metrics.push( - query_metrics_for_target(metrics_client, Arc::clone(&limiter), target).await?, - ); + let client = metrics_client.clone(); + let moved_limiter = Arc::clone(&limiter); + let spawn = tokio::spawn(async move { + query_metrics_for_target(&client, moved_limiter, target).await + }); + futures.push(spawn); + } + while let Some(finished_future) = futures.next().await { + match finished_future { + Err(_e) => { + return Err(CliError { + msg: "failed to retrieve metrics from cloudwatch".to_string(), + }) + } + Ok(result) => { + let resource_metrics = result?; + metrics.push(resource_metrics); + } + } } + self.set_metrics(metrics.into_iter().flatten().collect()); self.set_metric_period_seconds(60 * 60 * 24); diff --git a/momento/src/commands/cloud_linter/resource.rs b/momento/src/commands/cloud_linter/resource.rs index 3f556a08..3d468146 100644 --- a/momento/src/commands/cloud_linter/resource.rs +++ b/momento/src/commands/cloud_linter/resource.rs @@ -7,7 +7,7 @@ use crate::commands::cloud_linter::metrics::Metric; use crate::commands::cloud_linter::s3::S3Metadata; use crate::commands::cloud_linter::serverless_elasticache::ServerlessElastiCacheMetadata; -#[derive(Serialize, Debug)] +#[derive(Serialize, Debug, PartialEq)] #[serde(untagged)] pub(crate) enum Resource { ApiGateway(ApiGatewayResource), @@ -17,7 +17,7 @@ pub(crate) enum Resource { S3(S3Resource), } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub(crate) enum ResourceType { #[serde(rename = "AWS::ApiGateway::API")] ApiGateway, @@ -35,7 +35,7 @@ pub(crate) enum ResourceType { S3, } -#[derive(Serialize, Debug)] +#[derive(Serialize, Debug, PartialEq)] pub(crate) struct DynamoDbResource { #[serde(rename = "type")] pub(crate) resource_type: ResourceType, @@ -47,7 +47,7 @@ pub(crate) struct DynamoDbResource { pub(crate) metadata: DynamoDbMetadata, } -#[derive(Serialize, Debug)] +#[derive(Serialize, Debug, PartialEq)] pub(crate) struct ElastiCacheResource { #[serde(rename = "type")] pub(crate) resource_type: ResourceType, @@ -59,7 +59,7 @@ pub(crate) struct ElastiCacheResource { pub(crate) metadata: ElastiCacheMetadata, } -#[derive(Serialize, Debug)] +#[derive(Serialize, Debug, PartialEq)] pub(crate) struct ServerlessElastiCacheResource { #[serde(rename = "type")] pub(crate) resource_type: ResourceType, @@ -71,7 +71,7 @@ pub(crate) struct ServerlessElastiCacheResource { pub(crate) metadata: ServerlessElastiCacheMetadata, } -#[derive(Serialize, Debug)] +#[derive(Serialize, Debug, PartialEq)] pub(crate) struct S3Resource { #[serde(rename = "type")] pub(crate) resource_type: ResourceType, @@ -83,7 +83,7 @@ pub(crate) struct S3Resource { pub(crate) metadata: S3Metadata, } -#[derive(Serialize, Debug)] +#[derive(Serialize, Debug, PartialEq)] pub(crate) struct ApiGatewayResource { #[serde(rename = "type")] pub(crate) resource_type: ResourceType, diff --git a/momento/src/commands/cloud_linter/s3.rs b/momento/src/commands/cloud_linter/s3.rs index 8beb8602..04996263 100644 --- a/momento/src/commands/cloud_linter/s3.rs +++ b/momento/src/commands/cloud_linter/s3.rs @@ -7,6 +7,7 @@ use crate::error::CliError; use aws_config::SdkConfig; use aws_sdk_s3::error::ProvideErrorMetadata; use aws_sdk_s3::types::MetricsConfiguration; +use futures::stream::FuturesUnordered; use governor::DefaultDirectRateLimiter; use indicatif::{ProgressBar, ProgressStyle}; use phf::{phf_map, Map}; @@ -64,7 +65,7 @@ const S3_METRICS_REQUEST: Map<&'static str, &'static [&'static str]> = phf_map! ], }; -#[derive(Serialize, Clone, Debug)] +#[derive(Serialize, Clone, Debug, PartialEq, Eq)] pub(crate) struct S3Metadata { #[serde(rename = "requestMetricsFilter")] request_metrics_filter: String, @@ -226,9 +227,7 @@ async fn try_get_bucket_metrics_filter( } } Err(err) => { - return Err(CliError { - msg: format!("{}", err), - }); + return Err(err); } } Ok("".to_string()) @@ -243,68 +242,96 @@ async fn process_buckets( metrics_limiter: Arc, control_plane_limiter: Arc, ) -> Result<(), CliError> { - let mut resources: Vec = Vec::new(); - let process_buckets_bar = ProgressBar::new((buckets.len() * 2) as u64).with_message("Processing S3 Buckets"); process_buckets_bar .set_style(ProgressStyle::with_template(" {msg} {bar} {eta}").expect("invalid template")); + let futures = FuturesUnordered::new(); for bucket in buckets { - let filter_id = try_get_bucket_metrics_filter( - s3client.clone(), - bucket.clone(), - Arc::clone(&control_plane_limiter), - ) - .await; - let filter_id = match filter_id { - Ok(filter_id) => filter_id, - Err(err) => { - eprint!("{}", err); - continue; - } - }; - - let metadata = S3Metadata { - request_metrics_filter: filter_id, - }; + let s3_client_clone = s3client.clone(); + let metrics_client_clone = metrics_client.clone(); + let sender_clone = sender.clone(); + let metrics_limiter_clone = metrics_limiter.clone(); + let control_plane_limiter_clone = control_plane_limiter.clone(); + let region_clone = region.to_string().clone(); + let progress_bar_clone = process_buckets_bar.clone(); + let spawn = tokio::spawn(async move { + progress_bar_clone.inc(1); + let res = process_bucket( + s3_client_clone, + bucket, + region_clone.as_str(), + sender_clone, + &metrics_client_clone, + metrics_limiter_clone, + control_plane_limiter_clone, + ) + .await; + progress_bar_clone.inc(1); + res + }); - let s3_resource = S3Resource { - resource_type: ResourceType::S3, - region: region.to_string(), - id: bucket.clone(), - metrics: vec![], - metric_period_seconds: 0, - metadata, - }; - resources.push(Resource::S3(s3_resource)); - process_buckets_bar.inc(1); + futures.push(spawn); } - for resource in resources { - match resource { - Resource::S3(mut my_resource) => { - my_resource - .append_metrics(metrics_client, Arc::clone(&metrics_limiter)) - .await?; - sender - .send(Resource::S3(my_resource)) - .await - .map_err(|_| CliError { - msg: "Failed to send S3 resource".to_string(), - })?; - process_buckets_bar.inc(1); - } - _ => { + let all_results = futures::future::join_all(futures).await; + for result in all_results { + match result { + // bubble up any cli errors that we came across + Ok(res) => res?, + Err(_) => { return Err(CliError { - msg: "Invalid resource type".to_string(), - }); + msg: "failed to wait for all s3 resources to collect data".to_string(), + }) } } } + process_buckets_bar.finish(); Ok(()) } +async fn process_bucket( + s3client: aws_sdk_s3::Client, + bucket: String, + region: &str, + sender: Sender, + metrics_client: &aws_sdk_cloudwatch::Client, + metrics_limiter: Arc, + control_plane_limiter: Arc, +) -> Result<(), CliError> { + let filter_id = try_get_bucket_metrics_filter( + s3client.clone(), + bucket.clone(), + Arc::clone(&control_plane_limiter), + ) + .await?; + + let metadata = S3Metadata { + request_metrics_filter: filter_id, + }; + + let mut s3_resource = S3Resource { + resource_type: ResourceType::S3, + region: region.to_string(), + id: bucket.clone(), + metrics: vec![], + metric_period_seconds: 0, + metadata, + }; + s3_resource + .append_metrics(metrics_client, Arc::clone(&metrics_limiter)) + .await?; + sender + .send(Resource::S3(s3_resource)) + .await + .map_err(|_| CliError { + msg: "Failed to send S3 resource".to_string(), + })?; + + Ok(()) +} + async fn list_buckets(s3_client: &aws_sdk_s3::Client) -> Result, CliError> { let mut bucket_names = Vec::new(); let resp = s3_client.list_buckets().send().await?; diff --git a/momento/src/commands/cloud_linter/serverless_elasticache.rs b/momento/src/commands/cloud_linter/serverless_elasticache.rs index ed0f0748..7650dc6f 100644 --- a/momento/src/commands/cloud_linter/serverless_elasticache.rs +++ b/momento/src/commands/cloud_linter/serverless_elasticache.rs @@ -71,7 +71,7 @@ pub(crate) const SERVERLESS_CACHE_METRICS: Map<&'static str, &'static [&'static ], }; -#[derive(Serialize, Clone, Debug)] +#[derive(Serialize, Clone, Debug, PartialEq, Eq)] pub(crate) struct ServerlessElastiCacheMetadata { name: String, engine: String, diff --git a/momento/src/main.rs b/momento/src/main.rs index ca08d2bf..8b84b573 100644 --- a/momento/src/main.rs +++ b/momento/src/main.rs @@ -263,8 +263,19 @@ async fn run_momento_command(args: momento_cli_opts::Momento) -> Result<(), CliE ) .await?; } - PreviewCommand::CloudLinter { region } => { - commands::cloud_linter::linter_cli::run_cloud_linter(region).await?; + PreviewCommand::CloudLinter { + region, + enable_ddb_ttl_check, + resource, + metric_collection_rate, + } => { + commands::cloud_linter::linter_cli::run_cloud_linter( + region, + enable_ddb_ttl_check, + resource, + metric_collection_rate, + ) + .await?; } }, }