-
Notifications
You must be signed in to change notification settings - Fork 25
/
scheduler.rs
303 lines (282 loc) · 12.8 KB
/
scheduler.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
use std::{
cmp::min,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Mutex,
},
thread,
};
use smallvec::SmallVec;
use crate::{FinishExecFlags, IncarnationStatus, Task, TxIdx, TxStatus, TxVersion};
// The Pevm collaborative scheduler coordinates execution & validation
// tasks among work threads.
//
// To pick a task, threads increment the smaller of the (execution and
// validation) task counters until they find a task that is ready to be
// performed. To redo a task for a transaction, the thread updates the status
// and reduces the corresponding counter to the transaction index if it had a
// larger value.
//
// An incarnation may write to a memory location that was previously
// read by a higher transaction. Thus, when an incarnation finishes, new
// validation tasks are created for higher transactions.
//
// Validation tasks are scheduled optimistically and in parallel. Identifying
// validation failures and aborting incarnations as soon as possible is critical
// for performance, as any incarnation that reads values written by an
// incarnation that aborts also must abort.
// When an incarnation writes only to a subset of memory locations written
// by the previously completed incarnation of the same transaction, we schedule
// validation just for the incarnation. This is sufficient as the whole write
// set of the previous incarnation is marked as ESTIMATE during the abort.
// The abort leads to optimistically creating validation tasks for higher
// transactions. Threads that perform these tasks can already detect validation
// failure due to the ESTIMATE markers on memory locations, instead of waiting
// for a subsequent incarnation to finish.
#[derive(Debug)]
pub(crate) struct Scheduler {
// The number of transactions in this block.
block_size: usize,
// The most up-to-date incarnation number (initially 0) and
// the status of this incarnation.
// TODO: Consider packing [TxStatus]s into atomics instead of
// [Mutex] given how small they are.
transactions_status: Vec<Mutex<TxStatus>>,
// The list of dependent transactions to resume when the
// key transaction is re-executed.
transactions_dependents: Vec<Mutex<SmallVec<[TxIdx; 1]>>>,
// The next transaction to try and execute.
execution_idx: AtomicUsize,
// The next transaction to try and validate.
validation_idx: AtomicUsize,
// We won't validate until we find the first non-lazy transaction that
// needs to read explicit values. We also skip the first transaction.
min_validation_idx: AtomicUsize,
// The number of validated transactions
num_validated: AtomicUsize,
// True if the scheduler has been aborted, likely due to fatal execution
// errors.
aborted: AtomicBool,
}
// TODO: Better error handling.
// Like returning errors instead of panicking on [unreachable]s.
impl Scheduler {
pub(crate) fn new(block_size: usize) -> Self {
Self {
block_size,
execution_idx: AtomicUsize::new(0),
transactions_status: (0..block_size)
.map(|_| {
Mutex::new(TxStatus {
incarnation: 0,
status: IncarnationStatus::ReadyToExecute,
})
})
.collect(),
transactions_dependents: (0..block_size).map(|_| Mutex::default()).collect(),
// We won't validate until we find the first non-lazy transaction that
// needs to read explicit values. We also skip the first transaction.
validation_idx: AtomicUsize::new(block_size),
min_validation_idx: AtomicUsize::new(block_size),
num_validated: AtomicUsize::new(0),
aborted: AtomicBool::new(false),
}
}
pub(crate) fn abort(&self) {
self.aborted.store(true, Ordering::Release);
}
fn try_execute(&self, tx_idx: TxIdx) -> Option<TxVersion> {
if tx_idx < self.block_size {
let mut tx = index_mutex!(self.transactions_status, tx_idx);
if tx.status == IncarnationStatus::ReadyToExecute {
tx.status = IncarnationStatus::Executing;
return Some(TxVersion {
tx_idx,
tx_incarnation: tx.incarnation,
});
}
}
None
}
pub(crate) fn next_task(&self) -> Option<Task> {
while !self.aborted.load(Ordering::Acquire) {
let execution_idx = self.execution_idx.load(Ordering::Acquire);
let validation_idx = self.validation_idx.load(Ordering::Acquire);
if execution_idx >= self.block_size && validation_idx >= self.block_size {
if self.num_validated.load(Ordering::Acquire)
>= self.block_size - self.min_validation_idx.load(Ordering::Acquire)
{
break;
}
thread::yield_now();
continue;
}
// Prioritize a validation task to minimize re-execution
if validation_idx < execution_idx {
let tx_idx = self.validation_idx.fetch_add(1, Ordering::Release);
if tx_idx < self.block_size {
let mut tx = index_mutex!(self.transactions_status, tx_idx);
// "Steal" execution job while holding the lock
if tx.status == IncarnationStatus::ReadyToExecute {
tx.status = IncarnationStatus::Executing;
return Some(Task::Execution(TxVersion {
tx_idx,
tx_incarnation: tx.incarnation,
}));
}
// Start a typical validation task
if matches!(
tx.status,
IncarnationStatus::Executed | IncarnationStatus::Validated
) {
return Some(Task::Validation(TxVersion {
tx_idx,
tx_incarnation: tx.incarnation,
}));
}
// Validation index is still catching up so continue a
// new loop iteration to refetch the latest indices
// before deciding again.
if tx.status == IncarnationStatus::Aborting {
continue;
}
// Fall back to execution job as this executing tx will
// decide if validation is needed when it's done. If it
// does, all validation tasks here would be redone anyway.
}
}
// Prioritize execution task
if let Some(tx_version) =
self.try_execute(self.execution_idx.fetch_add(1, Ordering::Release))
{
return Some(Task::Execution(tx_version));
}
}
None
}
// Add [tx_idx] as a dependent of [blocking_tx_idx] so [tx_idx] is
// re-executed when the next [blocking_tx_idx] incarnation is executed.
// Return [false] if we encounter a race condition when [blocking_tx_idx]
// gets re-executed before the dependency can be added.
pub(crate) fn add_dependency(&self, tx_idx: TxIdx, blocking_tx_idx: TxIdx) -> bool {
// This is an important lock to prevent a race condition where the blocking
// transaction completes re-execution before this dependency can be added.
let blocking_tx = index_mutex!(self.transactions_status, blocking_tx_idx);
if matches!(
blocking_tx.status,
IncarnationStatus::Executed | IncarnationStatus::Validated
) {
return false;
}
let mut tx = index_mutex!(self.transactions_status, tx_idx);
debug_assert_eq!(tx.status, IncarnationStatus::Executing);
tx.status = IncarnationStatus::Aborting;
let mut blocking_dependents = index_mutex!(self.transactions_dependents, blocking_tx_idx);
blocking_dependents.push(tx_idx);
true
}
fn set_ready_status(&self, tx_idx: TxIdx) {
let mut tx = index_mutex!(self.transactions_status, tx_idx);
debug_assert_eq!(tx.status, IncarnationStatus::Aborting);
tx.status = IncarnationStatus::ReadyToExecute;
tx.incarnation += 1;
}
pub(crate) fn finish_execution(
&self,
tx_version: TxVersion,
flags: FinishExecFlags,
) -> Option<Task> {
let mut tx = index_mutex!(self.transactions_status, tx_version.tx_idx);
debug_assert_eq!(tx.status, IncarnationStatus::Executing);
debug_assert_eq!(tx.incarnation, tx_version.tx_incarnation);
// Resume dependent transactions
let mut dependents = index_mutex!(self.transactions_dependents, tx_version.tx_idx);
for tx_idx in dependents.drain(..) {
self.set_ready_status(tx_idx);
self.execution_idx.fetch_min(tx_idx, Ordering::Release);
}
// TODO: Simplify or better document this logic.
// Decide where to validate from next
let min_validation_idx = if flags.contains(FinishExecFlags::NeedValidation) {
min(
self.min_validation_idx
.fetch_min(tx_version.tx_idx, Ordering::Release),
tx_version.tx_idx,
)
} else {
self.min_validation_idx.load(Ordering::Acquire)
};
// Have found a min validation index to even bother
if min_validation_idx < self.block_size {
// Must re-validate from min as this transaction is lower
if tx_version.tx_idx < min_validation_idx {
if flags.contains(FinishExecFlags::WroteNewLocation) {
self.validation_idx
.fetch_min(min_validation_idx, Ordering::Release);
}
}
// Validate from this transaction as it's in between min and the current
// validation index.
else if tx_version.tx_idx < self.validation_idx.load(Ordering::Acquire) {
if flags.contains(FinishExecFlags::WroteNewLocation) {
self.validation_idx
.fetch_min(tx_version.tx_idx + 1, Ordering::Release);
}
if flags.contains(FinishExecFlags::NeedValidation) {
tx.status = IncarnationStatus::Executed;
return Some(Task::Validation(tx_version));
} else {
tx.status = IncarnationStatus::Validated;
self.num_validated.fetch_add(1, Ordering::Release);
}
}
// Don't need to validate anything if the current validation index is
// lower or equal -- it will catch up later.
}
if flags.contains(FinishExecFlags::NeedValidation) {
tx.status = IncarnationStatus::Executed;
} else {
tx.status = IncarnationStatus::Validated;
self.num_validated.fetch_add(1, Ordering::Release);
}
None
}
// Return whether the abort was successful. A successful abort leads to
// scheduling the transaction for re-execution and the higher transactions
// for validation during [finish_validation]. The scheduler ensures that only
// one failing validation per version can lead to a successful abort.
pub(crate) fn try_validation_abort(&self, tx_version: &TxVersion) -> bool {
let mut tx = index_mutex!(self.transactions_status, tx_version.tx_idx);
if tx.status == IncarnationStatus::Validated {
self.num_validated.fetch_sub(1, Ordering::Release);
}
let aborting = matches!(
tx.status,
IncarnationStatus::Executed | IncarnationStatus::Validated
);
if aborting {
tx.status = IncarnationStatus::Aborting;
}
aborting
}
// When there is a successful abort, schedule the transaction for re-execution
// and the higher transactions for validation. The re-execution task is returned
// for the aborted transaction.
pub(crate) fn finish_validation(&self, tx_version: &TxVersion, aborted: bool) -> Option<Task> {
if aborted {
self.set_ready_status(tx_version.tx_idx);
self.validation_idx
.fetch_min(tx_version.tx_idx + 1, Ordering::Release);
if self.execution_idx.load(Ordering::Acquire) > tx_version.tx_idx {
return self.try_execute(tx_version.tx_idx).map(Task::Execution);
}
} else {
let mut tx = index_mutex!(self.transactions_status, tx_version.tx_idx);
if tx.status == IncarnationStatus::Executed {
tx.status = IncarnationStatus::Validated;
self.num_validated.fetch_add(1, Ordering::Release);
}
}
None
}
}