diff --git a/requirements_doc.txt b/requirements_doc.txt index 8516167fa3..fb86a928d1 100644 --- a/requirements_doc.txt +++ b/requirements_doc.txt @@ -1,4 +1,5 @@ -r requirements_dev.txt +nbconvert==5.6.1 sphinx>=1.8.2,<2.0.0 jupyter>=1.0.0,<2.0.0 hide-code>=0.5.2,<0.5.3 diff --git a/synapse/cortex.py b/synapse/cortex.py index 9db4543ebf..c3701d2487 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -1568,6 +1568,8 @@ async def _runStormSvcAdd(self, iden): await self.svchive.set(iden, sdef) async def runStormSvcEvent(self, iden, name): + assert name in ('add', 'del') + sdef = self.svchive.get(iden) if sdef is None: mesg = f'No storm service with iden: {iden}' @@ -1576,7 +1578,13 @@ async def runStormSvcEvent(self, iden, name): evnt = sdef.get('evts', {}).get(name, {}).get('storm') if evnt is None: return - await s_common.aspin(self.storm(evnt, opts={'vars': {'cmdconf': {'svciden': iden}}})) + + opts = {'vars': {'cmdconf': {'svciden': iden}}} + coro = s_common.aspin(self.storm(evnt, opts=opts)) + if name == 'add': + await coro + else: + self.schedCoro(coro) async def _setStormSvc(self, sdef): diff --git a/synapse/lib/stormtypes.py b/synapse/lib/stormtypes.py index 20c3122453..cd85247dba 100644 --- a/synapse/lib/stormtypes.py +++ b/synapse/lib/stormtypes.py @@ -1444,13 +1444,13 @@ async def _methQueueGets(self, offs=0, wait=True, cull=False, size=None): if size is not None: size = await toint(size) - todo = s_common.todo('coreQueueGets', self.name, offs, wait=wait, size=size) + todo = s_common.todo('coreQueueGets', self.name, offs, cull=cull, wait=wait, size=size) gatekeys = self._getGateKeys('get') async for item in self.runt.dyniter('cortex', todo, gatekeys=gatekeys): yield item - async def _methQueuePuts(self, items, wait=False): + async def _methQueuePuts(self, items): todo = s_common.todo('coreQueuePuts', self.name, items) gatekeys = self._getGateKeys('put') return await self.runt.dyncall('cortex', todo, gatekeys=gatekeys) diff --git a/synapse/tests/test_lib_stormsvc.py b/synapse/tests/test_lib_stormsvc.py index e4f5e5b31b..9a74a3b150 100644 --- a/synapse/tests/test_lib_stormsvc.py +++ b/synapse/tests/test_lib_stormsvc.py @@ -159,7 +159,7 @@ class RealService(s_stormsvc.StormSvc): 'storm': '$lib.queue.add(vertex)', }, 'del': { - 'storm': '$lib.queue.del(vertex)', + 'storm': '$que=$lib.queue.get(vertex) $que.put(done)', }, } @@ -611,9 +611,10 @@ async def test_storm_svcs(self): # make sure stormcmd got deleted self.none(core.getStormCmd('ohhai')) - # ensure fini ran - queue = core.multiqueue.list() - self.len(0, queue) + # ensure del event ran + q = 'for ($o, $m) in $lib.queue.get(vertex).gets(wait=10) {return (($o, $m))}' + retn = await core.callStorm(q) + self.eq(retn, (0, 'done')) # specifically call teardown for svc in core.getStormSvcs(): @@ -833,11 +834,11 @@ async def test_storm_svc_mirror(self): # Make sure it got removed from both self.none(core00.getStormCmd('ohhai')) - queue = core00.multiqueue.list() - self.len(0, queue) - self.notin('foo.bar', core00.stormmods) + q = 'for ($o, $m) in $lib.queue.get(vertex).gets(wait=10) {return (($o, $m))}' + retn = await core00.callStorm(q) + self.eq(retn, (0, 'done')) self.none(core01.getStormCmd('ohhai')) - queue = core01.multiqueue.list() - self.len(0, queue) - self.notin('foo.bar', core01.stormmods) + q = 'for ($o, $m) in $lib.queue.get(vertex).gets(wait=10) {return (($o, $m))}' + retn = await core01.callStorm(q) + self.eq(retn, (0, 'done'))