Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace routerify in file transfer server with axum #2461

Merged
merged 15 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 24 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ az_mapper_ext = { path = "crates/extensions/az_mapper_ext" }
backoff = { version = "0.4", features = ["tokio"] }
base64 = "0.13"
batcher = { path = "crates/common/batcher" }
bytes = "1.4"
c8y_api = { path = "crates/core/c8y_api" }
c8y_config_manager = { path = "crates/extensions/c8y_config_manager" }
c8y_firmware_manager = { path = "crates/extensions/c8y_firmware_manager" }
Expand Down Expand Up @@ -114,7 +115,6 @@ rand = "0.8"
rcgen = { version = "0.9", features = ["pem", "zeroize"] }
regex = "1.4"
reqwest = { version = "0.11", default-features = false }
routerify = "3.0"
rpassword = "5.0"
rstest = "0.16.0"
rumqttc = "0.22"
Expand Down Expand Up @@ -156,14 +156,15 @@ tedge_timer_ext = { path = "crates/extensions/tedge_timer_ext" }
tedge_utils = { path = "crates/common/tedge_utils" }
tedge-watchdog = { path = "crates/core/tedge_watchdog" }
tokio-rustls = "0.24.1"
tower = "0.4.13"
tempfile = "3.5"
test-case = "2.2"
test-case = "3.2"
thiserror = "1.0"
time = "0.3"
tokio = { version = "1.23", default-features = false }
tokio-util = { version = "0.7", features = ["codec"] }
toml = "0.7"
tower = "0.4"
http-body = "0.4"
Bravo555 marked this conversation as resolved.
Show resolved Hide resolved
tracing = { version = "0.1", features = ["attributes", "log"] }
tracing-subscriber = { version = "0.3", features = ["time", "env-filter"] }
try-traits = "0.1"
Expand Down
7 changes: 5 additions & 2 deletions crates/core/tedge_agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ repository = { workspace = true }
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true }
camino = { workspace = true }
clap = { workspace = true }
flockfile = { workspace = true }
Expand All @@ -21,7 +22,6 @@ log = { workspace = true }
path-clean = { workspace = true }
plugin_sm = { workspace = true }
reqwest = { workspace = true }
routerify = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tedge_actors = { workspace = true }
Expand All @@ -39,13 +39,16 @@ tedge_utils = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true, features = ["formatting"] }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-util = { workspace = true }
toml = { workspace = true }
tracing = { workspace = true }
which = { workspace = true }

