Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sequencing bug between leadership setting and service initialization #1768

Merged
merged 21 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e748974
Fix sequencing bug between leadership setting and service initialization
jnwatson Jun 12, 2020
55c670a
Unconditionally have cortex call nexsroot.setLeader
jnwatson Jun 15, 2020
f33017f
some initial work refactoring leadship status
invisig0th Jun 15, 2020
65aa48c
Add explicit check to verify that nexus pushes don't happen too early
jnwatson Jun 17, 2020
a3909bb
Merge remote-tracking branch 'origin/visi-leaders' into leader-servic…
jnwatson Jun 17, 2020
f24aa6e
WIP
jnwatson Jun 18, 2020
2b491fe
merged from master
invisig0th Jun 18, 2020
b105b72
Remove breakpoint, change deferpost, unstabilize all, root guid
jnwatson Jun 18, 2020
4bf3a31
Merge branch 'leader-service-bug' of https://github.com/vertexproject…
jnwatson Jun 18, 2020
5135522
Bug fixes
jnwatson Jun 18, 2020
9905475
Fix migration bugs
jnwatson Jun 18, 2020
6b11ea4
Merge branch 'master' into leader-service-bug
jnwatson Jun 18, 2020
77ab011
Remove debugging stuff, invalid comments
jnwatson Jun 19, 2020
39e3558
Add postAnit callback for Base users to override.
vEpiphyte Jun 19, 2020
c4ddfdf
Use postAnit instead of postNexsAnit
jnwatson Jun 22, 2020
8d91dc7
Fix from epiphyte review
jnwatson Jun 22, 2020
20de83c
Merge branch 'master' into leader-service-bug
jnwatson Jun 22, 2020
80122df
Whitespace
vEpiphyte Jun 22, 2020
a9f3971
Revert exception wording test
jnwatson Jun 22, 2020
2c3e4a0
Merge branch 'master' into leader-service-bug
jnwatson Jun 23, 2020
7ad50e9
Add an overfloweerror to the msgpack test
vEpiphyte Jun 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 63 additions & 58 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,10 +701,6 @@ class Cortex(s_cell.Cell): # type: ignore
'description': 'A telepath URL for a remote axon.',
'type': 'string'
},
'mirror': {
'description': 'Run a mirror of the cortex at the given telepath URL. We must be a backup!',
'type': 'string'
},
'cron:enable': {
'default': True,
'description': 'Enable cron jobs running.',
Expand Down Expand Up @@ -769,6 +765,8 @@ async def __anit__(self, dirn, conf=None):

await s_cell.Cell.__anit__(self, dirn, conf=conf)

# NOTE: we may not make *any* nexus actions in this method

if self.inaugural:
await self.cellinfo.set('cortex:version', s_version.version)

Expand All @@ -787,7 +785,6 @@ async def __anit__(self, dirn, conf=None):
self.feedfuncs = {}
self.stormcmds = {}

self.isleader = None
self.spawnpool = None
self.mirror = self.conf.get('mirror')

Expand All @@ -800,7 +797,6 @@ async def __anit__(self, dirn, conf=None):

self.svcsbyiden = {}
self.svcsbyname = {}
self.stormservices = None # type: s_hive.HiveDict

self._runtLiftFuncs = {}
self._runtPropSetFuncs = {}
Expand Down Expand Up @@ -866,11 +862,12 @@ async def __anit__(self, dirn, conf=None):

cmdhive = await self.hive.open(('cortex', 'storm', 'cmds'))
pkghive = await self.hive.open(('cortex', 'storm', 'packages'))
svchive = await self.hive.open(('cortex', 'storm', 'services'))
self.cmdhive = await cmdhive.dict()
self.pkghive = await pkghive.dict()
self.svchive = await svchive.dict()

# Finalize coremodule loading & give stormservices a shot to load
await self._initCoreMods()
# Finalize coremodule loading & give svchive a shot to load
await self._initPureStormCmds()

import synapse.lib.spawn as s_spawn # get around circular dependency
Expand All @@ -885,36 +882,37 @@ async def __anit__(self, dirn, conf=None):
'axon': self.axon
})

await self.auth.addAuthGate('cortex', 'cortex')

await self.postNexsAnit()
self.nexsroot.onPreLeader(self.preLeaderHook)

if self.mirror is not None:
await self._initCoreMirror(self.mirror)
await self.auth.addAuthGate('cortex', 'cortex')

