Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New console command: random seeder #458

Merged
merged 7 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 205 additions & 25 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ version = "3.0.0-alpha.3-develop"
opt-level = 3

[dependencies]
anyhow = "1.0.79"
argon2 = "0"
async-trait = "0"
axum = { version = "0", features = ["multipart"] }
binascii = "0"
bytes = "1"
chrono = { version = "0", default-features = false, features = ["clock"] }
clap = { version = "4.4.18", features = ["derive", "env"]}
config = "0"
derive_more = "0"
email_address = "0"
Expand All @@ -53,6 +55,7 @@ lazy_static = "1.4.0"
lettre = { version = "0", features = ["builder", "smtp-transport", "tokio1", "tokio1-native-tls", "tokio1-rustls-tls"] }
log = "0"
pbkdf2 = { version = "0", features = ["simple"] }
rand = "0"
rand_core = { version = "0", features = ["std"] }
regex = "1"
reqwest = { version = "0", features = ["json", "multipart"] }
Expand All @@ -76,7 +79,6 @@ urlencoding = "2"
uuid = { version = "1", features = ["v4"] }

[dev-dependencies]
rand = "0"
tempfile = "3"
uuid = { version = "1", features = ["v4"] }
which = "5"
4 changes: 2 additions & 2 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::services::torrent::{
use crate::services::user::{self, DbBannedUserList, DbUserProfileRepository, DbUserRepository};
use crate::services::{proxy, settings, torrent};
use crate::tracker::statistics_importer::StatisticsImporter;
use crate::web::api::v1::auth::Authentication;
use crate::web::api::server::v1::auth::Authentication;
use crate::web::api::Version;
use crate::{console, mailer, tracker, web};

Expand Down Expand Up @@ -159,7 +159,7 @@ pub async fn run(configuration: Configuration, api_version: &Version) -> Running

// Start cronjob to import tracker torrent data and updating
// seeders and leechers info.
let tracker_statistics_importer_handle = console::tracker_statistics_importer::start(
let tracker_statistics_importer_handle = console::cronjobs::tracker_statistics_importer::start(
importer_port,
importer_torrent_info_update_interval,
&tracker_statistics_importer,
Expand Down
6 changes: 3 additions & 3 deletions src/bin/import_tracker_statistics.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! Import Tracker Statistics command.
//!
//! It imports the number of seeders and leechers for all torrent from the linked tracker.
//! It imports the number of seeders and leechers for all torrents from the linked tracker.
//!
//! You can execute it with: `cargo run --bin import_tracker_statistics`
use torrust_index::console::commands::import_tracker_statistics::run_importer;
use torrust_index::console::commands::tracker_statistics_importer::app::run;

#[tokio::main]
async fn main() {
run_importer().await;
run().await;
}
7 changes: 7 additions & 0 deletions src/bin/seeder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//! Program to upload random torrents to a live Index API.
use torrust_index::console::commands::seeder::app;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
app::run().await
}
2 changes: 1 addition & 1 deletion src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::services::torrent::{
use crate::services::user::{self, DbBannedUserList, DbUserProfileRepository, DbUserRepository};
use crate::services::{proxy, settings, torrent};
use crate::tracker::statistics_importer::StatisticsImporter;
use crate::web::api::v1::auth::Authentication;
use crate::web::api::server::v1::auth::Authentication;
use crate::{mailer, tracker};
pub type Username = String;

Expand Down
3 changes: 2 additions & 1 deletion src/console/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod import_tracker_statistics;
pub mod seeder;
pub mod tracker_statistics_importer;
108 changes: 108 additions & 0 deletions src/console/commands/seeder/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use log::debug;
use thiserror::Error;

use crate::web::api::client::v1::client::Client;
use crate::web::api::client::v1::contexts::category::forms::AddCategoryForm;
use crate::web::api::client::v1::contexts::category::responses::{ListItem, ListResponse};
use crate::web::api::client::v1::contexts::torrent::forms::UploadTorrentMultipartForm;
use crate::web::api::client::v1::contexts::torrent::responses::{UploadedTorrent, UploadedTorrentResponse};
use crate::web::api::client::v1::contexts::user::forms::LoginForm;
use crate::web::api::client::v1::contexts::user::responses::{LoggedInUserData, SuccessfulLoginResponse};
use crate::web::api::client::v1::responses::TextResponse;

#[derive(Error, Debug)]
pub enum Error {
#[error("Torrent with the same info-hash already exist in the database")]
TorrentInfoHashAlreadyExists,
#[error("Torrent with the same title already exist in the database")]
TorrentTitleAlreadyExists,
}

/// It uploads a torrent file to the Torrust Index.
///
/// # Errors
///
/// It returns an error if the torrent already exists in the database.
///
/// # Panics
///
/// Panics if the response body is not a valid JSON.
pub async fn upload_torrent(client: &Client, upload_torrent_form: UploadTorrentMultipartForm) -> Result<UploadedTorrent, Error> {
let categories = get_categories(client).await;

if !contains_category_with_name(&categories, &upload_torrent_form.category) {
add_category(client, &upload_torrent_form.category).await;
}

let response = client.upload_torrent(upload_torrent_form.into()).await;

debug!(target:"seeder", "response: {}", response.status);

if response.status == 400 {
if response.body.contains("This torrent already exists in our database") {
return Err(Error::TorrentInfoHashAlreadyExists);
}

if response.body.contains("This torrent title has already been used") {
return Err(Error::TorrentTitleAlreadyExists);
}
}

assert!(response.is_json_and_ok(), "Error uploading torrent: {}", response.body);

let uploaded_torrent_response: UploadedTorrentResponse =
serde_json::from_str(&response.body).expect("a valid JSON response should be returned from the Torrust Index API");

Ok(uploaded_torrent_response.data)
}

/// It logs in the user and returns the user data.
///
/// # Panics
///
/// Panics if the response body is not a valid JSON.
pub async fn login(client: &Client, username: &str, password: &str) -> LoggedInUserData {
let response = client
.login_user(LoginForm {
login: username.to_owned(),
password: password.to_owned(),
})
.await;

let res: SuccessfulLoginResponse = serde_json::from_str(&response.body).unwrap_or_else(|_| {
panic!(
"a valid JSON response should be returned after login. Received: {}",
response.body
)
});

res.data
}

/// It returns all the index categories.
///
/// # Panics
///
/// Panics if the response body is not a valid JSON.
pub async fn get_categories(client: &Client) -> Vec<ListItem> {
let response = client.get_categories().await;

let res: ListResponse = serde_json::from_str(&response.body).unwrap();

res.data
}

/// It adds a new category.
pub async fn add_category(client: &Client, name: &str) -> TextResponse {
client
.add_category(AddCategoryForm {
name: name.to_owned(),
icon: None,
})
.await
}

/// It checks if the category list contains the given category.
fn contains_category_with_name(items: &[ListItem], category_name: &str) -> bool {
items.iter().any(|item| item.name == category_name)
}
132 changes: 132 additions & 0 deletions src/console/commands/seeder/app.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
//! Program to upload random torrent to a live Index API.
//!
//! Run with:
//!
//! ```text
//! cargo run --bin seeder -- --api-base-url <API_BASE_URL> --number-of-torrents <NUMBER_OF_TORRENTS> --user <USER> --password <PASSWORD> --interval <INTERVAL>
//! ```
//!
//! For example:
//!
//! ```text
//! cargo run --bin seeder -- --api-base-url "localhost:3001" --number-of-torrents 1000 --user admin --password 12345678 --interval 0
//! ```
//!
//! That command would upload 1000 random torrents to the Index using the user
//! account admin with password 123456 and waiting 1 second between uploads.
use std::thread::sleep;
use std::time::Duration;

use anyhow::Context;
use clap::Parser;
use log::{debug, info, LevelFilter};
use text_colorizer::Colorize;
use uuid::Uuid;

use super::api::Error;
use crate::console::commands::seeder::api::{login, upload_torrent};
use crate::console::commands::seeder::logging;
use crate::services::torrent_file::generate_random_torrent;
use crate::utils::parse_torrent;
use crate::web::api::client::v1::client::Client;
use crate::web::api::client::v1::contexts::torrent::forms::{BinaryFile, UploadTorrentMultipartForm};
use crate::web::api::client::v1::contexts::torrent::responses::UploadedTorrent;
use crate::web::api::client::v1::contexts::user::responses::LoggedInUserData;

#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
#[arg(short, long)]
api_base_url: String,

#[arg(short, long)]
number_of_torrents: i32,

#[arg(short, long)]
user: String,

#[arg(short, long)]
password: String,

#[arg(short, long)]
interval: u64,
}

/// # Errors
///
/// Will not return any errors for the time being.
pub async fn run() -> anyhow::Result<()> {
logging::setup(LevelFilter::Info);

let args = Args::parse();

let api_user = login_index_api(&args.api_base_url, &args.user, &args.password).await;

let api_client = Client::authenticated(&args.api_base_url, &api_user.token);

info!(target:"seeder", "Uploading { } random torrents to the Torrust Index with a { } seconds interval...", args.number_of_torrents.to_string().yellow(), args.interval.to_string().yellow());

for i in 1..=args.number_of_torrents {
info!(target:"seeder", "Uploading torrent #{} ...", i.to_string().yellow());

match upload_random_torrent(&api_client).await {
Ok(uploaded_torrent) => {
debug!(target:"seeder", "Uploaded torrent {uploaded_torrent:?}");

let json = serde_json::to_string(&uploaded_torrent).context("failed to serialize upload response into JSON")?;

info!(target:"seeder", "Uploaded torrent: {}", json.yellow());
}
Err(err) => print!("Error uploading torrent {err:?}"),
};

if i != args.number_of_torrents {
sleep(Duration::from_secs(args.interval));
}
}

Ok(())
}

/// It logs in a user in the Index API.
pub async fn login_index_api(api_url: &str, username: &str, password: &str) -> LoggedInUserData {
let unauthenticated_client = Client::unauthenticated(api_url);

info!(target:"seeder", "Trying to login with username: {} ...", username.yellow());

let user: LoggedInUserData = login(&unauthenticated_client, username, password).await;

if user.admin {
info!(target:"seeder", "Logged as admin with account: {} ", username.yellow());
} else {
info!(target:"seeder", "Logged as {} ", username.yellow());
}

user
}

async fn upload_random_torrent(api_client: &Client) -> Result<UploadedTorrent, Error> {
let uuid = Uuid::new_v4();

info!(target:"seeder", "Uploading torrent with uuid: {} ...", uuid.to_string().yellow());

let torrent_file = generate_random_torrent_file(uuid);

let upload_form = UploadTorrentMultipartForm {
title: format!("title-{uuid}"),
description: format!("description-{uuid}"),
category: "test".to_string(),
torrent_file,
};

upload_torrent(api_client, upload_form).await
}

/// It returns the bencoded binary data of the torrent meta file.
fn generate_random_torrent_file(uuid: Uuid) -> BinaryFile {
let torrent = generate_random_torrent(uuid);

let bytes = parse_torrent::encode_torrent(&torrent).expect("msg:the torrent should be bencoded");

BinaryFile::from_bytes(torrent.info.name, bytes)
}
25 changes: 25 additions & 0 deletions src/console/commands/seeder/logging.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use log::{debug, LevelFilter};

/// # Panics
///
///
pub fn setup(level: LevelFilter) {
if let Err(_err) = fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
"{} [{}][{}] {}",
chrono::Local::now().format("%+"),
record.target(),
record.level(),
message
));
})
.level(level)
.chain(std::io::stdout())
.apply()
{
panic!("Failed to initialize logging.")
}

