From 0295094818320ab36c123e3426a7ca1beea6492e Mon Sep 17 00:00:00 2001 From: cisphyx Date: Mon, 1 Feb 2021 12:13:44 -0500 Subject: [PATCH] remove spawn --- synapse/cmds/cortex.py | 5 - synapse/cortex.py | 127 ------ synapse/lib/layer.py | 6 - synapse/lib/link.py | 20 - synapse/lib/spawn.py | 534 ------------------------ synapse/lib/storm.py | 1 - synapse/lib/view.py | 7 +- synapse/tests/test_cmds_cortex.py | 6 - synapse/tests/test_cortex.py | 7 - synapse/tests/test_lib_link.py | 15 - synapse/tests/test_lib_spawn.py | 670 ------------------------------ 11 files changed, 1 insertion(+), 1397 deletions(-) delete mode 100644 synapse/lib/spawn.py delete mode 100644 synapse/tests/test_lib_spawn.py diff --git a/synapse/cmds/cortex.py b/synapse/cmds/cortex.py index 621db6a8f6..3679adda36 100644 --- a/synapse/cmds/cortex.py +++ b/synapse/cmds/cortex.py @@ -231,7 +231,6 @@ class StormCmd(s_cli.Cmd): --show : Limit storm events (server-side) to the comma-separated list. --file : Run the storm query specified in the given file path. --optsfile : Run the query with the given options from a JSON/YAML file. - --spawn: (EXPERIMENTAL!) Run the query within a spawned sub-process runtime (read-only). Examples: storm inet:ipv4=1.2.3.4 @@ -254,7 +253,6 @@ class StormCmd(s_cli.Cmd): ('--raw', {}), ('--debug', {}), ('--path', {}), - ('--spawn', {'type': 'flag'}), ('--save-nodes', {'type': 'valu'}), ('query', {'type': 'glob'}), ) @@ -429,9 +427,6 @@ async def runCmdOpts(self, opts): if editformat != 'nodeedits': stormopts['editformat'] = editformat - if opts.get('spawn'): - stormopts['spawn'] = True - nodesfd = None if opts.get('save-nodes'): nodesfd = s_common.genfile(opts.get('save-nodes')) diff --git a/synapse/cortex.py b/synapse/cortex.py index 190bd6b9fc..18ab62c84e 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -495,55 +495,9 @@ async def storm(self, text, opts=None): ''' opts = self._reqValidStormOpts(opts) - if opts.get('spawn'): - await self._execSpawnStorm(text, opts) - return - async for mesg in self.cell.storm(text, opts=opts): yield mesg - async def _execSpawnStorm(self, text, opts): - - view = self.cell._viewFromOpts(opts) - - link = s_scope.get('link') - - opts.pop('spawn', None) - info = { - 'link': link.getSpawnInfo(), - 'view': view.iden, - 'user': self.user.iden, - 'storm': { - 'opts': opts, - 'query': text, - } - } - - await self.cell.boss.promote('storm:spawn', user=self.user, info={'query': text}) - proc = None - mesg = 'Spawn complete' - - try: - - async with self.cell.spawnpool.get() as proc: - if await proc.xact(info): - await link.fini() - - except (asyncio.CancelledError, Exception) as e: - - if not isinstance(e, asyncio.CancelledError): - logger.exception('Error during spawned Storm execution.') - - if not self.cell.isfini: - if proc: - await proc.fini() - - mesg = repr(e) - raise - - finally: - raise s_exc.DmonSpawn(mesg=mesg) - async def reqValidStorm(self, text, opts=None): ''' Parse a Storm query to validate it. @@ -994,11 +948,6 @@ class Cortex(s_cell.Cell): # type: ignore 'description': 'A list of module classes to load.', 'type': 'array' }, - 'spawn:poolsize': { - 'default': 8, - 'description': 'The max number of spare processes to keep around in the storm spawn pool.', - 'type': 'integer' - }, 'storm:log': { 'default': False, 'description': 'Log storm queries via system logger.', @@ -1022,7 +971,6 @@ class Cortex(s_cell.Cell): # type: ignore viewctor = s_view.View.anit layrctor = s_layer.Layer.anit - spawncorector = 'synapse.lib.spawn.SpawnCore' # phase 2 - service storage async def initServiceStorage(self): @@ -1043,8 +991,6 @@ async def initServiceStorage(self): self.feedfuncs = {} self.stormcmds = {} - self.spawnpool = None - self.maxnodes = self.conf.get('max:nodes') self.nodecount = 0 @@ -1149,11 +1095,6 @@ async def initServiceStorage(self): # Finalize coremodule loading & give svchive a shot to load await self._initPureStormCmds() - import synapse.lib.spawn as s_spawn # get around circular dependency - self.spawnpool = await s_spawn.SpawnPool.anit(self) - self.onfini(self.spawnpool) - self.on('user:mod', self._onEvtBumpSpawnPool) - self.dynitems.update({ 'cron': self.agenda, 'cortex': self, @@ -1195,13 +1136,6 @@ async def initServicePassive(self): await self.agenda.stop() await self.stormdmons.stop() - async def _onEvtBumpSpawnPool(self, evnt): - await self.bumpSpawnPool() - - async def bumpSpawnPool(self): - if self.spawnpool is not None: - await self.spawnpool.bump() - @s_nexus.Pusher.onPushAuto('model:depr:lock') async def setDeprLock(self, name, locked): @@ -1439,38 +1373,6 @@ async def listTagModel(self): ''' return list(self.taghive.items()) - async def getSpawnInfo(self): - - if self.spawncorector is None: - mesg = 'spawn storm option not supported on this cortex' - raise s_exc.FeatureNotSupported(mesg=mesg) - - ret = { - 'iden': self.iden, - 'dirn': self.dirn, - 'conf': { - 'storm:log': self.conf.get('storm:log', False), - 'storm:log:level': self.conf.get('storm:log:level', logging.INFO), - 'trigger:enable': self.conf.get('trigger:enable', True), - }, - 'loglevel': logger.getEffectiveLevel(), - 'views': [v.getSpawnInfo() for v in self.views.values()], - 'layers': [await lyr.getSpawnInfo() for lyr in self.layers.values()], - 'storm': { - 'cmds': { - 'cdefs': list(self.storm_cmd_cdefs.items()), - 'ctors': list(self.storm_cmd_ctors.items()), - }, - 'libs': tuple(self.libroot), - 'mods': await self.getStormMods(), - 'pkgs': await self.getStormPkgs(), - 'svcs': [svc.sdef for svc in self.getStormSvcs()], - }, - 'model': await self.getModelDefs(), - 'spawncorector': self.spawncorector, - } - return ret - async def _finiStor(self): await asyncio.gather(*[view.fini() for view in self.views.values()]) await asyncio.gather(*[layr.fini() for layr in self.layers.values()]) @@ -1662,13 +1564,10 @@ def getStorNode(form): self.stormcmds[name] = ctor self.storm_cmd_cdefs[name] = cdef - await self.bumpSpawnPool() - await self.fire('core:cmd:change', cmd=name, act='add') async def _popStormCmd(self, name): self.stormcmds.pop(name, None) - await self.bumpSpawnPool() await self.fire('core:cmd:change', cmd=name, act='del') @@ -1696,7 +1595,6 @@ async def _delStormCmd(self, name): await self.cmdhive.pop(name) self.stormcmds.pop(name, None) - await self.bumpSpawnPool() await self.fire('core:cmd:change', cmd=name, act='del') @@ -1839,8 +1737,6 @@ async def _onload(): logger.warning(f'onload failed for package: {name}') self.schedCoro(_onload()) - await self.bumpSpawnPool() - async def _dropStormPkg(self, pkgdef): ''' Reverse the process of loadStormPkg() @@ -1853,8 +1749,6 @@ async def _dropStormPkg(self, pkgdef): name = cdef.get('name') await self._popStormCmd(name) - await self.bumpSpawnPool() - def getStormSvc(self, name): ssvc = self.svcsbyiden.get(name) @@ -1897,7 +1791,6 @@ async def _addStormSvc(self, sdef): ssvc = await self._setStormSvc(sdef) await self.svchive.set(iden, sdef) - await self.bumpSpawnPool() return ssvc.sdef @@ -1939,8 +1832,6 @@ async def _delStormSvc(self, iden): self.svcsbysvcname.pop(ssvc.svcname, None) await ssvc.fini() - await self.bumpSpawnPool() - async def _delStormSvcPkgs(self, iden): ''' Delete storm packages associated with a service. @@ -2119,8 +2010,6 @@ async def _loadExtModel(self): except Exception as e: logger.warning(f'ext tag prop ({prop}) error: {e}') - await self.bumpSpawnPool() - @contextlib.asynccontextmanager async def watcher(self, wdef): @@ -2188,7 +2077,6 @@ async def _addForm(self, formname, basetype, typeopts, typeinfo): await self.extforms.set(formname, (formname, basetype, typeopts, typeinfo)) await self.fire('core:extmodel:change', form=formname, act='add', type='form') - await self.bumpSpawnPool() @s_nexus.Pusher.onPushAuto('model:form:del') async def delForm(self, formname): @@ -2207,7 +2095,6 @@ async def delForm(self, formname): await self.extforms.pop(formname, None) await self.fire('core:extmodel:change', form=formname, act='del', type='form') - await self.bumpSpawnPool() @s_nexus.Pusher.onPushAuto('model:prop:add') async def addFormProp(self, form, prop, tdef, info): @@ -2223,7 +2110,6 @@ async def addFormProp(self, form, prop, tdef, info): await self.extprops.set(f'{form}:{prop}', (form, prop, tdef, info)) await self.fire('core:extmodel:change', form=form, prop=prop, act='add', type='formprop') - await self.bumpSpawnPool() async def delFormProp(self, form, prop): full = f'{form}:{prop}' @@ -2255,7 +2141,6 @@ async def _delFormProp(self, form, prop): await self.extprops.pop(full, None) await self.fire('core:extmodel:change', form=form, prop=prop, act='del', type='formprop') - await self.bumpSpawnPool() async def delUnivProp(self, prop): udef = self.extunivs.get(prop) @@ -2283,7 +2168,6 @@ async def _delUnivProp(self, prop): self.model.delUnivProp(prop) await self.extunivs.pop(prop, None) await self.fire('core:extmodel:change', name=prop, act='del', type='univ') - await self.bumpSpawnPool() async def addTagProp(self, name, tdef, info): if self.exttagprops.get(name) is not None: @@ -2300,7 +2184,6 @@ async def _addTagProp(self, name, tdef, info): await self.exttagprops.set(name, (name, tdef, info)) await self.fire('core:tagprop:change', name=name, act='add') - await self.bumpSpawnPool() async def delTagProp(self, name): pdef = self.exttagprops.get(name) @@ -2325,7 +2208,6 @@ async def _delTagProp(self, name): await self.exttagprops.pop(name, None) await self.fire('core:tagprop:change', name=name, act='del') - await self.bumpSpawnPool() async def addNodeTag(self, user, iden, tag, valu=(None, None)): ''' @@ -3055,8 +2937,6 @@ async def _addView(self, vdef): view = await self._loadView(node) view.init2() - await self.bumpSpawnPool() - return await view.pack() async def delView(self, iden): @@ -3090,8 +2970,6 @@ async def _delView(self, iden): await self.auth.delAuthGate(iden) - await self.bumpSpawnPool() - async def delLayer(self, iden): layr = self.layers.get(iden, None) if layr is None: @@ -3126,8 +3004,6 @@ async def _delLayer(self, iden, nexsitem): layr.deloffs = nexsitem[0] - await self.bumpSpawnPool() - async def setViewLayers(self, layers, iden=None): ''' Args: @@ -3139,7 +3015,6 @@ async def setViewLayers(self, layers, iden=None): raise s_exc.NoSuchView(iden=iden) await view.setLayers(layers) - await self.bumpSpawnPool() def getLayer(self, iden=None): ''' @@ -3297,8 +3172,6 @@ def ondel(): for pdef in layrinfo.get('pulls', {}).values(): await self.runLayrPull(layr, pdef) - await self.bumpSpawnPool() - await self.fire('core:layr:add', iden=layr.iden) return layr diff --git a/synapse/lib/layer.py b/synapse/lib/layer.py index b816b1ab09..1daeaabd11 100644 --- a/synapse/lib/layer.py +++ b/synapse/lib/layer.py @@ -1379,12 +1379,6 @@ async def _initLayerStorage(self): if self.layrvers < 3: await self._layrV2toV3() - async def getSpawnInfo(self): - info = await self.pack() - info['dirn'] = self.dirn - info['ctor'] = self.ctorname - return info - async def getLayerSize(self): ''' Get the total storage size for the layer. diff --git a/synapse/lib/link.py b/synapse/lib/link.py index 6c66844b7b..8957886be2 100644 --- a/synapse/lib/link.py +++ b/synapse/lib/link.py @@ -79,13 +79,6 @@ async def linksock(): link0 = await Link.anit(reader, writer, info={'unix': True}) return link0, sock1 -async def fromspawn(spawninfo): - sock = spawninfo.get('sock') - info = spawninfo.get('info', {}) - info['spawn'] = True - reader, writer = await asyncio.open_connection(sock=sock) - return await Link.anit(reader, writer, info=info) - class Link(s_base.Base): ''' A Link() is created to wrap a socket reader/writer. @@ -154,19 +147,6 @@ def getTlsPeerCn(self): if name == 'commonName': return valu - def getSpawnInfo(self): - info = {} - - # selectively add info for pickle... - if self.info.get('unix'): - info['unix'] = True - - return { - 'info': info, - # a bit dirty, but there's no other way... - 'sock': self.reader._transport._sock, - } - def getAddrInfo(self): ''' Get a summary of address information related to the link. diff --git a/synapse/lib/spawn.py b/synapse/lib/spawn.py deleted file mode 100644 index 19848e7e01..0000000000 --- a/synapse/lib/spawn.py +++ /dev/null @@ -1,534 +0,0 @@ -''' -Spawn is mechanism so that a cortex can execute different queries in separate processes -''' - -import os -import asyncio -import logging -import threading -import contextlib -import collections -import multiprocessing -import concurrent.futures - -import synapse.exc as s_exc -import synapse.glob as s_glob -import synapse.common as s_common -import synapse.cortex as s_cortex -import synapse.daemon as s_daemon -import synapse.telepath as s_telepath -import synapse.datamodel as s_datamodel - -import synapse.lib.base as s_base -import synapse.lib.boss as s_boss -import synapse.lib.coro as s_coro -import synapse.lib.hive as s_hive -import synapse.lib.link as s_link -import synapse.lib.view as s_view -import synapse.lib.dyndeps as s_dyndeps -import synapse.lib.hiveauth as s_hiveauth - -logger = logging.getLogger(__name__) - -async def storm(core, item): - ''' - Storm implementation for SpawnCore use. - ''' - useriden = item.get('user') - viewiden = item.get('view') - - storminfo = item.get('storm') - - opts = storminfo.get('opts') - text = storminfo.get('query') - - if opts is None: - opts = {} - - user = core.auth.user(useriden) - if user is None: - raise s_exc.NoSuchUser(iden=useriden) - - view = core.views.get(viewiden) - if view is None: - raise s_exc.NoSuchView(iden=viewiden) - - opts['user'] = useriden - async for mesg in view.storm(text, opts=opts): - yield mesg - -async def _innerloop(core, todo, done): - ''' - Inner loop for the multiprocessing target code. - - Args: - spawninfo (dict): Spawninfo dictionary. - todo (multiprocessing.Queue): RX Queue - done (multiprocessing.Queue): TX Queue - - Returns: - None: Returns None. - ''' - item = await s_coro.executor(todo.get) - if item is None: - return - - link = await s_link.fromspawn(item.get('link')) - - await s_daemon.t2call(link, storm, (core, item,), {}) - - wasfini = link.isfini - - await link.fini() - - await s_coro.executor(done.put, wasfini) - - return True - -async def _workloop(spawninfo, todo, done): - ''' - Workloop executed by the multiprocessing target. - - Args: - spawninfo (dict): Spawninfo dictionary. - todo (multiprocessing.Queue): RX Queue - done (multiprocessing.Queue): TX Queue - - Returns: - None: Returns None. - ''' - s_glob.iAmLoop() - - ctorname = spawninfo.get('spawncorector') - ctor = s_dyndeps.tryDynLocal(ctorname) - - async with await ctor.anit(spawninfo) as core: - - while not core.isfini: - - if not await _innerloop(core, todo, done): - break - -def corework(spawninfo, todo, done): - ''' - Multiprocessing target for hosting a SpawnCore launched by a SpawnProc. - ''' - - # This logging call is okay to run since we're executing in - # our own process space and no logging has been configured. - s_common.setlogging(logger, spawninfo.get('loglevel')) - - asyncio.run(_workloop(spawninfo, todo, done)) - -class SpawnProc(s_base.Base): - ''' - ''' - async def __anit__(self, core): - - await s_base.Base.__anit__(self) - - self.core = core - self.iden = s_common.guid() - self.proc = None - - self.ready = asyncio.Event() - self.mpctx = multiprocessing.get_context('spawn') - - name = f'SpawnProc#{self.iden[:8]}' - self.threadpool = concurrent.futures.ThreadPoolExecutor(max_workers=2, thread_name_prefix=name) - - self.todo = self.mpctx.Queue() - self.done = self.mpctx.Queue() - self.proc = None # type: multiprocessing.Process - self.procstat = None - self.obsolete = False - - spawninfo = await core.getSpawnInfo() - self.finievent = threading.Event() - - @s_common.firethread - def procwaiter(): - ''' - Wait for child process to exit - ''' - self.procstat = self.proc.join() - self.proc.close() - if not self.isfini: - self.schedCoroSafe(self.fini()) - - @s_common.firethread - def finiwaiter(): - ''' - Wait for the SpawnProc to complete on another thread (so we can block) - ''' - self.finievent.wait() - self.todo.put(None) - self.todo.close() - self.done.put(None) - self.done.close() - self.todo.join_thread() - self.done.join_thread() - if self.procstat is None: - try: - self.proc.terminate() - except ValueError: - pass - self.threadpool.shutdown() - - # avoid blocking the ioloop during process construction - def getproc(): - self.proc = self.mpctx.Process(target=corework, args=(spawninfo, self.todo, self.done)) - self.proc.start() - - await self.executor(getproc) - finiwaiter() - procwaiter() - - async def fini(): - self.obsolete = True - self.finievent.set() - - self.onfini(fini) - - def __repr__(self): # pragma: no cover - info = [self.__class__.__module__ + '.' + self.__class__.__name__] - info.append(f'at {hex(id(self))}') - info.append(f'isfini={self.isfini}') - info.append(f'iden={self.iden}') - info.append(f'obsolete={self.obsolete}') - if self.proc and not self.proc._closed: - info.append(f'proc={self.proc.pid}') - else: - info.append('proc=None') - return '<{}>'.format(' '.join(info)) - - async def retire(self): - logger.debug(f'Proc {self} marked obsolete') - self.obsolete = True - - async def xact(self, mesg): - - def doit(): - self.todo.put(mesg) - try: - return self.done.get() - except (TypeError, OSError) as e: - logger.warning('Queue torn out from underneath me. (%s)', e) - assert self.isfini - return True - - return await self.executor(doit) - - def executor(self, func, *args, **kwargs): - def real(): - return func(*args, **kwargs) - - return asyncio.get_running_loop().run_in_executor(self.threadpool, real) - -class SpawnPool(s_base.Base): - - async def __anit__(self, core): - - await s_base.Base.__anit__(self) - - self.core = core - - self.poolsize = await core.getConfOpt('spawn:poolsize') - - self.spawns = {} - self.spawnq = collections.deque() - - async def fini(): - await self.kill() - - self.onfini(fini) - - async def bump(self): - if not self.spawns: - return - [await s.retire() for s in list(self.spawns.values())] - [await s.fini() for s in self.spawnq] - self.spawnq.clear() - - async def kill(self): - if not self.spawns: - return - self.spawnq.clear() - [await s.fini() for s in list(self.spawns.values())] - - @contextlib.asynccontextmanager - async def get(self): - ''' - Get a SpawnProc instance; either from the pool or a new process. - - Returns: - SpawnProc: Yields a SpawnProc. This is placed back into the pool if no exceptions occur. - ''' - - if self.isfini: # pragma: no cover - raise s_exc.IsFini() - - proc = None - - if self.spawnq: - proc = self.spawnq.popleft() - - if proc is None: - proc = await self._new() - - yield proc - - await self._put(proc) - - async def _put(self, proc): - - if not proc.obsolete and len(self.spawnq) < self.poolsize: - self.spawnq.append(proc) - return - - await proc.fini() - - async def _new(self): - - proc = await SpawnProc.anit(self.core) - - logger.debug(f'Made new SpawnProc {proc}') - - self.spawns[proc.iden] = proc - - async def fini(): - self.spawns.pop(proc.iden, None) - - proc.onfini(fini) - - return proc - -class SpawnCore(s_base.Base): - ''' - A SpawnCore instance is the substitute for a Cortex in non-cortex processes - ''' - - async def __anit__(self, spawninfo): - - await s_base.Base.__anit__(self) - - self.pid = os.getpid() - self.views = {} - self.layers = {} - self.nexsroot = None - self.isactive = False - self.spawninfo = spawninfo - - self.conf = spawninfo.get('conf') - self.iden = spawninfo.get('iden') - self.dirn = spawninfo.get('dirn') - - self.trigson = self.conf.get('trigger:enable') - - self.svcsbyiden = {} - self.svcsbyname = {} - self.svcsbysvcname = {} - - self.stormcmds = {} - self.storm_cmd_ctors = {} - self.storm_cmd_cdefs = {} - self.stormmods = spawninfo['storm']['mods'] - self.pkginfo = spawninfo['storm']['pkgs'] - self.svcinfo = spawninfo['storm']['svcs'] - - self.model = s_datamodel.Model() - self.model.addDataModels(spawninfo.get('model')) - - self.stormpkgs = {} # name: pkgdef - await self._initStormCmds() - - for sdef in self.svcinfo: - await self._addStormSvc(sdef) - - for pkgdef in self.pkginfo: - await self._tryLoadStormPkg(pkgdef) - - for name, ctor in spawninfo['storm']['cmds']['ctors']: - self.stormcmds[name] = ctor - - for name, cdef in spawninfo['storm']['cmds']['cdefs']: - self.storm_cmd_cdefs[name] = cdef - - self.libroot = spawninfo.get('storm').get('libs') - - self.boss = await s_boss.Boss.anit() - self.onfini(self.boss.fini) - - self.prox = await s_telepath.openurl(f'cell://{self.dirn}') - self.onfini(self.prox.fini) - - self.hive = await s_hive.openurl(f'cell://{self.dirn}', name='*/hive') - self.onfini(self.hive) - - node = await self.hive.open(('auth',)) - self.auth = await s_hiveauth.Auth.anit(node) - self.onfini(self.auth.fini) - for layrinfo in self.spawninfo.get('layers'): - await self._initLayr(layrinfo) - - for viewinfo in self.spawninfo.get('views'): - - iden = viewinfo.get('iden') - path = ('cortex', 'views', iden) - - node = await self.hive.open(path) - view = await s_view.View.anit(self, node) - - self.onfini(view) - - self.views[iden] = view - - for view in self.views.values(): - view.init2() - - # TODO maybe we should subclass telepath client - self.addStormDmon = self.prox.addStormDmon - self.delStormDmon = self.prox.delStormDmon - self.getStormDmon = self.prox.getStormDmon - self.getStormDmons = self.prox.getStormDmons - - self.bumpStormDmon = self.prox.bumpStormDmon - self.enableStormDmon = self.prox.enableStormDmon - self.disableStormDmon = self.prox.disableStormDmon - - # Cell specific apis - self.setHiveKey = self.prox.setHiveKey - self.getHiveKey = self.prox.getHiveKey - self.popHiveKey = self.prox.popHiveKey - self.getHiveKeys = self.prox.getHiveKeys - self.getUserDef = self.prox.getUserDef - - async def _initLayr(self, layrinfo): - iden = layrinfo.get('iden') - ctorname = layrinfo.get('ctor') - - ctor = s_dyndeps.tryDynLocal(ctorname) - - layrinfo['readonly'] = True - - layr = await self._ctorLayr(ctor, layrinfo) - - self.onfini(layr) - - self.layers[iden] = layr - - async def _ctorLayr(self, ctor, layrinfo): - iden = layrinfo.get('iden') - layrdirn = s_common.genpath(self.dirn, 'layers', iden) - layr = await ctor.anit(layrinfo, layrdirn) - return layr - - async def dyncall(self, iden, todo, gatekeys=()): - return await self.prox.dyncall(iden, todo, gatekeys=gatekeys) - - async def dyniter(self, iden, todo, gatekeys=()): - async for item in self.prox.dyniter(iden, todo, gatekeys=gatekeys): - yield item - - def _logStormQuery(self, text, user): - ''' - Log a storm query. - ''' - if self.conf.get('storm:log'): - lvl = self.conf.get('storm:log:level') - logger.log(lvl, 'Executing spawn storm query {%s} as [%s] from [%s]', text, user.name, self.pid) - - async def addStormPkg(self, pkgdef): - ''' - Do it for the proxy, then myself - ''' - todo = s_common.todo('addStormPkg', pkgdef) - await self.dyncall('cortex', todo) - - await self.loadStormPkg(pkgdef) - - async def delStormPkg(self, name): - ''' - Do it for the proxy, then myself - ''' - todo = s_common.todo('delStormPkg', name) - await self.dyncall('cortex', todo) - - pkgdef = self.stormpkgs.get(name) - if pkgdef is None: - return - - await self._dropStormPkg(pkgdef) - - async def bumpSpawnPool(self): - pass - - async def getStormPkgs(self): - return list(self.stormpkgs.values()) - - async def _addStormSvc(self, sdef): - - iden = sdef.get('iden') - ssvc = self.svcsbyiden.get(iden) - if ssvc is not None: - return ssvc.sdef - - ssvc = await self._setStormSvc(sdef) - - return ssvc.sdef - - async def _delStormSvcPkgs(self, iden): - ''' - For now don't actually run this in the spawn case. This only needs to be - done in the master Cortex, not in spawns. Deleting a storm service package - from a spawn should not be making persistent changes. - ''' - pass - - async def _hndladdStormPkg(self, pdef): - ''' - For now don't actually run this in the spawn case. This only needs to be - done in the master Cortex, not in spawns. Adding a storm service package - from a spawn should not be making persistent changes. - ''' - # Note - this represents the bottom half of addStormPkg which is made - # via the @s_nexus.Pusher.onPushAuto('pkg:add') decorator. - pass - - async def setStormSvcEvents(self, iden, edef): - svc = self.svcsbyiden.get(iden) - if svc is None: - mesg = f'No storm service with iden: {iden}' - raise s_exc.NoSuchStormSvc(mesg=mesg) - - sdef = svc.sdef - - sdef['evts'] = edef - return sdef - - # A little selective inheritance - # TODO: restructure cortex to avoid this hackery - _setStormSvc = s_cortex.Cortex._setStormSvc - _confirmStormPkg = s_cortex.Cortex._confirmStormPkg - _dropStormPkg = s_cortex.Cortex._dropStormPkg - _reqStormCmd = s_cortex.Cortex._reqStormCmd - _setStormCmd = s_cortex.Cortex._setStormCmd - _tryLoadStormPkg = s_cortex.Cortex._tryLoadStormPkg - _trySetStormCmd = s_cortex.Cortex._trySetStormCmd - addStormCmd = s_cortex.Cortex.addStormCmd - getDataModel = s_cortex.Cortex.getDataModel - getStormCmd = s_cortex.Cortex.getStormCmd - getStormCmds = s_cortex.Cortex.getStormCmds - getStormLib = s_cortex.Cortex.getStormLib - getStormMods = s_cortex.Cortex.getStormMods - getStormPkg = s_cortex.Cortex.getStormPkg - getStormQuery = s_cortex.Cortex.getStormQuery - getStormSvc = s_cortex.Cortex.getStormSvc - loadStormPkg = s_cortex.Cortex.loadStormPkg - - _initStormCmds = s_cortex.Cortex._initStormCmds - _initStormOpts = s_cortex.Cortex._initStormOpts - - _viewFromOpts = s_cortex.Cortex._viewFromOpts - _userFromOpts = s_cortex.Cortex._userFromOpts - getView = s_cortex.Cortex.getView diff --git a/synapse/lib/storm.py b/synapse/lib/storm.py index 629e5dbd4e..e401da51ed 100644 --- a/synapse/lib/storm.py +++ b/synapse/lib/storm.py @@ -260,7 +260,6 @@ def reqValidPkgdef(pkgdef): 'stormopts': { 'type': 'object', 'properties': { - 'spawn': {'type': 'boolean'}, 'repr': {'type': 'boolean'}, 'path': {'type': 'string'}, 'show': {'type': 'array', 'items': {'type': 'string'}} diff --git a/synapse/lib/view.py b/synapse/lib/view.py index a07eb787f6..d5c8e5e6c1 100644 --- a/synapse/lib/view.py +++ b/synapse/lib/view.py @@ -110,7 +110,7 @@ async def __anit__(self, core, node): 'tag:prop:set': self._tagPropSetConfirm, } - # isolate some initialization to easily override for SpawnView. + # isolate some initialization to easily override. await self._initViewLayers() async def getStorNodes(self, buid): @@ -720,11 +720,6 @@ async def delete(self): await self.fini() await self.node.pop() - def getSpawnInfo(self): - return { - 'iden': self.iden, - } - async def addNodeEdits(self, edits, meta): ''' A telepath compatible way to apply node edits to a view. diff --git a/synapse/tests/test_cmds_cortex.py b/synapse/tests/test_cmds_cortex.py index ee5acaf68f..874fe63507 100644 --- a/synapse/tests/test_cmds_cortex.py +++ b/synapse/tests/test_cmds_cortex.py @@ -237,12 +237,6 @@ async def runLongStorm(): await realcore.nodes('[ inet:ipv4=1.2.3.4 +#visi.woot ]') await s_lmdbslab.Slab.syncLoopOnce() - # The storm --spawn option works - outp = self.getTestOutp() - cmdr = await s_cmdr.getItemCmdr(core, outp=outp) - await cmdr.runCmdLine('storm --spawn inet:ipv4=1.2.3.4') - outp.expect('#visi.woot') - async def test_log(self): def check_locs_cleanup(cobj): diff --git a/synapse/tests/test_cortex.py b/synapse/tests/test_cortex.py index dbd91b7430..84229d32e4 100644 --- a/synapse/tests/test_cortex.py +++ b/synapse/tests/test_cortex.py @@ -4237,13 +4237,6 @@ async def test_cortex_storm_lib_dmon(self): with self.raises(s_exc.NoSuchIden): await core.nodes('$lib.dmon.del(newp)') - async def test_cortex_spawn_notsupported(self): - - async with self.getTestCore() as core: - core.spawncorector = None - with self.raises(s_exc.FeatureNotSupported): - await core.getSpawnInfo() - async def test_cortex_storm_dmon_view(self): with self.getTestDir() as dirn: diff --git a/synapse/tests/test_lib_link.py b/synapse/tests/test_lib_link.py index 92b005b082..89ad8b531f 100644 --- a/synapse/tests/test_lib_link.py +++ b/synapse/tests/test_lib_link.py @@ -152,18 +152,3 @@ def writer(sock): await link1.fini() sock1.close() - - async def test_link_fromspawn(self): - - link0, sock0 = await s_link.linksock() - - info = link0.getSpawnInfo() - link1 = await s_link.fromspawn(info) - - await link1.send(b'V') - self.eq(sock0.recv(1), b'V') - - sock0.close() - - await link0.fini() - await link1.fini() diff --git a/synapse/tests/test_lib_spawn.py b/synapse/tests/test_lib_spawn.py deleted file mode 100644 index 6635523bea..0000000000 --- a/synapse/tests/test_lib_spawn.py +++ /dev/null @@ -1,670 +0,0 @@ -import os -import signal -import asyncio -import logging -import multiprocessing - -import synapse.exc as s_exc -import synapse.glob as s_glob -import synapse.cortex as s_cortex - -import synapse.lib.coro as s_coro -import synapse.lib.link as s_link -import synapse.lib.spawn as s_spawn -import synapse.lib.msgpack as s_msgpack -import synapse.lib.lmdbslab as s_lmdbslab - -import synapse.tests.utils as s_test -import synapse.tests.test_lib_stormsvc as s_test_svc - -logger = logging.getLogger(__name__) - -def make_core(dirn, conf, queries, queue, event): - ''' - Multiprocessing target for making a Cortex for local use of a SpawnCore instance. - ''' - - async def workloop(): - s_glob.iAmLoop() - async with await s_cortex.Cortex.anit(dirn=dirn, conf=conf) as core: - await core.addTagProp('added', ('time', {}), {}) - for q in queries: - await core.nodes(q) - - await core.view.layers[0].layrslab.sync() - - spawninfo = await core.getSpawnInfo() - queue.put(spawninfo) - # Don't block the ioloop.. - await s_coro.executor(event.wait) - - asyncio.run(workloop()) - -class CoreSpawnTest(s_test.SynTest): - - async def test_spawncore(self): - # This test makes a real Cortex in a remote process, and then - # gets the spawninfo from that real Cortex in order to make a - # local SpawnCore. This avoids the problem of being unable to - # open lmdb environments multiple times by the same process - # and allows direct testing of the SpawnCore object. - - mpctx = multiprocessing.get_context('spawn') - queue = mpctx.Queue() - event = mpctx.Event() - - conf = { - 'storm:log': True, - 'storm:log:level': logging.INFO, - 'modules': [('synapse.tests.utils.TestModule', {})], - } - queries = [ - '[test:str="Cortex from the aether!"]', - ] - with self.getTestDir() as dirn: - args = (dirn, conf, queries, queue, event) - proc = mpctx.Process(target=make_core, args=args) - try: - proc.start() - spawninfo = queue.get(timeout=30) - - async with await s_spawn.SpawnCore.anit(spawninfo) as core: - root = await core.auth.getUserByName('root') - q = '''test:str - $lib.print($lib.str.format("{n}", n=$node.repr())) - | limit 1''' - item = { - 'user': root.iden, - 'view': list(core.views.keys())[0], - 'storm': { - 'query': q, - 'opts': None, - } - } - - # Test the storm implementation used by spawncore - msgs = await s_test.alist(s_spawn.storm(core, item)) - podes = [m[1] for m in msgs if m[0] == 'node'] - e = 'Cortex from the aether!' - self.len(1, podes) - self.eq(podes[0][0], ('test:str', e)) - self.stormIsInPrint(e, msgs) - - # Direct test of the _innerloop code. - todo = mpctx.Queue() - done = mpctx.Queue() - - # Test poison - this would cause the corework to exit - todo.put(None) - self.none(await s_spawn._innerloop(core, todo, done)) - - # Test a real item with a link associated with it. This ends - # up getting a bunch of telepath message directly. - todo_item = item.copy() - link0, sock0 = await s_link.linksock() - todo_item['link'] = link0.getSpawnInfo() - todo.put(todo_item) - self.true(await s_spawn._innerloop(core, todo, done)) - resp = done.get(timeout=12) - self.false(resp) - buf0 = sock0.recv(1024 * 16) - unpk = s_msgpack.Unpk() - msgs = [msg for (offset, msg) in unpk.feed(buf0)] - self.eq({'t2:genr', 't2:yield'}, - {m[0] for m in msgs}) - - await link0.fini() # We're done with the link now - todo.close() - done.close() - - # Test the workloop directly - this again just gets telepath - # messages back. This does use poison to kill the workloop. - todo = mpctx.Queue() - done = mpctx.Queue() - - task = asyncio.create_task(s_spawn._workloop(spawninfo, todo, done)) - await asyncio.sleep(0.01) - link1, sock1 = await s_link.linksock() - todo_item = item.copy() - todo_item['link'] = link1.getSpawnInfo() - todo.put(todo_item) - # Don't block the IO loop! - resp = await s_coro.executor(done.get, timeout=12) - self.false(resp) - buf0 = sock1.recv(1024 * 16) - unpk = s_msgpack.Unpk() - msgs = [msg for (offset, msg) in unpk.feed(buf0)] - self.eq({'t2:genr', 't2:yield'}, - {m[0] for m in msgs}) - await link1.fini() # We're done with the link now - # Poison the queue - this should close the task - todo.put(None) - self.none(await asyncio.wait_for(task, timeout=12)) - - todo.close() - done.close() - - finally: - - queue.close() - event.set() - proc.join(12) - - async def test_cortex_spawn_telepath(self): - conf = { - 'storm:log': True, - 'storm:log:level': logging.INFO, - } - - async with self.getTestCore(conf=conf) as core: - pkgdef = { - 'name': 'spawn', - 'version': (0, 0, 1), - 'synapse_minversion': (2, 8, 0), - 'commands': ( - { - 'name': 'passthrough', - 'desc': 'passthrough input nodes and print their ndef', - 'storm': '$lib.print($node.ndef())', - }, - ), - } - - await self.runCoreNodes(core, '[ inet:dns:a=(vertex.link, 1.2.3.4) ] -> inet:ipv4 [ :asn=0 ]') - - async with core.getLocalProxy() as prox: - - opts = {'spawn': True} - - # check that regular node lifting / pivoting works - msgs = await prox.storm('inet:fqdn=vertex.link -> inet:dns:a -> inet:ipv4', opts=opts).list() - podes = [m[1] for m in msgs if m[0] == 'node'] - self.len(1, podes) - self.eq(podes[0][0], ('inet:ipv4', 0x01020304)) - - # test that runt node lifting works - msgs = await prox.storm('syn:prop=inet:dns:a:fqdn :form -> syn:form', opts=opts).list() - podes = [m[1] for m in msgs if m[0] == 'node'] - self.len(1, podes) - self.eq(podes[0][0], ('syn:form', 'inet:dns:a')) - - # make sure node creation fails cleanly - msgs = await prox.storm('[ inet:email=visi@vertex.link ]', opts=opts).list() - errs = [m[1] for m in msgs if m[0] == 'err'] - self.eq(errs[0][0], 'IsReadOnly') - - # make sure storm commands are loaded - msgs = await prox.storm('inet:ipv4=1.2.3.4 | limit 1', opts=opts).list() - podes = [m[1] for m in msgs if m[0] == 'node'] - self.len(1, podes) - self.eq(podes[0][0], ('inet:ipv4', 0x01020304)) - - # make sure graph rules work - msgs = await prox.storm('inet:dns:a', opts={'spawn': True, 'graph': True}).list() - podes = [m[1] for m in msgs if m[0] == 'node'] - ndefs = list(sorted(p[0] for p in podes)) - - self.eq(ndefs, ( - ('inet:asn', 0), - ('inet:dns:a', ('vertex.link', 16909060)), - ('inet:fqdn', 'link'), - ('inet:fqdn', 'vertex.link'), - ('inet:ipv4', 16909060), - )) - - # Test a python cmd that came in via a ctor - msgs = await prox.storm('inet:ipv4=1.2.3.4 | testcmd', opts=opts).list() - self.stormIsInPrint("testcmd: ('inet:ipv4', 16909060)", msgs) - podes = [m[1] for m in msgs if m[0] == 'node'] - self.len(1, podes) - - # Test a macro execution - msgs = await prox.storm('macro.set macrotest ${ inet:ipv4 }', opts=opts).list() - self.stormIsInPrint('Set macro: macrotest', msgs) - msgs = await prox.storm('macro.list', opts=opts).list() - self.stormIsInPrint('macrotest', msgs) - self.stormIsInPrint('owner: root', msgs) - msgs = await prox.storm('macro.exec macrotest', opts=opts).list() - podes = [m[1] for m in msgs if m[0] == 'node'] - self.len(1, podes) - self.eq(podes[0][0], ('inet:ipv4', 0x01020304)) - msgs = await prox.storm('macro.del macrotest', opts=opts).list() - self.stormIsInPrint('Removed macro: macrotest', msgs) - msgs = await prox.storm('macro.exec macrotest', opts=opts).list() - self.stormIsInErr('Macro name not found: macrotest', msgs) - - # Test a simple stormlib command - msgs = await prox.storm('$lib.print("hello")', opts=opts).list() - self.stormIsInPrint("hello", msgs) - - # test a complex stormlib command using lib deferences - marsopts = {'spawn': True, 'vars': {'world': 'mars'}} - q = '$lib.print($lib.str.format("hello {world}", world=$world))' - msgs = await prox.storm(q, opts=marsopts).list() - self.stormIsInPrint("hello mars", msgs) - - # Model deference off of the snap via stormtypes - q = '''$valu=$lib.time.format('200103040516', '%Y %m %d') - $lib.print($valu) - ''' - msgs = await prox.storm(q, opts=opts).list() - self.stormIsInPrint('2001 03 04', msgs) - - # Test sleeps / fires from a spawnproc - q = '''$tick=$lib.time.now() - $lib.time.sleep(0.1) - $tock=$lib.time.now() - $lib.fire(took, tick=$tick, tock=$tock) - ''' - msgs = await prox.storm(q, opts=opts).list() - fires = [m[1] for m in msgs if m[0] == 'storm:fire'] - self.len(1, fires) - fire_data = fires[0].get('data') - self.ne(fire_data.get('tick'), fire_data.get('tock')) - - # Add a stormpkg - this should fini the spawnpool spawnprocs - procs = [p for p in core.spawnpool.spawns.values()] - self.isin(len(procs), (1, 2, 3)) - - await core.addStormPkg(pkgdef) - - for proc in procs: - self.true(await proc.waitfini(6)) - - self.len(0, core.spawnpool.spawnq) - self.len(0, core.spawnpool.spawns) - - # Test a pure storm commands - msgs = await prox.storm('inet:fqdn=vertex.link | passthrough', opts=opts).list() - self.stormIsInPrint("('inet:fqdn', 'vertex.link')", msgs) - - # No guarantee that we've gotten the proc back into - # the pool so we cannot check the size of spawnq - self.len(1, core.spawnpool.spawns) - - # Test launching a bunch of spawn queries at the same time - donecount = 0 - - await self.runCoreNodes(core, '[test:int=1]') - # force a commit - await s_lmdbslab.Slab.syncLoopOnce() - - async def taskfunc(i): - nonlocal donecount - msgs = await prox.storm('test:int=1 | sleep 3', opts=opts).list() - if len(msgs) == 3: - donecount += 1 - - n = 4 - tasks = [taskfunc(i) for i in range(n)] - try: - await asyncio.wait_for(asyncio.gather(*tasks), timeout=80) - except asyncio.TimeoutError: - self.fail('Timeout awaiting for spawn tasks to finish.') - - self.eq(donecount, n) - - # test a remote boss kill of the client side task - logger.info('telepath ps/kill test.') - evnt = asyncio.Event() - msgs = {'msgs': []} - - tf2opts = {'spawn': True, 'vars': {'hehe': 'haha'}} - - async def taskfunc2(): - async for mesg in prox.storm('test:int=1 | sleep 15', opts=tf2opts): - msgs['msgs'].append(mesg) - if mesg[0] == 'node': - evnt.set() - return True - - victimproc = core.spawnpool.spawnq[0] # type: s_spawn.SpawnProc - fut = core.schedCoro(taskfunc2()) - self.true(await asyncio.wait_for(evnt.wait(), timeout=6)) - tasks = await prox.ps() - new_idens = [task.get('iden') for task in tasks] - self.len(1, new_idens) - await prox.kill(new_idens[0]) - - # Ensure the task cancellation tore down the spawnproc - self.true(await victimproc.waitfini(6)) - - await self.asyncraises(s_exc.LinkShutDown, fut) - # We did not get a fini message since the proc was killed - self.eq({m[0] for m in msgs.get('msgs')}, {'init', 'node'}) - - # test kill -9 ing a spawn proc - logger.info('sigkill test') - assert len(core.spawnpool.spawnq) - victimproc = core.spawnpool.spawnq[0] # type: s_spawn.SpawnProc - victimpid = victimproc.proc.pid - sig = signal.SIGKILL - - retn = [] - - async def taskfunc3(): - async for item in prox.storm('test:int=1 | sleep 15', opts=opts): - retn.append(item) - - fut = core.schedCoro(taskfunc3()) - await asyncio.sleep(1) - os.kill(victimpid, sig) - self.true(await victimproc.waitfini(6)) - await self.asyncraises(s_exc.LinkShutDown, fut) - # We did not get a fini messages since the proc was killed - self.eq({m[0] for m in retn}, {'init', 'node'}) - - async def test_queues(self): - conf = { - 'storm:log': True, - 'storm:log:level': logging.INFO, - } - - # Largely mimics test_storm_lib_queue - async with self.getTestCore(conf=conf) as core: - opts = {'spawn': True} - - async with core.getLocalProxy() as prox: - - msgs = await prox.storm('queue.add visi', opts=opts).list() - self.stormIsInPrint('queue added: visi', msgs) - - with self.raises(s_exc.DupName): - await core.nodes('queue.add visi') - - msgs = await prox.storm('queue.list', opts=opts).list() - self.stormIsInPrint('Storm queue list:', msgs) - self.stormIsInPrint('visi', msgs) - - # Make a node and put it into the queue - q = '$q = $lib.queue.get(visi) [ inet:ipv4=1.2.3.4 ] $q.put( $node.repr() )' - nodes = await core.nodes(q) - self.len(1, nodes) - - await s_lmdbslab.Slab.syncLoopOnce() - - q = '$q = $lib.queue.get(visi) ($offs, $ipv4) = $q.get(0) inet:ipv4=$ipv4' - msgs = await prox.storm(q, opts=opts).list() - - podes = [m[1] for m in msgs if m[0] == 'node'] - self.len(1, podes) - self.eq(podes[0][0], ('inet:ipv4', 0x01020304)) - - # test iter use case - q = '$q = $lib.queue.add(blah) [ inet:ipv4=1.2.3.4 inet:ipv4=5.5.5.5 ] $q.put( $node.repr() )' - nodes = await core.nodes(q) - self.len(2, nodes) - - await s_lmdbslab.Slab.syncLoopOnce() - - # Put a value into the queue that doesn't exist in the cortex so the lift can nop - q = '$q = $lib.queue.get(blah) $q.put("8.8.8.8")' - msgs = await prox.storm(q, opts=opts).list() - - msgs = await prox.storm(''' - $q = $lib.queue.get(blah) - for ($offs, $ipv4) in $q.gets(0, cull=$lib.false, wait=$lib.false) { - inet:ipv4=$ipv4 - } - ''', opts=opts).list() - podes = [m[1] for m in msgs if m[0] == 'node'] - self.len(2, podes) - - msgs = await prox.storm(''' - $q = $lib.queue.get(blah) - for ($offs, $ipv4) in $q.gets(wait=$lib.false) { - inet:ipv4=$ipv4 - $q.cull($offs) - } - ''', opts=opts).list() - podes = [m[1] for m in msgs if m[0] == 'node'] - self.len(2, podes) - - q = '''$q = $lib.queue.get(blah) - for ($offs, $ipv4) in $q.gets(wait=0) { - inet:ipv4=$ipv4 - }''' - msgs = await prox.storm(q, opts=opts).list() - podes = [m[1] for m in msgs if m[0] == 'node'] - self.len(0, podes) - - msgs = await prox.storm('queue.del visi', opts=opts).list() - self.stormIsInPrint('queue removed: visi', msgs) - - with self.raises(s_exc.NoSuchName): - await core.nodes('queue.del visi') - - msgs = await prox.storm('$lib.queue.get(newp).get()', opts=opts).list() - # err = msgs[-2] - errs = [m[1] for m in msgs if m[0] == 'err'] - self.len(1, errs) - self.eq(errs[0][0], 'NoSuchName') - - # Attempting to use a queue to make nodes in spawn town fails. - await core.nodes(''' - $doit = $lib.queue.add(doit) - $doit.puts((foo,bar)) - ''') - q = 'for ($offs, $name) in $lib.queue.get(doit).gets(size=2) { [test:str=$name] }' - msgs = await prox.storm(q, opts=opts).list() - errs = [m[1] for m in msgs if m[0] == 'err'] - self.len(1, errs) - self.eq(errs[0][0], 'IsReadOnly') - - # test other users who have access to this queue can do things to it - async with core.getLocalProxy() as root: - # add users - await root.addUser('synapse') - await root.addUser('wootuser') - - synu = await core.auth.getUserByName('synapse') - woot = await core.auth.getUserByName('wootuser') - - async with core.getLocalProxy(user='synapse') as prox: - msgs = await prox.storm('queue.add synq', opts=opts).list() - errs = [m[1] for m in msgs if m[0] == 'err'] - self.len(1, errs) - self.eq(errs[0][0], 'AuthDeny') - - rule = (True, ('queue', 'add')) - await synu.addRule(rule) - msgs = await prox.storm('queue.add synq', opts=opts).list() - self.stormIsInPrint('queue added: synq', msgs) - - rule = (True, ('queue', 'put')) - await synu.addRule(rule, gateiden='queue:synq') - - q = '$q = $lib.queue.get(synq) $q.puts((bar, baz))' - msgs = await prox.storm(q, opts=opts).list() - - # Ensure that the data was put into the queue by the spawnproc - q = '$q = $lib.queue.get(synq) $lib.print($q.get(wait=$lib.false, cull=$lib.false))' - msgs = await core.stormlist(q) - self.stormIsInPrint("(0, 'bar')", msgs) - - async with core.getLocalProxy(user='wootuser') as prox: - # now let's see our other user fail to add things - msgs = await prox.storm('$lib.queue.get(synq).get()', opts=opts).list() - errs = [m[1] for m in msgs if m[0] == 'err'] - self.len(1, errs) - self.eq(errs[0][0], 'AuthDeny') - - rule = (True, ('queue', 'get')) - await woot.addRule(rule, gateiden='queue:synq') - - q = '$lib.print($lib.queue.get(synq).get(wait=$lib.false))' - msgs = await prox.storm(q, opts=opts).list() - self.stormIsInPrint("(0, 'bar')", msgs) - - msgs = await prox.storm('$lib.queue.del(synq)', opts=opts).list() - errs = [m[1] for m in msgs if m[0] == 'err'] - self.len(1, errs) - self.eq(errs[0][0], 'AuthDeny') - - rule = (True, ('queue', 'del')) - await woot.addRule(rule, gateiden='queue:synq') - - msgs = await prox.storm('$lib.queue.del(synq)', opts=opts).list() - with self.raises(s_exc.NoSuchName): - await core.nodes('$lib.queue.get(synq)') - - async def test_stormpkg(self): - otherpkg = { - 'name': 'foosball', - 'version': (0, 0, 1), - 'synapse_minversion': (2, 8, 0), - } - - stormpkg = { - 'name': 'stormpkg', - 'version': (1, 2, 3), - 'synapse_minversion': (2, 8, 0), - } - conf = { - 'storm:log': True, - 'storm:log:level': logging.INFO, - } - async with self.getTestDmon() as dmon: - dmon.share('real', s_test_svc.RealService()) - host, port = dmon.addr - - lurl = f'tcp://127.0.0.1:{port}/real' - async with self.getTestCore(conf=conf) as core: - - await core.nodes(f'service.add real {lurl}') - await core.nodes('$lib.service.wait(real)') - msgs = await core.stormlist('help') - self.stormIsInPrint('foobar', msgs) - - async with core.getLocalProxy() as prox: - opts = {'spawn': True} - - # Ensure the spawncore loaded the service - coro = prox.storm('$lib.service.wait(real)', opts).list() - msgs = await asyncio.wait_for(coro, 30) - - msgs = await prox.storm('help', opts=opts).list() - self.stormIsInPrint('foobar', msgs) - - msgs = await prox.storm('pkg.del asdf', opts=opts).list() - self.stormIsInPrint('No package names match "asdf". Aborting.', msgs) - - await core.addStormPkg(otherpkg) - msgs = await prox.storm('pkg.list', opts=opts).list() - self.stormIsInPrint('foosball', msgs) - - msgs = await prox.storm(f'pkg.del foosball', opts=opts).list() - self.stormIsInPrint('Removing package: foosball', msgs) - - # Direct add via stormtypes - msgs = await prox.storm('$lib.pkg.add($pkg)', - opts={'vars': {'pkg': stormpkg}, 'spawn': True}).list() - msgs = await prox.storm('pkg.list', opts=opts).list() - self.stormIsInPrint('stormpkg', msgs) - - async def test_spawn_node_data(self): - - # Largely mimics test_storm_node_data - async with self.getTestCore() as core: - opts = {'spawn': True} - - async with core.getLocalProxy() as prox: - - await core.nodes('[test:int=10]') - msgs = await prox.storm('test:int=10 $node.data.set(foo, hehe)', opts=opts).list() - errs = [m[1] for m in msgs if m[0] == 'err'] - self.eq(errs[0][0], 'IsReadOnly') - - await core.nodes('test:int=10 $node.data.set(foo, hehe)') - - msgs = await prox.storm('test:int $foo=$node.data.get(foo) $lib.print($foo)', opts=opts).list() - self.stormIsInPrint('hehe', msgs) - - q = 'test:int for $item in $node.data.list() { $lib.print($item) }' - msgs = await prox.storm(q, opts=opts).list() - self.stormIsInPrint("('foo', 'hehe')", msgs) - - await core.nodes('test:int=10 $node.data.set(woot, woot)') - q = 'test:int=10 $node.data.pop(woot)' - - msgs = await prox.storm(q, opts=opts).list() - errs = [m[1] for m in msgs if m[0] == 'err'] - self.eq(errs[0][0], 'IsReadOnly') - - async def test_model_extensions(self): - async with self.getTestCoreAndProxy() as (core, prox): - await core.nodes('[ inet:dns:a=(vertex.link, 1.2.3.4) ]') - opts = {'spawn': True} - # Adding model extensions must work - await core.addFormProp('inet:ipv4', '_woot', ('int', {}), {}) - await core.nodes('[inet:ipv4=1.2.3.4 :_woot=10]') - await s_lmdbslab.Slab.syncLoopOnce() - msgs = await prox.storm('inet:ipv4=1.2.3.4', opts=opts).list() - self.len(3, msgs) - self.eq(msgs[1][1][1]['props'].get('_woot'), 10) - - msgs = await prox.storm('inet:ipv4:_woot=10', opts=opts).list() - self.len(3, msgs) - self.eq(msgs[1][1][1]['props'].get('_woot'), 10) - - # tag props must work - await prox.addTagProp('added', ('time', {}), {}) - await prox.storm('inet:ipv4=1.2.3.4 [ +#foo.bar:added="2049" ]').list() - msgs = await prox.storm('inet:ipv4#foo.bar:added', opts=opts).list() - self.len(3, msgs) - - async def test_spawn_dmon_cmds(self): - ''' - Copied from test-cortex_storm_lib_dmon_cmds - ''' - async with self.getTestCoreAndProxy() as (core, prox): - opts = {'spawn': True} - await prox.storm(''' - $q = $lib.queue.add(visi) - $lib.queue.add(boom) - - $lib.dmon.add(${ - $lib.print('Starting wootdmon') - $lib.queue.get(visi).put(blah) - for ($offs, $item) in $lib.queue.get(boom).gets(wait=1) { - [ inet:ipv4=$item ] - } - }, name=wootdmon) - - for ($offs, $item) in $q.gets(size=1) { $q.cull($offs) } - ''', opts=opts).list() - - await asyncio.sleep(0) - - # dmon is now fully running - msgs = await prox.storm('dmon.list', opts=opts).list() - self.stormIsInPrint('(wootdmon ): running', msgs) - - dmon = list(core.stormdmons.dmons.values())[0] - - # make the dmon blow up - q = '''$lib.queue.get(boom).put(hehe) - $q = $lib.queue.get(visi) - for ($offs, $item) in $q.gets(size=1) { $q.cull($offs) } - ''' - _ = await prox.storm(q, opts=opts).list() - - self.true(await s_coro.event_wait(dmon.err_evnt, 6)) - - msgs = await prox.storm('dmon.list').list() - self.stormIsInPrint('(wootdmon ): error', msgs) - - async def test_spawn_forked_view(self): - async with self.getTestCoreAndProxy() as (core, prox): - await core.nodes('[ test:str=1234 ]') - mainview = await core.callStorm('$iden=$lib.view.get().pack().iden ' - 'return ( $iden )') - forkview = await core.callStorm(f'$fork=$lib.view.get({mainview}).fork() ' - f'return ( $fork.pack().iden )') - await core.nodes('[ test:str=beep ]', {'view': forkview}) - - opts = {'spawn': True, 'view': forkview} - msgs = await prox.storm('test:str $lib.print($node.value()) | spin', opts).list() - - self.stormIsInPrint('1234', msgs) - self.stormIsInPrint('beep', msgs)