Skip to content

Commit

Permalink
Fix sequencing bug between leadership setting and service initializat…
Browse files Browse the repository at this point in the history
…ion (#1768)

* Fix sequencing bug between leadership setting and service initialization

* Add explicit check to verify that nexus pushes don't happen too early

* Add postAnit callback for Base users to override.

* Use postAnit instead of postNexsAnit

* Add a onPreLeader hook to nexus to provide initialization after
leadership but before followerloop.

Co-authored-by: visi <[email protected]>
Co-authored-by: epiphyte <[email protected]>
  • Loading branch information
3 people authored Jun 23, 2020
1 parent 125c523 commit ee8af2a
Show file tree
Hide file tree
Showing 19 changed files with 234 additions and 120 deletions.
112 changes: 54 additions & 58 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,10 +701,6 @@ class Cortex(s_cell.Cell): # type: ignore
'description': 'A telepath URL for a remote axon.',
'type': 'string'
},
'mirror': {
'description': 'Run a mirror of the cortex at the given telepath URL. We must be a backup!',
'type': 'string'
},
'cron:enable': {
'default': True,
'description': 'Enable cron jobs running.',
Expand Down Expand Up @@ -769,6 +765,8 @@ async def __anit__(self, dirn, conf=None):

await s_cell.Cell.__anit__(self, dirn, conf=conf)

# NOTE: we may not make *any* nexus actions in this method

if self.inaugural:
await self.cellinfo.set('cortex:version', s_version.version)

Expand All @@ -787,7 +785,6 @@ async def __anit__(self, dirn, conf=None):
self.feedfuncs = {}
self.stormcmds = {}

self.isleader = None
self.spawnpool = None
self.mirror = self.conf.get('mirror')

Expand All @@ -800,7 +797,6 @@ async def __anit__(self, dirn, conf=None):

self.svcsbyiden = {}
self.svcsbyname = {}
self.stormservices = None # type: s_hive.HiveDict

self._runtLiftFuncs = {}
self._runtPropSetFuncs = {}
Expand Down Expand Up @@ -866,11 +862,12 @@ async def __anit__(self, dirn, conf=None):

cmdhive = await self.hive.open(('cortex', 'storm', 'cmds'))
pkghive = await self.hive.open(('cortex', 'storm', 'packages'))
svchive = await self.hive.open(('cortex', 'storm', 'services'))
self.cmdhive = await cmdhive.dict()
self.pkghive = await pkghive.dict()
self.svchive = await svchive.dict()

# Finalize coremodule loading & give stormservices a shot to load
await self._initCoreMods()
# Finalize coremodule loading & give svchive a shot to load
await self._initPureStormCmds()

import synapse.lib.spawn as s_spawn # get around circular dependency
Expand All @@ -885,36 +882,37 @@ async def __anit__(self, dirn, conf=None):
'axon': self.axon
})

await self.auth.addAuthGate('cortex', 'cortex')

await self.postNexsAnit()
self.nexsroot.onPreLeader(self.preLeaderHook)

if self.mirror is not None:
await self._initCoreMirror(self.mirror)
await self.auth.addAuthGate('cortex', 'cortex')

# Fire the leadership hook once at boot
await self.onLeaderChange(self.nexsroot.amLeader())
async def _initNexsRoot(self):
'''
Just like cell _initNexsRoot except doesn't call nexsroot.setLeader
'''
nexsroot = await s_nexus.NexsRoot.anit(self.dirn, donexslog=self.donexslog)
self.onfini(nexsroot.fini)
nexsroot.onfini(self)
return nexsroot

async def onLeaderChange(self, leader):
self.isleader = leader
# One shot to initialize storm services
if self.stormservices is None:
await self._initStormSvcs()
if leader:
return await self.startCortexLeader()
return await self.stopCortexLeader()
async def preLeaderHook(self, leader):
'''
These run after the leader is set, but before the leader/follower callbacks are run and the follower loop runs
'''
await self._initCoreMods()
await self._initStormSvcs()

