Skip to content

Commit

Permalink
feat: Deny list for repeatedly failing hosts (#1039)
Browse files Browse the repository at this point in the history
  • Loading branch information
loewenheim authored Feb 16, 2023
1 parent 4687cd2 commit 7a9a6b5
Show file tree
Hide file tree
Showing 10 changed files with 387 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Introduce a `CacheKeyBuilder` and human-readable `CacheKey` metadata. ([#1033](https://github.com/getsentry/symbolicator/pull/1033), [#1036](https://github.com/getsentry/symbolicator/pull/1036))
- Use new `CacheKey` for writes and shared-cache. ([#1038](https://github.com/getsentry/symbolicator/pull/1038))
- Consolidate `CacheVersions` and bump to refresh `CacheKey` usage. ([#1041](https://github.com/getsentry/symbolicator/pull/1041), [#1042](https://github.com/getsentry/symbolicator/pull/1042))
- Automatically block downloads from unreliable hosts. ([#1039](https://github.com/getsentry/symbolicator/pull/1039))

## 0.7.0

Expand Down
23 changes: 23 additions & 0 deletions crates/symbolicator-service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,25 @@ pub struct Config {
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,

/// The time window for the host deny list.
///
/// Hosts will be put on the deny list if a certain number of downloads
/// fail within this time window.
#[serde(with = "humantime_serde")]
pub deny_list_time_window: Duration,

/// The granularity at which download failures are tracked in the host deny list.
#[serde(with = "humantime_serde")]
pub deny_list_bucket_size: Duration,

/// The number of failures that must occur in the configured time window for a
/// server to be put on the deny list.
pub deny_list_threshold: usize,

/// The duration for which a host will remain on the deny list.
#[serde(with = "humantime_serde")]
pub deny_list_block_time: Duration,

/// The timeout per GB for streaming downloads.
///
/// For downloads with a known size, this timeout applies per individual
Expand Down Expand Up @@ -475,6 +494,10 @@ impl Default for Config {
connect_timeout: Duration::from_secs(15),
// Allow a 4MB/s connection to download 1GB without timing out
streaming_timeout: Duration::from_secs(250),
deny_list_time_window: Duration::from_secs(60),
deny_list_bucket_size: Duration::from_secs(5),
deny_list_threshold: 20,
deny_list_block_time: Duration::from_secs(24 * 60 * 60),
max_concurrent_requests: Some(120),
shared_cache: None,
_crash_db: None,
Expand Down
185 changes: 180 additions & 5 deletions crates/symbolicator-service/src/services/download/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
//! The sources are described on
//! <https://getsentry.github.io/symbolicator/advanced/symbol-server-compatibility/>

use std::collections::VecDeque;
use std::convert::TryInto;
use std::error::Error;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime};

use ::sentry::SentryFutureExt;
use futures::prelude::*;
Expand Down Expand Up @@ -82,6 +83,115 @@ impl From<GcsError> for CacheError {
}
}

/// A record of a number of download failures in a given second.
#[derive(Debug, Clone, Copy)]
struct FailureCount {
/// The time at which the failures occurred, measured in milliseconds since the Unix Epoch.
timestamp: u64,
/// The number of failures.
failures: usize,
}

type CountedFailures = Arc<Mutex<VecDeque<FailureCount>>>;

/// A structure that keeps track of download failures in a given time interval
/// and puts hosts on a block list accordingly.
///
/// The logic works like this: if a host has at least `FAILURE_THRESHOLD` download
/// failures in a window of `TIME_WINDOW` seconds, it will be blocked for a duration of
/// `BLOCK_TIME`.
#[derive(Clone, Debug)]
struct HostDenyList {
time_window_millis: u64,
bucket_size_millis: u64,
failure_threshold: usize,
failures: moka::sync::Cache<String, CountedFailures>,
blocked_hosts: moka::sync::Cache<String, ()>,
}

impl HostDenyList {
/// Creates an empty [`HostDenyList`].
fn new(
time_window: Duration,
bucket_size: Duration,
failure_threshold: usize,
block_time: Duration,
) -> Self {
let time_window_millis = time_window.as_millis() as u64;
let bucket_size_millis = bucket_size.as_millis() as u64;
Self {
time_window_millis,
bucket_size_millis,
failure_threshold,
failures: moka::sync::Cache::builder()
.time_to_idle(time_window)
.build(),
blocked_hosts: moka::sync::Cache::builder()
.time_to_live(block_time)
.build(),
}
}

/// Rounds a duration down to a multiple of the configured `bucket_size`.
fn round_duration(&self, duration: Duration) -> u64 {
let duration = duration.as_millis() as u64;

duration - (duration % self.bucket_size_millis)
}

/// The maximum length of the failure queue for one host.
fn max_queue_len(&self) -> usize {
// Add one to protect against round issues if `time_window` is not a multiple of `bucket_size`.
(self.time_window_millis / self.bucket_size_millis) as usize + 1
}

/// Registers a download failure for the given `host`.
///
/// If that puts the host over the threshold, it is added
/// to the blocked servers.
fn register_failure(&self, host: String) {
let current_ts = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default();

let current_ts = self.round_duration(current_ts);
let entry = self.failures.entry_by_ref(&host).or_default();

let mut queue = entry.value().lock().unwrap();
match queue.back_mut() {
Some(last) if last.timestamp == current_ts => {
last.failures += 1;
}
_ => {
queue.push_back(FailureCount {
timestamp: current_ts,
failures: 1,
});
}
}

if queue.len() > self.max_queue_len() {
queue.pop_front();
}

let cutoff = current_ts - self.time_window_millis;
let total_failures: usize = queue
.iter()
.filter(|failure_count| failure_count.timestamp >= cutoff)
.map(|failure_count| failure_count.failures)
.sum();

if total_failures >= self.failure_threshold {
self.blocked_hosts.insert(host, ());
}
}

/// Returns true if the given `host` is currently blocked.
fn is_blocked(&self, host: &str) -> bool {
self.blocked_hosts.contains_key(host)
}
}

/// A service which can download files from a [`SourceConfig`].
///
/// The service is rather simple on the outside but will one day control
Expand All @@ -95,6 +205,7 @@ pub struct DownloadService {
s3: s3::S3Downloader,
gcs: gcs::GcsDownloader,
fs: filesystem::FilesystemDownloader,
host_deny_list: HostDenyList,
}

impl DownloadService {
Expand All @@ -107,6 +218,10 @@ impl DownloadService {
connect_timeout,
streaming_timeout,
caches: CacheConfigs { ref in_memory, .. },
deny_list_time_window,
deny_list_bucket_size,
deny_list_threshold,
deny_list_block_time,
..
} = *config;

Expand All @@ -133,6 +248,12 @@ impl DownloadService {
*gcs_token_capacity,
),
fs: filesystem::FilesystemDownloader::new(),
host_deny_list: HostDenyList::new(
deny_list_time_window,
deny_list_bucket_size,
deny_list_threshold,
deny_list_block_time,
),
})
}

Expand Down Expand Up @@ -181,15 +302,43 @@ impl DownloadService {
source: RemoteFile,
destination: PathBuf,
) -> CacheEntry {
let host = source.host();

// Check whether `source` is an internal Sentry source. We don't ever
// want to put such sources on the block list.
let source_is_external = !host.starts_with("sentry:");

if source_is_external && self.host_deny_list.is_blocked(&host) {
return Err(CacheError::DownloadError(
"Server is temporarily blocked".to_string(),
));
}

let slf = self.clone();

let job = async move { slf.dispatch_download(&source, &destination).await };
let job = CancelOnDrop::new(self.runtime.spawn(job.bind_hub(::sentry::Hub::current())));
let job = tokio::time::timeout(self.max_download_timeout, job);
let job = measure("service.download", m::timed_result, job);

job.await
.map_err(|_| CacheError::Timeout(self.max_download_timeout))? // Timeout
.map_err(|_| CacheError::InternalError)? // Spawn error
let result = match job.await {
// Timeout
Err(_) => Err(CacheError::Timeout(self.max_download_timeout)),
// Spawn error
Ok(Err(_)) => Err(CacheError::InternalError),
Ok(Ok(res)) => res,
};

if source_is_external
&& matches!(
result,
Err(CacheError::DownloadError(_) | CacheError::Timeout(_))
)
{
self.host_deny_list.register_failure(host);
}

result
}

/// Returns all objects matching the [`ObjectId`] at the source.
Expand Down Expand Up @@ -617,4 +766,30 @@ mod tests {
// 1.5 GB
assert_eq!(timeout(one_gb * 3 / 2), timeout_per_gb.mul_f64(1.5));
}

#[test]
fn test_host_deny_list() {
let deny_list = HostDenyList::new(
Duration::from_secs(5),
Duration::from_secs(1),
2,
Duration::from_millis(100),
);
let host = String::from("test");

deny_list.register_failure(host.clone());

// shouldn't be blocked after one failure
assert!(!deny_list.is_blocked(&host));

deny_list.register_failure(host.clone());

// should be blocked after two failures
assert!(deny_list.is_blocked(&host));

std::thread::sleep(Duration::from_millis(100));

// should be unblocked after 100ms have passed
assert!(!deny_list.is_blocked(&host));
}
}
Loading

0 comments on commit 7a9a6b5

Please sign in to comment.