From b5dab614df1654fde40703ebc89bb4f85490e17f Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Thu, 26 Jan 2023 19:20:13 +0200 Subject: [PATCH] feat(cli): spawn task downloaders (#1055) --- bin/reth/src/node/mod.rs | 56 ++++++++++--------- .../net/downloaders/src/bodies/concurrent.rs | 2 - 2 files changed, 31 insertions(+), 27 deletions(-) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index a2b9fc35054f..f8153b8cb671 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -155,41 +155,47 @@ impl Command { info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); let fetch_client = Arc::new(network.fetch_client().await?); + + // Spawn headers downloader + let headers_downloader = headers::task::TaskDownloader::spawn( + headers::linear::LinearDownloadBuilder::default() + .request_limit(config.stages.headers.downloader_batch_size) + .stream_batch_size(config.stages.headers.commit_threshold as usize) + // NOTE: the head and target will be set from inside the stage before the + // downloader is called + .build( + consensus.clone(), + fetch_client.clone(), + Default::default(), + Default::default(), + ), + ); + + // Spawn bodies downloader + let bodies_downloader = bodies::task::TaskDownloader::spawn( + bodies::concurrent::ConcurrentDownloaderBuilder::default() + .with_stream_batch_size(config.stages.bodies.downloader_stream_batch_size) + .with_request_limit(config.stages.bodies.downloader_request_limit) + .with_max_buffered_responses(config.stages.bodies.downloader_max_buffered_responses) + .with_concurrent_requests_range( + config.stages.bodies.downloader_min_concurrent_requests..= + config.stages.bodies.downloader_max_concurrent_requests, + ) + .build(fetch_client.clone(), consensus.clone(), db.clone()), + ); + let mut pipeline = reth_stages::Pipeline::default() .with_sync_state_updater(network.clone()) .with_channel(sender) .push(HeaderStage { - downloader: headers::linear::LinearDownloadBuilder::default() - .request_limit(config.stages.headers.downloader_batch_size) - .stream_batch_size(config.stages.headers.commit_threshold as usize) - // NOTE: the head and target will be set from inside the stage before the - // downloader is called - .build( - consensus.clone(), - fetch_client.clone(), - Default::default(), - Default::default(), - ), + downloader: headers_downloader, consensus: consensus.clone(), sync_status_updates: network.clone(), }) .push(TotalDifficultyStage { commit_threshold: config.stages.total_difficulty.commit_threshold, }) - .push(BodyStage { - downloader: bodies::concurrent::ConcurrentDownloaderBuilder::default() - .with_stream_batch_size(config.stages.bodies.downloader_stream_batch_size) - .with_request_limit(config.stages.bodies.downloader_request_limit) - .with_max_buffered_responses( - config.stages.bodies.downloader_max_buffered_responses, - ) - .with_concurrent_requests_range( - config.stages.bodies.downloader_min_concurrent_requests..= - config.stages.bodies.downloader_max_concurrent_requests, - ) - .build(fetch_client.clone(), consensus.clone(), db.clone()), - consensus: consensus.clone(), - }) + .push(BodyStage { downloader: bodies_downloader, consensus: consensus.clone() }) .push(SenderRecoveryStage { batch_size: config.stages.sender_recovery.batch_size, commit_threshold: config.stages.sender_recovery.commit_threshold, diff --git a/crates/net/downloaders/src/bodies/concurrent.rs b/crates/net/downloaders/src/bodies/concurrent.rs index a350e5350c03..77794b5f898f 100644 --- a/crates/net/downloaders/src/bodies/concurrent.rs +++ b/crates/net/downloaders/src/bodies/concurrent.rs @@ -348,9 +348,7 @@ where loop { // Poll requests while let Poll::Ready(Some(response)) = this.in_progress_queue.poll_next_unpin(cx) { - println!("RESPONSE LEN >> {}", response.len()); let response = OrderedBodiesResponse(response); - println!("RESPONSE RANGE >> {:?}", response.block_range()); this.buffered_responses.push(response); }