Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closing of connection from server don't caught for uvloop #1038

Open
dimastbk opened this issue Aug 2, 2024 · 0 comments · May be fixed by #1044
Open

Closing of connection from server don't caught for uvloop #1038

dimastbk opened this issue Aug 2, 2024 · 0 comments · May be fixed by #1044

Comments

@dimastbk
Copy link
Contributor

dimastbk commented Aug 2, 2024

Describe the bug
We are using aiokafka and uvloop. When connection closed from server and aiokafka do attempt to send something to it, RuntimeError is raised (but only OSError is caught).

Traceback
Traceback (most recent call last):

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py\", line 554, in _coordination_routine
    await self.__coordination_routine()

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py\", line 612, in __coordination_routine
    wait_timeout = await self._maybe_do_autocommit(assignment)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py\", line 907, in _maybe_do_autocommit
    await self._do_commit_offsets(

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py\", line 995, in _do_commit_offsets
    response = await self._send_req(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py\", line 284, in _send_req
    resp = await self._client.send(
           ^^^^^^^^^^^^^^^^^^^^^^^^

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/client.py\", line 502, in send
    future = self._conns[(node_id, group)].send(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/conn.py\", line 439, in send
    self._writer.write(size + message)

  File \"/usr/local/lib/python3.11/asyncio/streams.py\", line 346, in write
    self._transport.write(data)

  File \"uvloop/handles/stream.pyx\", line 674, in uvloop.loop.UVStream.write

  File \"uvloop/handles/handle.pyx\", line 159, in uvloop.loop.UVHandle._ensure_alive

RuntimeError: unable to perform operation on <TCPTransport closed=True reading=False 0x55e23b4f8540>; the handler is closed


During handling of the above exception, another exception occurred:


Traceback (most recent call last):

  File \"/usr/src/consumers/kafka/consumer.py\", line 68, in run
    async for message in self.consumer:

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/opentelemetry/instrumentation/aiokafka/utils.py\", line 207, in _traced_next
    record = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/consumer/consumer.py\", line 1263, in __anext__
    return (await self.getone())
            ^^^^^^^^^^^^^^^^^^^

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/consumer/consumer.py\", line 1146, in getone
    self._coordinator.check_errors()

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py\", line 299, in check_errors
    self._coordination_task.result()

aiokafka.errors.KafkaError: KafkaError: Unexpected error during coordination RuntimeError('unable to perform operation on <TCPTransport closed=True reading=False 0x55e23b4f8540>; the handler is closed

Expected behaviour
Aiokafka catches this error and recreate connection.

Environment (please complete the following information):

  • aiokafka version (python -c "import aiokafka; print(aiokafka.__version__)"): 0.10.0
  • uvloop: 0.19.0
  • Kafka Broker version (kafka-topics.sh --version): 3.5

Reproducible example

class TestClosedSocket:
    @pytest.fixture(
        params=(
            asyncio.DefaultEventLoopPolicy(),
            uvloop.EventLoopPolicy(),
        ),
    )
    def event_loop(
        self, request: pytest.FixtureRequest
    ) -> Iterable[asyncio.AbstractEventLoop]:
        loop: asyncio.AbstractEventLoop = request.param.new_event_loop()
        yield loop
        loop.close()

    @pytest.fixture()
    def server(self, unused_tcp_port: int) -> Iterable[Tuple[str, int, socket.socket]]:
        host = "localhost"
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.bind((host, unused_tcp_port))
        sock.listen(8)
        sock.setblocking(False)

        yield host, unused_tcp_port, sock

        sock.close()

    @pytest_asyncio.fixture()
    async def conn(
        self, server: Tuple[str, int, socket.socket]
    ) -> AsyncIterable[AIOKafkaConnection]:
        host, port, _ = server

        conn = AIOKafkaConnection(host=host, port=port, request_timeout_ms=1000)
        conn._create_reader_task = mock.Mock()

        yield conn

        fut = conn.close()
        if fut:
            await fut

    @pytest.mark.asyncio
    async def test_send_to_closed_socket(
        self, server: Tuple[str, int, socket.socket], conn: AIOKafkaConnection
    ) -> None:
        _, _, sock = server

        request = MetadataRequest([])

        with pytest.raises(KafkaConnectionError):
            await conn.send(request)

        await conn.connect()

        sock.close()
        await asyncio.sleep(0.1)

        with pytest.raises(KafkaConnectionError):
            await conn.send(request)
@dimastbk dimastbk linked a pull request Aug 19, 2024 that will close this issue
4 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant