Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better management of LastDeclareJobs - no more wrong fallback-system activations #904

Merged
merged 10 commits into from
May 21, 2024
4 changes: 3 additions & 1 deletion roles/jd-client/src/lib/job_declarator/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ impl ParseServerJobDeclarationMessages for JobDeclarator {
message: ProvideMissingTransactions,
) -> Result<SendTo, Error> {
let tx_list = self
.last_declare_mining_job_sent
.last_declare_mining_jobs_sent
.get(&message.request_id)
.unwrap()
.clone()
.unwrap()
.tx_list
Expand Down
120 changes: 81 additions & 39 deletions roles/jd-client/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ pub struct JobDeclarator {
req_ids: Id,
min_extranonce_size: u16,
// (Sent DeclareMiningJob, is future, template id, merkle path)
last_declare_mining_job_sent: Option<LastDeclareJob>,
last_declare_mining_jobs_sent: HashMap<u32, Option<LastDeclareJob>>,
GitGab19 marked this conversation as resolved.
Show resolved Hide resolved
last_set_new_prev_hash: Option<SetNewPrevHash<'static>>,
set_new_prev_hash_counter: u8,
#[allow(clippy::type_complexity)]
future_jobs: HashMap<
u64,
Expand Down Expand Up @@ -114,34 +115,50 @@ impl JobDeclarator {
allocated_tokens: vec![],
req_ids: Id::new(),
min_extranonce_size,
last_declare_mining_job_sent: None,
last_declare_mining_jobs_sent: HashMap::with_capacity(10),
GitGab19 marked this conversation as resolved.
Show resolved Hide resolved
last_set_new_prev_hash: None,
future_jobs: HashMap::with_hasher(BuildNoHashHasher::default()),
up,
task_collector,
coinbase_tx_prefix: vec![].try_into().unwrap(),
coinbase_tx_suffix: vec![].try_into().unwrap(),
set_new_prev_hash_counter: 0,
}));

Self::allocate_tokens(&self_, 2).await;
Self::on_upstream_message(self_.clone());
Ok(self_)
}