async def startCortexLeader(self):
async def startAsLeader(self):
'''
Indempotent actions that are done when a Cortex is a leader.
Run things that only a leader Cortex runs.
'''
if self.conf.get('cron:enable'):
await self.agenda.start()
await self.stormdmons.start()

async def stopCortexLeader(self):
async def stopAsLeader(self):
'''
Indempotent actions that are done when a Cortex is not a leader.
Stop things that only a leader Cortex runs.
'''
await self.agenda.stop()
await self.stormdmons.stop()
Expand Down Expand Up @@ -1087,11 +1085,7 @@ async def _initStormDmons(self):

async def _initStormSvcs(self):

node = await self.hive.open(('cortex', 'storm', 'services'))

self.stormservices = await node.dict()

for iden, sdef in self.stormservices.items():
for iden, sdef in self.svchive.items():

try:
await self._setStormSvc(sdef)
Expand Down Expand Up @@ -1411,13 +1405,13 @@ async def _addStormSvc(self, sdef):
return ssvc.sdef

ssvc = await self._setStormSvc(sdef)
await self.stormservices.set(iden, sdef)
await self.svchive.set(iden, sdef)
await self.bumpSpawnPool()

return ssvc.sdef

async def delStormSvc(self, iden):
sdef = self.stormservices.get(iden)
sdef = self.svchive.get(iden)
if sdef is None:
mesg = f'No storm service with iden: {iden}'
raise s_exc.NoSuchStormSvc(mesg=mesg, iden=iden)
Expand All @@ -1429,19 +1423,19 @@ async def _delStormSvc(self, iden):
'''
Delete a registered storm service from the cortex.
'''
sdef = self.stormservices.get(iden)
sdef = self.svchive.get(iden)
if sdef is None:
return

try:
if self.isleader:
if await self.isLeader():
await self.runStormSvcEvent(iden, 'del')
except asyncio.CancelledError: # pragma: no cover
raise
except Exception as e:
logger.exception(f'service.del hook for service {iden} failed with error: {e}')

sdef = await self.stormservices.pop(iden)
sdef = await self.svchive.pop(iden)

await self._delStormSvcPkgs(iden)