[dev-dependencies]
serial_test = { workspace = true }
bytes = { workspace = true }
http-body = { workspace = true }
tedge_actors = { workspace = true, features = ["test-helpers"] }
tedge_mqtt_ext = { workspace = true, features = ["test-helpers"] }
tedge_test_utils = { workspace = true }
test-case = { workspace = true }
tower = { workspace = true }
8 changes: 4 additions & 4 deletions crates/core/tedge_agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ impl Agent {
pub fn init(&self) -> Result<(), anyhow::Error> {
// `config_dir` by default is `/etc/tedge` (or whatever the user sets with --config-dir)
create_directory_with_defaults(self.config.config_dir.join(".agent"))?;
create_directory_with_defaults(self.config.log_dir.clone())?;
create_directory_with_defaults(self.config.data_dir.clone())?;
create_directory_with_defaults(self.config.http_config.data_dir.file_transfer_dir())?;
create_directory_with_defaults(self.config.http_config.data_dir.cache_dir())?;
create_directory_with_defaults(&self.config.log_dir)?;
create_directory_with_defaults(&self.config.data_dir)?;
create_directory_with_defaults(&self.config.http_config.file_transfer_dir)?;
create_directory_with_defaults(self.config.data_dir.cache_dir())?;

Ok(())
}
Expand Down
34 changes: 14 additions & 20 deletions crates/core/tedge_agent/src/file_transfer_server/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl Actor for FileTransferServerActor {

async fn run(mut self) -> Result<(), RuntimeError> {
let http_config = self.config.clone();
let server = http_file_transfer_server(&http_config)?;
let server = http_file_transfer_server(http_config)?;

tokio::select! {
result = server => {
Expand Down Expand Up @@ -83,9 +83,12 @@ impl Builder<FileTransferServerActor> for FileTransferServerBuilder {
#[cfg(test)]
mod tests {
use super::*;
use anyhow::bail;
use anyhow::ensure;
use hyper::Body;
use hyper::Method;
use hyper::Request;
use std::time::Duration;
use tedge_test_utils::fs::TempTedgeDir;
use tokio::fs;

Expand Down Expand Up @@ -141,31 +144,22 @@ mod tests {
}

#[tokio::test]
#[serial_test::serial]
async fn check_server_does_not_panic_when_port_is_in_use() -> Result<(), anyhow::Error> {
async fn check_server_does_not_panic_when_port_is_in_use() -> anyhow::Result<()> {
let ttd = TempTedgeDir::new();

let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port_in_use = listener.local_addr().unwrap().port();

let http_config = HttpConfig::default()
.with_data_dir(ttd.utf8_path_buf().into())
.with_port(3746);
let config_clone = http_config.clone();

// Spawn HTTP file transfer server
// handle_one uses port 3000.
let builder_one = FileTransferServerBuilder::new(http_config);
let handle_one = tokio::spawn(async move { builder_one.build().run().await });
.with_port(port_in_use);

// handle_two will not be able to bind to the same port.
let builder_two = FileTransferServerBuilder::new(config_clone);
let handle_two = tokio::spawn(async move { builder_two.build().run().await });
let server = FileTransferServerBuilder::new(http_config).build().run();

// although the code inside handle_two throws an error it does not panic.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// to check for the error, we assert that handle_one is still running
// while handle_two is finished.
assert!(!handle_one.is_finished());
assert!(handle_two.is_finished());
tokio::select! {
res = server => ensure!(res.is_err(), "expected server startup to fail with port binding error, but actor exited successfully"),
_ = tokio::time::sleep(Duration::from_secs(5)) => bail!("timed out waiting for actor to stop running"),
}

Ok(())
}
Expand Down
70 changes: 64 additions & 6 deletions crates/core/tedge_agent/src/file_transfer_server/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use axum::extract::rejection::PathRejection;
use axum::response::IntoResponse;
use hyper::StatusCode;
use tedge_actors::RuntimeError;

use super::request_files::RequestPath;

#[derive(Debug, thiserror::Error)]
pub enum FileTransferError {
#[error(transparent)]
Expand All @@ -8,12 +13,6 @@ pub enum FileTransferError {
#[error(transparent)]
FromHyperError(#[from] hyper::Error),

#[error("Invalid URI: {value:?}")]
InvalidURI { value: String },

#[error(transparent)]
FromRouterServer(#[from] routerify::RouteError),

#[error(transparent)]
FromAddressParseError(#[from] std::net::AddrParseError),

Expand All @@ -24,8 +23,67 @@ pub enum FileTransferError {
BindingAddressInUse { address: std::net::SocketAddr },
}

#[derive(Debug, thiserror::Error)]
pub enum FileTransferRequestError {
#[error(transparent)]
FromIo(#[from] std::io::Error),

#[error("Cannot delete: {path:?} is a directory, not a file")]
CannotDeleteDirectory { path: RequestPath },

#[error("Cannot upload: {path:?} is a directory, not a file")]
CannotUploadDirectory { path: RequestPath },

#[error("Request to delete {path:?} failed: {source}")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: We need to be careful with including source error message in parent error message.

I think the recommendation is to not include source error messages like this because proper error reporting/diagnostics will themselves call .source() to print messages for the entire error chain. However, for code that is not aware of this, and doesn't depend on any error reporting crates, it will most likely just grab outer error message. It depends where the error is used. For the FTS, I think the errors appear only in HTTP responses and logs, but 1) I'm not sure how they are exactly reported by axum 2) it's all internal so it doesn't matter that much.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm aware, but knew exactly where these error messages were going (which is only when we call .to_string()), and I didn't want to have to convert them to something like anyhow::Error just to get a backtrace. You're right though, a lot of thin-edge errors are in library code where they're eventually going to become anyhow::Error and in that case it's really annoying (reqwest is the example of a public crate that does this that springs to mind).

OtherDelete {
source: std::io::Error,
path: RequestPath,
},

#[error("Request to upload to {path:?} failed: {source:?}")]
OtherUpload {
source: anyhow::Error,
path: RequestPath,
},
Bravo555 marked this conversation as resolved.
Show resolved Hide resolved

#[error("Invalid file path: {path:?}")]
InvalidPath { path: RequestPath },

#[error("File not found: {0:?}")]
FileNotFound(RequestPath),

#[error("Path rejection: {0}")]
PathRejection(#[from] PathRejection),
}

impl From<FileTransferError> for RuntimeError {
fn from(error: FileTransferError) -> Self {
RuntimeError::ActorError(Box::new(error))
}
}

impl IntoResponse for FileTransferRequestError {
fn into_response(self) -> axum::response::Response {
use FileTransferRequestError::*;
let error_message = self.to_string();
match self {
PathRejection(err) => {
tracing::error!("{error_message}");
err.into_response()
}
FromIo(_) | OtherDelete { .. } | OtherUpload { .. } => {
tracing::error!("{error_message}");
(
StatusCode::INTERNAL_SERVER_ERROR,
"Internal error".to_owned(),
)
.into_response()
}
// All of these from an invalid URL, so `Not Found` is most appropriate response
InvalidPath { .. } | FileNotFound(_) | CannotDeleteDirectory { .. } => {
(StatusCode::NOT_FOUND, error_message).into_response()
}
CannotUploadDirectory { .. } => (StatusCode::CONFLICT, error_message).into_response(),
}
}
}
Loading
Loading