From 5a8c87f7ef36a0b690c6476562f39ba9259c1817 Mon Sep 17 00:00:00 2001 From: Hanaasagi Date: Sat, 22 Apr 2023 22:57:08 +0900 Subject: [PATCH 01/12] fix(repl): fix repl server reading data logic --- src/dummy.rs | 47 ++++++++++++++++++++++++++------------ src/scripts/repl_server.py | 17 ++++++++++---- 2 files changed, 44 insertions(+), 20 deletions(-) diff --git a/src/dummy.rs b/src/dummy.rs index 6f48fb82f..1317bd42c 100644 --- a/src/dummy.rs +++ b/src/dummy.rs @@ -200,24 +200,41 @@ impl DummyVM { fn read(&mut self) -> Result { let mut buf = [0; 1024]; - match self.stream.as_mut().unwrap().read(&mut buf) { - Result::Ok(n) => { - let s = std::str::from_utf8(&buf[..n]) - .expect("failed to parse the response, maybe the output is too long"); - match s { - "[Exception] SystemExit" => Err(EvalErrors::from(EvalError::system_exit())), - "[Initialize]" => { - self.compiler.initialize_generator(); - self.read() - } - _ => Ok(s.to_string()), + let mut size = 0; + while size < 2 { + match self.stream.as_mut().unwrap().read(&mut buf[size..]) { + Result::Ok(n) => size += n, + Result::Err(err) => { + self.finish(); + eprintln!("Read error: {err}"); + process::exit(1); } } - Result::Err(err) => { - self.finish(); - eprintln!("Read error: {err}"); - process::exit(1); + } + + let data_len = u16::from_be_bytes(buf[..2].try_into().unwrap()) as usize; + + while size < 2 + data_len { + match self.stream.as_mut().unwrap().read(&mut buf[size..]) { + Result::Ok(n) => size += n, + Result::Err(err) => { + self.finish(); + eprintln!("Read error: {err}"); + process::exit(1); + } } } + + let s = std::str::from_utf8(&buf[2..size]) + .expect("failed to parse the response, maybe the output is too long"); + + if s.starts_with("[Exception] SystemExit") { + Err(EvalErrors::from(EvalError::system_exit())) + } else if s.starts_with("[Initialize]") { + self.compiler.initialize_generator(); + Ok(s["[Initialize]".len()..].to_string()) + } else { + Ok(s.to_string()) + } } } diff --git a/src/scripts/repl_server.py b/src/scripts/repl_server.py index e7f0d7bc5..994ff0271 100644 --- a/src/scripts/repl_server.py +++ b/src/scripts/repl_server.py @@ -15,18 +15,24 @@ __already_loaded = False __ctx = {'__importlib': __importlib} +def __encode(s): + s_bytes = s.encode() + s_len = len(s_bytes) + return s_len.to_bytes(2, 'big') + s_bytes + while True: try: __order = __client_socket.recv(1024).decode() except ConnectionResetError: # when the server was crashed break if __order == 'quit' or __order == 'exit': # when the server was closed successfully - __client_socket.send('closed'.encode()) + __client_socket.send(__encode('closed')) break elif __order == 'load': __sys.stdout = __io.StringIO() __res = '' __exc = '' + __buf = [] try: if __already_loaded: # __MODULE__ will be replaced with module name @@ -35,7 +41,7 @@ __res = str(exec('import __MODULE__', __ctx)) __already_loaded = True except SystemExit: - __client_socket.send('[Exception] SystemExit'.encode()) + __buf.append('[Exception] SystemExit') continue except Exception as e: try: @@ -44,15 +50,16 @@ excs = __traceback.format_exception_only(e.__class__, e) __exc = ''.join(excs).rstrip() __traceback.clear_frames(e.__traceback__) - __client_socket.send('[Initialize]'.encode()) + __buf.append('[Initialize]') __out = __sys.stdout.getvalue()[:-1] # assert not(__exc and __res) if __exc or __res: __out += '\n' __res = __out + __exc + __res - __client_socket.send(__res.encode()) + __buf.append(__res) + __client_socket.send(__encode(''.join(__buf))) else: - __client_socket.send('unknown operation'.encode()) + __client_socket.send(__encode('unknown operation')) __client_socket.close() __server_socket.close() From 7d2339308d95b563da692ff4dfd1bf414dd391e1 Mon Sep 17 00:00:00 2001 From: Hanaasagi Date: Sat, 22 Apr 2023 23:07:27 +0900 Subject: [PATCH 02/12] fix(repl): fix reading server-side data --- src/dummy.rs | 43 ++++++++++++++++++++------------------ src/scripts/repl_server.py | 1 + 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/src/dummy.rs b/src/dummy.rs index 1317bd42c..dd2b75954 100644 --- a/src/dummy.rs +++ b/src/dummy.rs @@ -199,33 +199,36 @@ impl DummyVM { } fn read(&mut self) -> Result { - let mut buf = [0; 1024]; - let mut size = 0; - while size < 2 { - match self.stream.as_mut().unwrap().read(&mut buf[size..]) { - Result::Ok(n) => size += n, - Result::Err(err) => { - self.finish(); - eprintln!("Read error: {err}"); - process::exit(1); - } + // Server Data Format: + // ------------------- + // | size | data + // ------------------- + // | 2 bytes | n bytes + // ------------------- + + let mut size_buf = [0; 2]; + match self.stream.as_mut().unwrap().read_exact(&mut size_buf) { + Result::Ok(()) => {} + Result::Err(err) => { + self.finish(); + eprintln!("Read error: {err}"); + process::exit(1); } } - let data_len = u16::from_be_bytes(buf[..2].try_into().unwrap()) as usize; + let data_size = u16::from_be_bytes(size_buf) as usize; + let mut data_buf = vec![0; data_size]; - while size < 2 + data_len { - match self.stream.as_mut().unwrap().read(&mut buf[size..]) { - Result::Ok(n) => size += n, - Result::Err(err) => { - self.finish(); - eprintln!("Read error: {err}"); - process::exit(1); - } + match self.stream.as_mut().unwrap().read_exact(&mut data_buf) { + Result::Ok(()) => {} + Result::Err(err) => { + self.finish(); + eprintln!("Read error: {err}"); + process::exit(1); } } - let s = std::str::from_utf8(&buf[2..size]) + let s = std::str::from_utf8(&data_buf) .expect("failed to parse the response, maybe the output is too long"); if s.starts_with("[Exception] SystemExit") { diff --git a/src/scripts/repl_server.py b/src/scripts/repl_server.py index 994ff0271..9c10a478f 100644 --- a/src/scripts/repl_server.py +++ b/src/scripts/repl_server.py @@ -18,6 +18,7 @@ def __encode(s): s_bytes = s.encode() s_len = len(s_bytes) + # two bytes for size, and n bytes for data return s_len.to_bytes(2, 'big') + s_bytes while True: From d2d6f7341a57c259f3925ffc660da599972748af Mon Sep 17 00:00:00 2001 From: Hanaasagi Date: Sat, 22 Apr 2023 23:28:15 +0900 Subject: [PATCH 03/12] fix(lint): fix clippy warning --- src/dummy.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dummy.rs b/src/dummy.rs index dd2b75954..eaacb2cf0 100644 --- a/src/dummy.rs +++ b/src/dummy.rs @@ -233,9 +233,9 @@ impl DummyVM { if s.starts_with("[Exception] SystemExit") { Err(EvalErrors::from(EvalError::system_exit())) - } else if s.starts_with("[Initialize]") { + } else if let Some(remaing) = s.strip_prefix("[Initialize]") { self.compiler.initialize_generator(); - Ok(s["[Initialize]".len()..].to_string()) + Ok(remaing.to_string()) } else { Ok(s.to_string()) } From a2ee0f6c98ce632400be583cef46d420d2d90c7b Mon Sep 17 00:00:00 2001 From: Hanaasagi Date: Sun, 23 Apr 2023 15:44:38 +0900 Subject: [PATCH 04/12] feat(repl): impl client server protocol --- src/dummy.rs | 240 ++++++++++++++++++++++++++++--------- src/scripts/repl_server.py | 49 +++++--- 2 files changed, 217 insertions(+), 72 deletions(-) diff --git a/src/dummy.rs b/src/dummy.rs index eaacb2cf0..effec960e 100644 --- a/src/dummy.rs +++ b/src/dummy.rs @@ -19,6 +19,123 @@ use erg_compiler::Compiler; pub type EvalError = CompileError; pub type EvalErrors = CompileErrors; +/// The instructions for communication between the client and the server. +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +#[repr(u8)] +enum Inst { + /// Send from server to client. Informs the client to print data. + Print = 0x01, + /// Send from client to server. Informs the REPL server that the executable .pyc file has been written out and is ready for evaluation. + Load = 0x02, + /// Send from server to client. Represents an exception. + Exception = 0x03, + /// Send from server to client. Tells the code generator to initialize due to an error. + Initialize = 0x04, + /// Informs that the connection is to be / should be terminated. + Exit = 0x05, + /// Informs that it is not a supported instruction. + Unknown = 0x00, +} + +impl Inst { + fn has_data(&self) -> bool { + match self { + Self::Print => true, + Self::Load => false, + Self::Exception => true, + Self::Initialize => true, + Self::Exit => false, + Self::Unknown => false, + } + } +} + +impl Into for u8 { + fn into(self) -> Inst { + match self { + 0x01 => Inst::Print, + 0x02 => Inst::Load, + 0x03 => Inst::Exception, + 0x04 => Inst::Initialize, + 0x05 => Inst::Exit, + _ => Inst::Unknown, + } + } +} + +/// ------------------------------- +/// | ins | size | data +/// ------------------------------- +/// | 1 byte | 2 bytes | n bytes +/// ------------------------------- +#[derive(Debug, Clone)] +struct Message { + inst: Inst, + size: usize, + data: Option>, +} + +impl Message { + fn new(inst: Inst, data: Option>) -> Self { + let size = if let Some(d) = &data { d.len() } else { 0 }; + Self { inst, size, data } + } +} + +#[derive(Debug)] +struct MessageStream { + stream: T, + read_buf: Vec, + write_buf: Vec, +} + +impl MessageStream { + fn new(stream: T) -> Self { + Self { + stream, + read_buf: Vec::new(), + write_buf: Vec::new(), + } + } + + fn send_msg(&mut self, msg: &Message) -> Result<(), std::io::Error> { + self.write_buf.clear(); + + self.write_buf.extend((msg.inst as u8).to_be_bytes()); + self.write_buf.extend((msg.size).to_be_bytes()); + self.write_buf + .extend_from_slice(&msg.data.clone().unwrap_or_default()); + + self.stream.write_all(&self.write_buf)?; + + Ok(()) + } + + fn recv_msg(&mut self) -> Result { + // read instruction, 1 byte + let mut inst_buf = [0; 1]; + self.stream.read_exact(&mut inst_buf)?; + + let inst: Inst = u8::from_be_bytes(inst_buf).into(); + + if !inst.has_data() { + return Ok(Message::new(inst, None)); + } + + // read size, 2 bytes + let mut size_buf = [0; 2]; + self.stream.read_exact(&mut size_buf)?; + + let data_size = u16::from_be_bytes(size_buf) as usize; + + // read data + let mut data_buf = vec![0; data_size]; + self.stream.read_exact(&mut data_buf)?; + + Ok(Message::new(inst, Some(data_buf))) + } +} + fn find_available_port() -> u16 { let socket = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0); TcpListener::bind(socket) @@ -33,7 +150,7 @@ fn find_available_port() -> u16 { #[derive(Debug)] pub struct DummyVM { compiler: Compiler, - stream: Option, + stream: Option>, } impl Default for DummyVM { @@ -82,7 +199,7 @@ impl Runnable for DummyVM { stream .set_read_timeout(Some(Duration::from_secs(cfg.py_server_timeout))) .unwrap(); - break Some(stream); + break Some(MessageStream::new(stream)); } Err(_) => { if !cfg.quiet_repl { @@ -104,15 +221,16 @@ impl Runnable for DummyVM { fn finish(&mut self) { if let Some(stream) = &mut self.stream { - if let Err(err) = stream.write_all("exit".as_bytes()) { + // send exit to server + if let Err(err) = stream.send_msg(&Message::new(Inst::Exit, None)) { eprintln!("Write error: {err}"); process::exit(1); } - let mut buf = [0; 1024]; - match stream.read(&mut buf) { - Result::Ok(n) => { - let s = std::str::from_utf8(&buf[..n]).unwrap(); - if s.contains("closed") && !self.cfg().quiet_repl { + + // wait server exit + match stream.recv_msg() { + Result::Ok(msg) => { + if msg.inst == Inst::Exit && !self.cfg().quiet_repl { println!("The REPL server is closed."); } } @@ -121,6 +239,7 @@ impl Runnable for DummyVM { process::exit(1); } } + remove_file(self.cfg().dump_pyc_filename()).unwrap_or(()); } } @@ -158,18 +277,68 @@ impl Runnable for DummyVM { .map_err(|eart| eart.errors)?; let (last, warns) = (arti.object, arti.warns); let mut res = warns.to_string(); + + macro_rules! err_handle { + () => { + { + self.finish(); + process::exit(1); + + } + }; + ($hint:expr, $($args:expr),*) => { + { + self.finish(); + eprintln!($hint, $($args)*); + process::exit(1); + } + }; + } + // Tell the REPL server to execute the code - res += &match self.stream.as_mut().unwrap().write("load".as_bytes()) { - Result::Ok(_) => self.read()?, - Result::Err(err) => { - self.finish(); - eprintln!("Sending error: {err}"); - process::exit(1); + if let Err(err) = self + .stream + .as_mut() + .unwrap() + .send_msg(&Message::new(Inst::Load, None)) + { + err_handle!("Sending error: {}", err); + }; + + // receive data from server + let data = match self.stream.as_mut().unwrap().recv_msg() { + Result::Ok(msg) => { + let s = match msg.inst { + Inst::Exception => { + return Err(EvalErrors::from(EvalError::system_exit())); + } + Inst::Initialize => { + self.compiler.initialize_generator(); + String::from_utf8(msg.data.unwrap_or_default()) + } + Inst::Print => String::from_utf8(msg.data.unwrap_or_default()), + Inst::Exit => err_handle!("Recving inst {:?} from server", msg.inst), + // `load` can only be sent from the client to the server + Inst::Load | Inst::Unknown => { + err_handle!("Recving unexpected inst {:?} from server", msg.inst) + } + }; + + if s.is_err() { + err_handle!("Failed to parse server response data, error: {:?}", s.err()); + } else { + s.unwrap() + } } + Result::Err(err) => err_handle!("Recving error: {}", err), }; + + res.push_str(&data); + // If the result of an expression is None, it will not be displayed in the REPL. if res.ends_with("None") { res.truncate(res.len() - 5); } + if self.cfg().show_type { res.push_str(": "); res.push_str( @@ -197,47 +366,4 @@ impl DummyVM { pub fn eval(&mut self, src: String) -> Result { Runnable::eval(self, src) } - - fn read(&mut self) -> Result { - // Server Data Format: - // ------------------- - // | size | data - // ------------------- - // | 2 bytes | n bytes - // ------------------- - - let mut size_buf = [0; 2]; - match self.stream.as_mut().unwrap().read_exact(&mut size_buf) { - Result::Ok(()) => {} - Result::Err(err) => { - self.finish(); - eprintln!("Read error: {err}"); - process::exit(1); - } - } - - let data_size = u16::from_be_bytes(size_buf) as usize; - let mut data_buf = vec![0; data_size]; - - match self.stream.as_mut().unwrap().read_exact(&mut data_buf) { - Result::Ok(()) => {} - Result::Err(err) => { - self.finish(); - eprintln!("Read error: {err}"); - process::exit(1); - } - } - - let s = std::str::from_utf8(&data_buf) - .expect("failed to parse the response, maybe the output is too long"); - - if s.starts_with("[Exception] SystemExit") { - Err(EvalErrors::from(EvalError::system_exit())) - } else if let Some(remaing) = s.strip_prefix("[Initialize]") { - self.compiler.initialize_generator(); - Ok(remaing.to_string()) - } else { - Ok(s.to_string()) - } - } } diff --git a/src/scripts/repl_server.py b/src/scripts/repl_server.py index 9c10a478f..f7cf90e8d 100644 --- a/src/scripts/repl_server.py +++ b/src/scripts/repl_server.py @@ -15,24 +15,44 @@ __already_loaded = False __ctx = {'__importlib': __importlib} -def __encode(s): - s_bytes = s.encode() - s_len = len(s_bytes) - # two bytes for size, and n bytes for data - return s_len.to_bytes(2, 'big') + s_bytes + +class INST: + # Informs that it is not a supported instruction. + UNKNOWN = 0x00 + # Send from server to client. Informs the client to print data. + PRINT = 0x01 + # Send from client to server. Informs the REPL server that the executable .pyc file has been written out and is ready for evaluation. + LOAD = 0x02 + # Send from server to client. Represents an exception. + EXCEPTION = 0x03 + # Send from server to client. Tells the code generator to initialize due to an error. + INITIALIZE = 0x04 + # Informs that the connection is to be / should be terminated. + EXIT = 0x05 + +def __encode(instr, data=''): + data_bytes = data.encode() + data_len = len(data_bytes) + if data_len > 0: + # one byte for inst, two bytes for size(Optional), and n bytes for data(Optional) + return instr.to_bytes(1, 'big') + data_len.to_bytes(2, 'big') + data_bytes + return instr.to_bytes(1, 'big') + while True: try: - __order = __client_socket.recv(1024).decode() + __data = __client_socket.recv(1024) except ConnectionResetError: # when the server was crashed break - if __order == 'quit' or __order == 'exit': # when the server was closed successfully - __client_socket.send(__encode('closed')) + __inst = int.from_bytes(__data[:1], 'big') + if __inst == INST.EXIT: # when the server was closed successfully + __client_socket.send(__encode(INST.EXIT)) break - elif __order == 'load': + elif __inst == INST.LOAD: __sys.stdout = __io.StringIO() __res = '' __exc = '' + __resp_inst = INST.PRINT __buf = [] try: if __already_loaded: @@ -42,7 +62,7 @@ def __encode(s): __res = str(exec('import __MODULE__', __ctx)) __already_loaded = True except SystemExit: - __buf.append('[Exception] SystemExit') + __client_socket.send(__encode(INST.EXCEPTION, 'SystemExit')) continue except Exception as e: try: @@ -51,16 +71,15 @@ def __encode(s): excs = __traceback.format_exception_only(e.__class__, e) __exc = ''.join(excs).rstrip() __traceback.clear_frames(e.__traceback__) - __buf.append('[Initialize]') + __resp_inst = INST.INITIALIZE __out = __sys.stdout.getvalue()[:-1] - # assert not(__exc and __res) - if __exc or __res: + if __out and __exc or __res: __out += '\n' __res = __out + __exc + __res __buf.append(__res) - __client_socket.send(__encode(''.join(__buf))) + __client_socket.send(__encode(__resp_inst, ''.join(__buf))) else: - __client_socket.send(__encode('unknown operation')) + __client_socket.send(__encode(INST.UNKNOWN)) __client_socket.close() __server_socket.close() From 507d49f09b371031f50ebfca25fb687c88a35afd Mon Sep 17 00:00:00 2001 From: Hanaasagi Date: Sun, 23 Apr 2023 15:59:03 +0900 Subject: [PATCH 05/12] feat(repl): fix clippy warning and some bugs --- src/dummy.rs | 85 +++++++++++++++++++++----------------- src/scripts/repl_server.py | 8 +--- 2 files changed, 50 insertions(+), 43 deletions(-) diff --git a/src/dummy.rs b/src/dummy.rs index effec960e..bad7e8294 100644 --- a/src/dummy.rs +++ b/src/dummy.rs @@ -37,22 +37,9 @@ enum Inst { Unknown = 0x00, } -impl Inst { - fn has_data(&self) -> bool { - match self { - Self::Print => true, - Self::Load => false, - Self::Exception => true, - Self::Initialize => true, - Self::Exit => false, - Self::Unknown => false, - } - } -} - -impl Into for u8 { - fn into(self) -> Inst { - match self { +impl From for Inst { + fn from(v: u8) -> Inst { + match v { 0x01 => Inst::Print, 0x02 => Inst::Load, 0x03 => Inst::Exception, @@ -85,28 +72,20 @@ impl Message { #[derive(Debug)] struct MessageStream { stream: T, - read_buf: Vec, - write_buf: Vec, } impl MessageStream { fn new(stream: T) -> Self { - Self { - stream, - read_buf: Vec::new(), - write_buf: Vec::new(), - } + Self { stream } } fn send_msg(&mut self, msg: &Message) -> Result<(), std::io::Error> { - self.write_buf.clear(); + let mut write_buf = Vec::with_capacity(1024); + write_buf.extend((msg.inst as u8).to_be_bytes()); + write_buf.extend((msg.size).to_be_bytes()); + write_buf.extend_from_slice(&msg.data.clone().unwrap_or_default()); - self.write_buf.extend((msg.inst as u8).to_be_bytes()); - self.write_buf.extend((msg.size).to_be_bytes()); - self.write_buf - .extend_from_slice(&msg.data.clone().unwrap_or_default()); - - self.stream.write_all(&self.write_buf)?; + self.stream.write_all(&write_buf)?; Ok(()) } @@ -118,16 +97,16 @@ impl MessageStream { let inst: Inst = u8::from_be_bytes(inst_buf).into(); - if !inst.has_data() { - return Ok(Message::new(inst, None)); - } - // read size, 2 bytes let mut size_buf = [0; 2]; self.stream.read_exact(&mut size_buf)?; let data_size = u16::from_be_bytes(size_buf) as usize; + if data_size == 0 { + return Ok(Message::new(inst, None)); + } + // read data let mut data_buf = vec![0; data_size]; self.stream.read_exact(&mut data_buf)?; @@ -136,6 +115,35 @@ impl MessageStream { } } +#[test] +fn test_message() { + use std::collections::VecDeque; + + let inner = Box::>::default(); + let mut stream = MessageStream::new(inner); + + // test send_msg + stream.send_msg(&Message::new(Inst::Load, None)).unwrap(); + assert_eq!( + stream.stream.as_slices(), + (&[2, 0, 0, 0, 0, 0, 0, 0, 0][..], &[][..]) + ); + + // test recv_msg + // data field, 'A' => hex is 0x41 + stream.stream.push_front(0x41); + // size field + stream.stream.push_front(0x01); + stream.stream.push_front(0x00); + // inst field + stream.stream.push_front(0x01); + + let msg = stream.recv_msg().unwrap(); + assert_eq!(msg.inst, Inst::Print); + assert_eq!(msg.size, 1); + assert_eq!(std::str::from_utf8(&msg.data.unwrap()).unwrap(), "A"); +} + fn find_available_port() -> u16 { let socket = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0); TcpListener::bind(socket) @@ -310,6 +318,9 @@ impl Runnable for DummyVM { Result::Ok(msg) => { let s = match msg.inst { Inst::Exception => { + debug_assert!( + std::str::from_utf8(msg.data.as_ref().unwrap()) == Ok("SystemExit") + ); return Err(EvalErrors::from(EvalError::system_exit())); } Inst::Initialize => { @@ -324,10 +335,10 @@ impl Runnable for DummyVM { } }; - if s.is_err() { - err_handle!("Failed to parse server response data, error: {:?}", s.err()); + if let Ok(ss) = s { + ss } else { - s.unwrap() + err_handle!("Failed to parse server response data, error: {:?}", s.err()); } } Result::Err(err) => err_handle!("Recving error: {}", err), diff --git a/src/scripts/repl_server.py b/src/scripts/repl_server.py index f7cf90e8d..b74ece6ee 100644 --- a/src/scripts/repl_server.py +++ b/src/scripts/repl_server.py @@ -15,7 +15,6 @@ __already_loaded = False __ctx = {'__importlib': __importlib} - class INST: # Informs that it is not a supported instruction. UNKNOWN = 0x00 @@ -33,11 +32,8 @@ class INST: def __encode(instr, data=''): data_bytes = data.encode() data_len = len(data_bytes) - if data_len > 0: - # one byte for inst, two bytes for size(Optional), and n bytes for data(Optional) - return instr.to_bytes(1, 'big') + data_len.to_bytes(2, 'big') + data_bytes - return instr.to_bytes(1, 'big') - + # one byte for inst, two bytes for size, and n bytes for data(Optional) + return instr.to_bytes(1, 'big') + data_len.to_bytes(2, 'big') + data_bytes while True: try: From 5e3c7cbf0f2c7f975208542cb05e7bacbe0b1974 Mon Sep 17 00:00:00 2001 From: Shunsuke Shibayama Date: Tue, 25 Apr 2023 00:25:20 +0900 Subject: [PATCH 06/12] refactor: fix typo and improve `err_handle!` definition --- src/dummy.rs | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/src/dummy.rs b/src/dummy.rs index bad7e8294..2b49f866b 100644 --- a/src/dummy.rs +++ b/src/dummy.rs @@ -287,20 +287,15 @@ impl Runnable for DummyVM { let mut res = warns.to_string(); macro_rules! err_handle { - () => { - { - self.finish(); - process::exit(1); - - } - }; - ($hint:expr, $($args:expr),*) => { - { - self.finish(); - eprintln!($hint, $($args)*); - process::exit(1); - } - }; + () => {{ + self.finish(); + process::exit(1); + }}; + ($hint:expr $(,$args:expr),* $(,)?) => {{ + self.finish(); + eprintln!($hint, $($args)*); + process::exit(1); + }}; } // Tell the REPL server to execute the code @@ -310,7 +305,7 @@ impl Runnable for DummyVM { .unwrap() .send_msg(&Message::new(Inst::Load, None)) { - err_handle!("Sending error: {}", err); + err_handle!("Sending error: {err}"); }; // receive data from server @@ -328,10 +323,10 @@ impl Runnable for DummyVM { String::from_utf8(msg.data.unwrap_or_default()) } Inst::Print => String::from_utf8(msg.data.unwrap_or_default()), - Inst::Exit => err_handle!("Recving inst {:?} from server", msg.inst), + Inst::Exit => err_handle!("Receiving inst {:?} from server", msg.inst), // `load` can only be sent from the client to the server Inst::Load | Inst::Unknown => { - err_handle!("Recving unexpected inst {:?} from server", msg.inst) + err_handle!("Receiving unexpected inst {:?} from server", msg.inst) } }; @@ -341,7 +336,7 @@ impl Runnable for DummyVM { err_handle!("Failed to parse server response data, error: {:?}", s.err()); } } - Result::Err(err) => err_handle!("Recving error: {}", err), + Result::Err(err) => err_handle!("Received an error: {err}"), }; res.push_str(&data); From 4f8146875cb99ae00b984ff4c1cff744f8c1cdc8 Mon Sep 17 00:00:00 2001 From: Shunsuke Shibayama Date: Tue, 25 Apr 2023 00:33:03 +0900 Subject: [PATCH 07/12] refactor: delete unnecessary double escapes --- src/scripts/repl_server.py | 83 +++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 42 deletions(-) diff --git a/src/scripts/repl_server.py b/src/scripts/repl_server.py index b74ece6ee..b28fac1dd 100644 --- a/src/scripts/repl_server.py +++ b/src/scripts/repl_server.py @@ -1,19 +1,18 @@ -# Append __ to all variables to prevent name collisions in exec # All strings must be quoted by single quotes to prevent shell interpretation -import socket as __socket -import sys as __sys -import importlib as __importlib -import io as __io -import traceback as __traceback +import socket +import sys +import importlib +import io +import traceback -__server_socket = __socket.socket() +server_socket = socket.socket() # DummyVM will replace this __PORT__ with free port -__server_socket.bind(('127.0.0.1', __PORT__)) -__server_socket.listen(1) -(__client_socket, __client_address) = __server_socket.accept() +server_socket.bind(('127.0.0.1', __PORT__)) +server_socket.listen(1) +(client_socket, client_address) = server_socket.accept() -__already_loaded = False -__ctx = {'__importlib': __importlib} +already_loaded = False +ctx = {'importlib': importlib} class INST: # Informs that it is not a supported instruction. @@ -29,7 +28,7 @@ class INST: # Informs that the connection is to be / should be terminated. EXIT = 0x05 -def __encode(instr, data=''): +def encode(instr, data=''): data_bytes = data.encode() data_len = len(data_bytes) # one byte for inst, two bytes for size, and n bytes for data(Optional) @@ -37,45 +36,45 @@ def __encode(instr, data=''): while True: try: - __data = __client_socket.recv(1024) + data = client_socket.recv(1024) except ConnectionResetError: # when the server was crashed break - __inst = int.from_bytes(__data[:1], 'big') - if __inst == INST.EXIT: # when the server was closed successfully - __client_socket.send(__encode(INST.EXIT)) + inst = int.from_bytes(data[:1], 'big') + if inst == INST.EXIT: # when the server was closed successfully + client_socket.send(encode(INST.EXIT)) break - elif __inst == INST.LOAD: - __sys.stdout = __io.StringIO() - __res = '' - __exc = '' - __resp_inst = INST.PRINT - __buf = [] + elif inst == INST.LOAD: + sys.stdout = io.StringIO() + res = '' + exc = '' + resp_inst = INST.PRINT + buf = [] try: - if __already_loaded: + if already_loaded: # __MODULE__ will be replaced with module name - __res = str(exec('__importlib.reload(__MODULE__)', __ctx)) + res = str(exec('importlib.reload(__MODULE__)', ctx)) else: - __res = str(exec('import __MODULE__', __ctx)) - __already_loaded = True + res = str(exec('import __MODULE__', ctx)) + already_loaded = True except SystemExit: - __client_socket.send(__encode(INST.EXCEPTION, 'SystemExit')) + client_socket.send(encode(INST.EXCEPTION, 'SystemExit')) continue except Exception as e: try: - excs = __traceback.format_exception(e) + excs = traceback.format_exception(e) except: - excs = __traceback.format_exception_only(e.__class__, e) - __exc = ''.join(excs).rstrip() - __traceback.clear_frames(e.__traceback__) - __resp_inst = INST.INITIALIZE - __out = __sys.stdout.getvalue()[:-1] - if __out and __exc or __res: - __out += '\n' - __res = __out + __exc + __res - __buf.append(__res) - __client_socket.send(__encode(__resp_inst, ''.join(__buf))) + excs = traceback.format_exception_only(e.__class__, e) + exc = ''.join(excs).rstrip() + traceback.clear_frames(e.__traceback__) + resp_inst = INST.INITIALIZE + out = sys.stdout.getvalue()[:-1] + if out and exc or res: + out += '\n' + res = out + exc + res + buf.append(res) + client_socket.send(encode(resp_inst, ''.join(buf))) else: - __client_socket.send(__encode(INST.UNKNOWN)) + client_socket.send(encode(INST.UNKNOWN)) -__client_socket.close() -__server_socket.close() +client_socket.close() +server_socket.close() From a97316a11471d7d6273165722d5c27ecfd9e1229 Mon Sep 17 00:00:00 2001 From: Hanaasagi Date: Tue, 25 Apr 2023 02:22:35 +0900 Subject: [PATCH 08/12] feat: impl message stream in Python server --- src/scripts/repl_server.py | 68 ++++++++++++++++++++++++++------------ 1 file changed, 47 insertions(+), 21 deletions(-) diff --git a/src/scripts/repl_server.py b/src/scripts/repl_server.py index b28fac1dd..cd94a651e 100644 --- a/src/scripts/repl_server.py +++ b/src/scripts/repl_server.py @@ -5,15 +5,6 @@ import io import traceback -server_socket = socket.socket() -# DummyVM will replace this __PORT__ with free port -server_socket.bind(('127.0.0.1', __PORT__)) -server_socket.listen(1) -(client_socket, client_address) = server_socket.accept() - -already_loaded = False -ctx = {'importlib': importlib} - class INST: # Informs that it is not a supported instruction. UNKNOWN = 0x00 @@ -28,20 +19,55 @@ class INST: # Informs that the connection is to be / should be terminated. EXIT = 0x05 -def encode(instr, data=''): - data_bytes = data.encode() - data_len = len(data_bytes) - # one byte for inst, two bytes for size, and n bytes for data(Optional) - return instr.to_bytes(1, 'big') + data_len.to_bytes(2, 'big') + data_bytes +class MessageStream: + def __init__(self, socket): + self.socket = socket + self._read_buf = bytearray() + + def recv_msg(self): + self._read_buf.clear() + # requires at least 3 bytes as metadata + while len(self._read_buf) < 3: + self._read_buf.extend(self.socket.recv(1024)) + + inst = int.from_bytes(self._read_buf[:1], 'big') + data_len = int.from_bytes(self._read_buf[1:3], 'big') + + # until all data has been read + while len(self._read_buf) < 3 + data_len: + self._read_buf.extend(self.socket.recv(1024)) + + return (inst, self._read_buf[3:].decode()) + + + def send_msg(self, inst, data=''): + data_bytes = data.encode() + data_len = len(data_bytes) + # one byte for inst, two bytes for size, and n bytes for data(Optional) + raw_bytes = inst.to_bytes(1, 'big') + data_len.to_bytes(2, 'big') + data_bytes + + self.socket.send(raw_bytes) + + def close(): + self.socket.close() + +server_socket = socket.socket() +# DummyVM will replace this __PORT__ with free port +server_socket.bind(('127.0.0.1', __PORT__)) +server_socket.listen(1) +(client_socket, client_address) = server_socket.accept() + +already_loaded = False +ctx = {'importlib': importlib} +client_stream = MessageStream(client_socket) while True: try: - data = client_socket.recv(1024) + inst, _ = client_stream.recv_msg() except ConnectionResetError: # when the server was crashed break - inst = int.from_bytes(data[:1], 'big') if inst == INST.EXIT: # when the server was closed successfully - client_socket.send(encode(INST.EXIT)) + client_stream.send_msg(INST.EXIT) break elif inst == INST.LOAD: sys.stdout = io.StringIO() @@ -57,7 +83,7 @@ def encode(instr, data=''): res = str(exec('import __MODULE__', ctx)) already_loaded = True except SystemExit: - client_socket.send(encode(INST.EXCEPTION, 'SystemExit')) + client_stream.send_msg(INST.EXCEPTION, 'SystemExit') continue except Exception as e: try: @@ -72,9 +98,9 @@ def encode(instr, data=''): out += '\n' res = out + exc + res buf.append(res) - client_socket.send(encode(resp_inst, ''.join(buf))) + client_stream.send_msg(resp_inst, ''.join(buf)) else: - client_socket.send(encode(INST.UNKNOWN)) + client_stream.send_msg(INST.UNKNOWN) -client_socket.close() +client_stream.close() server_socket.close() From fcfe7d595b7b91116b6204536c27089dbac92a42 Mon Sep 17 00:00:00 2001 From: Hanaasagi Date: Tue, 25 Apr 2023 12:25:17 +0900 Subject: [PATCH 09/12] fix(repl): fix missing self --- src/scripts/repl_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scripts/repl_server.py b/src/scripts/repl_server.py index cd94a651e..bcfd16933 100644 --- a/src/scripts/repl_server.py +++ b/src/scripts/repl_server.py @@ -48,7 +48,7 @@ def send_msg(self, inst, data=''): self.socket.send(raw_bytes) - def close(): + def close(self): self.socket.close() server_socket = socket.socket() From 2c5ba704f89b9042e3cde938083aa9f04196cf1d Mon Sep 17 00:00:00 2001 From: Hanaasagi Date: Tue, 25 Apr 2023 14:46:43 +0900 Subject: [PATCH 10/12] fix(repl): fix type, change `size` field from `usize` to `u16` --- src/dummy.rs | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/src/dummy.rs b/src/dummy.rs index 2b49f866b..e1d5cc18d 100644 --- a/src/dummy.rs +++ b/src/dummy.rs @@ -58,15 +58,29 @@ impl From for Inst { #[derive(Debug, Clone)] struct Message { inst: Inst, - size: usize, + size: u16, data: Option>, } impl Message { fn new(inst: Inst, data: Option>) -> Self { - let size = if let Some(d) = &data { d.len() } else { 0 }; + let size = if let Some(d) = &data { + if d.len() > usize::from(u16::MAX) { + eprintln!("Warning: length truncated to 65535"); + u16::MAX + } else { + d.len() as u16 + } + } else { + 0 + }; Self { inst, size, data } } + + #[allow(unused)] + fn len(&self) -> usize { + self.size as usize + } } #[derive(Debug)] @@ -122,11 +136,16 @@ fn test_message() { let inner = Box::>::default(); let mut stream = MessageStream::new(inner); - // test send_msg - stream.send_msg(&Message::new(Inst::Load, None)).unwrap(); + // test send_msg with data + stream + .send_msg(&Message::new( + Inst::Print, + Some("hello".chars().map(|c| c as u8).collect()), + )) + .unwrap(); assert_eq!( stream.stream.as_slices(), - (&[2, 0, 0, 0, 0, 0, 0, 0, 0][..], &[][..]) + (&[1, 0, 5, 104, 101, 108, 108, 111][..], &[][..]) ); // test recv_msg @@ -140,7 +159,7 @@ fn test_message() { let msg = stream.recv_msg().unwrap(); assert_eq!(msg.inst, Inst::Print); - assert_eq!(msg.size, 1); + assert_eq!(msg.len(), 1); assert_eq!(std::str::from_utf8(&msg.data.unwrap()).unwrap(), "A"); } From 03a30a450af09a27de8ab1b9e110d0b09d59bf3c Mon Sep 17 00:00:00 2001 From: Shunsuke Shibayama Date: Wed, 26 Apr 2023 23:59:23 +0900 Subject: [PATCH 11/12] Update repl_server.py --- src/scripts/repl_server.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/scripts/repl_server.py b/src/scripts/repl_server.py index bcfd16933..0759f343d 100644 --- a/src/scripts/repl_server.py +++ b/src/scripts/repl_server.py @@ -26,16 +26,10 @@ def __init__(self, socket): def recv_msg(self): self._read_buf.clear() - # requires at least 3 bytes as metadata - while len(self._read_buf) < 3: - self._read_buf.extend(self.socket.recv(1024)) - + self._read_buf.extend(self.socket.recv(3)) inst = int.from_bytes(self._read_buf[:1], 'big') data_len = int.from_bytes(self._read_buf[1:3], 'big') - - # until all data has been read - while len(self._read_buf) < 3 + data_len: - self._read_buf.extend(self.socket.recv(1024)) + self._read_buf.extend(self.socket.recv(data_len)) return (inst, self._read_buf[3:].decode()) From 5536858363e011bca44196f0a468769755c17299 Mon Sep 17 00:00:00 2001 From: Shunsuke Shibayama Date: Thu, 27 Apr 2023 00:25:53 +0900 Subject: [PATCH 12/12] test: add a mock test for the REPL server --- crates/erg_common/python_util.rs | 7 +++--- src/scripts/repl_server_test.py | 40 ++++++++++++++++++++++++++++++++ tests/repl.rs | 9 +++++++ 3 files changed, 52 insertions(+), 4 deletions(-) create mode 100644 src/scripts/repl_server_test.py diff --git a/crates/erg_common/python_util.rs b/crates/erg_common/python_util.rs index c218d3969..074173eca 100644 --- a/crates/erg_common/python_util.rs +++ b/crates/erg_common/python_util.rs @@ -738,15 +738,14 @@ pub fn _eval_pyc>(file: S, py_command: Option<&str>) -> String { String::from_utf8_lossy(&out.stdout).to_string() } -pub fn _exec_py(code: &str) -> Option { +pub fn exec_py(file: &str) -> Option { let mut child = if cfg!(windows) { Command::new(which_python()) - .arg("-c") - .arg(code) + .arg(file) .spawn() .expect("cannot execute python") } else { - let exec_command = format!("{} -c \"{}\"", which_python(), code); + let exec_command = format!("{} {file}", which_python()); Command::new("sh") .arg("-c") .arg(exec_command) diff --git a/src/scripts/repl_server_test.py b/src/scripts/repl_server_test.py new file mode 100644 index 000000000..da4bf467d --- /dev/null +++ b/src/scripts/repl_server_test.py @@ -0,0 +1,40 @@ +import itertools +import random +import string + +with open("./src/scripts/repl_server.py") as f: + code = f.readlines() + +code.insert(0, "__PORT__ = 9000\n") +code = itertools.takewhile(lambda l: not l.startswith("# DummyVM"), code) + +exec("".join(code)) + + +class MockSocket: + def __init__(self): + self.data = bytearray() + self.cursor = 0 + + def send(self, data): + self.data.extend(data) + + def recv(self, bufsize): + if self.cursor > len(self.data): + raise Exception(f"MockSocket: recv({bufsize}) out of range") + data = bytes(self.data[self.cursor : self.cursor + bufsize]) + self.cursor += bufsize + return data + +corr_data = "".join(random.choices(string.ascii_uppercase + string.digits, k=2048)) +s = MessageStream(MockSocket()) + +s.send_msg(INST.PRINT, corr_data) +inst, recv_data = s.recv_msg() +assert inst == INST.PRINT +assert recv_data == corr_data + +s.send_msg(INST.EXIT, "") +inst, recv_data = s.recv_msg() +assert inst == INST.EXIT +assert recv_data == "" diff --git a/tests/repl.rs b/tests/repl.rs index 4df5c898f..7378b16a9 100644 --- a/tests/repl.rs +++ b/tests/repl.rs @@ -1,6 +1,8 @@ mod common; + use common::expect_repl_failure; use common::expect_repl_success; +use erg_common::python_util::exec_py; #[test] #[ignore] @@ -193,3 +195,10 @@ fn exec_repl_invalid_def_after_the_at_sign() -> Result<(), ()> { 1, ) } + +#[test] +#[ignore] +fn exec_repl_server_mock_test() -> Result<(), ()> { + assert_eq!(exec_py("src/scripts/repl_server_test.py"), Some(0)); + Ok(()) +}