# Fire the leadership hook once at boot
await self.onLeaderChange(self.nexsroot.amLeader())
async def _initNexsRoot(self):
'''
Just like cell _initNexsRoot except doesn't call nexsroot.setLeader
'''
nexsroot = await s_nexus.NexsRoot.anit(self.dirn, donexslog=self.donexslog)
self.onfini(nexsroot.fini)
nexsroot.onfini(self)
return nexsroot

async def onLeaderChange(self, leader):
self.isleader = leader
# One shot to initialize storm services
if self.stormservices is None:
await self._initStormSvcs()
if leader:
return await self.startCortexLeader()
return await self.stopCortexLeader()
async def preLeaderHook(self, leader):
'''
These run after the leader is set, but before the leader/follower callbacks are run and the follower loop runs
'''
await self._initCoreMods()
await self._initStormSvcs()

async def startCortexLeader(self):
async def startAsLeader(self):
'''
Indempotent actions that are done when a Cortex is a leader.
Run things that only a leader Cortex runs.
'''
if self.conf.get('cron:enable'):
await self.agenda.start()
await self.stormdmons.start()

async def stopCortexLeader(self):
async def stopAsLeader(self):
'''
Indempotent actions that are done when a Cortex is not a leader.
Stop things that only a leader Cortex runs.
'''
await self.agenda.stop()
await self.stormdmons.stop()
Expand Down Expand Up @@ -1087,11 +1085,7 @@ async def _initStormDmons(self):

async def _initStormSvcs(self):

node = await self.hive.open(('cortex', 'storm', 'services'))

self.stormservices = await node.dict()

for iden, sdef in self.stormservices.items():
for iden, sdef in self.svchive.items():

try:
await self._setStormSvc(sdef)
Expand All @@ -1102,6 +1096,15 @@ async def _initStormSvcs(self):
except Exception as e:
logger.warning(f'initStormService ({iden}) failed: {e}')

async def _finiStormSvcs(self):
try:
jnwatson marked this conversation as resolved.
Show resolved Hide resolved
for ssvc in self.svcsbyiden.values():
await ssvc.fini()

finally:
self.svcsbyiden.clear()
self.svcsbyname.clear()

async def _initCoreQueues(self):
path = os.path.join(self.dirn, 'slabs', 'queues.lmdb')

Expand Down Expand Up @@ -1411,13 +1414,13 @@ async def _addStormSvc(self, sdef):
return ssvc.sdef

ssvc = await self._setStormSvc(sdef)
await self.stormservices.set(iden, sdef)
await self.svchive.set(iden, sdef)
await self.bumpSpawnPool()

return ssvc.sdef

async def delStormSvc(self, iden):
sdef = self.stormservices.get(iden)
sdef = self.svchive.get(iden)
if sdef is None:
mesg = f'No storm service with iden: {iden}'
raise s_exc.NoSuchStormSvc(mesg=mesg, iden=iden)
Expand All @@ -1429,19 +1432,19 @@ async def _delStormSvc(self, iden):
'''
Delete a registered storm service from the cortex.
'''
sdef = self.stormservices.get(iden)
sdef = self.svchive.get(iden)
if sdef is None:
return

try:
if self.isleader:
if await self.isLeader():
await self.runStormSvcEvent(iden, 'del')
except asyncio.CancelledError: # pragma: no cover
raise
except Exception as e:
logger.exception(f'service.del hook for service {iden} failed with error: {e}')

sdef = await self.stormservices.pop(iden)
sdef = await self.svchive.pop(iden)

await self._delStormSvcPkgs(iden)

Expand Down Expand Up @@ -1501,17 +1504,17 @@ async def setStormSvcEvents(self, iden, edef):
Returns:
dict: An updated storm service definition dictionary.
'''
sdef = self.stormservices.get(iden)
sdef = self.svchive.get(iden)
if sdef is None:
mesg = f'No storm service with iden: {iden}'
raise s_exc.NoSuchStormSvc(mesg=mesg)

sdef['evts'] = edef
await self.stormservices.set(iden, sdef)
await self.svchive.set(iden, sdef)
return sdef

async def _runStormSvcAdd(self, iden):
sdef = self.stormservices.get(iden)
sdef = self.svchive.get(iden)
if sdef is None:
mesg = f'No storm service with iden: {iden}'
raise s_exc.NoSuchStormSvc(mesg=mesg)
Expand All @@ -1528,10 +1531,10 @@ async def _runStormSvcAdd(self, iden):
return

sdef['added'] = True
await self.stormservices.set(iden, sdef)
await self.svchive.set(iden, sdef)

async def runStormSvcEvent(self, iden, name):
sdef = self.stormservices.get(iden)
sdef = self.svchive.get(iden)
if sdef is None:
mesg = f'No storm service with iden: {iden}'
raise s_exc.NoSuchStormSvc(mesg=mesg)
Expand Down Expand Up @@ -1856,15 +1859,6 @@ async def spliceHistory(self, user):
if user.iden == mesg[1]['user'] or user.isAdmin():
yield mesg

async def _initCoreMirror(self, url):
'''
Initialize this cortex as a down-stream/follower mirror from a telepath url.

