Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove experimental spawn feature #2068

Merged
merged 2 commits into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions synapse/cmds/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ class StormCmd(s_cli.Cmd):
--show <names>: Limit storm events (server-side) to the comma-separated list.
--file <path>: Run the storm query specified in the given file path.
--optsfile <path>: Run the query with the given options from a JSON/YAML file.
--spawn: (EXPERIMENTAL!) Run the query within a spawned sub-process runtime (read-only).

Examples:
storm inet:ipv4=1.2.3.4
Expand All @@ -254,7 +253,6 @@ class StormCmd(s_cli.Cmd):
('--raw', {}),
('--debug', {}),
('--path', {}),
('--spawn', {'type': 'flag'}),
('--save-nodes', {'type': 'valu'}),
('query', {'type': 'glob'}),
)
Expand Down Expand Up @@ -429,9 +427,6 @@ async def runCmdOpts(self, opts):
if editformat != 'nodeedits':
stormopts['editformat'] = editformat

if opts.get('spawn'):
stormopts['spawn'] = True

nodesfd = None
if opts.get('save-nodes'):
nodesfd = s_common.genfile(opts.get('save-nodes'))
Expand Down
127 changes: 0 additions & 127 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,55 +495,9 @@ async def storm(self, text, opts=None):
'''
opts = self._reqValidStormOpts(opts)

if opts.get('spawn'):
await self._execSpawnStorm(text, opts)
return

async for mesg in self.cell.storm(text, opts=opts):
yield mesg

async def _execSpawnStorm(self, text, opts):

view = self.cell._viewFromOpts(opts)

link = s_scope.get('link')

opts.pop('spawn', None)
info = {
'link': link.getSpawnInfo(),
'view': view.iden,
'user': self.user.iden,
'storm': {
'opts': opts,
'query': text,
}
}

await self.cell.boss.promote('storm:spawn', user=self.user, info={'query': text})
proc = None
mesg = 'Spawn complete'

try:

async with self.cell.spawnpool.get() as proc:
if await proc.xact(info):
await link.fini()

except (asyncio.CancelledError, Exception) as e:

if not isinstance(e, asyncio.CancelledError):
logger.exception('Error during spawned Storm execution.')

if not self.cell.isfini:
if proc:
await proc.fini()

mesg = repr(e)
raise

finally:
raise s_exc.DmonSpawn(mesg=mesg)

async def reqValidStorm(self, text, opts=None):
'''
Parse a Storm query to validate it.
Expand Down Expand Up @@ -994,11 +948,6 @@ class Cortex(s_cell.Cell): # type: ignore
'description': 'A list of module classes to load.',
'type': 'array'
},
'spawn:poolsize': {
'default': 8,
'description': 'The max number of spare processes to keep around in the storm spawn pool.',
'type': 'integer'
},
'storm:log': {
'default': False,
'description': 'Log storm queries via system logger.',
Expand All @@ -1022,7 +971,6 @@ class Cortex(s_cell.Cell): # type: ignore

viewctor = s_view.View.anit
layrctor = s_layer.Layer.anit
spawncorector = 'synapse.lib.spawn.SpawnCore'

# phase 2 - service storage
async def initServiceStorage(self):
Expand All @@ -1043,8 +991,6 @@ async def initServiceStorage(self):
self.feedfuncs = {}
self.stormcmds = {}

self.spawnpool = None

self.maxnodes = self.conf.get('max:nodes')
self.nodecount = 0

Expand Down Expand Up @@ -1149,11 +1095,6 @@ async def initServiceStorage(self):
# Finalize coremodule loading & give svchive a shot to load
await self._initPureStormCmds()

import synapse.lib.spawn as s_spawn # get around circular dependency
self.spawnpool = await s_spawn.SpawnPool.anit(self)
self.onfini(self.spawnpool)
self.on('user:mod', self._onEvtBumpSpawnPool)

self.dynitems.update({
'cron': self.agenda,
'cortex': self,
Expand Down Expand Up @@ -1195,13 +1136,6 @@ async def initServicePassive(self):
await self.agenda.stop()
await self.stormdmons.stop()

async def _onEvtBumpSpawnPool(self, evnt):
await self.bumpSpawnPool()

async def bumpSpawnPool(self):
if self.spawnpool is not None:
await self.spawnpool.bump()

@s_nexus.Pusher.onPushAuto('model:depr:lock')
async def setDeprLock(self, name, locked):

Expand Down Expand Up @@ -1439,38 +1373,6 @@ async def listTagModel(self):
'''
return list(self.taghive.items())

async def getSpawnInfo(self):

if self.spawncorector is None:
mesg = 'spawn storm option not supported on this cortex'
raise s_exc.FeatureNotSupported(mesg=mesg)

ret = {
'iden': self.iden,
'dirn': self.dirn,
'conf': {
'storm:log': self.conf.get('storm:log', False),
'storm:log:level': self.conf.get('storm:log:level', logging.INFO),
'trigger:enable': self.conf.get('trigger:enable', True),
},
'loglevel': logger.getEffectiveLevel(),
'views': [v.getSpawnInfo() for v in self.views.values()],
'layers': [await lyr.getSpawnInfo() for lyr in self.layers.values()],
'storm': {
'cmds': {
'cdefs': list(self.storm_cmd_cdefs.items()),
'ctors': list(self.storm_cmd_ctors.items()),
},
'libs': tuple(self.libroot),
'mods': await self.getStormMods(),
'pkgs': await self.getStormPkgs(),
'svcs': [svc.sdef for svc in self.getStormSvcs()],
},
'model': await self.getModelDefs(),
'spawncorector': self.spawncorector,
}
return ret

async def _finiStor(self):
await asyncio.gather(*[view.fini() for view in self.views.values()])
await asyncio.gather(*[layr.fini() for layr in self.layers.values()])
Expand Down Expand Up @@ -1662,13 +1564,10 @@ def getStorNode(form):
self.stormcmds[name] = ctor
self.storm_cmd_cdefs[name] = cdef

await self.bumpSpawnPool()

await self.fire('core:cmd:change', cmd=name, act='add')

async def _popStormCmd(self, name):
self.stormcmds.pop(name, None)
await self.bumpSpawnPool()

await self.fire('core:cmd:change', cmd=name, act='del')

Expand Down Expand Up @@ -1696,7 +1595,6 @@ async def _delStormCmd(self, name):

await self.cmdhive.pop(name)
self.stormcmds.pop(name, None)
await self.bumpSpawnPool()

await self.fire('core:cmd:change', cmd=name, act='del')

Expand Down Expand Up @@ -1839,8 +1737,6 @@ async def _onload():
logger.warning(f'onload failed for package: {name}')
self.schedCoro(_onload())

await self.bumpSpawnPool()

async def _dropStormPkg(self, pkgdef):
'''
Reverse the process of loadStormPkg()
Expand All @@ -1853,8 +1749,6 @@ async def _dropStormPkg(self, pkgdef):
name = cdef.get('name')
await self._popStormCmd(name)

await self.bumpSpawnPool()

def getStormSvc(self, name):

ssvc = self.svcsbyiden.get(name)
Expand Down Expand Up @@ -1897,7 +1791,6 @@ async def _addStormSvc(self, sdef):

ssvc = await self._setStormSvc(sdef)
await self.svchive.set(iden, sdef)
await self.bumpSpawnPool()

return ssvc.sdef

Expand Down Expand Up @@ -1939,8 +1832,6 @@ async def _delStormSvc(self, iden):
self.svcsbysvcname.pop(ssvc.svcname, None)
await ssvc.fini()

await self.bumpSpawnPool()

async def _delStormSvcPkgs(self, iden):
'''
Delete storm packages associated with a service.
Expand Down Expand Up @@ -2119,8 +2010,6 @@ async def _loadExtModel(self):
except Exception as e:
logger.warning(f'ext tag prop ({prop}) error: {e}')

await self.bumpSpawnPool()

@contextlib.asynccontextmanager
async def watcher(self, wdef):

Expand Down Expand Up @@ -2188,7 +2077,6 @@ async def _addForm(self, formname, basetype, typeopts, typeinfo):

await self.extforms.set(formname, (formname, basetype, typeopts, typeinfo))
await self.fire('core:extmodel:change', form=formname, act='add', type='form')
await self.bumpSpawnPool()

@s_nexus.Pusher.onPushAuto('model:form:del')
async def delForm(self, formname):
Expand All @@ -2207,7 +2095,6 @@ async def delForm(self, formname):

await self.extforms.pop(formname, None)
await self.fire('core:extmodel:change', form=formname, act='del', type='form')
await self.bumpSpawnPool()

@s_nexus.Pusher.onPushAuto('model:prop:add')
async def addFormProp(self, form, prop, tdef, info):
Expand All @@ -2223,7 +2110,6 @@ async def addFormProp(self, form, prop, tdef, info):

await self.extprops.set(f'{form}:{prop}', (form, prop, tdef, info))
await self.fire('core:extmodel:change', form=form, prop=prop, act='add', type='formprop')
await self.bumpSpawnPool()

async def delFormProp(self, form, prop):
full = f'{form}:{prop}'
Expand Down Expand Up @@ -2255,7 +2141,6 @@ async def _delFormProp(self, form, prop):
await self.extprops.pop(full, None)
await self.fire('core:extmodel:change',
form=form, prop=prop, act='del', type='formprop')
await self.bumpSpawnPool()

async def delUnivProp(self, prop):
udef = self.extunivs.get(prop)
Expand Down Expand Up @@ -2283,7 +2168,6 @@ async def _delUnivProp(self, prop):
self.model.delUnivProp(prop)
await self.extunivs.pop(prop, None)
await self.fire('core:extmodel:change', name=prop, act='del', type='univ')
await self.bumpSpawnPool()

async def addTagProp(self, name, tdef, info):
if self.exttagprops.get(name) is not None:
Expand All @@ -2300,7 +2184,6 @@ async def _addTagProp(self, name, tdef, info):

await self.exttagprops.set(name, (name, tdef, info))
await self.fire('core:tagprop:change', name=name, act='add')
await self.bumpSpawnPool()

async def delTagProp(self, name):
pdef = self.exttagprops.get(name)
Expand All @@ -2325,7 +2208,6 @@ async def _delTagProp(self, name):

await self.exttagprops.pop(name, None)
await self.fire('core:tagprop:change', name=name, act='del')
await self.bumpSpawnPool()

async def addNodeTag(self, user, iden, tag, valu=(None, None)):
'''
Expand Down Expand Up @@ -3056,8 +2938,6 @@ async def _addView(self, vdef):
view = await self._loadView(node)
view.init2()

await self.bumpSpawnPool()

return await view.pack()

async def delView(self, iden):
Expand Down Expand Up @@ -3091,8 +2971,6 @@ async def _delView(self, iden):

await self.auth.delAuthGate(iden)

await self.bumpSpawnPool()

async def delLayer(self, iden):
layr = self.layers.get(iden, None)
if layr is None:
Expand Down Expand Up @@ -3127,8 +3005,6 @@ async def _delLayer(self, iden, nexsitem):

layr.deloffs = nexsitem[0]

await self.bumpSpawnPool()

async def setViewLayers(self, layers, iden=None):
'''
Args:
Expand All @@ -3140,7 +3016,6 @@ async def setViewLayers(self, layers, iden=None):
raise s_exc.NoSuchView(iden=iden)

await view.setLayers(layers)
await self.bumpSpawnPool()

def getLayer(self, iden=None):
'''
Expand Down Expand Up @@ -3298,8 +3173,6 @@ def ondel():
for pdef in layrinfo.get('pulls', {}).values():
await self.runLayrPull(layr, pdef)

await self.bumpSpawnPool()

await self.fire('core:layr:add', iden=layr.iden)

return layr
Expand Down
6 changes: 0 additions & 6 deletions synapse/lib/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1379,12 +1379,6 @@ async def _initLayerStorage(self):
if self.layrvers < 3:
await self._layrV2toV3()

async def getSpawnInfo(self):
info = await self.pack()
info['dirn'] = self.dirn
info['ctor'] = self.ctorname
return info

async def getLayerSize(self):
'''
Get the total storage size for the layer.
Expand Down
20 changes: 0 additions & 20 deletions synapse/lib/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,6 @@ async def linksock():
link0 = await Link.anit(reader, writer, info={'unix': True})
return link0, sock1

async def fromspawn(spawninfo):
sock = spawninfo.get('sock')
info = spawninfo.get('info', {})
info['spawn'] = True
reader, writer = await asyncio.open_connection(sock=sock)
return await Link.anit(reader, writer, info=info)

class Link(s_base.Base):
'''
A Link() is created to wrap a socket reader/writer.
Expand Down Expand Up @@ -154,19 +147,6 @@ def getTlsPeerCn(self):
if name == 'commonName':
return valu

def getSpawnInfo(self):
info = {}

# selectively add info for pickle...
if self.info.get('unix'):
info['unix'] = True

return {
'info': info,
# a bit dirty, but there's no other way...
'sock': self.reader._transport._sock,
}

def getAddrInfo(self):
'''
Get a summary of address information related to the link.
Expand Down
Loading