Skip to content

Commit

Permalink
feat(api): [#143] scaffolding for new API using Axum
Browse files Browse the repository at this point in the history
- Test scaffolding
- Dummy entrypoint
  • Loading branch information
josecelano committed Jan 2, 2023
1 parent 901bc34 commit cbf8837
Show file tree
Hide file tree
Showing 10 changed files with 279 additions and 69 deletions.
2 changes: 2 additions & 0 deletions src/apis/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod server;
pub mod routes;
7 changes: 7 additions & 0 deletions src/apis/routes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use axum::response::Json;
use serde_json::{json, Value};

#[allow(clippy::unused_async)]
pub async fn root() -> Json<Value> {
Json(json!({ "data": 42 }))
}
35 changes: 35 additions & 0 deletions src/apis/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::net::SocketAddr;
use std::sync::Arc;

use axum::routing::get;
use axum::Router;
use futures::Future;
use warp::hyper;

use super::routes::root;
use crate::tracker;

pub fn start(socket_addr: SocketAddr, _tracker: &Arc<tracker::Tracker>) -> impl Future<Output = hyper::Result<()>> {
let app = Router::new().route("/", get(root));

let server = axum::Server::bind(&socket_addr).serve(app.into_make_service());

server.with_graceful_shutdown(async move {
tokio::signal::ctrl_c().await.expect("Failed to listen to shutdown signal.");
})
}

pub fn start_tls(
socket_addr: SocketAddr,
_ssl_cert_path: &str,
_ssl_key_path: &str,
_tracker: &Arc<tracker::Tracker>,
) -> impl Future<Output = hyper::Result<()>> {
let app = Router::new().route("/", get(root));

let server = axum::Server::bind(&socket_addr).serve(app.into_make_service());

server.with_graceful_shutdown(async move {
tokio::signal::ctrl_c().await.expect("Failed to listen to shutdown signal.");
})
}
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct HttpTracker {
}

#[serde_as]
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
pub struct HttpApi {
pub enabled: bool,
pub bind_address: String,
Expand Down
1 change: 1 addition & 0 deletions src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod http_tracker;
pub mod torrent_cleanup;
pub mod tracker_api;
pub mod udp_tracker;
pub mod tracker_apis;
54 changes: 54 additions & 0 deletions src/jobs/tracker_apis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::sync::Arc;

use log::info;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

use crate::apis::server;
use crate::config::HttpApi;
use crate::tracker;

#[derive(Debug)]
pub struct ApiServerJobStarted();

/// # Panics
///
/// It would panic if unable to send the `ApiServerJobStarted` notice.
pub async fn start_job(config: &HttpApi, tracker: Arc<tracker::Tracker>) -> JoinHandle<()> {
let bind_addr = config
.bind_address
.parse::<std::net::SocketAddr>()
.expect("Tracker API bind_address invalid.");
let ssl_enabled = config.ssl_enabled;
let ssl_cert_path = config.ssl_cert_path.clone();
let ssl_key_path = config.ssl_key_path.clone();

let (tx, rx) = oneshot::channel::<ApiServerJobStarted>();

// Run the API server
let join_handle = tokio::spawn(async move {
if !ssl_enabled {
info!("Starting Torrust APIs server on: http://{}", bind_addr);
let handle = server::start(bind_addr, &tracker);
tx.send(ApiServerJobStarted()).expect("the start job dropped");
if let Ok(()) = handle.await {
info!("Stopping Torrust APIs server on {} ...", bind_addr);
}
} else if ssl_enabled && ssl_cert_path.is_some() && ssl_key_path.is_some() {
info!("Starting Torrust APIs server on: https://{}", bind_addr);
let handle = server::start_tls(bind_addr, &ssl_cert_path.unwrap(), &ssl_key_path.unwrap(), &tracker);
tx.send(ApiServerJobStarted()).expect("the start job dropped");
if let Ok(()) = handle.await {
info!("Stopping Torrust APIs server on {} ...", bind_addr);
}
}
});

// Wait until the APIs server job is running
match rx.await {
Ok(_msg) => info!("Torrust APIs server started"),
Err(e) => panic!("the apis server was dropped: {e}"),
}

join_handle
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod setup;
pub mod stats;
pub mod tracker;
pub mod udp;
pub mod apis;

#[macro_use]
extern crate lazy_static;
Expand Down
19 changes: 18 additions & 1 deletion src/setup.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::net::SocketAddr;
use std::sync::Arc;

use log::warn;
use tokio::task::JoinHandle;

use crate::config::Configuration;
use crate::jobs::{http_tracker, torrent_cleanup, tracker_api, udp_tracker};
use crate::jobs::{http_tracker, torrent_cleanup, tracker_api, tracker_apis, udp_tracker};
use crate::tracker;

/// # Panics
///
/// Will panic if the socket address for API can't be parsed.
pub async fn setup(config: &Configuration, tracker: Arc<tracker::Tracker>) -> Vec<JoinHandle<()>> {
let mut jobs: Vec<JoinHandle<()>> = Vec::new();

Expand Down Expand Up @@ -52,6 +56,19 @@ pub async fn setup(config: &Configuration, tracker: Arc<tracker::Tracker>) -> Ve
jobs.push(tracker_api::start_job(&config.http_api, tracker.clone()).await);
}

// Start HTTP APIs server (multiple API versions)
if config.http_api.enabled {
// Temporarily running the new API in the 1313 port
let bind_address = config.http_api.bind_address.clone();
let mut bind_socket: SocketAddr = bind_address.parse().unwrap();
bind_socket.set_port(1313);

let mut http_apis_config = config.http_api.clone();
http_apis_config.bind_address = bind_socket.to_string();

jobs.push(tracker_apis::start_job(&http_apis_config, tracker.clone()).await);
}

// Remove torrents without peers, every interval
if config.inactive_peer_cleanup_interval > 0 {
jobs.push(torrent_cleanup::start_job(config, &tracker));
Expand Down
61 changes: 46 additions & 15 deletions tests/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;
use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes};
use reqwest::Response;
use torrust_tracker::config::Configuration;
use torrust_tracker::jobs::tracker_api;
use torrust_tracker::jobs::{tracker_api, tracker_apis};
use torrust_tracker::protocol::clock::DurationSinceUnixEpoch;
use torrust_tracker::protocol::info_hash::InfoHash;
use torrust_tracker::tracker::peer::{self, Peer};
Expand Down Expand Up @@ -67,16 +67,38 @@ impl ConnectionInfo {
}
}

pub async fn start_default_api_server() -> Server {
pub async fn start_default_api_server(version: &Version) -> Server {
let configuration = tracker_configuration();
start_custom_api_server(configuration.clone()).await
start_custom_api_server(configuration.clone(), version).await
}

pub async fn start_custom_api_server(configuration: Arc<Configuration>) -> Server {
start(configuration).await
pub async fn start_custom_api_server(configuration: Arc<Configuration>, version: &Version) -> Server {
match &version {
Version::Warp => start_warp_api(configuration).await,
Version::Axum => start_axum_api(configuration).await,
}
}

async fn start_warp_api(configuration: Arc<Configuration>) -> Server {
let server = start(&configuration);

// Start the HTTP API job
tracker_api::start_job(&configuration.http_api, server.tracker.clone()).await;

server
}

async fn start_axum_api(configuration: Arc<Configuration>) -> Server {
let server = start(&configuration);

// Start HTTP APIs server (multiple API versions)
// Temporarily run the new API on a port number after the current API port
tracker_apis::start_job(&configuration.http_api, server.tracker.clone()).await;

server
}

async fn start(configuration: Arc<Configuration>) -> Server {
fn start(configuration: &Arc<Configuration>) -> Server {
let connection_info = ConnectionInfo::authenticated(
&configuration.http_api.bind_address.clone(),
&configuration.http_api.access_tokens.get_key_value("admin").unwrap().1.clone(),
Expand All @@ -92,18 +114,15 @@ async fn start(configuration: Arc<Configuration>) -> Server {
let (stats_event_sender, stats_repository) = Keeper::new_active_instance();

// Initialize Torrust tracker
let tracker = match tracker::Tracker::new(&configuration.clone(), Some(stats_event_sender), stats_repository) {
let tracker = match tracker::Tracker::new(configuration, Some(stats_event_sender), stats_repository) {
Ok(tracker) => Arc::new(tracker),
Err(error) => {
panic!("{}", error)
}
};

// Initialize logging
logging::setup(&configuration);

// Start the HTTP API job
tracker_api::start_job(&configuration.http_api, tracker.clone()).await;
logging::setup(configuration);

Server {
tracker,
Expand Down Expand Up @@ -133,6 +152,7 @@ impl Server {

pub struct Client {
connection_info: ConnectionInfo,
base_path: String,
}

type ReqwestQuery = Vec<ReqwestQueryParam>;
Expand Down Expand Up @@ -194,9 +214,20 @@ impl From<QueryParam> for ReqwestQueryParam {
}
}

pub enum Version {
Warp,
Axum,
}

impl Client {
pub fn new(connection_info: ConnectionInfo) -> Self {
Self { connection_info }
pub fn new(connection_info: ConnectionInfo, version: &Version) -> Self {
Self {
connection_info,
base_path: match version {
Version::Warp => "/api/".to_string(),
Version::Axum => String::new(),
},
}
}

pub async fn generate_auth_key(&self, seconds_valid: i32) -> Response {
Expand Down Expand Up @@ -235,7 +266,7 @@ impl Client {
self.get("stats", Query::default()).await
}

async fn get(&self, path: &str, params: Query) -> Response {
pub async fn get(&self, path: &str, params: Query) -> Response {
let mut query: Query = params;

if let Some(token) = &self.connection_info.api_token {
Expand Down Expand Up @@ -271,7 +302,7 @@ impl Client {
}

fn base_url(&self, path: &str) -> String {
format!("http://{}/api/{path}", &self.connection_info.bind_address)
format!("http://{}{}{path}", &self.connection_info.bind_address, &self.base_path)
}

fn query_with_token(&self) -> Query {
Expand Down
Loading

0 comments on commit cbf8837

Please sign in to comment.