fn get_last_declare_job_sent(self_mutex: &Arc<Mutex<Self>>) -> LastDeclareJob {
fn get_last_declare_job_sent(self_mutex: &Arc<Mutex<Self>>, request_id: u32) -> LastDeclareJob {
self_mutex
.safe_lock(|s| {
s.last_declare_mining_job_sent
s.last_declare_mining_jobs_sent
.get(&request_id)
.expect("LastDeclareJob not found")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should never panic while we are keeping a lock otherwise the shared data get unusable for everyone. If there is something that is obviously unreachable, is ok to put an expect but is not very clear why the code can never panic you should write an explanation. Otherwise, will be a lot better to take the Result or the Option out of the safe_lock and if needed unwrap it out

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you should remove the job btw

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somthing like that:

    fn get_last_declare_job_sent(self_mutex: &Arc<Mutex<Self>>, request_id: u32) -> LastDeclareJob {
        let id = self_mutex
            .safe_lock(|s| {
                s.last_declare_mining_jobs_sent
                    .remove(&request_id)
                    .clone()
            })
            .unwrap();
        id.expect("Impossible to get last declare job sent").clone().expect("This is ok")
    }

Some consideration:

  1. having and HashMap of Option do no make much sense, I would remove the inner Option so we do not have last exepct.
  2. not having the required id, is a possibility, the function should return an Option and the caller should handle the case of downstream sending shares with wrong id (closing the connection, or the entire process)
  3. as I said below could be enough having an array of 2 elements rather then an hashmap here, (1) and (2) still apply

.clone()
.expect("unreachable code")
})
.unwrap()
}

fn update_last_declare_job_sent(self_mutex: &Arc<Mutex<Self>>, j: LastDeclareJob) {
fn update_last_declare_job_sent(
self_mutex: &Arc<Mutex<Self>>,
request_id: u32,
j: LastDeclareJob,
) {
self_mutex
.safe_lock(|s| s.last_declare_mining_job_sent = Some(j))
.unwrap()
.safe_lock(|s| {
//check hashmap size in order to not let it grow indefinetely
if s.last_declare_mining_jobs_sent.len() < 10 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this number "10" really require a comment. Why is 10 and not 2 or 3? How many job we expect to have in the same time? I guess max is 2? If not why? And is ok having more than 3 jobs, if not we should just close the process cause something unexpected is happening, and we don't want people keep mining (paying electricity bill) when we are not sure that they are producing shares that will get payed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe would be better to just have an array with [job-1, job0]. Are there cases where we would need to access an older job? If downstream send share for a job that have is 7 job old, I would say that something is off either here or in the downstream so the safest thing to do would be close the connection. Maybe I'm missing something but I would like to have it addressed in a comment.

s.last_declare_mining_jobs_sent.insert(request_id, Some(j));
} else if let Some(min_key) = s.last_declare_mining_jobs_sent.keys().min().cloned()
{
s.last_declare_mining_jobs_sent.remove(&min_key);
s.last_declare_mining_jobs_sent.insert(request_id, Some(j));
}
})
.unwrap();
}

#[async_recursion]
Expand Down Expand Up @@ -245,7 +262,7 @@ impl JobDeclarator {
coinbase_pool_output,
tx_list: tx_list_.clone(),
};
Self::update_last_declare_job_sent(self_mutex, last_declare);
Self::update_last_declare_job_sent(self_mutex, id, last_declare);
let frame: StdFrame =
PoolMessages::JobDeclaration(JobDeclaration::DeclareMiningJob(declare_job))
.try_into()
Expand All @@ -272,7 +289,8 @@ impl JobDeclarator {
match next_message_to_send {
Ok(SendTo::None(Some(JobDeclaration::DeclareMiningJobSuccess(m)))) => {
let new_token = m.new_mining_job_token;
let last_declare = Self::get_last_declare_job_sent(&self_mutex);
let last_declare =
Self::get_last_declare_job_sent(&self_mutex, m.request_id);
let mut last_declare_mining_job_sent = last_declare.declare_job;
let is_future = last_declare.template.future_template;
let id = last_declare.template.template_id;
Expand Down Expand Up @@ -355,43 +373,67 @@ impl JobDeclarator {
) {
tokio::task::spawn(async move {
let id = set_new_prev_hash.template_id;
let (job, up, merkle_path, template, mut pool_outs) = loop {
if let Some(future_job_tuple) = self_mutex
let _ = self_mutex.safe_lock(|s| {
s.last_set_new_prev_hash = Some(set_new_prev_hash.clone());
s.set_new_prev_hash_counter += 1;
});

let mut future_job_tuple_opt = None;

loop {
match self_mutex
.safe_lock(|s| {
s.last_set_new_prev_hash = Some(set_new_prev_hash.clone());
match s.future_jobs.remove(&id) {
Some((job, merkle_path, template, pool_outs)) => {
s.future_jobs = HashMap::with_hasher(BuildNoHashHasher::default());
Some((job, s.up.clone(), merkle_path, template, pool_outs))
}
None => None,
if s.set_new_prev_hash_counter > 1
&& s.last_set_new_prev_hash != Some(set_new_prev_hash.clone())
{
s.set_new_prev_hash_counter -= 1;
Some(None)
} else {
s.future_jobs.remove(&id).map(
|(job, merkle_path, template, pool_outs)| {
s.future_jobs =
HashMap::with_hasher(BuildNoHashHasher::default());
s.set_new_prev_hash_counter -= 1;
Some((job, s.up.clone(), merkle_path, template, pool_outs))
},
)
}
})
.unwrap()
{
break future_job_tuple;
Some(Some(future_job_tuple)) => {
future_job_tuple_opt = Some(future_job_tuple);
break;
}
Some(None) => {
break;
}
None => {}
};
tokio::task::yield_now().await;
};
let signed_token = job.mining_job_token.clone();
let mut template_outs = template.coinbase_tx_outputs.to_vec();
pool_outs.append(&mut template_outs);
Upstream::set_custom_jobs(
&up,
job,
set_new_prev_hash,
merkle_path,
signed_token,
template.coinbase_tx_version,
template.coinbase_prefix,
template.coinbase_tx_input_sequence,
template.coinbase_tx_value_remaining,
pool_outs,
template.coinbase_tx_locktime,
template.template_id,
)
.await
.unwrap();
}

if let Some((job, up, merkle_path, template, mut pool_outs)) = future_job_tuple_opt {
let signed_token = job.mining_job_token.clone();
let mut template_outs = template.coinbase_tx_outputs.to_vec();
pool_outs.append(&mut template_outs);
Upstream::set_custom_jobs(
&up,
job,
set_new_prev_hash,
merkle_path,
signed_token,
template.coinbase_tx_version,
template.coinbase_prefix,
template.coinbase_tx_input_sequence,
template.coinbase_tx_value_remaining,
pool_outs,
template.coinbase_tx_locktime,
template.template_id,
)
.await
.unwrap();
}
});
}

Expand Down
Loading