Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better management of LastDeclareJobs - no more wrong fallback-system activations #904

Merged
merged 10 commits into from
May 21, 2024
16 changes: 13 additions & 3 deletions roles/jd-client/src/lib/downstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,14 @@ impl DownstreamMiningNode {
// pool's job_id. The below return as soon as we have a pairable job id for the
// template_id associated with this share.
let last_template_id = self_mutex.safe_lock(|s| s.last_template_id).unwrap();
let job_id =
UpstreamMiningNode::get_job_id(&upstream_mutex, last_template_id).await;
let job_id_future =
UpstreamMiningNode::get_job_id(&upstream_mutex, last_template_id);
let job_id = match timeout(Duration::from_secs(10), job_id_future).await {
Ok(job_id) => job_id,
Err(_) => {
return;
}
};
share.job_id = job_id;
debug!(
"Sending valid block solution upstream, with job_id {}",
Expand Down Expand Up @@ -643,7 +649,11 @@ impl ParseDownstreamCommonMessages<roles_logic_sv2::routing_logic::NoRouting>

use network_helpers_sv2::noise_connection_tokio::Connection;
use std::net::SocketAddr;
use tokio::{net::TcpListener, task::AbortHandle};
use tokio::{
net::TcpListener,
task::AbortHandle,
time::{timeout, Duration},
};

/// Strat listen for downstream mining node. Return as soon as one downstream connect.
#[allow(clippy::too_many_arguments)]
Expand Down
4 changes: 3 additions & 1 deletion roles/jd-client/src/lib/job_declarator/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ impl ParseServerJobDeclarationMessages for JobDeclarator {
message: ProvideMissingTransactions,
) -> Result<SendTo, Error> {
let tx_list = self
.last_declare_mining_job_sent
.last_declare_mining_jobs_sent
.get(&message.request_id)
.unwrap()
.clone()
.unwrap()
.tx_list
Expand Down
68 changes: 50 additions & 18 deletions roles/jd-client/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ pub struct JobDeclarator {
req_ids: Id,
min_extranonce_size: u16,
// (Sent DeclareMiningJob, is future, template id, merkle path)
last_declare_mining_job_sent: Option<LastDeclareJob>,
last_declare_mining_jobs_sent: HashMap<u32, Option<LastDeclareJob>>,
GitGab19 marked this conversation as resolved.
Show resolved Hide resolved
last_set_new_prev_hash: Option<SetNewPrevHash<'static>>,
set_new_prev_hash_counter: u8,
#[allow(clippy::type_complexity)]
future_jobs: HashMap<
u64,
Expand Down Expand Up @@ -114,34 +115,50 @@ impl JobDeclarator {
allocated_tokens: vec![],
req_ids: Id::new(),
min_extranonce_size,
last_declare_mining_job_sent: None,
last_declare_mining_jobs_sent: HashMap::with_capacity(10),
GitGab19 marked this conversation as resolved.
Show resolved Hide resolved
last_set_new_prev_hash: None,
future_jobs: HashMap::with_hasher(BuildNoHashHasher::default()),
up,
task_collector,
coinbase_tx_prefix: vec![].try_into().unwrap(),
coinbase_tx_suffix: vec![].try_into().unwrap(),
set_new_prev_hash_counter: 0,
}));

Self::allocate_tokens(&self_, 2).await;
Self::on_upstream_message(self_.clone());
Ok(self_)
}

fn get_last_declare_job_sent(self_mutex: &Arc<Mutex<Self>>) -> LastDeclareJob {
fn get_last_declare_job_sent(self_mutex: &Arc<Mutex<Self>>, request_id: u32) -> LastDeclareJob {
self_mutex
.safe_lock(|s| {
s.last_declare_mining_job_sent
s.last_declare_mining_jobs_sent
.get(&request_id)
.expect("LastDeclareJob not found")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should never panic while we are keeping a lock otherwise the shared data get unusable for everyone. If there is something that is obviously unreachable, is ok to put an expect but is not very clear why the code can never panic you should write an explanation. Otherwise, will be a lot better to take the Result or the Option out of the safe_lock and if needed unwrap it out

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you should remove the job btw

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somthing like that:

    fn get_last_declare_job_sent(self_mutex: &Arc<Mutex<Self>>, request_id: u32) -> LastDeclareJob {
        let id = self_mutex
            .safe_lock(|s| {
                s.last_declare_mining_jobs_sent
                    .remove(&request_id)
                    .clone()
            })
            .unwrap();
        id.expect("Impossible to get last declare job sent").clone().expect("This is ok")
    }

Some consideration:

  1. having and HashMap of Option do no make much sense, I would remove the inner Option so we do not have last exepct.
  2. not having the required id, is a possibility, the function should return an Option and the caller should handle the case of downstream sending shares with wrong id (closing the connection, or the entire process)
  3. as I said below could be enough having an array of 2 elements rather then an hashmap here, (1) and (2) still apply

.clone()
.expect("unreachable code")
})
.unwrap()
}

fn update_last_declare_job_sent(self_mutex: &Arc<Mutex<Self>>, j: LastDeclareJob) {
fn update_last_declare_job_sent(
self_mutex: &Arc<Mutex<Self>>,
request_id: u32,
j: LastDeclareJob,
) {
self_mutex
.safe_lock(|s| s.last_declare_mining_job_sent = Some(j))
.unwrap()
.safe_lock(|s| {
//check hashmap size in order to not let it grow indefinetely
if s.last_declare_mining_jobs_sent.len() < 10 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this number "10" really require a comment. Why is 10 and not 2 or 3? How many job we expect to have in the same time? I guess max is 2? If not why? And is ok having more than 3 jobs, if not we should just close the process cause something unexpected is happening, and we don't want people keep mining (paying electricity bill) when we are not sure that they are producing shares that will get payed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe would be better to just have an array with [job-1, job0]. Are there cases where we would need to access an older job? If downstream send share for a job that have is 7 job old, I would say that something is off either here or in the downstream so the safest thing to do would be close the connection. Maybe I'm missing something but I would like to have it addressed in a comment.

s.last_declare_mining_jobs_sent.insert(request_id, Some(j));
} else if let Some(min_key) = s.last_declare_mining_jobs_sent.keys().min().cloned()
{
s.last_declare_mining_jobs_sent.remove(&min_key);
s.last_declare_mining_jobs_sent.insert(request_id, Some(j));
}
})
.unwrap();
}

#[async_recursion]
Expand Down Expand Up @@ -245,7 +262,7 @@ impl JobDeclarator {
coinbase_pool_output,
tx_list: tx_list_.clone(),
};
Self::update_last_declare_job_sent(self_mutex, last_declare);
Self::update_last_declare_job_sent(self_mutex, id, last_declare);
let frame: StdFrame =
PoolMessages::JobDeclaration(JobDeclaration::DeclareMiningJob(declare_job))
.try_into()
Expand All @@ -272,7 +289,8 @@ impl JobDeclarator {
match next_message_to_send {
Ok(SendTo::None(Some(JobDeclaration::DeclareMiningJobSuccess(m)))) => {
let new_token = m.new_mining_job_token;
let last_declare = Self::get_last_declare_job_sent(&self_mutex);
let last_declare =
Self::get_last_declare_job_sent(&self_mutex, m.request_id);
let mut last_declare_mining_job_sent = last_declare.declare_job;
let is_future = last_declare.template.future_template;
let id = last_declare.template.template_id;
Expand Down Expand Up @@ -355,21 +373,35 @@ impl JobDeclarator {
) {
tokio::task::spawn(async move {
let id = set_new_prev_hash.template_id;
let _ = self_mutex.safe_lock(|s| {
s.last_set_new_prev_hash = Some(set_new_prev_hash.clone());
s.set_new_prev_hash_counter += 1;
});
let (job, up, merkle_path, template, mut pool_outs) = loop {
if let Some(future_job_tuple) = self_mutex
match self_mutex
.safe_lock(|s| {
s.last_set_new_prev_hash = Some(set_new_prev_hash.clone());
match s.future_jobs.remove(&id) {
Some((job, merkle_path, template, pool_outs)) => {
s.future_jobs = HashMap::with_hasher(BuildNoHashHasher::default());
Some((job, s.up.clone(), merkle_path, template, pool_outs))
}
None => None,
if s.set_new_prev_hash_counter > 1
&& s.last_set_new_prev_hash != Some(set_new_prev_hash.clone())
//it means that a new prev_hash is arrived while the previous hasn't exited the loop yet
{
s.set_new_prev_hash_counter -= 1;
Some(None)
} else {
s.future_jobs.remove(&id).map(
|(job, merkle_path, template, pool_outs)| {
s.future_jobs =
HashMap::with_hasher(BuildNoHashHasher::default());
s.set_new_prev_hash_counter -= 1;
Some((job, s.up.clone(), merkle_path, template, pool_outs))
},
)
}
})
.unwrap()
{
break future_job_tuple;
Some(Some(future_job_tuple)) => break future_job_tuple,
Some(None) => return,
None => {}
};
tokio::task::yield_now().await;
};
Expand Down
Loading