Expand Down Expand Up @@ -1501,17 +1495,17 @@ async def setStormSvcEvents(self, iden, edef):
Returns:
dict: An updated storm service definition dictionary.
'''
sdef = self.stormservices.get(iden)
sdef = self.svchive.get(iden)
if sdef is None:
mesg = f'No storm service with iden: {iden}'
raise s_exc.NoSuchStormSvc(mesg=mesg)

sdef['evts'] = edef
await self.stormservices.set(iden, sdef)
await self.svchive.set(iden, sdef)
return sdef

async def _runStormSvcAdd(self, iden):
sdef = self.stormservices.get(iden)
sdef = self.svchive.get(iden)
if sdef is None:
mesg = f'No storm service with iden: {iden}'
raise s_exc.NoSuchStormSvc(mesg=mesg)
Expand All @@ -1528,10 +1522,10 @@ async def _runStormSvcAdd(self, iden):
return

sdef['added'] = True
await self.stormservices.set(iden, sdef)
await self.svchive.set(iden, sdef)

async def runStormSvcEvent(self, iden, name):
sdef = self.stormservices.get(iden)
sdef = self.svchive.get(iden)
if sdef is None:
mesg = f'No storm service with iden: {iden}'
raise s_exc.NoSuchStormSvc(mesg=mesg)
Expand Down Expand Up @@ -1856,15 +1850,6 @@ async def spliceHistory(self, user):
if user.iden == mesg[1]['user'] or user.isAdmin():
yield mesg

async def _initCoreMirror(self, url):
'''
Initialize this cortex as a down-stream/follower mirror from a telepath url.
Note:
This cortex *must* be initialized from a backup of the target cortex!
'''
await self.nexsroot.setLeader(url, self.iden)

async def _initCoreHive(self):
stormvarsnode = await self.hive.open(('cortex', 'storm', 'vars'))
self.stormvars = await stormvarsnode.dict()
Expand Down Expand Up @@ -1936,7 +1921,7 @@ async def _initPureStormCmds(self):
oldcmds = []
for name, cdef in self.cmdhive.items():
cmdiden = cdef.get('cmdconf', {}).get('svciden')
if cmdiden and self.stormservices.get(cmdiden) is None:
if cmdiden and self.svchive.get(cmdiden) is None:
oldcmds.append(name)
else:
await self._trySetStormCmd(name, cdef)
Expand Down Expand Up @@ -2232,18 +2217,19 @@ async def _initCoreViews(self):

# if we have no views, we are initializing. Add a default main view and layer.
if not self.views:
ldef = await self.addLayer()
assert self.inaugural, 'Cortex initialization failed: there are no views.'
ldef = await self.addLayer(nexs=False)
layriden = ldef.get('iden')
vdef = {
'layers': (layriden,),
'worldreadable': True,
}
vdef = await self.addView(vdef)
vdef = await self.addView(vdef, nexs=False)
iden = vdef.get('iden')
await self.cellinfo.set('defaultview', iden)
self.view = self.getView(iden)

async def addView(self, vdef):
async def addView(self, vdef, nexs=True):

vdef['iden'] = s_common.guid()
vdef.setdefault('parent', None)
Expand All @@ -2252,7 +2238,10 @@ async def addView(self, vdef):

s_view.reqValidVdef(vdef)

return await self._push('view:add', vdef)
if nexs:
return await self._push('view:add', vdef)
else:
return await self._addView(vdef)

@s_nexus.Pusher.onPush('view:add')
async def _addView(self, vdef):
Expand Down Expand Up @@ -2439,9 +2428,13 @@ def getViewDef(self, iden):
def getViewDefs(self):
return [v.pack() for v in self.views.values()]

async def addLayer(self, ldef=None):
async def addLayer(self, ldef=None, nexs=True):
'''
Add a Layer to the cortex.
Args:
ldef (Optional[Dict]): layer configuration
nexs (bool): whether to record a nexus transaction (internal use only)
'''
ldef = ldef or {}

Expand All @@ -2453,7 +2446,10 @@ async def addLayer(self, ldef=None):

s_layer.reqValidLdef(ldef)

return await self._push('layer:add', ldef)
if nexs:
return await self._push('layer:add', ldef)
else:
return await self._addLayer(ldef)

@s_nexus.Pusher.onPush('layer:add')
async def _addLayer(self, ldef):
Expand Down
1 change: 1 addition & 0 deletions synapse/exc.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class NoCertKey(SynErr):

class ModAlreadyLoaded(SynErr): pass
class MustBeJsonSafe(SynErr): pass
class NotMsgpackSafe(SynErr): pass

class NoSuchAbrv(SynErr): pass
class NoSuchAct(SynErr): pass
Expand Down
13 changes: 13 additions & 0 deletions synapse/lib/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ async def anit(cls, *args, **kwargs):

raise

try:
await self.postAnit()
except Exception:
logger.exception('Error during postAnit callback.')
await self.fini()
raise

return self

async def __anit__(self):
Expand Down Expand Up @@ -140,6 +147,12 @@ async def __anit__(self):
self._fini_atexit = False
self._active_tasks = set() # the free running tasks associated with me

async def postAnit(self):
'''
Method called after self.__anit__() has completed, but before anit() returns the object to the caller.
'''
pass

async def enter_context(self, item):
'''
Modeled on Python's contextlib.ExitStack.enter_context. Enters a new context manager and adds its __exit__()
Expand Down
Loading

0 comments on commit ee8af2a

Please sign in to comment.