From e322db0d1639fe896503ab697c72a18f5554defc Mon Sep 17 00:00:00 2001 From: visi Date: Fri, 18 Sep 2020 09:49:39 -0400 Subject: [PATCH 1/3] added ability to disconnect and delay dmon clients --- synapse/daemon.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/synapse/daemon.py b/synapse/daemon.py index f327a0c098..0179df913b 100644 --- a/synapse/daemon.py +++ b/synapse/daemon.py @@ -243,6 +243,18 @@ async def __anit__(self, certdir=None): self.onfini(self._onDmonFini) + # by default we are ready... ( backward compat ) + self.dmonready = asyncio.Event() + self.setReady(True) + + async def setReady(self, ready): + if not ready: + self.dmonready.clear() + for link in list(self.links): + await link.fini() + else: + self.dmonready.set() + async def listen(self, url, **opts): ''' Bind and listen on the given host/port with possible SSL. @@ -334,8 +346,19 @@ async def _onDmonFini(self): if isinstance(share, s_base.Base): await share.fini() + async def _waitForReady(self, timeout=30): + try: + await asyncio.wait_for(self.dmonready.wait(), timeout=timeout) + return True + except TimeoutError: + return False + async def _onLinkInit(self, link): + if not self._waitForReady(): + logger.warning(f'onLinkInit is not ready: {repr(link)}') + return await link.fini() + self.links.add(link) async def fini(): self.links.discard(link) @@ -474,7 +497,6 @@ def _getTaskFiniMesg(self, task, valu): async def _onTaskV2Init(self, link, mesg): # t2:init is used by the pool sockets on the client - name = mesg[1].get('name') sidn = mesg[1].get('sess') todo = mesg[1].get('todo') From 76fe33e6aaa93e5f6136d7c85b9b0b27152ab07a Mon Sep 17 00:00:00 2001 From: visi Date: Fri, 18 Sep 2020 10:09:36 -0400 Subject: [PATCH 2/3] unit test and fixes --- synapse/daemon.py | 4 ++-- synapse/tests/test_daemon.py | 19 ++++++++++++++++++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/synapse/daemon.py b/synapse/daemon.py index 0179df913b..806d7f220a 100644 --- a/synapse/daemon.py +++ b/synapse/daemon.py @@ -245,7 +245,7 @@ async def __anit__(self, certdir=None): # by default we are ready... ( backward compat ) self.dmonready = asyncio.Event() - self.setReady(True) + await self.setReady(True) async def setReady(self, ready): if not ready: @@ -355,7 +355,7 @@ async def _waitForReady(self, timeout=30): async def _onLinkInit(self, link): - if not self._waitForReady(): + if not await self._waitForReady(): logger.warning(f'onLinkInit is not ready: {repr(link)}') return await link.fini() diff --git a/synapse/tests/test_daemon.py b/synapse/tests/test_daemon.py index 6e6dd876d2..75eb2323b3 100644 --- a/synapse/tests/test_daemon.py +++ b/synapse/tests/test_daemon.py @@ -1,9 +1,13 @@ import synapse.common as s_common import synapse.daemon as s_daemon - +import synapse.telepath as s_telepath import synapse.tests.utils as s_t_utils +class Foo: + def woot(self): + return 10 + class DaemonTest(s_t_utils.SynTest): async def test_unixsock_longpath(self): @@ -27,3 +31,16 @@ async def test_unixsock_longpath(self): await dmon.listen(listpath) self.true(await stream.wait(1)) + + async def test_dmon_ready(self): + + async with await s_daemon.Daemon.anit() as dmon: + + host, port = await dmon.listen('tcp://127.0.0.1:0') + dmon.share('foo', Foo()) + + async with await s_telepath.openurl(f'tcp://127.0.0.1:{port}/foo') as foo: + self.eq(10, await foo.woot()) + await dmon.setReady(False) + await foo.waitfini(timeout=2) + self.true(foo.isfini) From 9f22f6e9f83077d26a3b9b98221e2204582e29d7 Mon Sep 17 00:00:00 2001 From: visi Date: Tue, 22 Sep 2020 09:51:02 -0400 Subject: [PATCH 3/3] removed delay on reconnect close and added test case --- synapse/daemon.py | 18 ++++-------------- synapse/tests/test_daemon.py | 5 +++++ 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/synapse/daemon.py b/synapse/daemon.py index 806d7f220a..27ca3f9660 100644 --- a/synapse/daemon.py +++ b/synapse/daemon.py @@ -244,16 +244,13 @@ async def __anit__(self, certdir=None): self.onfini(self._onDmonFini) # by default we are ready... ( backward compat ) - self.dmonready = asyncio.Event() - await self.setReady(True) + self.dmonready = True async def setReady(self, ready): - if not ready: - self.dmonready.clear() + self.dmonready = ready + if not self.dmonready: for link in list(self.links): await link.fini() - else: - self.dmonready.set() async def listen(self, url, **opts): ''' @@ -346,16 +343,9 @@ async def _onDmonFini(self): if isinstance(share, s_base.Base): await share.fini() - async def _waitForReady(self, timeout=30): - try: - await asyncio.wait_for(self.dmonready.wait(), timeout=timeout) - return True - except TimeoutError: - return False - async def _onLinkInit(self, link): - if not await self._waitForReady(): + if not self.dmonready: logger.warning(f'onLinkInit is not ready: {repr(link)}') return await link.fini() diff --git a/synapse/tests/test_daemon.py b/synapse/tests/test_daemon.py index 75eb2323b3..a7c46d7b08 100644 --- a/synapse/tests/test_daemon.py +++ b/synapse/tests/test_daemon.py @@ -1,3 +1,4 @@ +import synapse.exc as s_exc import synapse.common as s_common import synapse.daemon as s_daemon import synapse.telepath as s_telepath @@ -44,3 +45,7 @@ async def test_dmon_ready(self): await dmon.setReady(False) await foo.waitfini(timeout=2) self.true(foo.isfini) + + with self.raises(s_exc.LinkShutDown): + async with await s_telepath.openurl(f'tcp://127.0.0.1:{port}/foo') as foo: + pass