diff --git a/.github/workflows/auto-format.yml b/.github/workflows/auto-format.yml index a998557..a93041d 100644 --- a/.github/workflows/auto-format.yml +++ b/.github/workflows/auto-format.yml @@ -34,7 +34,7 @@ jobs: run: poetry run ruff format - name: Check files using the ruff linter - run: poetry run ruff --fix --unsafe-fixes --preview --exit-zero . + run: poetry run ruff check --fix --unsafe-fixes --preview --exit-zero . - name: Commit changes uses: EndBug/add-and-commit@v9 diff --git a/epicbox/sandboxes.py b/epicbox/sandboxes.py index 2289672..0cbff60 100644 --- a/epicbox/sandboxes.py +++ b/epicbox/sandboxes.py @@ -98,7 +98,7 @@ def create( raise ValueError(msg) if not isinstance(workdir, WorkingDirectory | None): - msg = ( + msg = ( # type: ignore[unreachable] "Invalid 'workdir', " "it should be created using 'working_directory' context manager" ) diff --git a/epicbox/utils.py b/epicbox/utils.py index 49de7bc..f34ba62 100644 --- a/epicbox/utils.py +++ b/epicbox/utils.py @@ -7,6 +7,7 @@ import socket import struct import time +from contextlib import contextmanager from typing import Any, TYPE_CHECKING import dateutil.parser @@ -22,6 +23,8 @@ from epicbox import config, exceptions if TYPE_CHECKING: + from collections.abc import Iterator + from docker.models.containers import Container logger = structlog.get_logger() @@ -157,6 +160,54 @@ def _socket_write(sock: socket.SocketIO, data: bytes) -> int: raise +def process_sock( + sock: socket.SocketIO, stdin: bytes | None, log: structlog.BoundLogger +) -> tuple[bytes, int]: + """Process the socket IO. + + Read data from the socket if it is ready for reading. + Write data to the socket if it is ready for writing. + :returns: A tuple containing the received data and the number of bytes written. + """ + ready_to_read, ready_to_write, _ = select.select([sock], [sock], [], 1) + received_data: bytes = b"" + bytes_written = 0 + if ready_to_read: + data = _socket_read(sock) + if data is None: + msg = "Received EOF from the container" + raise EOFError(msg) + received_data = data + + if ready_to_write and stdin: + bytes_written = _socket_write(sock, stdin) + if bytes_written >= len(stdin): + log.debug( + "All input data has been sent. " + "Shut down the write half of the socket.", + ) + sock._sock.shutdown(socket.SHUT_WR) # type: ignore[attr-defined] + + if not ready_to_read and (not ready_to_write or not stdin): + # Save CPU time by sleeping when there is no IO activity. + time.sleep(0.05) + + return received_data, bytes_written + + +@contextmanager +def attach_socket( + docker_client: DockerClient, + container: Container, + params: dict[str, Any], +) -> Iterator[socket.SocketIO]: + sock = docker_client.api.attach_socket(container.id, params=params) + + yield sock + + sock.close() + + def docker_communicate( container: Container, stdin: bytes | None = None, @@ -196,44 +247,33 @@ def docker_communicate( "stream": 1, "logs": 0, } - sock = docker_client.api.attach_socket(container.id, params=params) - sock._sock.setblocking(False) # Make socket non-blocking - log.info( - "Attached to the container", - params=params, - fd=sock.fileno(), - timeout=timeout, - ) - if not stdin: - log.debug("There is no input data. Shut down the write half of the socket.") - sock._sock.shutdown(socket.SHUT_WR) - if start_container: - container.start() - log.info("Container started") - - stream_data = b"" - start_time = time.monotonic() - while timeout is None or time.monotonic() - start_time < timeout: - read_ready, write_ready, _ = select.select([sock], [sock], [], 1) - is_io_active = bool(read_ready or (write_ready and stdin)) - - if read_ready: + + with attach_socket(docker_client, container, params) as sock: + sock._sock.setblocking(False) # type: ignore[attr-defined] # Make socket non-blocking + log.info( + "Attached to the container", + params=params, + fd=sock.fileno(), + timeout=timeout, + ) + if not stdin: + log.debug("There is no input data. Shut down the write half of the socket.") + sock._sock.shutdown(socket.SHUT_WR) # type: ignore[attr-defined] + if start_container: + container.start() + log.info("Container started") + + stream_data = b"" + start_time = time.monotonic() + while timeout is None or time.monotonic() - start_time < timeout: try: - data = _socket_read(sock) + received_data, bytes_written = process_sock(sock, stdin, log) except ConnectionResetError: log.warning( "Connection reset caught on reading the container " "output stream. Break communication", ) break - if data is None: - log.debug("Container output reached EOF. Closing the socket") - break - stream_data += data - - if write_ready and stdin: - try: - written = _socket_write(sock, stdin) except BrokenPipeError: # Broken pipe may happen when a container terminates quickly # (e.g. OOM Killer) and docker manages to close the socket @@ -242,22 +282,18 @@ def docker_communicate( "Broken pipe caught on writing to stdin. Break communication", ) break - stdin = stdin[written:] - if not stdin: - log.debug( - "All input data has been sent. Shut down the write " - "half of the socket.", - ) - sock._sock.shutdown(socket.SHUT_WR) - - if not is_io_active: - # Save CPU time - time.sleep(0.05) - else: - sock.close() - msg = "Container didn't terminate after timeout seconds" - raise TimeoutError(msg) - sock.close() + except EOFError: + log.debug("Container output reached EOF. Closing the socket") + break + + if received_data: + stream_data += received_data + if stdin and bytes_written > 0: + stdin = stdin[bytes_written:] + else: + msg = "Container didn't terminate after timeout seconds" + raise TimeoutError(msg) + return demultiplex_docker_stream(stream_data)