From 766fbc04e2bc1bf58f938b42882240d805dc57f7 Mon Sep 17 00:00:00 2001 From: Roman Valls Guimera Date: Mon, 6 Feb 2023 21:26:45 +1100 Subject: [PATCH] Remove s3-server and dependencies (#150) * fix(search) Bump up s3s-* crates as suggested in https://github.com/umccr/htsget-rs/issues/131#issuecomment-1413125841 * test(search): fix tests related to improperly formatted `folder` in URLs for S3 mock filesystem * fix(search): check that a key exists before formatting a URL, ensuring that pre-signed URLs are not generated for non-existent keys * fix(search): use KeyNotFound error when a key is not found * fix(search): use KeyNotFound for head and get object in AwsS3Storage * test(search): add set of tests targeting non-existent keys * build(search): remove unused dependencies --------- Co-authored-by: Marko Malenic --- htsget-lambda/Cargo.toml | 1 - htsget-search/Cargo.toml | 15 +- htsget-search/src/htsget/bam_search.rs | 168 +++++++++++++++++------ htsget-search/src/htsget/bcf_search.rs | 166 ++++++++++++++++------ htsget-search/src/htsget/cram_search.rs | 107 ++++++++++++++- htsget-search/src/htsget/from_storage.rs | 57 ++++---- htsget-search/src/htsget/mod.rs | 17 ++- htsget-search/src/htsget/search.rs | 9 ++ htsget-search/src/htsget/vcf_search.rs | 134 ++++++++++++++---- htsget-search/src/storage/aws.rs | 153 +++++++++++---------- htsget-search/src/storage/data_server.rs | 5 +- htsget-search/src/storage/local.rs | 20 ++- htsget-search/src/storage/mod.rs | 10 +- 13 files changed, 629 insertions(+), 233 deletions(-) diff --git a/htsget-lambda/Cargo.toml b/htsget-lambda/Cargo.toml index cecc86a5d..ba802c1cb 100644 --- a/htsget-lambda/Cargo.toml +++ b/htsget-lambda/Cargo.toml @@ -15,7 +15,6 @@ s3-storage = ["htsget-config/s3-storage", "htsget-search/s3-storage", "htsget-ht default = ["s3-storage"] [dependencies] -envy = "0.4" tokio = { version = "1.24", features = ["macros", "rt-multi-thread"] } tower-http = { version = "0.3", features = ["cors"] } lambda_http = { version = "0.7" } diff --git a/htsget-search/Cargo.toml b/htsget-search/Cargo.toml index ca4d2e926..e8940e0e2 100644 --- a/htsget-search/Cargo.toml +++ b/htsget-search/Cargo.toml @@ -41,8 +41,8 @@ noodles-vcf = { version = "0.23", features = ["async"] } # Amazon S3 bytes = { version = "1.2", optional = true } -aws-sdk-s3 = { version = "0.22", optional = true } -aws-config = { version = "0.52", optional = true } +aws-sdk-s3 = { version = "0.24", optional = true, features = ["test-util"] } +aws-config = { version = "0.54", optional = true } # Error control, tracing, config thiserror = "1.0" @@ -55,13 +55,16 @@ serde = "1.0" htsget-test = { path = "../htsget-test", features = ["cors-tests"], default-features = false } tempfile = "3.3" data-url = "0.2" +once_cell = "1.17" -# Aws S3 storage dependencies. +# Aws S3 storage. anyhow = "1.0" -s3-server = "0.2" -aws-types = { version = "0.52", features = ["hardcoded-credentials"] } +aws-credential-types = { version = "0.54", features = ["test-util"] } +s3s = { version = "0.3" } +s3s-fs = { version = "0.3" } +s3s-aws = { version = "0.3" } -# Axum server dependencies +# Axum server reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] } criterion = { version = "0.4", features = ["async_tokio"] } diff --git a/htsget-search/src/htsget/bam_search.rs b/htsget-search/src/htsget/bam_search.rs index c1bd7f883..3a9b9e552 100644 --- a/htsget-search/src/htsget/bam_search.rs +++ b/htsget-search/src/htsget/bam_search.rs @@ -178,16 +178,18 @@ pub(crate) mod tests { use htsget_test::util::expected_bgzf_eof_data_url; - use crate::htsget::from_storage::tests::{ - with_local_storage as with_local_storage_path, - with_local_storage_tmp as with_local_storage_tmp_path, - }; - use crate::htsget::{Class::Body, Class::Header, Headers, Response, Url}; + #[cfg(feature = "s3-storage")] + use crate::htsget::from_storage::tests::with_aws_storage_fn; + use crate::htsget::from_storage::tests::with_local_storage_fn; + use crate::htsget::{Class::Body, Class::Header, Headers, HtsGetError::NotFound, Response, Url}; use crate::storage::data_server::HttpTicketFormatter; use crate::storage::local::LocalStorage; use super::*; + const DATA_LOCATION: &str = "data/bam"; + const INDEX_FILE_LOCATION: &str = "htsnexus_test_NA12878.bam.bai"; + #[tokio::test] async fn search_all_reads() { with_local_storage(|storage| async move { @@ -354,29 +356,33 @@ pub(crate) mod tests { #[tokio::test] async fn search_no_gzi() { - with_local_storage_tmp(|storage| async move { - let search = BamSearch::new(storage.clone()); - let query = Query::new("htsnexus_test_NA12878", Format::Bam) - .with_reference_name("11") - .with_start(5015000) - .with_end(5050000); - let response = search.search(query).await; - println!("{response:#?}"); - - let expected_response = Ok(Response::new( - Format::Bam, - vec![ - Url::new(expected_url()) - .with_headers(Headers::default().with_header("Range", "bytes=0-4667")) - .with_class(Header), - Url::new(expected_url()) - .with_headers(Headers::default().with_header("Range", "bytes=256721-1065951")) - .with_class(Body), - Url::new(expected_bgzf_eof_data_url()).with_class(Body), - ], - )); - assert_eq!(response, expected_response) - }) + with_local_storage_fn( + |storage| async move { + let search = BamSearch::new(storage.clone()); + let query = Query::new("htsnexus_test_NA12878", Format::Bam) + .with_reference_name("11") + .with_start(5015000) + .with_end(5050000); + let response = search.search(query).await; + println!("{response:#?}"); + + let expected_response = Ok(Response::new( + Format::Bam, + vec![ + Url::new(expected_url()) + .with_headers(Headers::default().with_header("Range", "bytes=0-4667")) + .with_class(Header), + Url::new(expected_url()) + .with_headers(Headers::default().with_header("Range", "bytes=256721-1065951")) + .with_class(Body), + Url::new(expected_bgzf_eof_data_url()).with_class(Body), + ], + )); + assert_eq!(response, expected_response) + }, + DATA_LOCATION, + &["htsnexus_test_NA12878.bam", INDEX_FILE_LOCATION], + ) .await } @@ -399,25 +405,105 @@ pub(crate) mod tests { .await; } - pub(crate) async fn with_local_storage(test: F) - where - F: FnOnce(Arc>) -> Fut, - Fut: Future, - { - with_local_storage_path(test, "data/bam").await + #[tokio::test] + async fn search_non_existent_id_reference_name() { + with_local_storage_fn( + |storage| async move { + let search = BamSearch::new(storage.clone()); + let query = Query::new("htsnexus_test_NA12878", Format::Bam); + let response = search.search(query).await; + assert!(matches!(response, Err(NotFound(_)))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[tokio::test] + async fn search_non_existent_id_all_reads() { + with_local_storage_fn( + |storage| async move { + let search = BamSearch::new(storage.clone()); + let query = Query::new("htsnexus_test_NA12878", Format::Bam).with_reference_name("20"); + let response = search.search(query).await; + assert!(matches!(response, Err(NotFound(_)))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await } - async fn with_local_storage_tmp(test: F) + #[tokio::test] + async fn search_non_existent_id_header() { + with_local_storage_fn( + |storage| async move { + let search = BamSearch::new(storage.clone()); + let query = Query::new("htsnexus_test_NA12878", Format::Bam).with_class(Header); + let response = search.search(query).await; + assert!(matches!(response, Err(NotFound(_)))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[cfg(feature = "s3-storage")] + #[tokio::test] + async fn search_non_existent_id_reference_name_aws() { + with_aws_storage_fn( + |storage| async move { + let search = BamSearch::new(storage); + let query = Query::new("htsnexus_test_NA12878", Format::Bam); + let response = search.search(query).await; + assert!(matches!(response, Err(_))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[cfg(feature = "s3-storage")] + #[tokio::test] + async fn search_non_existent_id_all_reads_aws() { + with_aws_storage_fn( + |storage| async move { + let search = BamSearch::new(storage); + let query = Query::new("htsnexus_test_NA12878", Format::Bam).with_reference_name("20"); + let response = search.search(query).await; + assert!(matches!(response, Err(_))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[cfg(feature = "s3-storage")] + #[tokio::test] + async fn search_non_existent_id_header_aws() { + with_aws_storage_fn( + |storage| async move { + let search = BamSearch::new(storage); + let query = Query::new("htsnexus_test_NA12878", Format::Bam).with_class(Header); + let response = search.search(query).await; + assert!(matches!(response, Err(_))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + pub(crate) async fn with_local_storage(test: F) where F: FnOnce(Arc>) -> Fut, Fut: Future, { - with_local_storage_tmp_path( - test, - "data/bam", - &["htsnexus_test_NA12878.bam", "htsnexus_test_NA12878.bam.bai"], - ) - .await + with_local_storage_fn(test, DATA_LOCATION, &[]).await } pub(crate) fn expected_url() -> String { diff --git a/htsget-search/src/htsget/bcf_search.rs b/htsget-search/src/htsget/bcf_search.rs index ae2c103bf..90bd157a5 100644 --- a/htsget-search/src/htsget/bcf_search.rs +++ b/htsget-search/src/htsget/bcf_search.rs @@ -132,16 +132,18 @@ mod tests { use htsget_test::util::expected_bgzf_eof_data_url; - use crate::htsget::from_storage::tests::{ - with_local_storage as with_local_storage_path, - with_local_storage_tmp as with_local_storage_tmp_path, - }; - use crate::htsget::{Class, Headers, Response, Url}; + #[cfg(feature = "s3-storage")] + use crate::htsget::from_storage::tests::with_aws_storage_fn; + use crate::htsget::from_storage::tests::with_local_storage_fn; + use crate::htsget::{Class::Header, Headers, HtsGetError::NotFound, Response, Url}; use crate::storage::data_server::HttpTicketFormatter; use crate::storage::local::LocalStorage; use super::*; + const DATA_LOCATION: &str = "data/bcf"; + const INDEX_FILE_LOCATION: &str = "vcf-spec-v4.3.bcf.csi"; + #[tokio::test] async fn search_all_variants() { with_local_storage(|storage| async move { @@ -206,12 +208,127 @@ mod tests { #[tokio::test] async fn search_no_gzi() { - with_local_storage_tmp(|storage| async move { - test_reference_sequence_with_seq_range(storage).await + with_local_storage_fn( + |storage| async move { test_reference_sequence_with_seq_range(storage).await }, + DATA_LOCATION, + &["sample1-bcbio-cancer.bcf", "sample1-bcbio-cancer.bcf.csi"], + ) + .await + } + + #[tokio::test] + async fn search_header() { + with_local_storage(|storage| async move { + let search = BcfSearch::new(storage.clone()); + let filename = "vcf-spec-v4.3"; + let query = Query::new(filename, Format::Bcf).with_class(Header); + let response = search.search(query).await; + println!("{response:#?}"); + + let expected_response = Ok(Response::new( + Format::Bcf, + vec![Url::new(expected_url(filename)) + .with_headers(Headers::default().with_header("Range", "bytes=0-949")) + .with_class(Header)], + )); + assert_eq!(response, expected_response) }) .await } + #[tokio::test] + async fn search_non_existent_id_reference_name() { + with_local_storage_fn( + |storage| async move { + let search = BcfSearch::new(storage.clone()); + let query = Query::new("vcf-spec-v4.3", Format::Bcf); + let response = search.search(query).await; + assert!(matches!(response, Err(NotFound(_)))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[tokio::test] + async fn search_non_existent_id_all_reads() { + with_local_storage_fn( + |storage| async move { + let search = BcfSearch::new(storage.clone()); + let query = Query::new("vcf-spec-v4.3", Format::Bcf).with_reference_name("chrM"); + let response = search.search(query).await; + assert!(matches!(response, Err(NotFound(_)))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[tokio::test] + async fn search_non_existent_id_header() { + with_local_storage_fn( + |storage| async move { + let search = BcfSearch::new(storage.clone()); + let query = Query::new("vcf-spec-v4.3", Format::Bcf).with_class(Header); + let response = search.search(query).await; + assert!(matches!(response, Err(NotFound(_)))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[cfg(feature = "s3-storage")] + #[tokio::test] + async fn search_non_existent_id_reference_name_aws() { + with_aws_storage_fn( + |storage| async move { + let search = BcfSearch::new(storage); + let query = Query::new("vcf-spec-v4.3", Format::Bcf); + let response = search.search(query).await; + assert!(matches!(response, Err(_))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[cfg(feature = "s3-storage")] + #[tokio::test] + async fn search_non_existent_id_all_reads_aws() { + with_aws_storage_fn( + |storage| async move { + let search = BcfSearch::new(storage); + let query = Query::new("vcf-spec-v4.3", Format::Bcf).with_reference_name("chrM"); + let response = search.search(query).await; + assert!(matches!(response, Err(_))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[cfg(feature = "s3-storage")] + #[tokio::test] + async fn search_non_existent_id_header_aws() { + with_aws_storage_fn( + |storage| async move { + let search = BcfSearch::new(storage); + let query = Query::new("vcf-spec-v4.3", Format::Bcf).with_class(Header); + let response = search.search(query).await; + assert!(matches!(response, Err(_))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + async fn test_reference_sequence_with_seq_range(storage: Arc>) { let search = BcfSearch::new(storage.clone()); let filename = "sample1-bcbio-cancer"; @@ -237,45 +354,12 @@ mod tests { ) } - #[tokio::test] - async fn search_header() { - with_local_storage(|storage| async move { - let search = BcfSearch::new(storage.clone()); - let filename = "vcf-spec-v4.3"; - let query = Query::new(filename, Format::Bcf).with_class(Class::Header); - let response = search.search(query).await; - println!("{response:#?}"); - - let expected_response = Ok(Response::new( - Format::Bcf, - vec![Url::new(expected_url(filename)) - .with_headers(Headers::default().with_header("Range", "bytes=0-949")) - .with_class(Class::Header)], - )); - assert_eq!(response, expected_response) - }) - .await - } - async fn with_local_storage(test: F) where F: FnOnce(Arc>) -> Fut, Fut: Future, { - with_local_storage_path(test, "data/bcf").await - } - - async fn with_local_storage_tmp(test: F) - where - F: FnOnce(Arc>) -> Fut, - Fut: Future, - { - with_local_storage_tmp_path( - test, - "data/bcf", - &["sample1-bcbio-cancer.bcf", "sample1-bcbio-cancer.bcf.csi"], - ) - .await + with_local_storage_fn(test, "data/bcf", &[]).await } fn expected_url(name: &str) -> String { diff --git a/htsget-search/src/htsget/cram_search.rs b/htsget-search/src/htsget/cram_search.rs index 91cd39074..d1bd1265b 100644 --- a/htsget-search/src/htsget/cram_search.rs +++ b/htsget-search/src/htsget/cram_search.rs @@ -7,7 +7,6 @@ use std::sync::Arc; use async_trait::async_trait; use futures::StreamExt; use futures_util::stream::FuturesOrdered; -use htsget_config::Interval; use noodles::core::Position; use noodles::cram; use noodles::cram::crai; @@ -17,6 +16,8 @@ use tokio::io::{AsyncRead, BufReader}; use tokio::{io, select}; use tracing::{instrument, trace}; +use htsget_config::Interval; + use crate::htsget::search::{Search, SearchAll, SearchReads}; use crate::htsget::Class::Body; use crate::htsget::{Format, HtsGetError, Query, Result}; @@ -269,13 +270,18 @@ mod tests { use htsget_test::util::expected_cram_eof_data_url; - use crate::htsget::from_storage::tests::with_local_storage as with_local_storage_path; - use crate::htsget::{Class::Body, Class::Header, Headers, Response, Url}; + #[cfg(feature = "s3-storage")] + use crate::htsget::from_storage::tests::with_aws_storage_fn; + use crate::htsget::from_storage::tests::with_local_storage_fn; + use crate::htsget::{Class::Header, Headers, HtsGetError::NotFound, Response, Url}; use crate::storage::data_server::HttpTicketFormatter; use crate::storage::local::LocalStorage; use super::*; + const DATA_LOCATION: &str = "data/cram"; + const INDEX_FILE_LOCATION: &str = "htsnexus_test_NA12878.cram.crai"; + #[tokio::test] async fn search_all_reads() { with_local_storage(|storage| async move { @@ -434,12 +440,105 @@ mod tests { .await; } + #[tokio::test] + async fn search_non_existent_id_reference_name() { + with_local_storage_fn( + |storage| async move { + let search = CramSearch::new(storage.clone()); + let query = Query::new("htsnexus_test_NA12878", Format::Cram); + let response = search.search(query).await; + assert!(matches!(response, Err(NotFound(_)))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[tokio::test] + async fn search_non_existent_id_all_reads() { + with_local_storage_fn( + |storage| async move { + let search = CramSearch::new(storage.clone()); + let query = Query::new("htsnexus_test_NA12878", Format::Cram).with_reference_name("20"); + let response = search.search(query).await; + assert!(matches!(response, Err(NotFound(_)))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[tokio::test] + async fn search_non_existent_id_header() { + with_local_storage_fn( + |storage| async move { + let search = CramSearch::new(storage.clone()); + let query = Query::new("htsnexus_test_NA12878", Format::Cram).with_class(Header); + let response = search.search(query).await; + assert!(matches!(response, Err(NotFound(_)))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[cfg(feature = "s3-storage")] + #[tokio::test] + async fn search_non_existent_id_reference_name_aws() { + with_aws_storage_fn( + |storage| async move { + let search = CramSearch::new(storage); + let query = Query::new("htsnexus_test_NA12878", Format::Cram); + let response = search.search(query).await; + assert!(matches!(response, Err(_))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[cfg(feature = "s3-storage")] + #[tokio::test] + async fn search_non_existent_id_all_reads_aws() { + with_aws_storage_fn( + |storage| async move { + let search = CramSearch::new(storage); + let query = Query::new("htsnexus_test_NA12878", Format::Cram).with_reference_name("20"); + let response = search.search(query).await; + assert!(matches!(response, Err(_))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[cfg(feature = "s3-storage")] + #[tokio::test] + async fn search_non_existent_id_header_aws() { + with_aws_storage_fn( + |storage| async move { + let search = CramSearch::new(storage); + let query = Query::new("htsnexus_test_NA12878", Format::Cram).with_class(Header); + let response = search.search(query).await; + assert!(matches!(response, Err(_))); + }, + DATA_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + async fn with_local_storage(test: F) where F: FnOnce(Arc>) -> Fut, Fut: Future, { - with_local_storage_path(test, "data/cram").await + with_local_storage_fn(test, "data/cram", &[]).await } fn expected_url() -> String { diff --git a/htsget-search/src/htsget/from_storage.rs b/htsget-search/src/htsget/from_storage.rs index fb7fdc214..b432dc210 100644 --- a/htsget-search/src/htsget/from_storage.rs +++ b/htsget-search/src/htsget/from_storage.rs @@ -5,11 +5,12 @@ use std::path::Path; use std::sync::Arc; use async_trait::async_trait; -use htsget_config::regex_resolver::{Resolver, StorageType}; use tokio::io::AsyncRead; use tracing::debug; use tracing::instrument; +use htsget_config::regex_resolver::{Resolver, StorageType}; + use crate::htsget::search::Search; use crate::htsget::{Format, HtsGetError}; #[cfg(feature = "s3-storage")] @@ -111,12 +112,13 @@ impl HtsGetFromStorage> { #[cfg(test)] pub(crate) mod tests { use std::fs; + use std::fs::create_dir; use std::future::Future; use std::path::PathBuf; - use htsget_config::config::cors::CorsConfig; use tempfile::TempDir; + use htsget_config::config::cors::CorsConfig; use htsget_test::util::expected_bgzf_eof_data_url; use crate::htsget::bam_search::tests::{ @@ -126,6 +128,8 @@ pub(crate) mod tests { expected_url as vcf_expected_url, with_local_storage as with_vcf_local_storage, }; use crate::htsget::{Headers, Url}; + #[cfg(feature = "s3-storage")] + use crate::storage::aws::tests::with_aws_s3_storage_fn; use crate::storage::data_server::HttpTicketFormatter; use super::*; @@ -173,24 +177,30 @@ pub(crate) mod tests { .await; } - async fn with_local_storage_fn(test: F, path: &str, file_names: Option<&[&str]>) - where - F: FnOnce(Arc>) -> Fut, - Fut: Future, - { + async fn copy_files(from_path: &str, to_path: &Path, file_names: &[&str]) -> PathBuf { let mut base_path = std::env::current_dir() .unwrap() .parent() .unwrap() - .join(path); + .join(from_path); - let tmp_dir = TempDir::new().unwrap(); - if let Some(file_names) = file_names { - for file_name in file_names { - fs::copy(base_path.join(file_name), tmp_dir.path().join(file_name)).unwrap(); - } - base_path = PathBuf::from(tmp_dir.path()); + for file_name in file_names { + fs::copy(base_path.join(file_name), to_path.join(file_name)).unwrap(); } + if !file_names.is_empty() { + base_path = PathBuf::from(to_path); + } + + base_path + } + + pub(crate) async fn with_local_storage_fn(test: F, path: &str, file_names: &[&str]) + where + F: FnOnce(Arc>) -> Fut, + Fut: Future, + { + let tmp_dir = TempDir::new().unwrap(); + let base_path = copy_files(path, tmp_dir.path(), file_names).await; test(Arc::new( LocalStorage::new( @@ -202,19 +212,18 @@ pub(crate) mod tests { .await } - pub(crate) async fn with_local_storage(test: F, path: &str) + #[cfg(feature = "s3-storage")] + pub(crate) async fn with_aws_storage_fn(test: F, path: &str, file_names: &[&str]) where - F: FnOnce(Arc>) -> Fut, + F: FnOnce(Arc) -> Fut, Fut: Future, { - with_local_storage_fn(test, path, None).await; - } + let tmp_dir = TempDir::new().unwrap(); + let to_path = tmp_dir.into_path().join("folder"); + create_dir(&to_path).unwrap(); - pub(crate) async fn with_local_storage_tmp(test: F, path: &str, file_names: &[&str]) - where - F: FnOnce(Arc>) -> Fut, - Fut: Future, - { - with_local_storage_fn(test, path, Some(file_names)).await; + let base_path = copy_files(path, &to_path, file_names).await; + + with_aws_s3_storage_fn(test, "folder".to_string(), base_path.parent().unwrap()).await; } } diff --git a/htsget-search/src/htsget/mod.rs b/htsget-search/src/htsget/mod.rs index 842142ea7..fe6b54fb0 100644 --- a/htsget-search/src/htsget/mod.rs +++ b/htsget-search/src/htsget/mod.rs @@ -8,11 +8,12 @@ use std::io; use std::io::ErrorKind; use async_trait::async_trait; -use htsget_config::{Class, Format, Query}; use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::task::JoinError; +use htsget_config::{Class, Format, Query}; + use crate::storage::StorageError; pub mod bam_search; @@ -105,10 +106,10 @@ impl From for io::Error { impl From for HtsGetError { fn from(err: StorageError) -> Self { match err { - err @ (StorageError::InvalidKey(_) | StorageError::InvalidInput(_)) => { - Self::InvalidInput(err.to_string()) + err @ StorageError::InvalidInput(_) => Self::InvalidInput(err.to_string()), + err @ (StorageError::KeyNotFound(_) | StorageError::InvalidKey(_)) => { + Self::NotFound(err.to_string()) } - err @ StorageError::KeyNotFound(_) => Self::NotFound(err.to_string()), err @ StorageError::IoError(_, _) => Self::IoError(err.to_string()), err @ (StorageError::DataServerError(_) | StorageError::InvalidUri(_) @@ -230,10 +231,12 @@ impl Response { #[cfg(test)] mod tests { - use super::*; - use htsget_config::{Fields, NoTags, TaggedTypeAll, Tags}; use std::collections::HashSet; + use htsget_config::{Fields, NoTags, TaggedTypeAll, Tags}; + + use super::*; + #[test] fn htsget_error_not_found() { let result = HtsGetError::not_found("error"); @@ -285,7 +288,7 @@ mod tests { #[test] fn htsget_error_from_storage_invalid_key() { let result = HtsGetError::from(StorageError::InvalidKey("error".to_string())); - assert!(matches!(result, HtsGetError::InvalidInput(_))); + assert!(matches!(result, HtsGetError::NotFound(_))); } #[test] diff --git a/htsget-search/src/htsget/search.rs b/htsget-search/src/htsget/search.rs index fe3f275aa..1513bf90e 100644 --- a/htsget-search/src/htsget/search.rs +++ b/htsget-search/src/htsget/search.rs @@ -269,6 +269,12 @@ where self.build_response(&query, blocks).await } Class::Header => { + // Check to see if the key exists. + self + .get_storage() + .head(query.format().fmt_file(query.id())) + .await?; + let index = self.read_index(&query).await?; let header_byte_ranges = self.get_byte_ranges_for_header(&index).await?; @@ -292,6 +298,7 @@ where DataBlock::Range(range) => { let storage = self.get_storage(); let query_owned = query.clone(); + storage_futures.push_back(tokio::spawn(async move { storage .range_url( @@ -306,6 +313,7 @@ where } } } + let mut urls = Vec::new(); loop { select! { @@ -313,6 +321,7 @@ where else => break } } + return Ok(Response::new(query.format(), urls)); } diff --git a/htsget-search/src/htsget/vcf_search.rs b/htsget-search/src/htsget/vcf_search.rs index 3317f7aa7..99278171c 100644 --- a/htsget-search/src/htsget/vcf_search.rs +++ b/htsget-search/src/htsget/vcf_search.rs @@ -134,16 +134,18 @@ pub(crate) mod tests { use htsget_test::util::expected_bgzf_eof_data_url; - use crate::htsget::from_storage::tests::{ - with_local_storage as with_local_storage_path, - with_local_storage_tmp as with_local_storage_tmp_path, - }; - use crate::htsget::{Class, Headers, Response, Url}; + #[cfg(feature = "s3-storage")] + use crate::htsget::from_storage::tests::with_aws_storage_fn; + use crate::htsget::from_storage::tests::with_local_storage_fn; + use crate::htsget::{Class::Header, Headers, HtsGetError::NotFound, Response, Url}; use crate::storage::data_server::HttpTicketFormatter; use crate::storage::local::LocalStorage; use super::*; + const VCF_LOCATION: &str = "data/vcf"; + const INDEX_FILE_LOCATION: &str = "spec-v4.3.vcf.gz.tbi"; + #[tokio::test] async fn search_all_variants() { with_local_storage(|storage| async move { @@ -207,8 +209,13 @@ pub(crate) mod tests { #[tokio::test] async fn search_no_gzi() { - with_local_storage_tmp( + with_local_storage_fn( |storage| async move { test_reference_name_with_seq_range(storage).await }, + VCF_LOCATION, + &[ + "sample1-bcbio-cancer.vcf.gz", + "sample1-bcbio-cancer.vcf.gz.tbi", + ], ) .await; } @@ -218,7 +225,7 @@ pub(crate) mod tests { with_local_storage(|storage| async move { let search = VcfSearch::new(storage.clone()); let filename = "spec-v4.3"; - let query = Query::new(filename, Format::Vcf).with_class(Class::Header); + let query = Query::new(filename, Format::Vcf).with_class(Header); let response = search.search(query).await; println!("{response:#?}"); @@ -226,13 +233,106 @@ pub(crate) mod tests { Format::Vcf, vec![Url::new(expected_url(filename)) .with_headers(Headers::default().with_header("Range", "bytes=0-822")) - .with_class(Class::Header)], + .with_class(Header)], )); assert_eq!(response, expected_response) }) .await; } + #[tokio::test] + async fn search_non_existent_id_reference_name() { + with_local_storage_fn( + |storage| async move { + let search = VcfSearch::new(storage.clone()); + let query = Query::new("vcf-spec-v4.3", Format::Vcf); + let response = search.search(query).await; + assert!(matches!(response, Err(NotFound(_)))); + }, + VCF_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[tokio::test] + async fn search_non_existent_id_all_reads() { + with_local_storage_fn( + |storage| async move { + let search = VcfSearch::new(storage.clone()); + let query = Query::new("vcf-spec-v4.3", Format::Vcf).with_reference_name("chrM"); + let response = search.search(query).await; + assert!(matches!(response, Err(NotFound(_)))); + }, + VCF_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[tokio::test] + async fn search_non_existent_id_header() { + with_local_storage_fn( + |storage| async move { + let search = VcfSearch::new(storage.clone()); + let query = Query::new("vcf-spec-v4.3", Format::Vcf).with_class(Header); + let response = search.search(query).await; + assert!(matches!(response, Err(NotFound(_)))); + }, + VCF_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[cfg(feature = "s3-storage")] + #[tokio::test] + async fn search_non_existent_id_reference_name_aws() { + with_aws_storage_fn( + |storage| async move { + let search = VcfSearch::new(storage); + let query = Query::new("vcf-spec-v4.3", Format::Vcf); + let response = search.search(query).await; + assert!(matches!(response, Err(_))); + }, + VCF_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[cfg(feature = "s3-storage")] + #[tokio::test] + async fn search_non_existent_id_all_reads_aws() { + with_aws_storage_fn( + |storage| async move { + let search = VcfSearch::new(storage); + let query = Query::new("vcf-spec-v4.3", Format::Vcf).with_reference_name("chrM"); + let response = search.search(query).await; + assert!(matches!(response, Err(_))); + }, + VCF_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + + #[cfg(feature = "s3-storage")] + #[tokio::test] + async fn search_non_existent_id_header_aws() { + with_aws_storage_fn( + |storage| async move { + let search = VcfSearch::new(storage); + let query = Query::new("vcf-spec-v4.3", Format::Vcf).with_class(Header); + let response = search.search(query).await; + assert!(matches!(response, Err(_))); + }, + VCF_LOCATION, + &[INDEX_FILE_LOCATION], + ) + .await + } + async fn test_reference_name_with_seq_range(storage: Arc>) { let search = VcfSearch::new(storage.clone()); let filename = "sample1-bcbio-cancer"; @@ -263,23 +363,7 @@ pub(crate) mod tests { F: FnOnce(Arc>) -> Fut, Fut: Future, { - with_local_storage_path(test, "data/vcf").await - } - - async fn with_local_storage_tmp(test: F) - where - F: FnOnce(Arc>) -> Fut, - Fut: Future, - { - with_local_storage_tmp_path( - test, - "data/vcf", - &[ - "sample1-bcbio-cancer.vcf.gz", - "sample1-bcbio-cancer.vcf.gz.tbi", - ], - ) - .await + with_local_storage_fn(test, "data/vcf", &[]).await } pub(crate) fn expected_url(name: &str) -> String { diff --git a/htsget-search/src/storage/aws.rs b/htsget-search/src/storage/aws.rs index a764260c7..57f62a3a0 100644 --- a/htsget-search/src/storage/aws.rs +++ b/htsget-search/src/storage/aws.rs @@ -1,4 +1,6 @@ //! Module providing an implementation for the [Storage] trait using Amazon's S3 object storage service. +//! + use std::fmt::Debug; use std::io; use std::io::ErrorKind::Other; @@ -6,10 +8,11 @@ use std::time::Duration; use async_trait::async_trait; use aws_sdk_s3::client::fluent_builders; +use aws_sdk_s3::error::{GetObjectError, GetObjectErrorKind, HeadObjectErrorKind}; use aws_sdk_s3::model::StorageClass; use aws_sdk_s3::output::HeadObjectOutput; use aws_sdk_s3::presigning::config::PresigningConfig; -use aws_sdk_s3::types::ByteStream; +use aws_sdk_s3::types::{ByteStream, SdkError}; use aws_sdk_s3::Client; use bytes::Bytes; use fluent_builders::GetObject; @@ -19,7 +22,7 @@ use tracing::instrument; use crate::htsget::Url; use crate::storage::aws::Retrieval::{Delayed, Immediate}; -use crate::storage::StorageError::AwsS3Error; +use crate::storage::StorageError::{AwsS3Error, KeyNotFound}; use crate::storage::{BytesPosition, StorageError}; use crate::storage::{BytesRange, Storage}; @@ -53,6 +56,8 @@ impl AwsS3Storage { AwsS3Storage::new(Client::new(&aws_config::load_from_env().await), bucket) } + /// Return an S3 pre-signed URL of the key. This function does not check that the key exists, + /// so this should be checked before calling it. pub async fn s3_presign_url + Send>( &self, key: K, @@ -71,13 +76,14 @@ impl AwsS3Storage { .map_err(|err| AwsS3Error(err.to_string(), key.as_ref().to_string()))?, ) .await - .map_err(|err| AwsS3Error(err.to_string(), key.as_ref().to_string()))? + .map_err(|err| Self::map_get_error(key, err))? .uri() .to_string(), ) } async fn s3_head + Send>(&self, key: K) -> Result { + println!("{:#?}", self.client.list_buckets().send().await.unwrap()); self .client .head_object() @@ -85,7 +91,14 @@ impl AwsS3Storage { .key(key.as_ref()) .send() .await - .map_err(|err| AwsS3Error(err.to_string(), key.as_ref().to_string())) + .map_err(|err| { + let err = err.into_service_error(); + if let HeadObjectErrorKind::NotFound(_) = err.kind { + KeyNotFound(key.as_ref().to_string()) + } else { + AwsS3Error(err.to_string(), key.as_ref().to_string()) + } + }) } /// Returns the retrieval type of the object stored with the key. @@ -130,6 +143,7 @@ impl AwsS3Storage { } } + /// Get the key from S3 storage as a `ByteStream`. pub async fn get_content + Send>( &self, key: K, @@ -152,7 +166,7 @@ impl AwsS3Storage { response .send() .await - .map_err(|err| AwsS3Error(err.to_string(), key.as_ref().to_string()))? + .map_err(|err| Self::map_get_error(key, err))? .body, ) } @@ -165,6 +179,18 @@ impl AwsS3Storage { let response = self.get_content(key, options).await?; Ok(StreamReader::new(response)) } + + fn map_get_error(key: K, error: SdkError) -> StorageError + where + K: AsRef + Send, + { + let error = error.into_service_error(); + if let GetObjectErrorKind::NoSuchKey(_) = error.kind { + KeyNotFound(key.as_ref().to_string()) + } else { + AwsS3Error(error.to_string(), key.as_ref().to_string()) + } + } } #[async_trait] @@ -184,7 +210,8 @@ impl Storage for AwsS3Storage { self.create_stream_reader(key, options).await } - /// Returns a S3-presigned htsget URL + /// Return an S3 pre-signed htsget URL. This function does not check that the key exists, so this + /// should be checked before calling it. #[instrument(level = "trace", skip(self))] async fn range_url + Send + Debug>( &self, @@ -218,20 +245,16 @@ impl Storage for AwsS3Storage { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use std::future::Future; - use std::net::TcpListener; use std::path::Path; + use std::sync::Arc; - use aws_sdk_s3::{Client, Endpoint}; - use aws_types::credentials::SharedCredentialsProvider; - use aws_types::region::Region; - use aws_types::{Credentials, SdkConfig}; - use futures::future; - use hyper::service::make_service_fn; - use hyper::Server; - use s3_server::storages::fs::FileSystem; - use s3_server::{S3Service, SimpleAuth}; + use aws_config::SdkConfig; + use aws_credential_types::provider::SharedCredentialsProvider; + use aws_sdk_s3::{Client, Credentials, Region}; + use s3s::service::S3Service; + use s3s_aws; use crate::htsget::Headers; use crate::storage::aws::AwsS3Storage; @@ -239,55 +262,58 @@ mod tests { use crate::storage::StorageError; use crate::storage::{BytesPosition, GetOptions, RangeUrlOptions, Storage}; - async fn with_s3_test_server(server_base_path: &Path, test: F) + pub(crate) async fn with_s3_test_server(server_base_path: &Path, test: F) where F: FnOnce(Client) -> Fut, Fut: Future, { - // Setup s3-server. - let fs = FileSystem::new(server_base_path).unwrap(); - let mut auth = SimpleAuth::new(); - auth.register(String::from("access_key"), String::from("secret_key")); - let mut service = S3Service::new(fs); - service.set_auth(auth); - - // Spawn hyper Server instance. - let service = service.into_shared(); - let listener = TcpListener::bind(("localhost", 0)).unwrap(); - let bound_addr = format!("http://localhost:{}", listener.local_addr().unwrap().port()); - let make_service: _ = - make_service_fn(move |_| future::ready(Ok::<_, anyhow::Error>(service.clone()))); - tokio::spawn(Server::from_tcp(listener).unwrap().serve(make_service)); - - // Create S3Config. - let config = SdkConfig::builder() - .region(Region::new("ap-southeast-2")) - .credentials_provider(SharedCredentialsProvider::new(Credentials::from_keys( - "access_key", - "secret_key", - None, - ))) - .build(); - let ep = Endpoint::immutable(bound_addr).unwrap(); - let s3_conf = aws_sdk_s3::config::Builder::from(&config) - .endpoint_resolver(ep) + const DOMAIN_NAME: &str = "localhost:8014"; + const REGION: &str = "ap-southeast-2"; + + let cred = Credentials::for_tests(); + + let conn = { + let fs = s3s_fs::FileSystem::new(server_base_path).unwrap(); + + let auth = s3s::SimpleAuth::from_single(cred.access_key_id(), cred.secret_access_key()); + + let mut service = S3Service::new(Box::new(fs)); + service.set_auth(Box::new(auth)); + service.set_base_domain(DOMAIN_NAME); + + s3s_aws::Connector::from(service.into_shared()) + }; + + let sdk_config = SdkConfig::builder() + .credentials_provider(SharedCredentialsProvider::new(cred)) + .http_connector(conn) + .region(Region::new(REGION)) + .endpoint_url(format!("http://{DOMAIN_NAME}")) .build(); - test(Client::from_conf(s3_conf)); + test(Client::new(&sdk_config)).await; } - async fn with_aws_s3_storage(test: F) + pub(crate) async fn with_aws_s3_storage_fn(test: F, folder_name: String, base_path: &Path) where - F: FnOnce(AwsS3Storage) -> Fut, + F: FnOnce(Arc) -> Fut, Fut: Future, { - let (folder_name, base_path) = create_local_test_files().await; - with_s3_test_server(base_path.path(), |client| async move { - test(AwsS3Storage::new(client, folder_name)); + with_s3_test_server(base_path, |client| async move { + test(Arc::new(AwsS3Storage::new(client, folder_name))).await; }) .await; } + async fn with_aws_s3_storage(test: F) + where + F: FnOnce(Arc) -> Fut, + Fut: Future, + { + let (folder_name, base_path) = create_local_test_files().await; + with_aws_s3_storage_fn(test, folder_name, base_path.path()).await; + } + #[tokio::test] async fn existing_key() { with_aws_s3_storage(|storage| async move { @@ -306,17 +332,6 @@ mod tests { .await; } - #[tokio::test] - async fn url_of_non_existing_key() { - with_aws_s3_storage(|storage| async move { - let result = storage - .range_url("non-existing-key", RangeUrlOptions::default()) - .await; - assert!(matches!(result, Err(StorageError::AwsS3Error(_, _)))); - }) - .await; - } - #[tokio::test] async fn url_of_existing_key() { with_aws_s3_storage(|storage| async move { @@ -324,9 +339,7 @@ mod tests { .range_url("key2", RangeUrlOptions::default()) .await .unwrap(); - assert!(result - .url - .starts_with(&format!("http://localhost:8014/{}/{}", "folder", "key2"))); + assert!(result.url.starts_with("http://folder.localhost:8014/key2")); assert!(result.url.contains(&format!( "Amz-Expires={}", AwsS3Storage::PRESIGNED_REQUEST_EXPIRY @@ -345,9 +358,7 @@ mod tests { ) .await .unwrap(); - assert!(result - .url - .starts_with(&format!("http://localhost:8014/{}/{}", "folder", "key2"))); + assert!(result.url.starts_with("http://folder.localhost:8014/key2")); assert!(result.url.contains(&format!( "Amz-Expires={}", AwsS3Storage::PRESIGNED_REQUEST_EXPIRY @@ -355,7 +366,7 @@ mod tests { assert!(result.url.contains("range")); assert_eq!( result.headers, - Some(Headers::default().with_header("Range", "bytes=7-9")) + Some(Headers::default().with_header("Range", "bytes=7-8")) ); }) .await; @@ -371,9 +382,7 @@ mod tests { ) .await .unwrap(); - assert!(result - .url - .starts_with(&format!("http://localhost:8014/{}/{}", "folder", "key2"))); + assert!(result.url.starts_with("http://folder.localhost:8014/key2")); assert!(result.url.contains(&format!( "Amz-Expires={}", AwsS3Storage::PRESIGNED_REQUEST_EXPIRY diff --git a/htsget-search/src/storage/data_server.rs b/htsget-search/src/storage/data_server.rs index 62c85a41a..e94952120 100644 --- a/htsget-search/src/storage/data_server.rs +++ b/htsget-search/src/storage/data_server.rs @@ -15,8 +15,6 @@ use axum::http; use axum::Router; use axum_extra::routing::SpaRouter; use futures_util::future::poll_fn; -use htsget_config::config::cors::CorsConfig; -use htsget_config::config::DataServerConfig; use http::uri::Scheme; use hyper::server::accept::Accept; use hyper::server::conn::{AddrIncoming, Http}; @@ -29,6 +27,9 @@ use tower_http::trace::TraceLayer; use tracing::instrument; use tracing::{info, trace}; +use htsget_config::config::cors::CorsConfig; +use htsget_config::config::DataServerConfig; + use crate::storage::StorageError::{DataServerError, IoError}; use crate::storage::{configure_cors, UrlFormatter}; diff --git a/htsget-search/src/storage/local.rs b/htsget-search/src/storage/local.rs index e92cb6c7d..214c476b1 100644 --- a/htsget-search/src/storage/local.rs +++ b/htsget-search/src/storage/local.rs @@ -2,6 +2,7 @@ //! use std::fmt::Debug; +use std::io::ErrorKind; use std::path::{Path, PathBuf}; use async_trait::async_trait; @@ -45,7 +46,13 @@ impl LocalStorage { .base_path .join(key) .canonicalize() - .map_err(|_| StorageError::InvalidKey(key.to_string())) + .map_err(|err| { + if let ErrorKind::NotFound = err.kind() { + StorageError::KeyNotFound(key.to_string()) + } else { + StorageError::InvalidKey(key.to_string()) + } + }) .and_then(|path| { path .starts_with(&self.base_path) @@ -118,11 +125,12 @@ pub(crate) mod tests { use std::future::Future; use std::matches; - use htsget_config::config::cors::CorsConfig; use tempfile::TempDir; use tokio::fs::{create_dir, File}; use tokio::io::AsyncWriteExt; + use htsget_config::config::cors::CorsConfig; + use crate::htsget::{Headers, Url}; use crate::storage::data_server::HttpTicketFormatter; use crate::storage::{BytesPosition, GetOptions, RangeUrlOptions, StorageError}; @@ -133,7 +141,7 @@ pub(crate) mod tests { async fn get_non_existing_key() { with_local_storage(|storage| async move { let result = storage.get("non-existing-key").await; - assert!(matches!(result, Err(StorageError::InvalidKey(msg)) if msg == "non-existing-key")); + assert!(matches!(result, Err(StorageError::KeyNotFound(msg)) if msg == "non-existing-key")); }) .await; } @@ -152,7 +160,7 @@ pub(crate) mod tests { with_local_storage(|storage| async move { let result = Storage::get(&storage, "folder/../../passwords", GetOptions::default()).await; assert!( - matches!(result, Err(StorageError::InvalidKey(msg)) if msg == "folder/../../passwords") + matches!(result, Err(StorageError::KeyNotFound(msg)) if msg == "folder/../../passwords") ); }) .await; @@ -172,7 +180,7 @@ pub(crate) mod tests { with_local_storage(|storage| async move { let result = Storage::range_url(&storage, "non-existing-key", RangeUrlOptions::default()).await; - assert!(matches!(result, Err(StorageError::InvalidKey(msg)) if msg == "non-existing-key")); + assert!(matches!(result, Err(StorageError::KeyNotFound(msg)) if msg == "non-existing-key")); }) .await; } @@ -196,7 +204,7 @@ pub(crate) mod tests { ) .await; assert!( - matches!(result, Err(StorageError::InvalidKey(msg)) if msg == "folder/../../passwords") + matches!(result, Err(StorageError::KeyNotFound(msg)) if msg == "folder/../../passwords") ); }) .await; diff --git a/htsget-search/src/storage/mod.rs b/htsget-search/src/storage/mod.rs index b8d61e71c..e8034d6ce 100644 --- a/htsget-search/src/storage/mod.rs +++ b/htsget-search/src/storage/mod.rs @@ -9,15 +9,16 @@ use std::time::Duration; use async_trait::async_trait; use base64::encode; -use htsget_config::config::cors::CorsConfig; -use htsget_config::regex_resolver::{LocalResolver, Scheme}; -use htsget_config::Class; use http::{uri, HeaderValue}; use thiserror::Error; use tokio::io::AsyncRead; use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer, ExposeHeaders}; use tracing::instrument; +use htsget_config::config::cors::CorsConfig; +use htsget_config::regex_resolver::{LocalResolver, Scheme}; +use htsget_config::Class; + use crate::htsget::{Headers, Url}; #[cfg(feature = "s3-storage")] @@ -40,7 +41,8 @@ pub trait Storage { options: GetOptions, ) -> Result; - /// Get the url of the object represented by the key using a bytes range. + /// Get the url of the object represented by the key using a bytes range. It is not required for + /// this function to check for the existent of the key, so this should be ensured beforehand. async fn range_url + Send + Debug>( &self, key: K,