diff --git a/tests/test_base.py b/tests/test_base.py index a8eb3b4d..55f24ec8 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -873,6 +873,23 @@ def test_loop_call_later_handle_when_after_fired(self): self.loop.run_until_complete(fut) self.assertEqual(handle.when(), when) + def test_get_ready_queue(self): + l1 = len(self.loop.get_ready_queue()) + self.loop.call_soon(lambda: None) + l2 = len(self.loop.get_ready_queue()) + self.assertEqual(l1, l2 - 1) + + def test_ready_handle(self): + def cb(a, b): + return None + args = 1, 2 + self.loop.call_soon(cb, *args) + handle = list(self.loop.get_ready_queue())[-1] + self.assertIsInstance(handle, uvloop.loop.Handle) + cb2, args2 = handle.get_callback() + self.assertIs(cb, cb2) + self.assertEqual(args, args2) + class TestBaseAIO(_TestBase, AIOTestCase): pass diff --git a/uvloop/cbhandles.pyx b/uvloop/cbhandles.pyx index bc67b73d..acaf082f 100644 --- a/uvloop/cbhandles.pyx +++ b/uvloop/cbhandles.pyx @@ -161,6 +161,16 @@ cdef class Handle: def cancelled(self): return self._cancelled + def get_callback(self): + """ + Allow access to a python callback and its args. + Return a (callback, args) tuple or None if it is an + internal callback. + """ + if self.cb_type == 1: + return self.arg1, self.arg2 + return None + @cython.no_gc_clear @cython.freelist(DEFAULT_FREELIST_SIZE) diff --git a/uvloop/loop.pyi b/uvloop/loop.pyi index 05d7714b..9d083606 100644 --- a/uvloop/loop.pyi +++ b/uvloop/loop.pyi @@ -7,6 +7,7 @@ from typing import ( Any, Awaitable, Callable, + Deque, Dict, Generator, List, @@ -292,3 +293,4 @@ class Loop: *, fallback: bool = ... ) -> int: ... + def get_ready_queue(self) -> Deque[asyncio.Handle]: ... diff --git a/uvloop/loop.pyx b/uvloop/loop.pyx index f6cb4fca..b3929c73 100644 --- a/uvloop/loop.pyx +++ b/uvloop/loop.pyx @@ -3220,6 +3220,9 @@ cdef class Loop: except Exception as ex: self.call_soon_threadsafe(future.set_exception, ex) + # Allow external access to the ready queue for advanced scheduling and introspection + def get_ready_queue(self): + return self._ready # Expose pointer for integration with other C-extensions def libuv_get_loop_t_ptr(loop):