Skip to content

Commit

Permalink
feat: serverless elasticache (#320)
Browse files Browse the repository at this point in the history
  • Loading branch information
bruuuuuuuce authored May 6, 2024
1 parent cbed92f commit 27f4074
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 3 deletions.
3 changes: 3 additions & 0 deletions momento/src/commands/cloud_linter/dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ pub(crate) struct DynamoDbMetadata {
ttl_enabled: bool,
#[serde(rename = "isGlobalTable")]
is_global_table: bool,
#[serde(rename = "deleteProtectionEnabled")]
delete_protection_enabled: bool,
#[serde(rename = "lsiCount")]
lsi_count: i64,
#[serde(rename = "tableClass")]
Expand Down Expand Up @@ -365,6 +367,7 @@ async fn process_table_resources(
p_throughput_read_units,
p_throughput_write_units,
gsi: None,
delete_protection_enabled: table.deletion_protection_enabled.unwrap_or_default(),
};

let mut resources = table
Expand Down
12 changes: 9 additions & 3 deletions momento/src/commands/cloud_linter/elasticache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async fn write_resources(
})?;

let engine = cluster.engine.ok_or(CliError {
msg: "ElastiCache cluster has no node type".to_string(),
msg: "ElastiCache cluster has no engine type".to_string(),
})?;
match engine.as_str() {
"redis" => {
Expand Down Expand Up @@ -244,14 +244,20 @@ async fn write_resources(

for resource in resources {
match resource {
Resource::DynamoDb(_) => todo!(),
Resource::ElastiCache(mut er) => {
er.append_metrics(&metrics_client, Arc::clone(&metrics_limiter))
.await?;
sender
.send(Resource::ElastiCache(er))
.await
.expect("TODO: panic message");
.map_err(|err| CliError {
msg: format!("Failed to send elasticache resource: {}", err),
})?;
}
_ => {
return Err(CliError {
msg: "Invalid resource type".to_string(),
});
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions momento/src/commands/cloud_linter/linter_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tokio::fs::{metadata, File};
use tokio::sync::mpsc::{self, Sender};

use crate::commands::cloud_linter::dynamodb::process_ddb_resources;
use crate::commands::cloud_linter::serverless_elasticache::process_serverless_elasticache_resources;
use crate::commands::cloud_linter::utils::check_aws_credentials;
use crate::error::CliError;

Expand Down Expand Up @@ -91,6 +92,14 @@ async fn process_data(region: String, sender: Sender<Resource>) -> Result<(), Cl
)
.await?;

process_serverless_elasticache_resources(
&config,
Arc::clone(&control_plane_limiter),
Arc::clone(&metrics_limiter),
sender.clone(),
)
.await?;

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions momento/src/commands/cloud_linter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ mod elasticache;
pub mod linter_cli;
mod metrics;
mod resource;
mod serverless_elasticache;
mod utils;
16 changes: 16 additions & 0 deletions momento/src/commands/cloud_linter/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ use serde::Serialize;
use crate::commands::cloud_linter::dynamodb::DynamoDbMetadata;
use crate::commands::cloud_linter::elasticache::ElastiCacheMetadata;
use crate::commands::cloud_linter::metrics::Metric;
use crate::commands::cloud_linter::serverless_elasticache::ServerlessElastiCacheMetadata;

#[derive(Serialize, Debug)]
#[serde(untagged)]
pub(crate) enum Resource {
DynamoDb(DynamoDbResource),
ElastiCache(ElastiCacheResource),
ServerlessElastiCache(ServerlessElastiCacheResource),
}

#[derive(Debug, Serialize, PartialEq)]
Expand All @@ -21,6 +23,8 @@ pub(crate) enum ResourceType {
ElastiCacheRedisNode,
#[serde(rename = "AWS::Elasticache::MemcachedNode")]
ElastiCacheMemcachedNode,
#[serde(rename = "AWS::Elasticache::Serverless")]
ServerlessElastiCache,
}

#[derive(Serialize, Debug)]
Expand All @@ -47,6 +51,18 @@ pub(crate) struct ElastiCacheResource {
pub(crate) metadata: ElastiCacheMetadata,
}

#[derive(Serialize, Debug)]
pub(crate) struct ServerlessElastiCacheResource {
#[serde(rename = "type")]
pub(crate) resource_type: ResourceType,
pub(crate) region: String,
pub(crate) id: String,
pub(crate) metrics: Vec<Metric>,
#[serde(rename = "metricPeriodSeconds")]
pub(crate) metric_period_seconds: i32,
pub(crate) metadata: ServerlessElastiCacheMetadata,
}

#[derive(Serialize, Debug)]
pub(crate) struct DataFormat {
pub(crate) resources: Vec<Resource>,
Expand Down
238 changes: 238 additions & 0 deletions momento/src/commands/cloud_linter/serverless_elasticache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use aws_config::SdkConfig;
use aws_sdk_elasticache::types::{
CacheUsageLimits, DataStorage, DataStorageUnit, EcpuPerSecond, ServerlessCache,
};
use governor::DefaultDirectRateLimiter;
use indicatif::ProgressBar;
use phf::{phf_map, Map};
use serde::Serialize;
use tokio::sync::mpsc::Sender;

use crate::commands::cloud_linter::metrics::{Metric, MetricTarget, ResourceWithMetrics};
use crate::commands::cloud_linter::resource::{
Resource, ResourceType, ServerlessElastiCacheResource,
};
use crate::commands::cloud_linter::utils::rate_limit;
use crate::error::CliError;

use super::metrics::AppendMetrics;

pub(crate) const SERVERLESS_CACHE_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! {
"Sum" => &[
"NetworkBytesIn",
"NetworkBytesOut",
"GeoSpatialBasedCmds",
"EvalBasedCmds",
"EvalBasedCmdsECPUs",
"GetTypeCmds",
"GetTypeCmdsECPUs" ,
"HashBasedCmds",
"HashBasedCmdsECPUs",
"JsonBasedCmds",
"JsonBasedCmdsECPUs",
"KeyBasedCmds",
"KeyBasedCmdsECPUs",
"ListBasedCmds",
"ListBasedCmdsECPUs",
"SetBasedCmds",
"SetBasedCmdsECPUs",
"SetTypeCmds",
"SetTypeCmdsECPUs",
"StringBasedCmds",
"StringBasedCmdsECPUs",
"PubSubBasedCmds",
"PubSubBasedCmdsECPUs",
"SortedSetBasedCmds",
"SortedSetBasedCmdsECPUs",
"StreamBasedCmds",
"StreamBasedCmdsECPUs",
"ElastiCacheProcessingUnits"
],
"Average" => &[
"DB0AverageTTL",
"ElastiCacheProcessingUnits"
],
"Maximum" => &[
"CurrConnections",
"NewConnections",
"EngineCPUUtilization",
"CPUUtilization",
"FreeableMemory",
"BytesUsedForCache",
"DatabaseMemoryUsagePercentage",
"CurrItems",
"KeysTracked",
"Evictions",
"CacheHitRate",
],
};

#[derive(Serialize, Clone, Debug)]
pub(crate) struct ServerlessElastiCacheMetadata {
name: String,
engine: String,
#[serde(rename = "maxDataStorageGB")]
max_data_storage_gb: i32,
#[serde(rename = "dataStorageUnit")]
data_storage_unit: String,
#[serde(rename = "maxEcpuPerSecond")]
max_ecpu_per_second: i32,
#[serde(rename = "snapshotRetentionLimit")]
snapshot_retention_limit: i32,
#[serde(rename = "dailySnapshotTime")]
daily_snapshot_time: String,
#[serde(rename = "userGroupId")]
user_group_id: String,
#[serde(rename = "engineVersion")]
engine_version: String,
}

impl ResourceWithMetrics for ServerlessElastiCacheResource {
fn create_metric_target(&self) -> Result<MetricTarget, CliError> {
match self.resource_type {
ResourceType::ServerlessElastiCache => Ok(MetricTarget {
namespace: "AWS/ElastiCache".to_string(),
dimensions: HashMap::from([
// the cache id for a serverless elasticache cluster is just the cache name
("CacheClusterId".to_string(), self.id.clone()),
]),
targets: SERVERLESS_CACHE_METRICS,
}),
_ => Err(CliError {
msg: "Invalid resource type".to_string(),
}),
}
}

fn set_metrics(&mut self, metrics: Vec<Metric>) {
self.metrics = metrics;
}

fn set_metric_period_seconds(&mut self, period: i32) {
self.metric_period_seconds = period;
}
}

pub(crate) async fn process_serverless_elasticache_resources(
config: &SdkConfig,
control_plane_limiter: Arc<DefaultDirectRateLimiter>,
metrics_limiter: Arc<DefaultDirectRateLimiter>,
sender: Sender<Resource>,
) -> Result<(), CliError> {
let region = config.region().map(|r| r.as_ref()).ok_or(CliError {
msg: "No region configured for client".to_string(),
})?;

let elasticache_client = aws_sdk_elasticache::Client::new(config);
let clusters = describe_clusters(&elasticache_client, control_plane_limiter).await?;

write_resources(clusters, config, region, sender, metrics_limiter).await?;
Ok(())
}

async fn describe_clusters(
elasticache_client: &aws_sdk_elasticache::Client,
limiter: Arc<DefaultDirectRateLimiter>,
) -> Result<Vec<ServerlessCache>, CliError> {
let bar =
ProgressBar::new_spinner().with_message("Describing Serverless ElastiCache resources");
bar.enable_steady_tick(Duration::from_millis(100));
let mut serverless_elasticache = Vec::new();
let mut elasticache_stream = elasticache_client
.describe_serverless_caches()
.into_paginator()
.send();

while let Some(result) = rate_limit(Arc::clone(&limiter), || elasticache_stream.next()).await {
match result {
Ok(result) => {
if let Some(caches) = result.serverless_caches {
serverless_elasticache.extend(caches);
}
}
Err(err) => {
return Err(CliError {
msg: format!("Failed to describe serverless caches: {}", err),
});
}
}
}
bar.finish();

Ok(serverless_elasticache)
}

async fn write_resources(
caches: Vec<ServerlessCache>,
config: &SdkConfig,
region: &str,
sender: Sender<Resource>,
metrics_limiter: Arc<DefaultDirectRateLimiter>,
) -> Result<(), CliError> {
let metrics_client = aws_sdk_cloudwatch::Client::new(config);

for cache in caches {
let cache_name = cache.serverless_cache_name.unwrap_or_default();
let engine = cache.engine.unwrap_or_default();
let user_group_id = cache.user_group_id.unwrap_or_default();
let snapshot_retention_limit = cache.snapshot_retention_limit.unwrap_or(0);
let daily_snapshot_time = cache.daily_snapshot_time.unwrap_or_default();

let cache_usage_limits = cache
.cache_usage_limits
.unwrap_or(CacheUsageLimits::builder().build());
// By default, every Serverless cache can scale to a maximum of 5 TBs of data storage and 15,000,000 ECPUs per second. To control costs, you can choose to set lower usage limits so that your cache will scale to a lower maximum.
//
// When a maximum Memory usage limit is set and your cache hits that limit, then ElastiCache Serverless will begin to evict data, to reject new writes with an Out of Memory error, or both.
//
// When a maximum ECPUs/second limit is set and your cache hits that limit, then ElastiCache Serverless will begin throttling or rejecting requests.
let data_storage = cache_usage_limits.data_storage.unwrap_or(
DataStorage::builder()
.set_maximum(Some(5_000))
.set_unit(Some(DataStorageUnit::Gb))
.build(),
);

let ecpu = cache_usage_limits.ecpu_per_second.unwrap_or(
EcpuPerSecond::builder()
.set_maximum(Some(15_000_000))
.build(),
);
let max_data_storage_gb = data_storage.maximum.unwrap_or(5_000);
let data_storage_unit = data_storage.unit.unwrap_or(DataStorageUnit::Gb);

let metadata = ServerlessElastiCacheMetadata {
name: cache_name.clone(),
engine,
max_data_storage_gb,
max_ecpu_per_second: ecpu.maximum.unwrap_or_default(),
snapshot_retention_limit,
daily_snapshot_time,
user_group_id,
data_storage_unit: data_storage_unit.to_string(),
engine_version: cache.full_engine_version.unwrap_or_default(),
};

let mut serverless_ec_resource = ServerlessElastiCacheResource {
resource_type: ResourceType::ServerlessElastiCache,
region: region.to_string(),
id: cache_name,
metrics: vec![],
metric_period_seconds: 0,
metadata,
};
serverless_ec_resource
.append_metrics(&metrics_client, Arc::clone(&metrics_limiter))
.await?;

let resource = Resource::ServerlessElastiCache(serverless_ec_resource);
sender.send(resource).await.map_err(|err| CliError {
msg: format!("Failed to send serverless elasticache resource: {}", err),
})?;
}
Ok(())
}

0 comments on commit 27f4074

Please sign in to comment.