Note:
This cortex *must* be initialized from a backup of the target cortex!
'''
await self.nexsroot.setLeader(url, self.iden)

async def _initCoreHive(self):
stormvarsnode = await self.hive.open(('cortex', 'storm', 'vars'))
self.stormvars = await stormvarsnode.dict()
Expand Down Expand Up @@ -1936,7 +1930,7 @@ async def _initPureStormCmds(self):
oldcmds = []
for name, cdef in self.cmdhive.items():
cmdiden = cdef.get('cmdconf', {}).get('svciden')
if cmdiden and self.stormservices.get(cmdiden) is None:
if cmdiden and self.svchive.get(cmdiden) is None:
oldcmds.append(name)
else:
await self._trySetStormCmd(name, cdef)
Expand Down Expand Up @@ -2232,18 +2226,19 @@ async def _initCoreViews(self):

# if we have no views, we are initializing. Add a default main view and layer.
if not self.views:
ldef = await self.addLayer()
assert self.inaugural, 'Cortex initialization failed: there are no views.'
ldef = await self.addLayer(nexs=False)
layriden = ldef.get('iden')
vdef = {
'layers': (layriden,),
'worldreadable': True,
}
vdef = await self.addView(vdef)
vdef = await self.addView(vdef, nexs=False)
iden = vdef.get('iden')
await self.cellinfo.set('defaultview', iden)
self.view = self.getView(iden)

async def addView(self, vdef):
async def addView(self, vdef, nexs=True):

vdef['iden'] = s_common.guid()
vdef.setdefault('parent', None)
Expand All @@ -2252,7 +2247,10 @@ async def addView(self, vdef):

s_view.reqValidVdef(vdef)

return await self._push('view:add', vdef)
if nexs:
return await self._push('view:add', vdef)
else:
return await self._addView(vdef)

@s_nexus.Pusher.onPush('view:add')
async def _addView(self, vdef):
Expand Down Expand Up @@ -2439,9 +2437,13 @@ def getViewDef(self, iden):
def getViewDefs(self):
return [v.pack() for v in self.views.values()]

async def addLayer(self, ldef=None):
async def addLayer(self, ldef=None, nexs=True):
'''
Add a Layer to the cortex.

Args:
ldef (Optional[Dict]): layer configuration
nexs (bool): whether to record a nexus transaction (internal use only)
'''
ldef = ldef or {}

Expand All @@ -2453,7 +2455,10 @@ async def addLayer(self, ldef=None):

s_layer.reqValidLdef(ldef)

return await self._push('layer:add', ldef)
if nexs:
return await self._push('layer:add', ldef)
else:
return await self._addLayer(ldef)

@s_nexus.Pusher.onPush('layer:add')
async def _addLayer(self, ldef):
Expand Down
1 change: 1 addition & 0 deletions synapse/exc.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class NoCertKey(SynErr):

class ModAlreadyLoaded(SynErr): pass
class MustBeJsonSafe(SynErr): pass
class NotMsgpackSafe(SynErr): pass

class NoSuchAbrv(SynErr): pass
class NoSuchAct(SynErr): pass
Expand Down
13 changes: 13 additions & 0 deletions synapse/lib/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ async def anit(cls, *args, **kwargs):

raise

try:
await self.postAnit()
except Exception:
logger.exception('Error during postAnit callback.')
await self.fini()
raise

return self

async def __anit__(self):
Expand Down Expand Up @@ -140,6 +147,12 @@ async def __anit__(self):
self._fini_atexit = False
self._active_tasks = set() # the free running tasks associated with me

async def postAnit(self):
'''
Method called after self.__anit__() has completed, but before anit() returns the object to the caller.
'''
pass

async def enter_context(self, item):
'''
Modeled on Python's contextlib.ExitStack.enter_context. Enters a new context manager and adds its __exit__()
Expand Down
Loading