This repository has been archived by the owner on Oct 17, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 67
/
restarter.rs
128 lines (112 loc) · 4.89 KB
/
restarter.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{Node, NodeStorage};
use arc_swap::ArcSwap;
use config::{Committee, Parameters};
use crypto::{traits::KeyPair as _, KeyPair};
use executor::{ExecutionState, ExecutorOutput};
use futures::future::join_all;
use network::{PrimaryToWorkerNetwork, ReliableNetwork, UnreliableNetwork, WorkerToPrimaryNetwork};
use prometheus::Registry;
use std::{fmt::Debug, path::PathBuf, sync::Arc};
use tokio::sync::mpsc::{Receiver, Sender};
use types::{PrimaryWorkerMessage, ReconfigureNotification, WorkerPrimaryMessage};
// Module to start a node (primary, workers and default consensus), keep it running, and restarting it
/// every time the committee changes.
pub struct NodeRestarter;
impl NodeRestarter {
pub async fn watch<State>(
keypair: KeyPair,
committee: &Committee,
storage_base_path: PathBuf,
execution_state: Arc<State>,
parameters: Parameters,
mut rx_reconfigure: Receiver<(KeyPair, Committee)>,
tx_output: Sender<ExecutorOutput<State>>,
registry: &Registry,
) where
State: ExecutionState + Send + Sync + 'static,
State::Outcome: Send + 'static,
State::Error: Debug,
{
let mut keypair = keypair;
let mut name = keypair.public().clone();
let mut committee = committee.clone();
let mut handles = Vec::new();
let mut primary_network = WorkerToPrimaryNetwork::default();
let mut worker_network = PrimaryToWorkerNetwork::default();
// Listen for new committees.
loop {
tracing::info!("Starting epoch E{}", committee.epoch());
// Get a fresh store for the new epoch.
let mut store_path = storage_base_path.clone();
store_path.push(format!("epoch{}", committee.epoch()));
let store = NodeStorage::reopen(store_path);
// Restart the relevant components.
let primary_handles = Node::spawn_primary(
keypair,
Arc::new(ArcSwap::new(Arc::new(committee.clone()))),
&store,
parameters.clone(),
/* consensus */ true,
execution_state.clone(),
tx_output.clone(),
registry,
)
.await
.unwrap();
let worker_handles = Node::spawn_workers(
name.clone(),
/* worker_ids */ vec![0],
Arc::new(ArcSwap::new(Arc::new(committee.clone()))),
&store,
parameters.clone(),
registry,
);
handles.extend(primary_handles);
handles.extend(worker_handles);
// Wait for a committee change.
let (new_keypair, new_committee) = match rx_reconfigure.recv().await {
Some(x) => x,
None => break,
};
tracing::info!("Starting reconfiguration with committee {committee}");
// Shutdown all relevant components.
let address = committee
.primary(&name)
.expect("Our key is not in the committee")
.worker_to_primary;
let message = WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::Shutdown);
let primary_cancel_handle = primary_network.send(address, &message).await;
let addresses = committee
.our_workers(&name)
.expect("Our key is not in the committee")
.into_iter()
.map(|x| x.primary_to_worker)
.collect();
let message = PrimaryWorkerMessage::Reconfigure(ReconfigureNotification::Shutdown);
let worker_cancel_handles = worker_network
.unreliable_broadcast(addresses, &message)
.await;
// Ensure the message has been received.
primary_cancel_handle
.await
.expect("Failed to notify primary");
join_all(worker_cancel_handles).await;
tracing::debug!("Committee reconfiguration message successfully sent");
// Cleanup the network.
worker_network.cleanup(committee.network_diff(&new_committee));
// Wait for the components to shut down.
join_all(handles.drain(..)).await;
tracing::debug!("All tasks successfully exited");
// Give it an extra second in case the last task to exit is a network server. The OS
// may need a moment to make the TCP ports available again.
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
tracing::debug!("Epoch E{} terminated", committee.epoch());
// Update the settings for the next epoch.
keypair = new_keypair;
name = keypair.public().clone();
committee = new_committee;
}
}
}