From f4357f0ff9d39411f22504fcc20db6bd5dec6ddb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Fri, 9 Oct 2020 19:08:10 +0200 Subject: [PATCH] refactor: Worker is not a Future (#7895) This commit rewrites deno::Worker to not implement Future trait. Instead there are two separate methods: - Worker::poll_event_loop() - does single tick of event loop - Worker::run_event_loop() - runs event loop to completion Additionally some cleanup to Worker's field visibility was done. --- cli/coverage.rs | 4 +- cli/main.rs | 22 ++-- cli/ops/worker_host.rs | 3 +- cli/repl.rs | 11 +- cli/worker.rs | 235 ++++++++++++++++++++--------------------- 5 files changed, 131 insertions(+), 144 deletions(-) diff --git a/cli/coverage.rs b/cli/coverage.rs index 97344b589bccda..85ba3f55929e11 100644 --- a/cli/coverage.rs +++ b/cli/coverage.rs @@ -1,7 +1,6 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use crate::colors; -use crate::inspector::DenoInspector; use crate::inspector::InspectorSession; use deno_core::error::AnyError; use deno_core::serde_json; @@ -14,8 +13,7 @@ pub struct CoverageCollector { } impl CoverageCollector { - pub fn new(inspector_ptr: *mut DenoInspector) -> Self { - let session = InspectorSession::new(inspector_ptr); + pub fn new(session: Box) -> Self { Self { session } } diff --git a/cli/main.rs b/cli/main.rs index fa755b783d4d09..b6b92d7ba9f44f 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -275,7 +275,7 @@ async fn eval_command( debug!("main_module {}", &main_module); worker.execute_module(&main_module).await?; worker.execute("window.dispatchEvent(new Event('load'))")?; - (&mut *worker).await?; + worker.run_event_loop().await?; worker.execute("window.dispatchEvent(new Event('unload'))")?; Ok(()) } @@ -423,7 +423,7 @@ async fn run_repl(flags: Flags) -> Result<(), AnyError> { ModuleSpecifier::resolve_url_or_path("./$deno$repl.ts").unwrap(); let global_state = GlobalState::new(flags)?; let mut worker = MainWorker::new(&global_state, main_module.clone()); - (&mut *worker).await?; + worker.run_event_loop().await?; repl::run(&global_state, worker).await } @@ -454,7 +454,7 @@ async fn run_from_stdin(flags: Flags) -> Result<(), AnyError> { debug!("main_module {}", main_module); worker.execute_module(&main_module).await?; worker.execute("window.dispatchEvent(new Event('load'))")?; - (&mut *worker).await?; + worker.run_event_loop().await?; worker.execute("window.dispatchEvent(new Event('unload'))")?; Ok(()) } @@ -500,7 +500,7 @@ async fn run_with_watch(flags: Flags, script: String) -> Result<(), AnyError> { debug!("main_module {}", main_module); worker.execute_module(&main_module).await?; worker.execute("window.dispatchEvent(new Event('load'))")?; - (&mut *worker).await?; + worker.run_event_loop().await?; worker.execute("window.dispatchEvent(new Event('unload'))")?; Ok(()) } @@ -525,7 +525,7 @@ async fn run_command(flags: Flags, script: String) -> Result<(), AnyError> { debug!("main_module {}", main_module); worker.execute_module(&main_module).await?; worker.execute("window.dispatchEvent(new Event('load'))")?; - (&mut *worker).await?; + worker.run_event_loop().await?; worker.execute("window.dispatchEvent(new Event('unload'))")?; Ok(()) } @@ -578,12 +578,8 @@ async fn test_command( .save_source_file_in_cache(&main_module, source_file); let mut maybe_coverage_collector = if flags.coverage { - let inspector = worker - .inspector - .as_mut() - .expect("Inspector is not created."); - - let mut coverage_collector = CoverageCollector::new(&mut **inspector); + let session = worker.create_inspector_session(); + let mut coverage_collector = CoverageCollector::new(session); coverage_collector.start_collecting().await?; Some(coverage_collector) @@ -594,9 +590,9 @@ async fn test_command( let execute_result = worker.execute_module(&main_module).await; execute_result?; worker.execute("window.dispatchEvent(new Event('load'))")?; - (&mut *worker).await?; + worker.run_event_loop().await?; worker.execute("window.dispatchEvent(new Event('unload'))")?; - (&mut *worker).await?; + worker.run_event_loop().await?; if let Some(coverage_collector) = maybe_coverage_collector.as_mut() { let coverages = coverage_collector.collect().await?; diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index 6d74bb9f4d3e98..0b36e2c470b68a 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -173,7 +173,8 @@ fn run_worker_thread( // TODO(bartlomieju): this thread should return result of event loop // that means that we should store JoinHandle to thread to ensure // that it actually terminates. - rt.block_on(worker).expect("Panic in event loop"); + rt.block_on(worker.run_event_loop()) + .expect("Panic in event loop"); debug!("Worker thread shuts down {}", &name); })?; diff --git a/cli/repl.rs b/cli/repl.rs index fbc37fac599ada..c5107d5af6dc8a 100644 --- a/cli/repl.rs +++ b/cli/repl.rs @@ -47,7 +47,7 @@ async fn post_message_and_poll( return result } - _ = &mut *worker => { + _ = worker.run_event_loop() => { // A zero delay is long enough to yield the thread in order to prevent the loop from // running hot for messages that are taking longer to resolve like for example an // evaluation of top level await. @@ -75,7 +75,7 @@ async fn read_line_and_poll( result = &mut line => { return result.unwrap(); } - _ = &mut *worker, if poll_worker => { + _ = worker.run_event_loop(), if poll_worker => { poll_worker = false; } _ = &mut timeout => { @@ -92,12 +92,7 @@ pub async fn run( // Our inspector is unable to default to the default context id so we have to specify it here. let context_id: u32 = 1; - let inspector = worker - .inspector - .as_mut() - .expect("Inspector is not created."); - - let mut session = InspectorSession::new(&mut **inspector); + let mut session = worker.create_inspector_session(); let history_file = global_state.dir.root.join("deno_history.txt"); diff --git a/cli/worker.rs b/cli/worker.rs index ea9362a6b4dfc6..4af3638256c829 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -3,6 +3,7 @@ use crate::fmt_errors::JsError; use crate::global_state::GlobalState; use crate::inspector::DenoInspector; +use crate::inspector::InspectorSession; use crate::js; use crate::metrics::Metrics; use crate::ops; @@ -11,6 +12,7 @@ use crate::permissions::Permissions; use crate::state::CliModuleLoader; use deno_core::error::AnyError; use deno_core::futures::channel::mpsc; +use deno_core::futures::future::poll_fn; use deno_core::futures::future::FutureExt; use deno_core::futures::stream::StreamExt; use deno_core::futures::task::AtomicWaker; @@ -22,10 +24,8 @@ use deno_core::ModuleSpecifier; use deno_core::RuntimeOptions; use deno_core::Snapshot; use std::env; -use std::future::Future; use std::ops::Deref; use std::ops::DerefMut; -use std::pin::Pin; use std::rc::Rc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; @@ -95,13 +95,15 @@ fn create_channels() -> (WorkerChannelsInternal, WorkerHandle) { /// - `MainWorker` /// - `WebWorker` pub struct Worker { - pub name: String, - pub js_runtime: JsRuntime, - pub inspector: Option>, - pub waker: AtomicWaker, - pub(crate) internal_channels: WorkerChannelsInternal, external_channels: WorkerHandle, + inspector: Option>, + // Following fields are pub because they are accessed + // when creating a new WebWorker instance. + pub(crate) internal_channels: WorkerChannelsInternal, + pub(crate) js_runtime: JsRuntime, + pub(crate) name: String, should_break_on_first_statement: bool, + waker: AtomicWaker, } impl Worker { @@ -147,13 +149,13 @@ impl Worker { let (internal_channels, external_channels) = create_channels(); Self { - name, - js_runtime, + external_channels, inspector, - waker: AtomicWaker::new(), internal_channels, - external_channels, + js_runtime, + name, should_break_on_first_statement, + waker: AtomicWaker::new(), } } @@ -221,39 +223,35 @@ impl Worker { .wait_for_session_and_break_on_next_statement() } } -} - -impl Drop for Worker { - fn drop(&mut self) { - // The Isolate object must outlive the Inspector object, but this is - // currently not enforced by the type system. - self.inspector.take(); - } -} -impl Future for Worker { - type Output = Result<(), AnyError>; + /// Create new inspector session. This function panics if Worker + /// was not configured to create inspector. + pub fn create_inspector_session(&mut self) -> Box { + let inspector = self.inspector.as_mut().unwrap(); - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let inner = self.get_mut(); + InspectorSession::new(&mut **inspector) + } + pub fn poll_event_loop( + &mut self, + cx: &mut Context, + ) -> Poll> { // We always poll the inspector if it exists. - let _ = inner.inspector.as_mut().map(|i| i.poll_unpin(cx)); - inner.waker.register(cx.waker()); - inner.js_runtime.poll_event_loop(cx) + let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx)); + self.waker.register(cx.waker()); + self.js_runtime.poll_event_loop(cx) } -} -impl Deref for Worker { - type Target = JsRuntime; - fn deref(&self) -> &Self::Target { - &self.js_runtime + pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { + poll_fn(|cx| self.poll_event_loop(cx)).await } } -impl DerefMut for Worker { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.js_runtime +impl Drop for Worker { + fn drop(&mut self) { + // The Isolate object must outlive the Inspector object, but this is + // currently not enforced by the type system. + self.inspector.take(); } } @@ -278,45 +276,46 @@ impl MainWorker { loader, true, ); + let js_runtime = &mut worker.js_runtime; { // All ops registered in this function depend on these { - let op_state = worker.op_state(); + let op_state = js_runtime.op_state(); let mut op_state = op_state.borrow_mut(); op_state.put::(Default::default()); op_state.put::>(global_state.clone()); op_state.put::(global_state.permissions.clone()); } - ops::runtime::init(&mut worker, main_module); - ops::fetch::init(&mut worker, global_state.flags.ca_file.as_deref()); - ops::timers::init(&mut worker); - ops::worker_host::init(&mut worker); - ops::random::init(&mut worker, global_state.flags.seed); - ops::reg_json_sync(&mut worker, "op_close", deno_core::op_close); - ops::reg_json_sync(&mut worker, "op_resources", deno_core::op_resources); + ops::runtime::init(js_runtime, main_module); + ops::fetch::init(js_runtime, global_state.flags.ca_file.as_deref()); + ops::timers::init(js_runtime); + ops::worker_host::init(js_runtime); + ops::random::init(js_runtime, global_state.flags.seed); + ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close); + ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources); ops::reg_json_sync( - &mut worker, + js_runtime, "op_domain_to_ascii", deno_web::op_domain_to_ascii, ); - ops::errors::init(&mut worker); - ops::fs_events::init(&mut worker); - ops::fs::init(&mut worker); - ops::io::init(&mut worker); - ops::net::init(&mut worker); - ops::os::init(&mut worker); - ops::permissions::init(&mut worker); - ops::plugin::init(&mut worker); - ops::process::init(&mut worker); - ops::runtime_compiler::init(&mut worker); - ops::signal::init(&mut worker); - ops::tls::init(&mut worker); - ops::tty::init(&mut worker); - ops::websocket::init(&mut worker); + ops::errors::init(js_runtime); + ops::fs_events::init(js_runtime); + ops::fs::init(js_runtime); + ops::io::init(js_runtime); + ops::net::init(js_runtime); + ops::os::init(js_runtime); + ops::permissions::init(js_runtime); + ops::plugin::init(js_runtime); + ops::process::init(js_runtime); + ops::runtime_compiler::init(js_runtime); + ops::signal::init(js_runtime); + ops::tls::init(js_runtime); + ops::tty::init(js_runtime); + ops::websocket::init(js_runtime); } { - let op_state = worker.op_state(); + let op_state = js_runtime.op_state(); let mut op_state = op_state.borrow_mut(); let t = &mut op_state.resource_table; let (stdin, stdout, stderr) = get_stdio(); @@ -449,49 +448,45 @@ impl WebWorker { { let handle = web_worker.thread_safe_handle(); let sender = web_worker.worker.internal_channels.sender.clone(); - + let js_runtime = &mut web_worker.js_runtime; // All ops registered in this function depend on these { - let op_state = web_worker.op_state(); + let op_state = js_runtime.op_state(); let mut op_state = op_state.borrow_mut(); op_state.put::(Default::default()); op_state.put::>(global_state.clone()); op_state.put::(permissions); } - ops::web_worker::init(&mut web_worker, sender, handle); - ops::runtime::init(&mut web_worker, main_module); - ops::fetch::init(&mut web_worker, global_state.flags.ca_file.as_deref()); - ops::timers::init(&mut web_worker); - ops::worker_host::init(&mut web_worker); - ops::reg_json_sync(&mut web_worker, "op_close", deno_core::op_close); - ops::reg_json_sync( - &mut web_worker, - "op_resources", - deno_core::op_resources, - ); + ops::web_worker::init(js_runtime, sender, handle); + ops::runtime::init(js_runtime, main_module); + ops::fetch::init(js_runtime, global_state.flags.ca_file.as_deref()); + ops::timers::init(js_runtime); + ops::worker_host::init(js_runtime); + ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close); + ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources); ops::reg_json_sync( - &mut web_worker, + js_runtime, "op_domain_to_ascii", deno_web::op_domain_to_ascii, ); - ops::errors::init(&mut web_worker); - ops::io::init(&mut web_worker); - ops::websocket::init(&mut web_worker); + ops::errors::init(js_runtime); + ops::io::init(js_runtime); + ops::websocket::init(js_runtime); if has_deno_namespace { - ops::fs_events::init(&mut web_worker); - ops::fs::init(&mut web_worker); - ops::net::init(&mut web_worker); - ops::os::init(&mut web_worker); - ops::permissions::init(&mut web_worker); - ops::plugin::init(&mut web_worker); - ops::process::init(&mut web_worker); - ops::random::init(&mut web_worker, global_state.flags.seed); - ops::runtime_compiler::init(&mut web_worker); - ops::signal::init(&mut web_worker); - ops::tls::init(&mut web_worker); - ops::tty::init(&mut web_worker); + ops::fs_events::init(js_runtime); + ops::fs::init(js_runtime); + ops::net::init(js_runtime); + ops::os::init(js_runtime); + ops::permissions::init(js_runtime); + ops::plugin::init(js_runtime); + ops::process::init(js_runtime); + ops::random::init(js_runtime, global_state.flags.seed); + ops::runtime_compiler::init(js_runtime); + ops::signal::init(js_runtime); + ops::tls::init(js_runtime); + ops::tty::init(js_runtime); } } @@ -504,38 +499,27 @@ impl WebWorker { pub fn thread_safe_handle(&self) -> WebWorkerHandle { self.handle.clone() } -} -impl Deref for WebWorker { - type Target = Worker; - fn deref(&self) -> &Self::Target { - &self.worker + pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { + poll_fn(|cx| self.poll_event_loop(cx)).await } -} - -impl DerefMut for WebWorker { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.worker - } -} -impl Future for WebWorker { - type Output = Result<(), AnyError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let inner = self.get_mut(); - let worker = &mut inner.worker; + pub fn poll_event_loop( + &mut self, + cx: &mut Context, + ) -> Poll> { + let worker = &mut self.worker; - let terminated = inner.handle.terminated.load(Ordering::Relaxed); + let terminated = self.handle.terminated.load(Ordering::Relaxed); if terminated { return Poll::Ready(Ok(())); } - if !inner.event_loop_idle { - match worker.poll_unpin(cx) { + if !self.event_loop_idle { + match worker.poll_event_loop(cx) { Poll::Ready(r) => { - let terminated = inner.handle.terminated.load(Ordering::Relaxed); + let terminated = self.handle.terminated.load(Ordering::Relaxed); if terminated { return Poll::Ready(Ok(())); } @@ -546,13 +530,13 @@ impl Future for WebWorker { .try_send(WorkerEvent::Error(e)) .expect("Failed to post message to host"); } - inner.event_loop_idle = true; + self.event_loop_idle = true; } Poll::Pending => {} } } - if let Poll::Ready(r) = inner.terminate_rx.poll_next_unpin(cx) { + if let Poll::Ready(r) = self.terminate_rx.poll_next_unpin(cx) { // terminate_rx should never be closed assert!(r.is_some()); return Poll::Ready(Ok(())); @@ -569,7 +553,7 @@ impl Future for WebWorker { if let Err(e) = worker.execute(&script) { // If execution was terminated during message callback then // just ignore it - if inner.handle.terminated.load(Ordering::Relaxed) { + if self.handle.terminated.load(Ordering::Relaxed) { return Poll::Ready(Ok(())); } @@ -581,7 +565,7 @@ impl Future for WebWorker { } // Let event loop be polled again - inner.event_loop_idle = false; + self.event_loop_idle = false; worker.waker.wake(); } None => unreachable!(), @@ -592,6 +576,19 @@ impl Future for WebWorker { } } +impl Deref for WebWorker { + type Target = Worker; + fn deref(&self) -> &Self::Target { + &self.worker + } +} + +impl DerefMut for WebWorker { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.worker + } +} + #[cfg(test)] mod tests { use super::*; @@ -628,7 +625,7 @@ mod tests { if let Err(err) = result { eprintln!("execute_mod err {:?}", err); } - if let Err(e) = (&mut *worker).await { + if let Err(e) = worker.run_event_loop().await { panic!("Future got unexpected error: {:?}", e); } } @@ -646,7 +643,7 @@ mod tests { if let Err(err) = result { eprintln!("execute_mod err {:?}", err); } - if let Err(e) = (&mut *worker).await { + if let Err(e) = worker.run_event_loop().await { panic!("Future got unexpected error: {:?}", e); } } @@ -665,7 +662,7 @@ mod tests { if let Err(err) = result { eprintln!("execute_mod err {:?}", err); } - if let Err(e) = (&mut *worker).await { + if let Err(e) = worker.run_event_loop().await { panic!("Future got unexpected error: {:?}", e); } } @@ -733,7 +730,7 @@ mod tests { worker.execute(source).unwrap(); let handle = worker.thread_safe_handle(); handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker); + let r = tokio_util::run_basic(worker.run_event_loop()); assert!(r.is_ok()) }); @@ -780,7 +777,7 @@ mod tests { worker.execute("onmessage = () => { close(); }").unwrap(); let handle = worker.thread_safe_handle(); handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker); + let r = tokio_util::run_basic(worker.run_event_loop()); assert!(r.is_ok()) });