Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: per-worker resource table #3306

Merged
merged 6 commits into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion cli/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ pub mod permissions;
mod progress;
mod repl;
pub mod resolve_addr;
pub mod resources;
mod shell;
mod signal;
pub mod source_maps;
Expand All @@ -57,6 +56,7 @@ pub mod worker;
use crate::deno_error::js_check;
use crate::deno_error::print_err_and_exit;
use crate::global_state::ThreadSafeGlobalState;
use crate::ops::io::get_stdio;
use crate::progress::Progress;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
Expand Down Expand Up @@ -128,6 +128,15 @@ fn create_worker_and_state(
.map_err(deno_error::print_err_and_exit)
.unwrap();

let state_ = state.clone();
{
let mut resource_table = state_.lock_resource_table();
let (stdin, stdout, stderr) = get_stdio();
resource_table.add("stdin", Box::new(stdin));
resource_table.add("stdout", Box::new(stdout));
resource_table.add("stderr", Box::new(stderr));
}

let worker = Worker::new(
"main".to_string(),
startup_data::deno_isolate_init(),
Expand Down
8 changes: 4 additions & 4 deletions cli/ops/dispatch_minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use deno::PinnedBuf;
use futures::Future;

pub type MinimalOp = dyn Future<Item = i32, Error = ErrBox> + Send;
pub type Dispatcher = fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>;

#[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side.
Expand Down Expand Up @@ -112,9 +111,10 @@ fn test_parse_min_record() {
assert_eq!(parse_min_record(&buf), None);
}

pub fn minimal_op(
d: Dispatcher,
) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp {
pub fn minimal_op<D>(d: D) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp
where
D: Fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>,
{
move |control: &[u8], zero_copy: Option<PinnedBuf>| {
let mut record = match parse_min_record(control) {
Some(r) => r,
Expand Down
9 changes: 6 additions & 3 deletions cli/ops/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use crate::http_body::HttpBody;
use crate::http_util::get_client;
use crate::ops::json_op;
use crate::resources;
use crate::state::ThreadSafeState;
use deno::*;
use http::header::HeaderName;
Expand Down Expand Up @@ -54,15 +55,17 @@ pub fn op_fetch(
request = request.header(name, v);
}
debug!("Before fetch {}", url);
let state_ = state.clone();
let future = request.send().map_err(ErrBox::from).and_then(move |res| {
let status = res.status();
let mut res_headers = Vec::new();
for (key, val) in res.headers().iter() {
res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
}

let body = res.into_body();
let rid = resources::add_reqwest_body(body);
let body = HttpBody::from(res.into_body());
let mut table = state_.lock_resource_table();
let rid = table.add("httpBody", Box::new(StreamResource::HttpBody(body)));

let json_res = json!({
"bodyRid": rid,
Expand Down
28 changes: 16 additions & 12 deletions cli/ops/files.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
use crate::fs as deno_fs;
use crate::ops::json_op;
use crate::resources;
use crate::resources::CliResource;
use crate::state::ThreadSafeState;
use deno::*;
use futures::Future;
Expand Down Expand Up @@ -38,7 +37,7 @@ fn op_open(
let args: OpenArgs = serde_json::from_value(args)?;
let (filename, filename_) = deno_fs::resolve_from_cwd(&args.filename)?;
let mode = args.mode.as_ref();

let state_ = state.clone();
let mut open_options = tokio::fs::OpenOptions::new();

match mode {
Expand Down Expand Up @@ -91,7 +90,8 @@ fn op_open(
let is_sync = args.promise_id.is_none();
let op = open_options.open(filename).map_err(ErrBox::from).and_then(
move |fs_file| {
let rid = resources::add_fs_file(fs_file);
let mut table = state_.lock_resource_table();
let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file)));
futures::future::ok(json!(rid))
},
);
Expand All @@ -110,35 +110,35 @@ struct CloseArgs {
}

fn op_close(
_state: &ThreadSafeState,
state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: CloseArgs = serde_json::from_value(args)?;

let mut table = resources::lock_resource_table();
let mut table = state.lock_resource_table();
table.close(args.rid as u32).ok_or_else(bad_resource)?;
Ok(JsonOp::Sync(json!({})))
}

#[derive(Debug)]
pub struct SeekFuture {
seek_from: SeekFrom,
rid: ResourceId,
state: ThreadSafeState,
}

impl Future for SeekFuture {
type Item = u64;
type Error = ErrBox;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut table = resources::lock_resource_table();
let mut table = self.state.lock_resource_table();
let resource = table
.get_mut::<CliResource>(self.rid)
.get_mut::<StreamResource>(self.rid)
.ok_or_else(bad_resource)?;

let tokio_file = match resource {
CliResource::FsFile(ref mut file) => file,
StreamResource::FsFile(ref mut file) => file,
_ => return Err(bad_resource()),
};

Expand All @@ -156,7 +156,7 @@ struct SeekArgs {
}

fn op_seek(
_state: &ThreadSafeState,
state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
Expand All @@ -177,7 +177,11 @@ fn op_seek(
}
};

let fut = SeekFuture { seek_from, rid };
let fut = SeekFuture {
state: state.clone(),
seek_from,
rid,
};

let op = fut.and_then(move |_| futures::future::ok(json!({})));
if args.promise_id.is_none() {
Expand Down
Loading