From 09d69003517a7ea84e5f58eca523e0ae58e7198d Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Jul 2018 15:58:33 -0600 Subject: [PATCH] Remove exit variable from VerifyStage --- src/sigverify_stage.rs | 21 ++++++++++----------- src/tpu.rs | 5 ++--- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/sigverify_stage.rs b/src/sigverify_stage.rs index a6eeda8a2eaf23..e90ea238f6384b 100644 --- a/src/sigverify_stage.rs +++ b/src/sigverify_stage.rs @@ -7,11 +7,10 @@ use packet::SharedPackets; use rand::{thread_rng, Rng}; -use result::Result; +use result::{Error, Result}; use service::Service; use sigverify; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Mutex}; use std::thread::{self, spawn, JoinHandle}; use std::time::Instant; @@ -24,11 +23,10 @@ pub struct SigVerifyStage { impl SigVerifyStage { pub fn new( - exit: Arc, packet_receiver: Receiver, ) -> (Self, Receiver)>>) { let (verified_sender, verified_receiver) = channel(); - let thread_hdls = Self::verifier_services(exit, packet_receiver, verified_sender); + let thread_hdls = Self::verifier_services(packet_receiver, verified_sender); (SigVerifyStage { thread_hdls }, verified_receiver) } @@ -75,27 +73,28 @@ impl SigVerifyStage { } fn verifier_service( - exit: Arc, packet_receiver: Arc>, verified_sender: Arc)>>>>, ) -> JoinHandle<()> { spawn(move || loop { - let e = Self::verifier(&packet_receiver.clone(), &verified_sender.clone()); - if e.is_err() && exit.load(Ordering::Relaxed) { - break; + if let Err(e) = Self::verifier(&packet_receiver.clone(), &verified_sender.clone()) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => error!("{:?}", e), + } } }) } fn verifier_services( - exit: Arc, packet_receiver: PacketReceiver, verified_sender: Sender)>>, ) -> Vec> { let sender = Arc::new(Mutex::new(verified_sender)); let receiver = Arc::new(Mutex::new(packet_receiver)); (0..4) - .map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone())) + .map(|_| Self::verifier_service(receiver.clone(), sender.clone())) .collect() } } diff --git a/src/tpu.rs b/src/tpu.rs index e3cacad2bc1e9c..ae213e888287fe 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -61,10 +61,9 @@ impl Tpu { let packet_recycler = PacketRecycler::default(); let (fetch_stage, packet_receiver) = - FetchStage::new(transactions_socket, exit.clone(), packet_recycler.clone()); + FetchStage::new(transactions_socket, exit, packet_recycler.clone()); - let (sigverify_stage, verified_receiver) = - SigVerifyStage::new(exit.clone(), packet_receiver); + let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver); let (banking_stage, signal_receiver) = BankingStage::new(bank.clone(), verified_receiver, packet_recycler.clone());