Skip to content

Commit

Permalink
feat(cli): spawn task downloaders (#1055)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk authored Jan 26, 2023
1 parent a9c75d2 commit b5dab61
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 27 deletions.
56 changes: 31 additions & 25 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,41 +155,47 @@ impl Command {
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");

let fetch_client = Arc::new(network.fetch_client().await?);

// Spawn headers downloader
let headers_downloader = headers::task::TaskDownloader::spawn(
headers::linear::LinearDownloadBuilder::default()
.request_limit(config.stages.headers.downloader_batch_size)
.stream_batch_size(config.stages.headers.commit_threshold as usize)
// NOTE: the head and target will be set from inside the stage before the
// downloader is called
.build(
consensus.clone(),
fetch_client.clone(),
Default::default(),
Default::default(),
),
);

// Spawn bodies downloader
let bodies_downloader = bodies::task::TaskDownloader::spawn(
bodies::concurrent::ConcurrentDownloaderBuilder::default()
.with_stream_batch_size(config.stages.bodies.downloader_stream_batch_size)
.with_request_limit(config.stages.bodies.downloader_request_limit)
.with_max_buffered_responses(config.stages.bodies.downloader_max_buffered_responses)
.with_concurrent_requests_range(
config.stages.bodies.downloader_min_concurrent_requests..=
config.stages.bodies.downloader_max_concurrent_requests,
)
.build(fetch_client.clone(), consensus.clone(), db.clone()),
);

let mut pipeline = reth_stages::Pipeline::default()
.with_sync_state_updater(network.clone())
.with_channel(sender)
.push(HeaderStage {
downloader: headers::linear::LinearDownloadBuilder::default()
.request_limit(config.stages.headers.downloader_batch_size)
.stream_batch_size(config.stages.headers.commit_threshold as usize)
// NOTE: the head and target will be set from inside the stage before the
// downloader is called
.build(
consensus.clone(),
fetch_client.clone(),
Default::default(),
Default::default(),
),
downloader: headers_downloader,
consensus: consensus.clone(),
sync_status_updates: network.clone(),
})
.push(TotalDifficultyStage {
commit_threshold: config.stages.total_difficulty.commit_threshold,
})
.push(BodyStage {
downloader: bodies::concurrent::ConcurrentDownloaderBuilder::default()
.with_stream_batch_size(config.stages.bodies.downloader_stream_batch_size)
.with_request_limit(config.stages.bodies.downloader_request_limit)
.with_max_buffered_responses(
config.stages.bodies.downloader_max_buffered_responses,
)
.with_concurrent_requests_range(
config.stages.bodies.downloader_min_concurrent_requests..=
config.stages.bodies.downloader_max_concurrent_requests,
)
.build(fetch_client.clone(), consensus.clone(), db.clone()),
consensus: consensus.clone(),
})
.push(BodyStage { downloader: bodies_downloader, consensus: consensus.clone() })
.push(SenderRecoveryStage {
batch_size: config.stages.sender_recovery.batch_size,
commit_threshold: config.stages.sender_recovery.commit_threshold,
Expand Down
2 changes: 0 additions & 2 deletions crates/net/downloaders/src/bodies/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,7 @@ where
loop {
// Poll requests
while let Poll::Ready(Some(response)) = this.in_progress_queue.poll_next_unpin(cx) {
println!("RESPONSE LEN >> {}", response.len());
let response = OrderedBodiesResponse(response);
println!("RESPONSE RANGE >> {:?}", response.block_range());
this.buffered_responses.push(response);
}

Expand Down

0 comments on commit b5dab61

Please sign in to comment.