debug!("logging initialized.");
}
3 changes: 3 additions & 0 deletions src/console/commands/seeder/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod api;
pub mod app;
pub mod logging;
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ fn print_usage() {
/// # Panics
///
/// Panics if arguments cannot be parsed.
pub async fn run_importer() {
pub async fn run() {
parse_args().expect("unable to parse command arguments");
import().await;
}
Expand Down
1 change: 1 addition & 0 deletions src/console/commands/tracker_statistics_importer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod app;
1 change: 1 addition & 0 deletions src/console/cronjobs/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod tracker_statistics_importer;
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ struct ImporterState {
pub torrent_info_update_interval: u64,
}

/// # Panics
///
/// Will panic if it can't start the tracker statistics importer API
#[must_use]
pub fn start(
importer_port: u16,
torrent_info_update_interval: u64,
Expand Down Expand Up @@ -60,7 +64,7 @@ pub fn start(

let addr = format!("{IMPORTER_API_IP}:{importer_port}");

info!("Tracker statistics importer API server listening on http://{}", addr);
info!("Tracker statistics importer API server listening on http://{}", addr); // # DevSkim: ignore DS137138

axum::Server::bind(&addr.parse().unwrap())
.serve(app.into_make_service())
Expand Down Expand Up @@ -122,7 +126,7 @@ async fn heartbeat_handler(State(state): State<Arc<ImporterState>>) -> Json<Valu
/// Send a heartbeat from the importer cronjob to the importer API.
async fn send_heartbeat(importer_port: u16) -> Result<(), reqwest::Error> {
let client = reqwest::Client::new();
let url = format!("http://{IMPORTER_API_IP}:{importer_port}/heartbeat");
let url = format!("http://{IMPORTER_API_IP}:{importer_port}/heartbeat"); // # DevSkim: ignore DS137138

client.post(url).send().await?;

Expand Down
Loading
Loading