Skip to content

Commit

Permalink
refactor: per-worker resource table (denoland#3306)
Browse files Browse the repository at this point in the history
- removes global `RESOURCE_TABLE` - resource tables are now created per `Worker`
  in `State`
- renames `CliResource` to `StreamResource` and moves all logic related
  to it to `cli/ops/io.rs`
- removes `cli/resources.rs`
- adds `state` argument to `op_read` and `op_write` and consequently adds
  `stateful_minimal_op` to `State`
- IMPORTANT NOTE: workers don't have access to process stdio - this is
  caused by fact that dropping worker would close stdout for process
  (because it's constructed from raw handle, which closes underlying file
  descriptor on drop)
  • Loading branch information
bartlomieju committed Dec 28, 2019
1 parent f75ac22 commit 9d22234
Show file tree
Hide file tree
Showing 14 changed files with 358 additions and 336 deletions.
11 changes: 10 additions & 1 deletion cli/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ pub mod permissions;
mod progress;
mod repl;
pub mod resolve_addr;
pub mod resources;
mod shell;
mod signal;
pub mod source_maps;
Expand All @@ -57,6 +56,7 @@ pub mod worker;
use crate::deno_error::js_check;
use crate::deno_error::print_err_and_exit;
use crate::global_state::ThreadSafeGlobalState;
use crate::ops::io::get_stdio;
use crate::progress::Progress;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
Expand Down Expand Up @@ -128,6 +128,15 @@ fn create_worker_and_state(
.map_err(deno_error::print_err_and_exit)
.unwrap();

let state_ = state.clone();
{
let mut resource_table = state_.lock_resource_table();
let (stdin, stdout, stderr) = get_stdio();
resource_table.add("stdin", Box::new(stdin));
resource_table.add("stdout", Box::new(stdout));
resource_table.add("stderr", Box::new(stderr));
}

let worker = Worker::new(
"main".to_string(),
startup_data::deno_isolate_init(),
Expand Down
8 changes: 4 additions & 4 deletions cli/ops/dispatch_minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use deno::PinnedBuf;
use futures::Future;

pub type MinimalOp = dyn Future<Item = i32, Error = ErrBox> + Send;
pub type Dispatcher = fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>;

#[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side.
Expand Down Expand Up @@ -112,9 +111,10 @@ fn test_parse_min_record() {
assert_eq!(parse_min_record(&buf), None);
}

pub fn minimal_op(
d: Dispatcher,
) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp {
pub fn minimal_op<D>(d: D) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp
where
D: Fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>,
{
move |control: &[u8], zero_copy: Option<PinnedBuf>| {
let mut record = match parse_min_record(control) {
Some(r) => r,
Expand Down
9 changes: 6 additions & 3 deletions cli/ops/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use crate::http_body::HttpBody;
use crate::http_util::get_client;
use crate::ops::json_op;
use crate::resources;
use crate::state::ThreadSafeState;
use deno::*;
use http::header::HeaderName;
Expand Down Expand Up @@ -54,15 +55,17 @@ pub fn op_fetch(
request = request.header(name, v);
}
debug!("Before fetch {}", url);
let state_ = state.clone();
let future = request.send().map_err(ErrBox::from).and_then(move |res| {
let status = res.status();
let mut res_headers = Vec::new();
for (key, val) in res.headers().iter() {
res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
}

let body = res.into_body();
let rid = resources::add_reqwest_body(body);
let body = HttpBody::from(res.into_body());
let mut table = state_.lock_resource_table();
let rid = table.add("httpBody", Box::new(StreamResource::HttpBody(body)));

let json_res = json!({
"bodyRid": rid,
Expand Down
28 changes: 16 additions & 12 deletions cli/ops/files.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
use crate::fs as deno_fs;
use crate::ops::json_op;
use crate::resources;
use crate::resources::CliResource;
use crate::state::ThreadSafeState;
use deno::*;
use futures::Future;
Expand Down Expand Up @@ -38,7 +37,7 @@ fn op_open(
let args: OpenArgs = serde_json::from_value(args)?;
let (filename, filename_) = deno_fs::resolve_from_cwd(&args.filename)?;
let mode = args.mode.as_ref();

let state_ = state.clone();
let mut open_options = tokio::fs::OpenOptions::new();

match mode {
Expand Down Expand Up @@ -91,7 +90,8 @@ fn op_open(
let is_sync = args.promise_id.is_none();
let op = open_options.open(filename).map_err(ErrBox::from).and_then(
move |fs_file| {
let rid = resources::add_fs_file(fs_file);
let mut table = state_.lock_resource_table();
let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file)));
futures::future::ok(json!(rid))
},
);
Expand All @@ -110,35 +110,35 @@ struct CloseArgs {
}

fn op_close(
_state: &ThreadSafeState,
state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: CloseArgs = serde_json::from_value(args)?;

let mut table = resources::lock_resource_table();
let mut table = state.lock_resource_table();
table.close(args.rid as u32).ok_or_else(bad_resource)?;
Ok(JsonOp::Sync(json!({})))
}

#[derive(Debug)]
pub struct SeekFuture {
seek_from: SeekFrom,
rid: ResourceId,
state: ThreadSafeState,
}

impl Future for SeekFuture {
type Item = u64;
type Error = ErrBox;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut table = resources::lock_resource_table();
let mut table = self.state.lock_resource_table();
let resource = table
.get_mut::<CliResource>(self.rid)
.get_mut::<StreamResource>(self.rid)
.ok_or_else(bad_resource)?;

let tokio_file = match resource {
CliResource::FsFile(ref mut file) => file,
StreamResource::FsFile(ref mut file) => file,
_ => return Err(bad_resource()),
};

Expand All @@ -156,7 +156,7 @@ struct SeekArgs {
}

fn op_seek(
_state: &ThreadSafeState,
state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
Expand All @@ -177,7 +177,11 @@ fn op_seek(
}
};

let fut = SeekFuture { seek_from, rid };
let fut = SeekFuture {
state: state.clone(),
seek_from,
rid,
};

let op = fut.and_then(move |_| futures::future::ok(json!({})));
if args.promise_id.is_none() {
Expand Down
Loading

0 comments on commit 9d22234

Please sign in to comment.