Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow telepath dmon ready status to disconnect clients #1881

Merged
merged 7 commits into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion synapse/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ async def __anit__(self, certdir=None):

self.onfini(self._onDmonFini)

# by default we are ready... ( backward compat )
self.dmonready = True

async def setReady(self, ready):
self.dmonready = ready
if not self.dmonready:
for link in list(self.links):
await link.fini()

async def listen(self, url, **opts):
'''
Bind and listen on the given host/port with possible SSL.
Expand Down Expand Up @@ -336,6 +345,10 @@ async def _onDmonFini(self):

async def _onLinkInit(self, link):

if not self.dmonready:
logger.warning(f'onLinkInit is not ready: {repr(link)}')
return await link.fini()

self.links.add(link)
async def fini():
self.links.discard(link)
Expand Down Expand Up @@ -474,7 +487,6 @@ def _getTaskFiniMesg(self, task, valu):
async def _onTaskV2Init(self, link, mesg):

# t2:init is used by the pool sockets on the client

name = mesg[1].get('name')
sidn = mesg[1].get('sess')
todo = mesg[1].get('todo')
Expand Down
24 changes: 23 additions & 1 deletion synapse/tests/test_daemon.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import synapse.exc as s_exc
import synapse.common as s_common
import synapse.daemon as s_daemon

import synapse.telepath as s_telepath

import synapse.tests.utils as s_t_utils

class Foo:
def woot(self):
return 10

class DaemonTest(s_t_utils.SynTest):

async def test_unixsock_longpath(self):
Expand All @@ -27,3 +32,20 @@ async def test_unixsock_longpath(self):
await dmon.listen(listpath)

self.true(await stream.wait(1))

async def test_dmon_ready(self):

async with await s_daemon.Daemon.anit() as dmon:

host, port = await dmon.listen('tcp://127.0.0.1:0')
dmon.share('foo', Foo())

async with await s_telepath.openurl(f'tcp://127.0.0.1:{port}/foo') as foo:
self.eq(10, await foo.woot())
await dmon.setReady(False)
await foo.waitfini(timeout=2)
self.true(foo.isfini)

with self.raises(s_exc.LinkShutDown):
async with await s_telepath.openurl(f'tcp://127.0.0.1:{port}/foo') as foo:
pass