Skip to content

Commit

Permalink
Added capability for "Direct Piping" - that is, transferring data bet…
Browse files Browse the repository at this point in the history
…ween

2 processes directly using OS pipes and connecting them with dup2, instead
of buffering the output of process 1 into python memory before sending to
process 2.

This is in response to issue amoffat#119 amoffat#119

The effect can be enabled by setting call arg _piping="direct" on the inner process.
I had issues with getting 'Input/Output error reading stdin' from dd, until
I set _tty_out=False. In the code I have therefore set _tty_out to False when
_piping == "direct". This has caused 3 of the unit tests to fail.

test2.py on my PC shows 13s vs 0.5s performance difference
  • Loading branch information
mcclymont committed Oct 9, 2013
1 parent 7279f1d commit ee68864
Showing 1 changed file with 24 additions and 12 deletions.
36 changes: 24 additions & 12 deletions sh.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ def _handle_exit_code(self, code):

if code not in self.call_args["ok_code"] and \
(code > 0 or -code in SIGNALS_THAT_SHOULD_THROW_EXCEPTION):
#print self.process.stderr
raise get_rc_exc(code)(
self.ran,
self.process.stdout,
Expand Down Expand Up @@ -481,6 +482,7 @@ class Command(object):

"env": None,
"piped": None,
"direct": None,
"iter": None,
"iter_noblock": None,
"ok_code": 0,
Expand Down Expand Up @@ -734,7 +736,10 @@ def __call__(self, *args, **kwargs):
# in the background, then this command should run in the
# background as well
if first_arg.call_args["bg"]: call_args["bg"] = True
stdin = first_arg.process._pipe_queue
if first_arg.call_args["piped"] == "direct":
stdin = first_arg.process
else:
stdin = first_arg.process._pipe_queue

else:
args.insert(0, first_arg)
Expand Down Expand Up @@ -788,6 +793,7 @@ def __init__(self, cmd, stdin, stdout, stderr, call_args,
persist=True, pipe=STDOUT):

self.call_args = call_args
if self.call_args["piped"] == "direct": self.call_args["tty_out"] = False

self._single_tty = self.call_args["tty_in"] and self.call_args["tty_out"]

Expand All @@ -806,7 +812,10 @@ def __init__(self, cmd, stdin, stdout, stderr, call_args,

# do not consolidate stdin and stdout
else:
if self.call_args["tty_in"]:
if isinstance(stdin, OProc):
self._slave_stdin_fd = stdin._stdout_fd
self._stdin_fd = None
elif self.call_args["tty_in"]:
self._slave_stdin_fd, self._stdin_fd = pty.openpty()
else:
self._slave_stdin_fd, self._stdin_fd = os.pipe()
Expand Down Expand Up @@ -857,7 +866,9 @@ def __init__(self, cmd, stdin, stdout, stderr, call_args,
tty.setraw(self._stdout_fd)


os.close(self._stdin_fd)
if self._stdin_fd:
os.close(self._stdin_fd)

if not self._single_tty:
os.close(self._stdout_fd)
if stderr is not STDOUT: os.close(self._stderr_fd)
Expand Down Expand Up @@ -942,9 +953,8 @@ def __init__(self, cmd, stdin, stdout, stderr, call_args,

# this represents the connection from a Queue object (or whatever
# we're using to feed STDIN) to the process's STDIN fd
self._stdin_stream = StreamWriter("stdin", self, self._stdin_fd,
self.stdin, self.call_args["in_bufsize"])

self._stdin_stream = None if isinstance(self.stdin, OProc) else \
StreamWriter("stdin", self, self._stdin_fd, self.stdin, self.call_args["in_bufsize"])

stdout_pipe = None
if pipe is STDOUT and not self.call_args["no_pipe"]:
Expand All @@ -956,10 +966,11 @@ def __init__(self, cmd, stdin, stdout, stderr, call_args,
# that we use to aggregate all the output
save_stdout = not self.call_args["no_out"] and \
(self.call_args["tee"] in (True, "out") or stdout is None)
self._stdout_stream = StreamReader("stdout", self, self._stdout_fd, stdout,
self._stdout, self.call_args["out_bufsize"], stdout_pipe,
save_data=save_stdout)


self._stdout_stream = None if self.call_args["piped"] == "direct" else \
StreamReader("stdout", self, self._stdout_fd, stdout,
self._stdout, self.call_args["out_bufsize"], stdout_pipe,
save_data=save_stdout)

if stderr is STDOUT or self._single_tty: self._stderr_stream = None
else:
Expand All @@ -974,7 +985,8 @@ def __init__(self, cmd, stdin, stdout, stderr, call_args,
save_data=save_stderr)

# start the main io threads
self._input_thread = self._start_thread(self.input_thread, self._stdin_stream)
# stdin thread is not needed if we are connecting from another process's stdout pipe
self._input_thread = None if not self._stdin_stream else self._start_thread(self.input_thread, self._stdin_stream)
self._output_thread = self._start_thread(self.output_thread, self._stdout_stream, self._stderr_stream)


Expand Down Expand Up @@ -1154,7 +1166,7 @@ def wait(self):
else:
self.log.debug("exit code already set (%d), no need to wait", self.exit_code)

self._input_thread.join()
if self._input_thread: self._input_thread.join()
self._output_thread.join()

OProc._procs_to_cleanup.discard(self)
Expand Down

0 comments on commit ee68864

Please sign in to comment.