Skip to content

Commit

Permalink
Better management of LastDeclareJobs - no more wrong fallback-system …
Browse files Browse the repository at this point in the history
…activations (#904)
  • Loading branch information
GitGab19 authored and Fi3 committed May 28, 2024
1 parent 5920b17 commit 339d914
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 22 deletions.
16 changes: 13 additions & 3 deletions roles/jd-client/src/lib/downstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,14 @@ impl DownstreamMiningNode {
// pool's job_id. The below return as soon as we have a pairable job id for the
// template_id associated with this share.
let last_template_id = self_mutex.safe_lock(|s| s.last_template_id).unwrap();
let job_id =
UpstreamMiningNode::get_job_id(&upstream_mutex, last_template_id).await;
let job_id_future =
UpstreamMiningNode::get_job_id(&upstream_mutex, last_template_id);
let job_id = match timeout(Duration::from_secs(10), job_id_future).await {
Ok(job_id) => job_id,
Err(_) => {
return;
}
};
share.job_id = job_id;
debug!(
"Sending valid block solution upstream, with job_id {}",
Expand Down Expand Up @@ -643,7 +649,11 @@ impl ParseDownstreamCommonMessages<roles_logic_sv2::routing_logic::NoRouting>

use network_helpers_sv2::noise_connection_tokio::Connection;
use std::net::SocketAddr;
use tokio::{net::TcpListener, task::AbortHandle};
use tokio::{
net::TcpListener,
task::AbortHandle,
time::{timeout, Duration},
};

/// Strat listen for downstream mining node. Return as soon as one downstream connect.
#[allow(clippy::too_many_arguments)]
Expand Down
4 changes: 3 additions & 1 deletion roles/jd-client/src/lib/job_declarator/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ impl ParseServerJobDeclarationMessages for JobDeclarator {
message: ProvideMissingTransactions,
) -> Result<SendTo, Error> {
let tx_list = self
.last_declare_mining_job_sent
.last_declare_mining_jobs_sent
.get(&message.request_id)
.unwrap()
.clone()
.unwrap()
.tx_list
Expand Down
68 changes: 50 additions & 18 deletions roles/jd-client/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ pub struct JobDeclarator {
req_ids: Id,
min_extranonce_size: u16,
// (Sent DeclareMiningJob, is future, template id, merkle path)
last_declare_mining_job_sent: Option<LastDeclareJob>,
last_declare_mining_jobs_sent: HashMap<u32, Option<LastDeclareJob>>,
last_set_new_prev_hash: Option<SetNewPrevHash<'static>>,
set_new_prev_hash_counter: u8,
#[allow(clippy::type_complexity)]
future_jobs: HashMap<
u64,
Expand Down Expand Up @@ -114,34 +115,50 @@ impl JobDeclarator {
allocated_tokens: vec![],
req_ids: Id::new(),
min_extranonce_size,
last_declare_mining_job_sent: None,
last_declare_mining_jobs_sent: HashMap::with_capacity(10),
last_set_new_prev_hash: None,
future_jobs: HashMap::with_hasher(BuildNoHashHasher::default()),
up,
task_collector,
coinbase_tx_prefix: vec![].try_into().unwrap(),
coinbase_tx_suffix: vec![].try_into().unwrap(),
set_new_prev_hash_counter: 0,
}));

Self::allocate_tokens(&self_, 2).await;
Self::on_upstream_message(self_.clone());
Ok(self_)
}

fn get_last_declare_job_sent(self_mutex: &Arc<Mutex<Self>>) -> LastDeclareJob {
fn get_last_declare_job_sent(self_mutex: &Arc<Mutex<Self>>, request_id: u32) -> LastDeclareJob {
self_mutex
.safe_lock(|s| {
s.last_declare_mining_job_sent
s.last_declare_mining_jobs_sent
.get(&request_id)
.expect("LastDeclareJob not found")
.clone()
.expect("unreachable code")
})
.unwrap()
}

fn update_last_declare_job_sent(self_mutex: &Arc<Mutex<Self>>, j: LastDeclareJob) {
fn update_last_declare_job_sent(
self_mutex: &Arc<Mutex<Self>>,
request_id: u32,
j: LastDeclareJob,
) {
self_mutex
.safe_lock(|s| s.last_declare_mining_job_sent = Some(j))
.unwrap()
.safe_lock(|s| {
//check hashmap size in order to not let it grow indefinetely
if s.last_declare_mining_jobs_sent.len() < 10 {
s.last_declare_mining_jobs_sent.insert(request_id, Some(j));
} else if let Some(min_key) = s.last_declare_mining_jobs_sent.keys().min().cloned()
{
s.last_declare_mining_jobs_sent.remove(&min_key);
s.last_declare_mining_jobs_sent.insert(request_id, Some(j));
}
})
.unwrap();
}

#[async_recursion]
Expand Down Expand Up @@ -245,7 +262,7 @@ impl JobDeclarator {
coinbase_pool_output,
tx_list: tx_list_.clone(),
};
Self::update_last_declare_job_sent(self_mutex, last_declare);
Self::update_last_declare_job_sent(self_mutex, id, last_declare);
let frame: StdFrame =
PoolMessages::JobDeclaration(JobDeclaration::DeclareMiningJob(declare_job))
.try_into()
Expand All @@ -272,7 +289,8 @@ impl JobDeclarator {
match next_message_to_send {
Ok(SendTo::None(Some(JobDeclaration::DeclareMiningJobSuccess(m)))) => {
let new_token = m.new_mining_job_token;
let last_declare = Self::get_last_declare_job_sent(&self_mutex);
let last_declare =
Self::get_last_declare_job_sent(&self_mutex, m.request_id);
let mut last_declare_mining_job_sent = last_declare.declare_job;
let is_future = last_declare.template.future_template;
let id = last_declare.template.template_id;
Expand Down Expand Up @@ -355,21 +373,35 @@ impl JobDeclarator {
) {
tokio::task::spawn(async move {
let id = set_new_prev_hash.template_id;
let _ = self_mutex.safe_lock(|s| {
s.last_set_new_prev_hash = Some(set_new_prev_hash.clone());
s.set_new_prev_hash_counter += 1;
});
let (job, up, merkle_path, template, mut pool_outs) = loop {
if let Some(future_job_tuple) = self_mutex
match self_mutex
.safe_lock(|s| {
s.last_set_new_prev_hash = Some(set_new_prev_hash.clone());
match s.future_jobs.remove(&id) {
Some((job, merkle_path, template, pool_outs)) => {
s.future_jobs = HashMap::with_hasher(BuildNoHashHasher::default());
Some((job, s.up.clone(), merkle_path, template, pool_outs))
}
None => None,
if s.set_new_prev_hash_counter > 1
&& s.last_set_new_prev_hash != Some(set_new_prev_hash.clone())
//it means that a new prev_hash is arrived while the previous hasn't exited the loop yet
{
s.set_new_prev_hash_counter -= 1;
Some(None)
} else {
s.future_jobs.remove(&id).map(
|(job, merkle_path, template, pool_outs)| {
s.future_jobs =
HashMap::with_hasher(BuildNoHashHasher::default());
s.set_new_prev_hash_counter -= 1;
Some((job, s.up.clone(), merkle_path, template, pool_outs))
},
)
}
})
.unwrap()
{
break future_job_tuple;
Some(Some(future_job_tuple)) => break future_job_tuple,
Some(None) => return,
None => {}
};
tokio::task::yield_now().await;
};
Expand Down

0 comments on commit 339d914

Please sign in to comment.