-
Notifications
You must be signed in to change notification settings - Fork 37
/
ipc.rs
554 lines (492 loc) Β· 21.7 KB
/
ipc.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
// Copyright (C) 2019-2021 Pierre Krieger
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{
extrinsics::Extrinsics,
id_pool::IdPool,
module::Module,
scheduler::{
extrinsics::{self, ThreadAccessAccess as _},
vm,
},
InterfaceHash,
};
use alloc::vec::Vec;
use core::convert::TryFrom as _;
use crossbeam_queue::SegQueue;
use hashbrown::{hash_map::Entry, HashMap};
use redshirt_syscalls::{EncodedMessage, MessageId, Pid, ThreadId};
use spinning_top::Spinlock;
mod notifications_queue;
mod waiting_threads;
/// Handles scheduling processes and inter-process communications.
///
/// # State of messages
///
/// The possible states of a message are as follow:
///
/// - Generated by a program but not accepted yet. A [`CoreRunOutcome::InterfaceMessage`] is
/// emitted. The thread that has generated the message is now sleeping. If the message has the
/// "immediate-delivery" flag on, it can then be refused by calling
/// [`Core::reject_immediate_interface_message`] in which case the emitting thread is resumed
/// with an error.
///
/// - Accepted by calling [`Core::accept_interface_message`] on a not-accepted-yet message. The
/// thread that has emitted the message is resumed, and, if the message expects an answer, the
/// user is responsible for later answering the message with [`Core::answer_message`].
///
/// - Answer queued. After calling [`Core::answer_message`], the answer to the message is added
/// to the queue of notifications that the original emitter of the message can receive. At least
/// one thread that is sleeping waiting for notifications is resumed.
///
/// Note that when a program emits a message that doesn't need an answer, this message is assigned
/// a [`MessageId`] for API-related purposes. This [`MessageId`] isn't expected to ever reach a
/// program's user space. As soon as the message is accepted or refused, the [`MessageId`] is
/// discarded.
//
// # Implementation notes
//
// This struct synchronizes the following components in a lock-free way:
//
// - The underlying VMs, with processes and threads.
// - For each process, a list of answers waiting to be delivered.
// - For each process, a list of threads blocked waiting for answers and that we have failed to
// resume in the past.
// - A list of active messages waiting to be answered.
//
// While each of these components is updated atomically, there exists no synchronization between
// them. As such, the implementation heavily relies on the fact that message IDs, process IDs,
// and thread IDs are unique.
//
// For example, delivering a message to an interface consists in atomically looking for the process
// that handles this interface, then atomically delivering it. If, in parallel, that process has
// terminated but has not yet been unregistered as the handler of the interface, then we know it
// from the fact that he process ID is no longer valid. This wouldn't be possible if process IDs
// were reused.
//
pub struct Core<TExt: Extrinsics> {
/// Pool of identifiers where `MessageId`s are allocated.
id_pool: IdPool,
/// Queue of events to return in priority when `run` is called.
pending_events: SegQueue<CoreRunOutcome>,
/// List of running processes.
processes: extrinsics::ProcessesCollectionExtrinsics<Process, (), TExt>,
/// List of messages that have been emitted by a thread but haven't been accepted or refused
/// yet. Stores the emitter of the message.
pending_accept_messages:
Spinlock<HashMap<MessageId, (Pid, ThreadId), nohash_hasher::BuildNoHashHasher<u64>>>,
/// List of messages that have been emitted by a process but haven't been answered yet. Stores
/// the emitter of the message.
pending_answer_messages:
Spinlock<HashMap<MessageId, Pid, nohash_hasher::BuildNoHashHasher<u64>>>,
}
/// Prototype for a `Core` under construction.
pub struct CoreBuilder<TExt: Extrinsics> {
/// Builder for the [`processes`][Core::processes] field in [`Core`].
inner_builder: extrinsics::Builder<TExt>,
/// Randomness seed used to initialize [`Core::id_pool`].
seed: [u8; 32],
}
/// Event returned by [`Core::run`].
pub enum ExecuteOut<'a, TExt: Extrinsics> {
/// Event directly generated.
Direct(CoreRunOutcome),
/// Ready to execute a bit of a thread.
ReadyToRun(ReadyToRun<'a, TExt>),
}
impl<'a, TExt: Extrinsics> ExecuteOut<'a, TExt> {
pub fn or_run(self) -> Option<CoreRunOutcome> {
match self {
ExecuteOut::Direct(ev) => Some(ev),
ExecuteOut::ReadyToRun(run) => run.run(),
}
}
}
/// Ready to resume one of the threads of a process.
#[must_use]
pub struct ReadyToRun<'a, TExt: Extrinsics> {
core: &'a Core<TExt>,
inner: extrinsics::ReadyToRun<'a, Process, (), TExt>,
}
impl<'a, TExt: Extrinsics> ReadyToRun<'a, TExt> {
/// Performs the actual execution.
///
/// Returns `None` if the execution doesn't lead to any event in particular.
pub fn run(self) -> Option<CoreRunOutcome> {
let core = self.core;
self.inner.run().and_then(move |out| core.inner_event(out))
}
}
/// Outcome of calling [`run`](Core::run).
#[derive(Debug)]
pub enum CoreRunOutcome {
/// A program has stopped, either because the main function has stopped or a problem has
/// occurred.
ProgramFinished {
/// Id of the program that has stopped.
pid: Pid,
/// How the program ended. If `Ok`, it has gracefully terminated. If `Err`, something
/// bad happened.
// TODO: force Ok to i32?
// TODO: don't expose wasmi in error
outcome: Result<Option<crate::WasmValue>, wasmi::Trap>,
},
/// A process wants to emit a message on an interface.
///
/// If `immediate` is `true`, either [`Core::accept_interface_message`]
/// or [`Core::reject_immediate_interface_message`] should be called as soon as possible.
///
/// If `immediate` is `false`, [`Core::accept_interface_message`] must be called,
/// but this can be delayed indefinitely.
InterfaceMessage {
/// Id of the program that has emitted the message.
pid: Pid,
/// Identifier of the message that has been emitted.
///
/// > **Note**: A [`MessageId`] is always generated for API-related purposes, even when no
/// > answer is expected.
message_id: MessageId,
/// True if the message is expecting an answer.
needs_answer: bool,
/// True if the caller requires an immediate answer by calling either
/// [`Core::accept_interface_message`] or [`Core::reject_immediate_interface_message`].
immediate: bool,
/// Which interface the message has been emitted on.
interface: InterfaceHash,
},
}
/// Additional information about a process.
#[derive(Debug)]
struct Process {
/// Notifications available for retrieval by the process by calling `next_notification`.
notifications_queue: notifications_queue::NotificationsQueue,
/// List of threads that are frozen waiting for new notifications.
wait_notifications_threads: waiting_threads::WaitingThreads,
}
/// Access to a process within the core.
pub struct CoreProcess<'a, TExt: Extrinsics> {
/// Access to the process within the inner collection.
process: extrinsics::ProcAccess<'a, Process, (), TExt>,
}
impl<TExt: Extrinsics> Core<TExt> {
/// Run the core once.
pub async fn run<'a>(&'a self) -> ExecuteOut<'a, TExt> {
loop {
if let Some(ev) = self.run_inner().await {
break ev;
}
}
}
/// Same as [`Core::run`]. Returns `None` if no event should be returned and we should loop
/// again.
async fn run_inner<'a>(&'a self) -> Option<ExecuteOut<'a, TExt>> {
if let Some(ev) = self.pending_events.pop() {
return Some(ExecuteOut::Direct(ev));
}
// Note: we use a temporary `run_outcome` variable in order to solve weird borrowing
// issues. Feel free to try to remove it if you manage.
match self.processes.run().await {
extrinsics::ExecuteOut::Direct(event) => {
self.inner_event(event).map(ExecuteOut::Direct)
}
extrinsics::ExecuteOut::ReadyToRun(ready) => Some(ExecuteOut::ReadyToRun(ReadyToRun {
core: self,
inner: ready,
})),
}
}
fn inner_event(
&self,
run_outcome: extrinsics::RunOneOutcome<Process, (), TExt>,
) -> Option<CoreRunOutcome> {
match run_outcome {
extrinsics::RunOneOutcome::ProcessFinished { pid, outcome, .. } => {
Some(CoreRunOutcome::ProgramFinished { pid, outcome })
}
extrinsics::RunOneOutcome::ThreadFinished { .. } => {
// TODO: report
None
}
extrinsics::RunOneOutcome::ThreadWaitNotification(thread) => {
// A thread has asked for new incoming notifications.
// Immediately try to resume the thread in case a suitable notification is
// available.
//
// If no suitable notification is available, add the thread to a list for later.
//
// The process of checking whether any notification is available, and if not
// adding the process to a list, is subject to race conditions. It is possible for
// a notification to have arrived in-between the two steps. Additionally, it is
// possible for a notification to arrive between adding the thread to the list In order to solve this
// problem, we compare the total number of notifications pushed to ths process'
// queue before and after the operations. If it has changed, try wake up again.
let total_notifications_pushed_before = thread
.process_user_data()
.notifications_queue
.total_notifications_pushed();
if let Err(thread) = try_resume_notification_wait_thread(thread) {
// The thread couldn't be resumed.
let tid = thread.tid();
let process = thread.into_process();
// It is important for the lock to the thread to have been dropped at this
// point (i.e. `thread` is destroyed), otherwise entries to still-locked
// threads could be found in the list.
process.user_data().wait_notifications_threads.push(tid);
// Try resume the thread again if a notification has been pushed in-between.
let total_notifications_pushed_after = process
.user_data()
.notifications_queue
.total_notifications_pushed();
if total_notifications_pushed_before != total_notifications_pushed_after {
self.try_resume_notification_wait(process);
}
}
None
}
extrinsics::RunOneOutcome::ThreadEmitMessage(mut thread) => {
let emitter_pid = thread.pid();
let interface = thread.emit_interface().clone();
let needs_answer = thread.needs_answer();
let message_id = self.id_pool.assign();
self.pending_accept_messages
.lock()
.insert(message_id, (emitter_pid, thread.tid()));
Some(CoreRunOutcome::InterfaceMessage {
pid: emitter_pid,
message_id,
needs_answer,
immediate: !thread.allow_delay(),
interface,
})
}
extrinsics::RunOneOutcome::ThreadCancelMessage {
message_id,
process,
..
} => {
let mut pending_answer_messages = self.pending_answer_messages.lock();
if let Entry::Occupied(entry) = pending_answer_messages.entry(message_id) {
if *entry.get() == process.pid() {
entry.remove();
}
}
None
}
}
}
/// Returns an object granting access to a process, if it exists.
pub fn process_by_id(&self, pid: Pid) -> Option<CoreProcess<TExt>> {
let p = self.processes.process_by_id(pid)?;
Some(CoreProcess { process: p })
}
/// After [`CoreRunOutcome::InterfaceMessage`] is generated, use this method to accept the
/// message and resume the thread that is emitting the message.
///
/// If the message [expects an answer](`CoreRunOutcome::InterfaceMessage::needs_answer`), it
/// must later be answered with [`Core::answer_message`].
///
/// Returns `None` if the message doesn't exist or no longer exists, which can typically
/// happen if the program has been aborted in parallel.
pub fn accept_interface_message(&self, message_id: MessageId) -> Option<(Pid, EncodedMessage)> {
let (pid, tid) = self.pending_accept_messages.lock().remove(&message_id)?;
self.pending_answer_messages.lock().insert(message_id, pid);
match self.processes.interrupted_thread_by_id(tid).unwrap() {
extrinsics::ThreadAccess::EmitMessage(mut thread) => {
let message = if thread.needs_answer() {
thread.accept_emit(Some(message_id))
} else {
thread.accept_emit(None)
};
Some((pid, message))
}
_ => unreachable!(),
}
}
/// After [`CoreRunOutcome::InterfaceMessage`] is generated where `immediate` is true, use
/// this method to notify that the message cannot be accepted at the moment.
///
/// # Panic
///
/// Panics if [`CoreRunOutcome::InterfaceMessage::immediate`] was false.
/// Might panic if the message is in the wrong state.
///
pub fn reject_immediate_interface_message(&self, message_id: MessageId) {
let (_, tid) = match self.pending_accept_messages.lock().remove(&message_id) {
Some(v) => v,
None => return, // Process might have been killed in-between.
};
match self.processes.interrupted_thread_by_id(tid) {
Ok(extrinsics::ThreadAccess::EmitMessage(mut thread)) => {
assert!(!thread.allow_delay());
thread.refuse_emit();
}
Err(extrinsics::ThreadByIdErr::RunningOrDead) => {}
_ => unreachable!(),
}
}
/// Set the answer to a message previously passed to [`Core::accept_interface_message`].
///
/// This pushes a notification to the process.
pub fn answer_message(&self, message_id: MessageId, response: Result<EncodedMessage, ()>) {
let emitter_pid = match self.pending_answer_messages.lock().remove(&message_id) {
Some(pid) => pid,
None => {
// Should happen if and only if the process that emitted the message has been
// aborted. MessageIds are never reused, therefore guaranteeing that this answer
// cannot reach the wrong message by accident.
return;
}
};
if let Some(process) = self.processes.process_by_id(emitter_pid) {
process
.user_data()
.notifications_queue
.push(message_id, response);
self.try_resume_notification_wait(process);
} else {
// It is possible for the emitter of the message to have stopped or crashed, and we
// had not updated `active_messages` yet.
}
}
/// Start executing the module passed as parameter.
///
/// Each import of the [`Module`](crate::module::Module) is resolved.
pub fn execute(&self, module: &Module) -> Result<(CoreProcess<TExt>, ThreadId), vm::NewErr> {
let proc_metadata = Process {
notifications_queue: notifications_queue::NotificationsQueue::new(),
wait_notifications_threads: waiting_threads::WaitingThreads::new(),
};
let (process, main_tid) = self.processes.execute(module, proc_metadata, ())?;
Ok((CoreProcess { process }, main_tid))
}
/// Tries to resume all the threads of the process that are waiting for an notification.
fn try_resume_notification_wait(&self, process: extrinsics::ProcAccess<Process, (), TExt>) {
// The actual work being done here is actually quite complicated in order to ensure that
// each `ThreadId` is only accessed once at a time, but the exposed API is very simple.
for thread_access in process.user_data().wait_notifications_threads.access() {
let thread = match self
.processes
.interrupted_thread_by_id(thread_access.thread_id())
{
Ok(extrinsics::ThreadAccess::WaitNotification(thread)) => thread,
_ => unreachable!(),
};
if try_resume_notification_wait_thread(thread).is_ok() {
thread_access.remove();
}
}
}
}
impl<'a, TExt: Extrinsics> CoreProcess<'a, TExt> {
/// Returns the [`Pid`] of the process.
pub fn pid(&self) -> Pid {
self.process.pid()
}
/// Adds a new thread to the process, starting the function with the given index and passing
/// the given parameters.
pub fn start_thread(
self,
fn_index: u32,
params: Vec<crate::WasmValue>,
) -> Result<(), vm::StartErr> {
self.process.start_thread(fn_index, params, ())?;
Ok(())
}
/// Starts killing the process.
// TODO: more docs
pub fn abort(&self) {
self.process.abort();
}
}
impl<TExt: Extrinsics> CoreBuilder<TExt> {
/// Initializes a new [`CoreBuilder`] using the given random seed.
///
/// The seed is used in determine how [`Pid`]s and [`MessageId`]s are generated. The same
/// seed will result in the same sequence of [`Pid`]s and [`MessageId`]s.
pub fn with_seed(seed: [u8; 64]) -> CoreBuilder<TExt> {
CoreBuilder {
inner_builder: extrinsics::Builder::with_seed(
<[u8; 32]>::try_from(&seed[..32]).unwrap(),
),
seed: <[u8; 32]>::try_from(&seed[32..]).unwrap(),
}
}
/// Allocates a `Pid` that will not be used by any process.
///
/// > **Note**: As of the writing of this comment, this feature is only ever used to allocate
/// > `Pid`s that last forever. There is therefore no corresponding "unreserve_pid"
/// > method that frees such an allocated `Pid`. If there is ever a need to free
/// > these `Pid`s, such a method should be added.
pub fn reserve_pid(&mut self) -> Pid {
self.inner_builder.reserve_pid()
}
/// Turns the builder into a [`Core`].
pub fn build(self) -> Core<TExt> {
Core {
pending_events: SegQueue::new(),
processes: self.inner_builder.build(),
id_pool: IdPool::with_seed(self.seed),
pending_accept_messages: Spinlock::new(HashMap::default()),
pending_answer_messages: Spinlock::new(HashMap::default()),
}
}
}
/// If the given thread is waiting for a notification to arrive, checks the queue and tries to
/// resume said thread.
///
/// Returns back the thread within an `Err` if it couldn't be resumed.
fn try_resume_notification_wait_thread<TExt: Extrinsics>(
mut thread: extrinsics::ThreadWaitNotif<Process, (), TExt>,
) -> Result<(), extrinsics::ThreadWaitNotif<Process, (), TExt>> {
// Note that the code below is a bit weird and unelegant, but this is to bypass spurious
// borrowing errors.
let (entry_size, index_and_notif) = {
// Try to find a notification in the queue that matches something the user is waiting for.
// TODO: don't alloc a Vec
let messages = thread.wait_entries().collect::<Vec<_>>();
let entry = thread
.process_user_data()
.notifications_queue
.find(&messages);
let entry = match entry {
Some(e) => e,
None => {
// No notification found.
drop(entry);
if !thread.block() {
thread.resume_no_notification();
return Ok(());
} else {
return Err(thread);
}
}
};
let entry_size = entry.size();
let index_and_notif = if entry_size <= thread.allowed_notification_size() {
// Pop the notification from the queue for delivery.
let index_in_msg_ids = entry.index_in_msg_ids();
let notification = entry.extract();
Some((index_in_msg_ids, notification))
} else {
None
};
(entry_size, index_and_notif)
};
if let Some((index_in_msg_ids, notification)) = index_and_notif {
thread.resume_notification(index_in_msg_ids, notification)
} else {
thread.resume_notification_too_big(entry_size)
}
Ok(())
}