diff --git a/Cargo.lock b/Cargo.lock index 135236c..3e3a7d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -720,7 +720,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.58", ] [[package]] @@ -947,6 +947,16 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "duplicate" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de78e66ac9061e030587b2a2e75cc88f22304913c907b11307bca737141230cb" +dependencies = [ + "heck 0.4.1", + "proc-macro-error", +] + [[package]] name = "either" version = "1.8.1" @@ -1114,7 +1124,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.58", ] [[package]] @@ -1536,9 +1546,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.5" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" [[package]] name = "jni" @@ -1729,6 +1739,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "struson", "tempdir", "tokio", "toml", @@ -1983,7 +1994,7 @@ dependencies = [ "phf_shared", "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.58", ] [[package]] @@ -2079,6 +2090,30 @@ dependencies = [ "syn 1.0.107", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn 1.0.107", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro2" version = "1.0.79" @@ -2170,9 +2205,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.28" +version = "1.0.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" dependencies = [ "proc-macro2", ] @@ -2475,9 +2510,9 @@ checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70" [[package]] name = "ryu" -version = "1.0.12" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde" +checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "same-file" @@ -2550,22 +2585,22 @@ checksum = "92d43fe69e652f3df9bdc2b85b2854a0825b86e4fb76bc44d945137d053639ca" [[package]] name = "serde" -version = "1.0.152" +version = "1.0.198" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" +checksum = "9846a40c979031340571da2545a4e5b7c4163bdae79b301d5f86d03979451fcc" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.152" +version = "1.0.198" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" +checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" dependencies = [ "proc-macro2", "quote", - "syn 1.0.107", + "syn 2.0.58", ] [[package]] @@ -2699,6 +2734,40 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ee073c9e4cd00e28217186dbe12796d692868f432bf2e97ee73bed0c56dfa01" +[[package]] +name = "strum" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6cf59daf282c0a494ba14fd21610a0325f9f90ec9d1231dea26bcb1d696c946" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.58", +] + +[[package]] +name = "struson" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd1ac2aafe0de8ac5f0daf57eab7d8e986724237e03b3092dbb47b867b9c4a76" +dependencies = [ + "duplicate", + "serde", + "strum", + "thiserror", +] + [[package]] name = "subtle" version = "2.5.0" @@ -2718,9 +2787,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.18" +version = "2.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32d41677bcbe24c20c52e7c70b0d8db04134c5d1066bf98662e2871ad200ea3e" +checksum = "44cfb93f38070beee36b3fef7d4f5a16f27751d94b187b666a5cc5e9b0d30687" dependencies = [ "proc-macro2", "quote", @@ -2774,22 +2843,22 @@ checksum = "95059e91184749cb66be6dc994f67f182b6d897cb3df74a5bf66b5e709295fd8" [[package]] name = "thiserror" -version = "1.0.38" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0" +checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.38" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" +checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" dependencies = [ "proc-macro2", "quote", - "syn 1.0.107", + "syn 2.0.58", ] [[package]] diff --git a/momento/Cargo.toml b/momento/Cargo.toml index f46defc..042df5e 100644 --- a/momento/Cargo.toml +++ b/momento/Cargo.toml @@ -25,6 +25,7 @@ aws-sdk-dynamodb = "1.19.0" aws-sdk-elasticache = "1.18.0" indicatif = "0.17.8" flate2 = "1.0.28" +struson = { version = "0.5.0", features = ["serde"] } [dev-dependencies] assert_cmd = "2.0.2" @@ -63,7 +64,7 @@ version = "1" features = [ "full",] [dependencies.serde] -version = "1.0" +version = "1.0.198" features = [ "derive",] [dependencies.serde_json] diff --git a/momento/src/commands/cloud_linter/dynamodb.rs b/momento/src/commands/cloud_linter/dynamodb.rs index 9d06bc5..80607cb 100644 --- a/momento/src/commands/cloud_linter/dynamodb.rs +++ b/momento/src/commands/cloud_linter/dynamodb.rs @@ -8,8 +8,11 @@ use governor::DefaultDirectRateLimiter; use indicatif::{ProgressBar, ProgressStyle}; use phf::{phf_map, Map}; use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::Sender; -use crate::commands::cloud_linter::metrics::{Metric, MetricTarget, ResourceWithMetrics}; +use crate::commands::cloud_linter::metrics::{ + AppendMetrics, Metric, MetricTarget, ResourceWithMetrics, +}; use crate::commands::cloud_linter::resource::{DynamoDbResource, Resource, ResourceType}; use crate::commands::cloud_linter::utils::rate_limit; use crate::error::CliError; @@ -67,7 +70,7 @@ const DDB_GSI_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! { ], }; -#[derive(Serialize, Clone)] +#[derive(Serialize, Clone, Debug)] pub(crate) struct DynamoDbMetadata { #[serde(rename = "avgItemSizeBytes")] avg_item_size_bytes: i64, @@ -81,6 +84,8 @@ pub(crate) struct DynamoDbMetadata { ttl_enabled: bool, #[serde(rename = "isGlobalTable")] is_global_table: bool, + #[serde(rename = "deleteProtectionEnabled")] + delete_protection_enabled: bool, #[serde(rename = "lsiCount")] lsi_count: i64, #[serde(rename = "tableClass")] @@ -105,7 +110,7 @@ impl DynamoDbMetadata { } } -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub(crate) struct GsiMetadata { #[serde(rename = "gsiName")] gsi_name: String, @@ -164,91 +169,41 @@ impl ResourceWithMetrics for DynamoDbResource { } } -pub(crate) async fn append_ttl_to_appropriate_ddb_resources( +pub(crate) async fn process_ddb_resources( config: &SdkConfig, - mut resources: Vec, - limiter: Arc, -) -> Result, CliError> { - log::debug!("describing ttl for dynamodb tables"); - let ddb_client = aws_sdk_dynamodb::Client::new(config); - let describe_ddb_ttl_bar = - ProgressBar::new(resources.len() as u64).with_message("Describing Dynamo DB Ttl"); - describe_ddb_ttl_bar - .set_style(ProgressStyle::with_template(" {msg} {bar} {eta}").expect("invalid template")); - for resource in &mut resources { - describe_ddb_ttl_bar.inc(1); - match resource { - Resource::DynamoDb(r) => { - if r.resource_type == ResourceType::DynamoDbGsi { - continue; - } - let consumed_write_ops_index = r.metrics.iter().position(|p| { - p.name - .eq_ignore_ascii_case("consumedwritecapacityunits_sum") - }); - match consumed_write_ops_index { - Some(index) => { - let consumed_write_capacity = - r.metrics.get(index).expect("index should exist"); - let sum: f64 = consumed_write_capacity.values.iter().sum(); - // a basic heuristic around whether or not we care to check to see if a ttl exists on a ddb table. If the dynamodb table - // has less than 10 tps average, then we don't care to check if ttl is enabled or not. - if sum < 10.0 * 60.0 * 60.0 * 24.0 * 30.0 { - log::debug!("skipping ttl check for table {}", r.id); - continue; - } - log::debug!("querying ttl for table {}", r.id); - let ttl_enabled = - is_ddb_ttl_enabled(&ddb_client, &r.id, Arc::clone(&limiter)).await?; - r.metadata.ttl_enabled = ttl_enabled; - } - // we did not find that metric, and therefore we assume that there are no consumed capacity units, meaning we don't care to - // check for a ttl on the ddb table - None => { - continue; - } - } - } - Resource::ElastiCache(_) => { - continue; - } - }; - } - describe_ddb_ttl_bar.finish(); - Ok(resources) -} - -pub(crate) async fn get_ddb_resources( - config: &SdkConfig, - limiter: Arc, -) -> Result, CliError> { + control_plane_limiter: Arc, + metrics_limiter: Arc, + describe_ttl_limiter: Arc, + sender: Sender, +) -> Result<(), CliError> { let ddb_client = aws_sdk_dynamodb::Client::new(config); + let metrics_client = aws_sdk_cloudwatch::Client::new(config); let list_ddb_tables_bar = ProgressBar::new_spinner().with_message("Listing Dynamo DB tables"); list_ddb_tables_bar.enable_steady_tick(Duration::from_millis(100)); - let table_names = list_table_names(&ddb_client, Arc::clone(&limiter)).await?; + let table_names = list_table_names(&ddb_client, Arc::clone(&control_plane_limiter)).await?; list_ddb_tables_bar.finish(); let describe_ddb_tables_bar = - ProgressBar::new(table_names.len() as u64).with_message("Describing Dynamo DB tables"); + ProgressBar::new(table_names.len() as u64).with_message("Processing Dynamo DB tables"); describe_ddb_tables_bar .set_style(ProgressStyle::with_template(" {msg} {bar} {eta}").expect("invalid template")); - let mut ddb_resources: Vec = Vec::new(); for table_name in table_names { - let instances = fetch_ddb_resources(&ddb_client, &table_name, Arc::clone(&limiter)).await?; - for instance in instances { - ddb_resources.push(instance); - } + process_table_resources( + &ddb_client, + &metrics_client, + &table_name, + Arc::clone(&control_plane_limiter), + Arc::clone(&metrics_limiter), + Arc::clone(&describe_ttl_limiter), + sender.clone(), + ) + .await?; describe_ddb_tables_bar.inc(1); } - describe_ddb_tables_bar.finish(); - - let wrapped_resources = ddb_resources - .into_iter() - .map(Resource::DynamoDb) - .collect::>(); - Ok(wrapped_resources) + describe_ddb_tables_bar.finish(); + Ok(()) } async fn list_table_names( @@ -278,13 +233,38 @@ async fn list_table_names( async fn is_ddb_ttl_enabled( ddb_client: &aws_sdk_dynamodb::Client, - table_name: &str, + resource: &DynamoDbResource, limiter: Arc, ) -> Result { + if resource.resource_type == ResourceType::DynamoDbGsi { + return Ok(false); + }; + let consumed_write_ops_index = resource.metrics.iter().position(|p| { + p.name + .eq_ignore_ascii_case("consumedwritecapacityunits_sum") + }); + match consumed_write_ops_index { + Some(index) => { + let consumed_write_capacity = resource.metrics.get(index).expect("index should exist"); + let sum: f64 = consumed_write_capacity.values.iter().sum(); + // a basic heuristic around whether we care to check to see if a ttl exists on a ddb table. If the dynamodb table + // has less than 10 tps average, then we don't care to check if ttl is enabled or not. + if sum < 10.0 * 60.0 * 60.0 * 24.0 * 30.0 { + log::debug!("skipping ttl check for table {}", resource.id); + return Ok(false); + } + } + // we did not find that metric, and therefore we assume that there are no consumed capacity units, meaning we don't care to + // check for a ttl on the ddb table + None => { + return Ok(false); + } + } + log::debug!("querying ttl for table {}", resource.id); let ttl = rate_limit(Arc::clone(&limiter), || async { ddb_client .describe_time_to_live() - .table_name(table_name) + .table_name(&resource.id) .send() .await }) @@ -300,11 +280,15 @@ async fn is_ddb_ttl_enabled( Ok(ttl_enabled) } -async fn fetch_ddb_resources( +async fn process_table_resources( ddb_client: &aws_sdk_dynamodb::Client, + metrics_client: &aws_sdk_cloudwatch::Client, table_name: &str, - limiter: Arc, -) -> Result, CliError> { + control_plane_limiter: Arc, + metrics_limiter: Arc, + describe_ttl_limiter: Arc, + sender: Sender, +) -> Result<(), CliError> { let region = ddb_client .config() .region() @@ -313,7 +297,7 @@ async fn fetch_ddb_resources( msg: "No region configured for client".to_string(), })?; - let description = rate_limit(Arc::clone(&limiter), || async { + let description = rate_limit(Arc::clone(&control_plane_limiter), || async { ddb_client .describe_table() .table_name(table_name) @@ -383,6 +367,7 @@ async fn fetch_ddb_resources( p_throughput_read_units, p_throughput_write_units, gsi: None, + delete_protection_enabled: table.deletion_protection_enabled.unwrap_or_default(), }; let mut resources = table @@ -460,5 +445,20 @@ async fn fetch_ddb_resources( metric_period_seconds: 0, }); - Ok(resources) + for mut resource in resources { + resource + .append_metrics(metrics_client, Arc::clone(&metrics_limiter)) + .await?; + let ttl_enabled = + is_ddb_ttl_enabled(ddb_client, &resource, Arc::clone(&describe_ttl_limiter)).await?; + resource.metadata.ttl_enabled = ttl_enabled; + sender + .send(Resource::DynamoDb(resource)) + .await + .map_err(|err| CliError { + msg: format!("Failed to stream dynamodb resource to file: {}", err), + })?; + } + + Ok(()) } diff --git a/momento/src/commands/cloud_linter/elasticache.rs b/momento/src/commands/cloud_linter/elasticache.rs index a5c75ef..92e219f 100644 --- a/momento/src/commands/cloud_linter/elasticache.rs +++ b/momento/src/commands/cloud_linter/elasticache.rs @@ -8,12 +8,15 @@ use governor::DefaultDirectRateLimiter; use indicatif::ProgressBar; use phf::{phf_map, Map}; use serde::Serialize; +use tokio::sync::mpsc::Sender; use crate::commands::cloud_linter::metrics::{Metric, MetricTarget, ResourceWithMetrics}; use crate::commands::cloud_linter::resource::{ElastiCacheResource, Resource, ResourceType}; use crate::commands::cloud_linter::utils::rate_limit; use crate::error::CliError; +use super::metrics::AppendMetrics; + pub(crate) const CACHE_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! { "Sum" => &[ "NetworkBytesIn", @@ -50,7 +53,7 @@ pub(crate) const CACHE_METRICS: Map<&'static str, &'static [&'static str]> = phf ], }; -#[derive(Serialize, Clone)] +#[derive(Serialize, Clone, Debug)] pub(crate) struct ElastiCacheMetadata { #[serde(rename = "clusterId")] cluster_id: String, @@ -100,19 +103,21 @@ impl ResourceWithMetrics for ElastiCacheResource { } } -pub(crate) async fn get_elasticache_resources( +pub(crate) async fn process_elasticache_resources( config: &SdkConfig, - limiter: Arc, -) -> Result, CliError> { - log::debug!("describing elasticache resources"); + control_plane_limiter: Arc, + metrics_limiter: Arc, + sender: Sender, +) -> Result<(), CliError> { let region = config.region().map(|r| r.as_ref()).ok_or(CliError { msg: "No region configured for client".to_string(), })?; let elasticache_client = aws_sdk_elasticache::Client::new(config); - let clusters = describe_clusters(&elasticache_client, limiter).await?; + let clusters = describe_clusters(&elasticache_client, control_plane_limiter).await?; - convert_to_resources(clusters, region).await + write_resources(clusters, config, region, sender, metrics_limiter).await?; + Ok(()) } async fn describe_clusters( @@ -147,10 +152,14 @@ async fn describe_clusters( Ok(elasticache_clusters) } -async fn convert_to_resources( +async fn write_resources( clusters: Vec, + config: &SdkConfig, region: &str, -) -> Result, CliError> { + sender: Sender, + metrics_limiter: Arc, +) -> Result<(), CliError> { + let metrics_client = aws_sdk_cloudwatch::Client::new(config); let mut resources: Vec = Vec::new(); for cluster in clusters { @@ -165,7 +174,7 @@ async fn convert_to_resources( })?; let engine = cluster.engine.ok_or(CliError { - msg: "ElastiCache cluster has no node type".to_string(), + msg: "ElastiCache cluster has no engine type".to_string(), })?; match engine.as_str() { "redis" => { @@ -232,5 +241,25 @@ async fn convert_to_resources( } }; } - Ok(resources) + + for resource in resources { + match resource { + Resource::ElastiCache(mut er) => { + er.append_metrics(&metrics_client, Arc::clone(&metrics_limiter)) + .await?; + sender + .send(Resource::ElastiCache(er)) + .await + .map_err(|err| CliError { + msg: format!("Failed to send elasticache resource: {}", err), + })?; + } + _ => { + return Err(CliError { + msg: "Invalid resource type".to_string(), + }); + } + } + } + Ok(()) } diff --git a/momento/src/commands/cloud_linter/linter_cli.rs b/momento/src/commands/cloud_linter/linter_cli.rs index d794e6b..70bd3d8 100644 --- a/momento/src/commands/cloud_linter/linter_cli.rs +++ b/momento/src/commands/cloud_linter/linter_cli.rs @@ -1,42 +1,73 @@ -use std::io::Write; +use std::io::{copy, BufReader}; use std::path::Path; use std::sync::Arc; -use std::time::Duration; use aws_config::{BehaviorVersion, Region}; use flate2::write::GzEncoder; use flate2::Compression; use governor::{Quota, RateLimiter}; -use indicatif::ProgressBar; +use struson::writer::{JsonStreamWriter, JsonWriter}; use tokio::fs::{metadata, File}; -use tokio::io::AsyncWriteExt; +use tokio::sync::mpsc::{self, Sender}; -use crate::commands::cloud_linter::dynamodb::get_ddb_resources; -use crate::commands::cloud_linter::elasticache::get_elasticache_resources; -use crate::commands::cloud_linter::metrics::append_metrics_to_resources; -use crate::commands::cloud_linter::resource::DataFormat; +use crate::commands::cloud_linter::dynamodb::process_ddb_resources; +use crate::commands::cloud_linter::serverless_elasticache::process_serverless_elasticache_resources; use crate::commands::cloud_linter::utils::check_aws_credentials; use crate::error::CliError; -use super::dynamodb::append_ttl_to_appropriate_ddb_resources; +use super::elasticache::process_elasticache_resources; +use super::resource::Resource; pub async fn run_cloud_linter(region: String) -> Result<(), CliError> { + let (tx, mut rx) = mpsc::channel::(32); + let file_path = "linter_results.json"; + // first we check to make sure we have perms to write files to the current directory + check_output_is_writable(file_path).await?; + + // here we write the unzipped json file, containing all the linter results + let unzipped_tokio_file = File::create(file_path).await?; + let mut unzipped_file = unzipped_tokio_file.into_std().await; + let mut json_writer = JsonStreamWriter::new(&mut unzipped_file); + json_writer.begin_object()?; + json_writer.name("resources")?; + json_writer.begin_array()?; + tokio::spawn(async move { + let _ = process_data(region, tx).await; + }); + while let Some(message) = rx.recv().await { + let _ = json_writer.serialize_value(&message); + } + json_writer.end_array()?; + json_writer.end_object()?; + json_writer.finish_document()?; + + // now we compress the json into a .gz file for the customer to upload + let opened_file_tokio = File::open(file_path).await?; + let opened_file = opened_file_tokio.into_std().await; + let mut unzipped_file = BufReader::new(opened_file); + let zipped_file_output_tokio = File::create("linter_results.json.gz").await?; + let zipped_file_output = zipped_file_output_tokio.into_std().await; + let mut gz = GzEncoder::new(zipped_file_output, Compression::default()); + copy(&mut unzipped_file, &mut gz)?; + gz.finish()?; + + Ok(()) +} + +async fn process_data(region: String, sender: Sender) -> Result<(), CliError> { let config = aws_config::defaults(BehaviorVersion::latest()) .region(Region::new(region)) .load() .await; check_aws_credentials(&config).await?; - let output_file_path = "linter_results.json.gz"; - check_output_is_writable(output_file_path).await?; - let control_plane_quota = Quota::per_second( core::num::NonZeroU32::new(10).expect("should create non-zero control_plane_quota"), ); let control_plane_limiter = Arc::new(RateLimiter::direct(control_plane_quota)); let describe_ttl_quota = Quota::per_second( - core::num::NonZeroU32::new(1).expect("should create non-zero describe_ttl_quota"), + core::num::NonZeroU32::new(3).expect("should create non-zero describe_ttl_quota"), ); let describe_ttl_limiter = Arc::new(RateLimiter::direct(describe_ttl_quota)); @@ -44,44 +75,30 @@ pub async fn run_cloud_linter(region: String) -> Result<(), CliError> { Quota::per_second(core::num::NonZeroU32::new(20).expect("should create non-zero quota")); let metrics_limiter = Arc::new(RateLimiter::direct(metrics_quota)); - let mut resources = get_ddb_resources(&config, Arc::clone(&control_plane_limiter)).await?; - - let mut elasticache_resources = - get_elasticache_resources(&config, Arc::clone(&control_plane_limiter)).await?; - resources.append(&mut elasticache_resources); - - let resources = - append_metrics_to_resources(&config, Arc::clone(&metrics_limiter), resources).await?; - - let resources = append_ttl_to_appropriate_ddb_resources( + process_ddb_resources( &config, - resources, + Arc::clone(&control_plane_limiter), + Arc::clone(&metrics_limiter), Arc::clone(&describe_ttl_limiter), + sender.clone(), ) .await?; - let data_format = DataFormat { resources }; - - write_data_to_file(data_format, output_file_path).await?; - - Ok(()) -} - -async fn write_data_to_file(data_format: DataFormat, file_path: &str) -> Result<(), CliError> { - let bar = ProgressBar::new_spinner().with_message("Writing data to file"); - bar.enable_steady_tick(Duration::from_millis(100)); - - let data_format_json = serde_json::to_string(&data_format)?; - - let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); - encoder.write_all(data_format_json.as_bytes())?; - - let compressed_json = encoder.finish()?; - - let mut file = File::create(file_path).await?; - file.write_all(&compressed_json).await?; + process_elasticache_resources( + &config, + Arc::clone(&control_plane_limiter), + Arc::clone(&metrics_limiter), + sender.clone(), + ) + .await?; - bar.finish(); + process_serverless_elasticache_resources( + &config, + Arc::clone(&control_plane_limiter), + Arc::clone(&metrics_limiter), + sender.clone(), + ) + .await?; Ok(()) } diff --git a/momento/src/commands/cloud_linter/metrics.rs b/momento/src/commands/cloud_linter/metrics.rs index 20c5f4f..897d1ae 100644 --- a/momento/src/commands/cloud_linter/metrics.rs +++ b/momento/src/commands/cloud_linter/metrics.rs @@ -1,22 +1,19 @@ use std::collections::HashMap; use std::sync::Arc; -use aws_config::SdkConfig; use aws_sdk_cloudwatch::primitives::DateTime; use aws_sdk_cloudwatch::types::Metric as CloudwatchMetric; use aws_sdk_cloudwatch::types::{Dimension, MetricDataQuery, MetricStat}; use aws_sdk_cloudwatch::Client; use chrono::{Duration, Utc}; use governor::DefaultDirectRateLimiter; -use indicatif::{ProgressBar, ProgressStyle}; use phf::Map; use serde::{Deserialize, Serialize}; -use crate::commands::cloud_linter::resource::Resource; use crate::commands::cloud_linter::utils::rate_limit; use crate::error::CliError; -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] pub(crate) struct Metric { pub name: String, pub values: Vec, @@ -63,35 +60,6 @@ where } } -pub(crate) async fn append_metrics_to_resources( - config: &SdkConfig, - limiter: Arc, - mut resources: Vec, -) -> Result, CliError> { - let bar = ProgressBar::new(resources.len() as u64).with_message("Querying metrics"); - bar.set_style(ProgressStyle::with_template(" {msg} {bar} {eta}").expect("invalid template")); - let metrics_client = Client::new(config); - - for resource in &mut resources { - match resource { - Resource::DynamoDb(dynamodb_resource) => { - dynamodb_resource - .append_metrics(&metrics_client, Arc::clone(&limiter)) - .await?; - } - Resource::ElastiCache(elasticache_resource) => { - elasticache_resource - .append_metrics(&metrics_client, Arc::clone(&limiter)) - .await?; - } - } - bar.inc(1); - } - bar.finish(); - - Ok(resources) -} - async fn query_metrics_for_target( client: &Client, limiter: Arc, diff --git a/momento/src/commands/cloud_linter/mod.rs b/momento/src/commands/cloud_linter/mod.rs index 5f1d37b..726985e 100644 --- a/momento/src/commands/cloud_linter/mod.rs +++ b/momento/src/commands/cloud_linter/mod.rs @@ -3,4 +3,5 @@ mod elasticache; pub mod linter_cli; mod metrics; mod resource; +mod serverless_elasticache; mod utils; diff --git a/momento/src/commands/cloud_linter/resource.rs b/momento/src/commands/cloud_linter/resource.rs index 78e3cd6..fcb67b4 100644 --- a/momento/src/commands/cloud_linter/resource.rs +++ b/momento/src/commands/cloud_linter/resource.rs @@ -3,12 +3,14 @@ use serde::Serialize; use crate::commands::cloud_linter::dynamodb::DynamoDbMetadata; use crate::commands::cloud_linter::elasticache::ElastiCacheMetadata; use crate::commands::cloud_linter::metrics::Metric; +use crate::commands::cloud_linter::serverless_elasticache::ServerlessElastiCacheMetadata; -#[derive(Serialize)] +#[derive(Serialize, Debug)] #[serde(untagged)] pub(crate) enum Resource { DynamoDb(DynamoDbResource), ElastiCache(ElastiCacheResource), + ServerlessElastiCache(ServerlessElastiCacheResource), } #[derive(Debug, Serialize, PartialEq)] @@ -21,9 +23,11 @@ pub(crate) enum ResourceType { ElastiCacheRedisNode, #[serde(rename = "AWS::Elasticache::MemcachedNode")] ElastiCacheMemcachedNode, + #[serde(rename = "AWS::Elasticache::Serverless")] + ServerlessElastiCache, } -#[derive(Serialize)] +#[derive(Serialize, Debug)] pub(crate) struct DynamoDbResource { #[serde(rename = "type")] pub(crate) resource_type: ResourceType, @@ -35,7 +39,7 @@ pub(crate) struct DynamoDbResource { pub(crate) metadata: DynamoDbMetadata, } -#[derive(Serialize)] +#[derive(Serialize, Debug)] pub(crate) struct ElastiCacheResource { #[serde(rename = "type")] pub(crate) resource_type: ResourceType, @@ -47,7 +51,19 @@ pub(crate) struct ElastiCacheResource { pub(crate) metadata: ElastiCacheMetadata, } -#[derive(Serialize)] +#[derive(Serialize, Debug)] +pub(crate) struct ServerlessElastiCacheResource { + #[serde(rename = "type")] + pub(crate) resource_type: ResourceType, + pub(crate) region: String, + pub(crate) id: String, + pub(crate) metrics: Vec, + #[serde(rename = "metricPeriodSeconds")] + pub(crate) metric_period_seconds: i32, + pub(crate) metadata: ServerlessElastiCacheMetadata, +} + +#[derive(Serialize, Debug)] pub(crate) struct DataFormat { pub(crate) resources: Vec, } diff --git a/momento/src/commands/cloud_linter/serverless_elasticache.rs b/momento/src/commands/cloud_linter/serverless_elasticache.rs new file mode 100644 index 0000000..fce88f2 --- /dev/null +++ b/momento/src/commands/cloud_linter/serverless_elasticache.rs @@ -0,0 +1,238 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use aws_config::SdkConfig; +use aws_sdk_elasticache::types::{ + CacheUsageLimits, DataStorage, DataStorageUnit, EcpuPerSecond, ServerlessCache, +}; +use governor::DefaultDirectRateLimiter; +use indicatif::ProgressBar; +use phf::{phf_map, Map}; +use serde::Serialize; +use tokio::sync::mpsc::Sender; + +use crate::commands::cloud_linter::metrics::{Metric, MetricTarget, ResourceWithMetrics}; +use crate::commands::cloud_linter::resource::{ + Resource, ResourceType, ServerlessElastiCacheResource, +}; +use crate::commands::cloud_linter::utils::rate_limit; +use crate::error::CliError; + +use super::metrics::AppendMetrics; + +pub(crate) const SERVERLESS_CACHE_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! { + "Sum" => &[ + "NetworkBytesIn", + "NetworkBytesOut", + "GeoSpatialBasedCmds", + "EvalBasedCmds", + "EvalBasedCmdsECPUs", + "GetTypeCmds", + "GetTypeCmdsECPUs" , + "HashBasedCmds", + "HashBasedCmdsECPUs", + "JsonBasedCmds", + "JsonBasedCmdsECPUs", + "KeyBasedCmds", + "KeyBasedCmdsECPUs", + "ListBasedCmds", + "ListBasedCmdsECPUs", + "SetBasedCmds", + "SetBasedCmdsECPUs", + "SetTypeCmds", + "SetTypeCmdsECPUs", + "StringBasedCmds", + "StringBasedCmdsECPUs", + "PubSubBasedCmds", + "PubSubBasedCmdsECPUs", + "SortedSetBasedCmds", + "SortedSetBasedCmdsECPUs", + "StreamBasedCmds", + "StreamBasedCmdsECPUs", + "ElastiCacheProcessingUnits" + ], + "Average" => &[ + "DB0AverageTTL", + "ElastiCacheProcessingUnits" + ], + "Maximum" => &[ + "CurrConnections", + "NewConnections", + "EngineCPUUtilization", + "CPUUtilization", + "FreeableMemory", + "BytesUsedForCache", + "DatabaseMemoryUsagePercentage", + "CurrItems", + "KeysTracked", + "Evictions", + "CacheHitRate", + ], +}; + +#[derive(Serialize, Clone, Debug)] +pub(crate) struct ServerlessElastiCacheMetadata { + name: String, + engine: String, + #[serde(rename = "maxDataStorageGB")] + max_data_storage_gb: i32, + #[serde(rename = "dataStorageUnit")] + data_storage_unit: String, + #[serde(rename = "maxEcpuPerSecond")] + max_ecpu_per_second: i32, + #[serde(rename = "snapshotRetentionLimit")] + snapshot_retention_limit: i32, + #[serde(rename = "dailySnapshotTime")] + daily_snapshot_time: String, + #[serde(rename = "userGroupId")] + user_group_id: String, + #[serde(rename = "engineVersion")] + engine_version: String, +} + +impl ResourceWithMetrics for ServerlessElastiCacheResource { + fn create_metric_target(&self) -> Result { + match self.resource_type { + ResourceType::ServerlessElastiCache => Ok(MetricTarget { + namespace: "AWS/ElastiCache".to_string(), + dimensions: HashMap::from([ + // the cache id for a serverless elasticache cluster is just the cache name + ("CacheClusterId".to_string(), self.id.clone()), + ]), + targets: SERVERLESS_CACHE_METRICS, + }), + _ => Err(CliError { + msg: "Invalid resource type".to_string(), + }), + } + } + + fn set_metrics(&mut self, metrics: Vec) { + self.metrics = metrics; + } + + fn set_metric_period_seconds(&mut self, period: i32) { + self.metric_period_seconds = period; + } +} + +pub(crate) async fn process_serverless_elasticache_resources( + config: &SdkConfig, + control_plane_limiter: Arc, + metrics_limiter: Arc, + sender: Sender, +) -> Result<(), CliError> { + let region = config.region().map(|r| r.as_ref()).ok_or(CliError { + msg: "No region configured for client".to_string(), + })?; + + let elasticache_client = aws_sdk_elasticache::Client::new(config); + let clusters = describe_clusters(&elasticache_client, control_plane_limiter).await?; + + write_resources(clusters, config, region, sender, metrics_limiter).await?; + Ok(()) +} + +async fn describe_clusters( + elasticache_client: &aws_sdk_elasticache::Client, + limiter: Arc, +) -> Result, CliError> { + let bar = + ProgressBar::new_spinner().with_message("Describing Serverless ElastiCache resources"); + bar.enable_steady_tick(Duration::from_millis(100)); + let mut serverless_elasticache = Vec::new(); + let mut elasticache_stream = elasticache_client + .describe_serverless_caches() + .into_paginator() + .send(); + + while let Some(result) = rate_limit(Arc::clone(&limiter), || elasticache_stream.next()).await { + match result { + Ok(result) => { + if let Some(caches) = result.serverless_caches { + serverless_elasticache.extend(caches); + } + } + Err(err) => { + return Err(CliError { + msg: format!("Failed to describe serverless caches: {}", err), + }); + } + } + } + bar.finish(); + + Ok(serverless_elasticache) +} + +async fn write_resources( + caches: Vec, + config: &SdkConfig, + region: &str, + sender: Sender, + metrics_limiter: Arc, +) -> Result<(), CliError> { + let metrics_client = aws_sdk_cloudwatch::Client::new(config); + + for cache in caches { + let cache_name = cache.serverless_cache_name.unwrap_or_default(); + let engine = cache.engine.unwrap_or_default(); + let user_group_id = cache.user_group_id.unwrap_or_default(); + let snapshot_retention_limit = cache.snapshot_retention_limit.unwrap_or(0); + let daily_snapshot_time = cache.daily_snapshot_time.unwrap_or_default(); + + let cache_usage_limits = cache + .cache_usage_limits + .unwrap_or(CacheUsageLimits::builder().build()); + // By default, every Serverless cache can scale to a maximum of 5 TBs of data storage and 15,000,000 ECPUs per second. To control costs, you can choose to set lower usage limits so that your cache will scale to a lower maximum. + // + // When a maximum Memory usage limit is set and your cache hits that limit, then ElastiCache Serverless will begin to evict data, to reject new writes with an Out of Memory error, or both. + // + // When a maximum ECPUs/second limit is set and your cache hits that limit, then ElastiCache Serverless will begin throttling or rejecting requests. + let data_storage = cache_usage_limits.data_storage.unwrap_or( + DataStorage::builder() + .set_maximum(Some(5_000)) + .set_unit(Some(DataStorageUnit::Gb)) + .build(), + ); + + let ecpu = cache_usage_limits.ecpu_per_second.unwrap_or( + EcpuPerSecond::builder() + .set_maximum(Some(15_000_000)) + .build(), + ); + let max_data_storage_gb = data_storage.maximum.unwrap_or(5_000); + let data_storage_unit = data_storage.unit.unwrap_or(DataStorageUnit::Gb); + + let metadata = ServerlessElastiCacheMetadata { + name: cache_name.clone(), + engine, + max_data_storage_gb, + max_ecpu_per_second: ecpu.maximum.unwrap_or_default(), + snapshot_retention_limit, + daily_snapshot_time, + user_group_id, + data_storage_unit: data_storage_unit.to_string(), + engine_version: cache.full_engine_version.unwrap_or_default(), + }; + + let mut serverless_ec_resource = ServerlessElastiCacheResource { + resource_type: ResourceType::ServerlessElastiCache, + region: region.to_string(), + id: cache_name, + metrics: vec![], + metric_period_seconds: 0, + metadata, + }; + serverless_ec_resource + .append_metrics(&metrics_client, Arc::clone(&metrics_limiter)) + .await?; + + let resource = Resource::ServerlessElastiCache(serverless_ec_resource); + sender.send(resource).await.map_err(|err| CliError { + msg: format!("Failed to send serverless elasticache resource: {}", err), + })?; + } + Ok(()) +}