Skip to content

Commit

Permalink
Remove exit variable from VerifyStage
Browse files Browse the repository at this point in the history
  • Loading branch information
garious committed Jul 5, 2018
1 parent 2c71b3a commit 09d6900
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 14 deletions.
21 changes: 10 additions & 11 deletions src/sigverify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@

use packet::SharedPackets;
use rand::{thread_rng, Rng};
use result::Result;
use result::{Error, Result};
use service::Service;
use sigverify;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, spawn, JoinHandle};
use std::time::Instant;
Expand All @@ -24,11 +23,10 @@ pub struct SigVerifyStage {

impl SigVerifyStage {
pub fn new(
exit: Arc<AtomicBool>,
packet_receiver: Receiver<SharedPackets>,
) -> (Self, Receiver<Vec<(SharedPackets, Vec<u8>)>>) {
let (verified_sender, verified_receiver) = channel();
let thread_hdls = Self::verifier_services(exit, packet_receiver, verified_sender);
let thread_hdls = Self::verifier_services(packet_receiver, verified_sender);
(SigVerifyStage { thread_hdls }, verified_receiver)
}

Expand Down Expand Up @@ -75,27 +73,28 @@ impl SigVerifyStage {
}

fn verifier_service(
exit: Arc<AtomicBool>,
packet_receiver: Arc<Mutex<PacketReceiver>>,
verified_sender: Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> JoinHandle<()> {
spawn(move || loop {
let e = Self::verifier(&packet_receiver.clone(), &verified_sender.clone());
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
if let Err(e) = Self::verifier(&packet_receiver.clone(), &verified_sender.clone()) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
}
}
})
}

fn verifier_services(
exit: Arc<AtomicBool>,
packet_receiver: PacketReceiver,
verified_sender: Sender<Vec<(SharedPackets, Vec<u8>)>>,
) -> Vec<JoinHandle<()>> {
let sender = Arc::new(Mutex::new(verified_sender));
let receiver = Arc::new(Mutex::new(packet_receiver));
(0..4)
.map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone()))
.map(|_| Self::verifier_service(receiver.clone(), sender.clone()))
.collect()
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ impl Tpu {
let packet_recycler = PacketRecycler::default();

let (fetch_stage, packet_receiver) =
FetchStage::new(transactions_socket, exit.clone(), packet_recycler.clone());
FetchStage::new(transactions_socket, exit, packet_recycler.clone());

let (sigverify_stage, verified_receiver) =
SigVerifyStage::new(exit.clone(), packet_receiver);
let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver);

let (banking_stage, signal_receiver) =
BankingStage::new(bank.clone(), verified_receiver, packet_recycler.clone());
Expand Down

0 comments on commit 09d6900

Please sign in to comment.