From ee8af2ab5e8b211b45941846dc4b3257c1959a1a Mon Sep 17 00:00:00 2001 From: Nic Watson Date: Tue, 23 Jun 2020 17:11:36 -0400 Subject: [PATCH] Fix sequencing bug between leadership setting and service initialization (#1768) * Fix sequencing bug between leadership setting and service initialization * Add explicit check to verify that nexus pushes don't happen too early * Add postAnit callback for Base users to override. * Use postAnit instead of postNexsAnit * Add a onPreLeader hook to nexus to provide initialization after leadership but before followerloop. Co-authored-by: visi Co-authored-by: epiphyte --- synapse/cortex.py | 112 ++++++++++++------------- synapse/exc.py | 1 + synapse/lib/base.py | 13 +++ synapse/lib/cell.py | 57 +++++++++---- synapse/lib/hiveauth.py | 16 +++- synapse/lib/layer.py | 1 + synapse/lib/msgpack.py | 11 ++- synapse/lib/nexus.py | 47 +++++++++-- synapse/tests/test_cortex.py | 3 + synapse/tests/test_lib_base.py | 11 +++ synapse/tests/test_lib_cell.py | 4 +- synapse/tests/test_lib_lmdbslab.py | 3 +- synapse/tests/test_lib_msgpack.py | 11 ++- synapse/tests/test_lib_nexus.py | 8 +- synapse/tests/test_lib_slabseqn.py | 4 +- synapse/tests/test_lib_stormsvc.py | 34 +++++--- synapse/tests/test_lib_stormtypes.py | 4 +- synapse/tests/test_tools_migrate200.py | 13 ++- synapse/tools/migrate_200.py | 1 + 19 files changed, 234 insertions(+), 120 deletions(-) diff --git a/synapse/cortex.py b/synapse/cortex.py index 0a38e0979b..131aae14b2 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -701,10 +701,6 @@ class Cortex(s_cell.Cell): # type: ignore 'description': 'A telepath URL for a remote axon.', 'type': 'string' }, - 'mirror': { - 'description': 'Run a mirror of the cortex at the given telepath URL. We must be a backup!', - 'type': 'string' - }, 'cron:enable': { 'default': True, 'description': 'Enable cron jobs running.', @@ -769,6 +765,8 @@ async def __anit__(self, dirn, conf=None): await s_cell.Cell.__anit__(self, dirn, conf=conf) + # NOTE: we may not make *any* nexus actions in this method + if self.inaugural: await self.cellinfo.set('cortex:version', s_version.version) @@ -787,7 +785,6 @@ async def __anit__(self, dirn, conf=None): self.feedfuncs = {} self.stormcmds = {} - self.isleader = None self.spawnpool = None self.mirror = self.conf.get('mirror') @@ -800,7 +797,6 @@ async def __anit__(self, dirn, conf=None): self.svcsbyiden = {} self.svcsbyname = {} - self.stormservices = None # type: s_hive.HiveDict self._runtLiftFuncs = {} self._runtPropSetFuncs = {} @@ -866,11 +862,12 @@ async def __anit__(self, dirn, conf=None): cmdhive = await self.hive.open(('cortex', 'storm', 'cmds')) pkghive = await self.hive.open(('cortex', 'storm', 'packages')) + svchive = await self.hive.open(('cortex', 'storm', 'services')) self.cmdhive = await cmdhive.dict() self.pkghive = await pkghive.dict() + self.svchive = await svchive.dict() - # Finalize coremodule loading & give stormservices a shot to load - await self._initCoreMods() + # Finalize coremodule loading & give svchive a shot to load await self._initPureStormCmds() import synapse.lib.spawn as s_spawn # get around circular dependency @@ -885,36 +882,37 @@ async def __anit__(self, dirn, conf=None): 'axon': self.axon }) - await self.auth.addAuthGate('cortex', 'cortex') - - await self.postNexsAnit() + self.nexsroot.onPreLeader(self.preLeaderHook) - if self.mirror is not None: - await self._initCoreMirror(self.mirror) + await self.auth.addAuthGate('cortex', 'cortex') - # Fire the leadership hook once at boot - await self.onLeaderChange(self.nexsroot.amLeader()) + async def _initNexsRoot(self): + ''' + Just like cell _initNexsRoot except doesn't call nexsroot.setLeader + ''' + nexsroot = await s_nexus.NexsRoot.anit(self.dirn, donexslog=self.donexslog) + self.onfini(nexsroot.fini) + nexsroot.onfini(self) + return nexsroot - async def onLeaderChange(self, leader): - self.isleader = leader - # One shot to initialize storm services - if self.stormservices is None: - await self._initStormSvcs() - if leader: - return await self.startCortexLeader() - return await self.stopCortexLeader() + async def preLeaderHook(self, leader): + ''' + These run after the leader is set, but before the leader/follower callbacks are run and the follower loop runs + ''' + await self._initCoreMods() + await self._initStormSvcs() - async def startCortexLeader(self): + async def startAsLeader(self): ''' - Indempotent actions that are done when a Cortex is a leader. + Run things that only a leader Cortex runs. ''' if self.conf.get('cron:enable'): await self.agenda.start() await self.stormdmons.start() - async def stopCortexLeader(self): + async def stopAsLeader(self): ''' - Indempotent actions that are done when a Cortex is not a leader. + Stop things that only a leader Cortex runs. ''' await self.agenda.stop() await self.stormdmons.stop() @@ -1087,11 +1085,7 @@ async def _initStormDmons(self): async def _initStormSvcs(self): - node = await self.hive.open(('cortex', 'storm', 'services')) - - self.stormservices = await node.dict() - - for iden, sdef in self.stormservices.items(): + for iden, sdef in self.svchive.items(): try: await self._setStormSvc(sdef) @@ -1411,13 +1405,13 @@ async def _addStormSvc(self, sdef): return ssvc.sdef ssvc = await self._setStormSvc(sdef) - await self.stormservices.set(iden, sdef) + await self.svchive.set(iden, sdef) await self.bumpSpawnPool() return ssvc.sdef async def delStormSvc(self, iden): - sdef = self.stormservices.get(iden) + sdef = self.svchive.get(iden) if sdef is None: mesg = f'No storm service with iden: {iden}' raise s_exc.NoSuchStormSvc(mesg=mesg, iden=iden) @@ -1429,19 +1423,19 @@ async def _delStormSvc(self, iden): ''' Delete a registered storm service from the cortex. ''' - sdef = self.stormservices.get(iden) + sdef = self.svchive.get(iden) if sdef is None: return try: - if self.isleader: + if await self.isLeader(): await self.runStormSvcEvent(iden, 'del') except asyncio.CancelledError: # pragma: no cover raise except Exception as e: logger.exception(f'service.del hook for service {iden} failed with error: {e}') - sdef = await self.stormservices.pop(iden) + sdef = await self.svchive.pop(iden) await self._delStormSvcPkgs(iden) @@ -1501,17 +1495,17 @@ async def setStormSvcEvents(self, iden, edef): Returns: dict: An updated storm service definition dictionary. ''' - sdef = self.stormservices.get(iden) + sdef = self.svchive.get(iden) if sdef is None: mesg = f'No storm service with iden: {iden}' raise s_exc.NoSuchStormSvc(mesg=mesg) sdef['evts'] = edef - await self.stormservices.set(iden, sdef) + await self.svchive.set(iden, sdef) return sdef async def _runStormSvcAdd(self, iden): - sdef = self.stormservices.get(iden) + sdef = self.svchive.get(iden) if sdef is None: mesg = f'No storm service with iden: {iden}' raise s_exc.NoSuchStormSvc(mesg=mesg) @@ -1528,10 +1522,10 @@ async def _runStormSvcAdd(self, iden): return sdef['added'] = True - await self.stormservices.set(iden, sdef) + await self.svchive.set(iden, sdef) async def runStormSvcEvent(self, iden, name): - sdef = self.stormservices.get(iden) + sdef = self.svchive.get(iden) if sdef is None: mesg = f'No storm service with iden: {iden}' raise s_exc.NoSuchStormSvc(mesg=mesg) @@ -1856,15 +1850,6 @@ async def spliceHistory(self, user): if user.iden == mesg[1]['user'] or user.isAdmin(): yield mesg - async def _initCoreMirror(self, url): - ''' - Initialize this cortex as a down-stream/follower mirror from a telepath url. - - Note: - This cortex *must* be initialized from a backup of the target cortex! - ''' - await self.nexsroot.setLeader(url, self.iden) - async def _initCoreHive(self): stormvarsnode = await self.hive.open(('cortex', 'storm', 'vars')) self.stormvars = await stormvarsnode.dict() @@ -1936,7 +1921,7 @@ async def _initPureStormCmds(self): oldcmds = [] for name, cdef in self.cmdhive.items(): cmdiden = cdef.get('cmdconf', {}).get('svciden') - if cmdiden and self.stormservices.get(cmdiden) is None: + if cmdiden and self.svchive.get(cmdiden) is None: oldcmds.append(name) else: await self._trySetStormCmd(name, cdef) @@ -2232,18 +2217,19 @@ async def _initCoreViews(self): # if we have no views, we are initializing. Add a default main view and layer. if not self.views: - ldef = await self.addLayer() + assert self.inaugural, 'Cortex initialization failed: there are no views.' + ldef = await self.addLayer(nexs=False) layriden = ldef.get('iden') vdef = { 'layers': (layriden,), 'worldreadable': True, } - vdef = await self.addView(vdef) + vdef = await self.addView(vdef, nexs=False) iden = vdef.get('iden') await self.cellinfo.set('defaultview', iden) self.view = self.getView(iden) - async def addView(self, vdef): + async def addView(self, vdef, nexs=True): vdef['iden'] = s_common.guid() vdef.setdefault('parent', None) @@ -2252,7 +2238,10 @@ async def addView(self, vdef): s_view.reqValidVdef(vdef) - return await self._push('view:add', vdef) + if nexs: + return await self._push('view:add', vdef) + else: + return await self._addView(vdef) @s_nexus.Pusher.onPush('view:add') async def _addView(self, vdef): @@ -2439,9 +2428,13 @@ def getViewDef(self, iden): def getViewDefs(self): return [v.pack() for v in self.views.values()] - async def addLayer(self, ldef=None): + async def addLayer(self, ldef=None, nexs=True): ''' Add a Layer to the cortex. + + Args: + ldef (Optional[Dict]): layer configuration + nexs (bool): whether to record a nexus transaction (internal use only) ''' ldef = ldef or {} @@ -2453,7 +2446,10 @@ async def addLayer(self, ldef=None): s_layer.reqValidLdef(ldef) - return await self._push('layer:add', ldef) + if nexs: + return await self._push('layer:add', ldef) + else: + return await self._addLayer(ldef) @s_nexus.Pusher.onPush('layer:add') async def _addLayer(self, ldef): diff --git a/synapse/exc.py b/synapse/exc.py index a91038ae83..9899fdf76e 100644 --- a/synapse/exc.py +++ b/synapse/exc.py @@ -149,6 +149,7 @@ class NoCertKey(SynErr): class ModAlreadyLoaded(SynErr): pass class MustBeJsonSafe(SynErr): pass +class NotMsgpackSafe(SynErr): pass class NoSuchAbrv(SynErr): pass class NoSuchAct(SynErr): pass diff --git a/synapse/lib/base.py b/synapse/lib/base.py index 8b7669f4c6..23b44c7d97 100644 --- a/synapse/lib/base.py +++ b/synapse/lib/base.py @@ -102,6 +102,13 @@ async def anit(cls, *args, **kwargs): raise + try: + await self.postAnit() + except Exception: + logger.exception('Error during postAnit callback.') + await self.fini() + raise + return self async def __anit__(self): @@ -140,6 +147,12 @@ async def __anit__(self): self._fini_atexit = False self._active_tasks = set() # the free running tasks associated with me + async def postAnit(self): + ''' + Method called after self.__anit__() has completed, but before anit() returns the object to the caller. + ''' + pass + async def enter_context(self, item): ''' Modeled on Python's contextlib.ExitStack.enter_context. Enters a new context manager and adds its __exit__() diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index 8016ba5167..61921a33cf 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -67,7 +67,6 @@ def wrapped(*args, **kwargs): return decrfunc - class CellApi(s_base.Base): async def __anit__(self, cell, link, user): @@ -477,6 +476,14 @@ class Cell(s_nexus.Pusher, s_telepath.Aware): confdefs = {} # type: ignore # This should be a JSONSchema properties list for an object. confbase = { + 'cell:guid': { + 'description': 'An optional hard-coded GUID to store as the permanent GUID for the cell.', + 'type': 'string', + }, + 'mirror': { + 'description': 'A telepath URL for our upstream mirror (we must be a backup!).', + 'type': 'string', + }, 'auth:passwd': { 'description': 'Set to (local only) to bootstrap the root user password.', 'type': 'string' @@ -490,6 +497,9 @@ class Cell(s_nexus.Pusher, s_telepath.Aware): async def __anit__(self, dirn, conf=None, readonly=False, *args, **kwargs): + if conf is None: + conf = {} + s_telepath.Aware.__init__(self) self.dirn = s_common.gendir(dirn) @@ -498,24 +508,27 @@ async def __anit__(self, dirn, conf=None, readonly=False, *args, **kwargs): self.sessions = {} self.inaugural = False + self.conf = self._initCellConf(conf) + # each cell has a guid path = s_common.genpath(dirn, 'cell.guid') # generate a guid file if needed if not os.path.isfile(path): + self.inaugural = True + + guid = conf.get('cell:guid') + if guid is None: + guid = s_common.guid() + with open(path, 'w') as fd: - fd.write(s_common.guid()) + fd.write(guid) # read our guid file with open(path, 'r') as fd: self.iden = fd.read().strip() - if conf is None: - conf = {} - - self.conf = self._initCellConf(conf) - self.donexslog = self.conf.get('nexslog:en') await s_nexus.Pusher.__anit__(self, self.iden) @@ -547,7 +560,7 @@ async def __anit__(self, dirn, conf=None, readonly=False, *args, **kwargs): user = await self.auth.getUserByName('root') if not user.tryPasswd(auth_passwd): - await user.setPasswd(auth_passwd) + await user.setPasswd(auth_passwd, nexs=False) await self._initCellDmon() @@ -569,25 +582,35 @@ async def fini(): 'cell': self } - async def postNexsAnit(self): - ''' - This must be called near the end of subclass initialization, after all the subsystems that allow nexus log - entries to be executed, but before any new changes can be initiated. - ''' - await self.nexsroot.recover() + async def postAnit(self): + mirror = self.conf.get('mirror') + await self.nexsroot.setLeader(mirror, self.iden) async def _initNexsRoot(self): + ''' + Initialize a NexsRoot to use for the cell. + ''' nexsroot = await s_nexus.NexsRoot.anit(self.dirn, donexslog=self.donexslog) self.onfini(nexsroot.fini) nexsroot.onfini(self) + await nexsroot.setLeader(None, '') return nexsroot async def onLeaderChange(self, leader): ''' - Cell implementers may override this method to be notified when - nexusroot leadership changes. The leader arg will be a bool provided - if the Cell is a leader or not. + Args: + leader(bool): If True, self is now the leader, else is now a follower ''' + self.isleader = leader + if leader: + await self.startAsLeader() + else: + await self.stopAsLeader() + + async def startAsLeader(self): + pass + + async def stopAsLeader(self): pass async def getNexusChanges(self, offs): diff --git a/synapse/lib/hiveauth.py b/synapse/lib/hiveauth.py index 848a3de2d2..16f8c290dd 100644 --- a/synapse/lib/hiveauth.py +++ b/synapse/lib/hiveauth.py @@ -76,6 +76,9 @@ async def __anit__(self, node, nexsroot=None): self.rolesbyname = {} self.authgates = {} + self.allrole = None + self.rootuser = None + roles = await self.node.open(('roles',)) for _, node in roles: await self._addRoleNode(node) @@ -94,12 +97,14 @@ async def __anit__(self, node, nexsroot=None): self.allrole = await self.getRoleByName('all') if self.allrole is None: # initialize the role of which all users are a member - self.allrole = await self.addRole('all') + guid = s_common.guid() + self.allrole = await self._addRole(guid, 'all') # initialize an admin user named root self.rootuser = await self.getUserByName('root') if self.rootuser is None: - self.rootuser = await self.addUser('root') + guid = s_common.guid() + self.rootuser = await self._addUser(guid, 'root') await self.rootuser.setAdmin(True, logged=False) await self.rootuser.setLocked(False, logged=False) @@ -854,10 +859,13 @@ def tryPasswd(self, passwd): return False - async def setPasswd(self, passwd): + async def setPasswd(self, passwd, nexs=True): # Prevent empty string or non-string values if not passwd or not isinstance(passwd, str): raise s_exc.BadArg(mesg='Password must be a string') salt = s_common.guid() hashed = s_common.guid((salt, passwd)) - await self.auth.setUserInfo(self.iden, 'passwd', (salt, hashed)) + if nexs: + await self.auth.setUserInfo(self.iden, 'passwd', (salt, hashed)) + else: + await self.auth._hndlsetUserInfo(self.iden, 'passwd', (salt, hashed)) diff --git a/synapse/lib/layer.py b/synapse/lib/layer.py index 4ce5de947c..85ab2c70b5 100644 --- a/synapse/lib/layer.py +++ b/synapse/lib/layer.py @@ -1015,6 +1015,7 @@ async def __anit__(self, layrinfo, dirn, nexsroot=None, allow_upstream=True): def pack(self): return self.layrinfo.pack() + @s_nexus.Pusher.onPushAuto('layer:truncate') async def truncate(self): self.buidcache.clear() diff --git a/synapse/lib/msgpack.py b/synapse/lib/msgpack.py index 0b72152577..636fe03614 100644 --- a/synapse/lib/msgpack.py +++ b/synapse/lib/msgpack.py @@ -3,6 +3,8 @@ import msgpack import msgpack.fallback as m_fallback +import synapse.exc as s_exc + logger = logging.getLogger(__name__) # Single Packer object which is reused for performance @@ -38,9 +40,14 @@ def en(item): return msgpack.packb(item, use_bin_type=True, unicode_errors='surrogatepass') try: return pakr.pack(item) - except Exception: + except TypeError as e: + pakr.reset() + mesg = f'{e.args[0]}: {repr(item)[:20]}' + raise s_exc.NotMsgpackSafe(mesg=mesg) from e + except Exception as e: pakr.reset() - raise + mesg = f'Cannot serialize: {repr(e)}: {repr(item)[:20]}' + raise s_exc.NotMsgpackSafe(mesg=mesg) from e def un(byts): ''' diff --git a/synapse/lib/nexus.py b/synapse/lib/nexus.py index 7201241259..7a09c7a178 100644 --- a/synapse/lib/nexus.py +++ b/synapse/lib/nexus.py @@ -3,7 +3,7 @@ import functools import contextlib -from typing import List, Dict, Any, Callable, Tuple, Optional, AsyncIterator +from typing import List, Dict, Any, Callable, Tuple, Optional, AsyncIterator, Union import synapse.exc as s_exc import synapse.common as s_common @@ -86,11 +86,13 @@ async def __anit__(self, dirn: str, donexslog: bool = True): # type: ignore self._mirrors: List[ChangeDist] = [] self.donexslog = donexslog + self._preleader_run = False # Whether preleader funcs have been called once + self._preleader_funcs: List[Callable] = [] # Callbacks for after leadership set, but before loop run self._state_lock = asyncio.Lock() - self._state_funcs: List[Callable] = [] # External Callbacks for state changes + self._state_funcs: List[Callable] = [] # Callbacks for leadership changes - # These are used when this cell is a mirror. - self._ldrurl: Optional[str] = None + # Mirror-related + self._ldrurl: Union[str, None, s_common.NoValu] = s_common.novalu # Initialized so that setLeader will run once self._ldr: Optional[s_telepath.Proxy] = None # only set by looptask self._looptask: Optional[asyncio.Task] = None self._ldrready = asyncio.Event() @@ -173,6 +175,8 @@ async def issue(self, nexsiden: str, event: str, args: List[Any], kwargs: Dict[s If I'm not a follower, mutate, otherwise, ask the leader to make the change and wait for the follower loop to hand me the result through a future. ''' + assert self._ldrurl is not s_common.novalu, 'Attempt to issue before leader known' + if not self._ldrurl: return await self.eat(nexsiden, event, args, kwargs, meta) @@ -259,7 +263,7 @@ async def fini(): yield dist - def amLeader(self): + async def isLeader(self): return self._ldrurl is None async def setLeader(self, url: Optional[str], iden: str) -> None: @@ -278,6 +282,9 @@ async def setLeader(self, url: Optional[str], iden: str) -> None: self._ldrurl = url + oldlooptask = self._looptask + + # If the looptask is already running, stop it if self._looptask is not None: self._looptask.cancel() self._looptask = None @@ -286,8 +293,13 @@ async def setLeader(self, url: Optional[str], iden: str) -> None: await self._ldr.fini() self._ldr = None + await self._dopreleader() await self._dostatechange() + if not oldlooptask: + # Before we start mirroring anything, replay the last event because we don't know if it got committed + await self.recover() + if self._ldrurl is None: return @@ -301,8 +313,26 @@ def onStateChange(self, func): ''' self._state_funcs.append(func) + def onPreLeader(self, func): + ''' + Add a method that will be called exactly once inside setLeader, after the leadership is set, but before the + state change hooks are run and the follower loop is started. + + To work, this must be called before the first time that setLeader is called. + ''' + self._preleader_funcs.append(func) + + async def _dopreleader(self): + if self._preleader_run: + return + + self._preleader_run = True + amleader = await self.isLeader() + for func in self._preleader_funcs: + await s_coro.ornot(func, leader=amleader) + async def _dostatechange(self): - amleader = self.amLeader() + amleader = await self.isLeader() async with self._state_lock: for func in self._state_funcs: await s_coro.ornot(func, leader=amleader) @@ -405,6 +435,11 @@ def onfini(): self.nexsroot = nexsroot + async def isLeader(self): + if self.nexsroot is None: + return True + return await self.nexsroot.isLeader() + @classmethod def onPush(cls, event: str, passoff=False) -> Callable: ''' diff --git a/synapse/tests/test_cortex.py b/synapse/tests/test_cortex.py index f758fd3640..58aaf80386 100644 --- a/synapse/tests/test_cortex.py +++ b/synapse/tests/test_cortex.py @@ -762,6 +762,9 @@ async def test_base_types2(self): async with self.getTestReadWriteCores() as (core, wcore): + # Make sure new nodes get different creation times than nodes created in the test CoreModule + await asyncio.sleep(0.001) + # Test some default values async with await wcore.snap() as snap: diff --git a/synapse/tests/test_lib_base.py b/synapse/tests/test_lib_base.py index 4c77d144d0..e596f05b50 100644 --- a/synapse/tests/test_lib_base.py +++ b/synapse/tests/test_lib_base.py @@ -37,6 +37,12 @@ async def __anit__(self, foo): await s_base.Base.__anit__(self) self.foo = foo self.bar = self.foo + 10 + self.posted = False + + async def postAnit(self): + self.posted = True + if self.foo == -1: + raise s_exc.BadArg(mesg='boom') class BaseTest(s_t_utils.SynTest): @@ -58,6 +64,11 @@ async def test_base_anit(self): afoo = await Hehe.anit(20) self.eq(afoo.foo, 20) self.eq(afoo.bar, 30) + self.true(afoo.posted) + + with self.raises(s_exc.BadArg) as cm: + await Hehe.anit(-1) + self.eq(cm.exception.get('mesg'), 'boom') async def test_coro_fini(self): diff --git a/synapse/tests/test_lib_cell.py b/synapse/tests/test_lib_cell.py index 1d0f27627a..e0441022bc 100644 --- a/synapse/tests/test_lib_cell.py +++ b/synapse/tests/test_lib_cell.py @@ -33,7 +33,6 @@ async def stream(self, doraise=False): if doraise: raise s_exc.BadTime(mesg='call again later') - class CellTest(s_t_utils.SynTest): async def test_cell_auth(self): @@ -405,7 +404,6 @@ async def coro(prox, offs): nexsiden, act, args, kwargs, meta = data if nexsiden == 'auth:auth' and act == 'user:add': retn.append(args) - if len(retn) >= 2: break return yielded, retn @@ -422,7 +420,7 @@ async def coro(prox, offs): yielded, data = await asyncio.wait_for(task, 6) self.true(yielded) usernames = [args[1] for args in data] - self.eq(usernames, ['root', 'test']) + self.eq(usernames, ['test']) # Disable change logging for this cell. conf = {'nexslog:en': False} diff --git a/synapse/tests/test_lib_lmdbslab.py b/synapse/tests/test_lib_lmdbslab.py index ac66d5b834..4a04be1f85 100644 --- a/synapse/tests/test_lib_lmdbslab.py +++ b/synapse/tests/test_lib_lmdbslab.py @@ -722,7 +722,7 @@ async def test_slab_guid_stor(self): self.true(info0.pop('woot', s_common.novalu) is s_common.novalu) # Sad path case - self.raises(TypeError, info0.set, 'newp', {1, 2, 3}) + self.raises(s_exc.NotMsgpackSafe, info0.set, 'newp', {1, 2, 3}) async with await s_lmdbslab.Slab.anit(path) as slab: guidstor = s_lmdbslab.GuidStor(slab, 'guids') @@ -1013,6 +1013,7 @@ async def getswait(): # Adding items past the current end of queue should wake waiters data = [] + async def getswait(): async for item in mque.gets('woot', 0, wait=True): diff --git a/synapse/tests/test_lib_msgpack.py b/synapse/tests/test_lib_msgpack.py index 5a1d84ab91..13a774e5b9 100644 --- a/synapse/tests/test_lib_msgpack.py +++ b/synapse/tests/test_lib_msgpack.py @@ -1,5 +1,6 @@ import msgpack +import synapse.exc as s_exc import synapse.common as s_common import synapse.lib.const as s_const @@ -173,9 +174,13 @@ def test_msgpack_large_data(self): self.eq(objs[0], (135266320, struct)) def test_msgpack_bad_types(self): - self.raises(TypeError, s_msgpack.en, {1, 2}) - self.raises(TypeError, s_msgpack.en, Exception()) - self.raises(TypeError, s_msgpack.en, s_msgpack.en) + self.raises(s_exc.NotMsgpackSafe, s_msgpack.en, {1, 2}) + self.raises(s_exc.NotMsgpackSafe, s_msgpack.en, Exception()) + self.raises(s_exc.NotMsgpackSafe, s_msgpack.en, s_msgpack.en) + # too long + with self.raises(s_exc.NotMsgpackSafe) as cm: + s_msgpack.en({'longlong': 45234928034723904723906}) + self.isin('OverflowError', cm.exception.get('mesg')) def test_msgpack_surrogates(self): bads = '\u01cb\ufffd\ud842\ufffd\u0012' diff --git a/synapse/tests/test_lib_nexus.py b/synapse/tests/test_lib_nexus.py index 5155b4338e..f16c499e6e 100644 --- a/synapse/tests/test_lib_nexus.py +++ b/synapse/tests/test_lib_nexus.py @@ -49,13 +49,12 @@ class SampleNexus2(SampleNexus): async def doathing(self, eventdict): return await self._push('thing:doathing', eventdict, 'bar') - async def _thing2handler(self): - return self - class NexusTest(s_t_utils.SynTest): async def test_nexus(self): with self.getTestDir() as dirn: async with await SampleNexus.anit(1) as nexus1, await s_nexus.NexsRoot.anit(dirn) as nexsroot: + await nexsroot.setLeader(None, None) + eventdict = {'specialpush': 0} self.eq('foo', await nexus1.doathing(eventdict)) self.eq(1, eventdict.get('happened')) @@ -69,6 +68,7 @@ async def test_nexus(self): self.eq('doc', nexus1.doathingauto2.__doc__) async with await SampleNexus2.anit(2, nexsroot=nexsroot) as testkid: + eventdict = {'specialpush': 0} # Tricky inheriting handler funcs themselves self.eq('foo', await nexus1.doathing(eventdict)) @@ -95,6 +95,8 @@ async def test_nexus_no_logging(self): with self.getTestDir() as dirn: async with await SampleNexus.anit(1) as nexus1, \ await s_nexus.NexsRoot.anit(dirn, donexslog=False) as nexsroot: + await nexsroot.setLeader(None, None) + eventdict = {'specialpush': 0} self.eq('foo', await nexus1.doathing(eventdict)) self.eq(1, eventdict.get('happened')) diff --git a/synapse/tests/test_lib_slabseqn.py b/synapse/tests/test_lib_slabseqn.py index 63393da1c7..570953c6a0 100644 --- a/synapse/tests/test_lib_slabseqn.py +++ b/synapse/tests/test_lib_slabseqn.py @@ -1,6 +1,8 @@ import os import asyncio +import synapse.exc as s_exc + import synapse.lib.coro as s_coro import synapse.lib.lmdbslab as s_lmdbslab import synapse.lib.slabseqn as s_slabseqn @@ -24,7 +26,7 @@ async def test_slab_seqn(self): retn = tuple(seqn.iter(0)) self.eq(retn, ((0, 'foo'), (1, 10), (2, 20))) - self.raises(TypeError, seqn.save, ({'set'},)) + self.raises(s_exc.NotMsgpackSafe, seqn.save, ({'set'},)) retn = tuple(seqn.iter(0)) self.eq(retn, ((0, 'foo'), (1, 10), (2, 20))) diff --git a/synapse/tests/test_lib_stormsvc.py b/synapse/tests/test_lib_stormsvc.py index 9ba3a3c1bd..e53c577ffd 100644 --- a/synapse/tests/test_lib_stormsvc.py +++ b/synapse/tests/test_lib_stormsvc.py @@ -291,6 +291,7 @@ class StormvarService(s_cell.CellApi, s_stormsvc.StormSvc): ) class StormvarServiceCell(s_cell.Cell): + cellapi = StormvarService @contextlib.contextmanager @@ -655,19 +656,26 @@ async def test_storm_svc_restarts(self): pkg = await core.getStormPkg('old') self.eq(pkg.get('version'), (0, 1, 0)) - async with await s_cortex.Cortex.anit(dirn) as core: - self.nn(core.getStormCmd('newcmd')) - self.nn(core.getStormCmd('new.baz')) - self.nn(core.getStormCmd('old.bar')) - self.nn(core.getStormCmd('runtecho')) - self.none(core.getStormCmd('oldcmd')) - self.none(core.getStormCmd('old.baz')) - self.isin('old', core.stormpkgs) - self.isin('new', core.stormpkgs) - self.isin('echo', core.stormmods) - self.isin('old.bar', core.stormmods) - self.isin('new.baz', core.stormmods) - self.notin('old.baz', core.stormmods) + # This test verifies that storm commands loaded from a previously connected service are still available, + # even if the service is not available now + with self.getLoggerStream('synapse.lib.nexus') as stream: + async with await s_cortex.Cortex.anit(dirn) as core: + self.nn(core.getStormCmd('newcmd')) + self.nn(core.getStormCmd('new.baz')) + self.nn(core.getStormCmd('old.bar')) + self.nn(core.getStormCmd('runtecho')) + self.none(core.getStormCmd('oldcmd')) + self.none(core.getStormCmd('old.baz')) + self.isin('old', core.stormpkgs) + self.isin('new', core.stormpkgs) + self.isin('echo', core.stormmods) + self.isin('old.bar', core.stormmods) + self.isin('new.baz', core.stormmods) + self.notin('old.baz', core.stormmods) + + stream.seek(0) + mesgs = stream.read() + self.notin('Exception while replaying', mesgs) async def test_storm_vars(self): diff --git a/synapse/tests/test_lib_stormtypes.py b/synapse/tests/test_lib_stormtypes.py index 8358b921f9..78442f9cda 100644 --- a/synapse/tests/test_lib_stormtypes.py +++ b/synapse/tests/test_lib_stormtypes.py @@ -914,10 +914,10 @@ async def test_persistent_vars(self): # Storing a valu into the hive that can't be msgpacked fails q = '[test:str=test] $lib.user.vars.set(mynode, $node)' mesgs = await s_test.alist(prox.storm(q)) - err = "can not serialize 'Node' object" + err = "can not serialize 'Node'" errs = [m for m in mesgs if m[0] == 'err'] self.len(1, errs) - self.eq(errs[0][1][1].get('mesg'), err) + self.isin(err, errs[0][1][1].get('mesg')) # Sad path - names must be strings. q = '$lib.globals.set((my, nested, valu), haha)' diff --git a/synapse/tests/test_tools_migrate200.py b/synapse/tests/test_tools_migrate200.py index 048068de4b..7d69db2727 100644 --- a/synapse/tests/test_tools_migrate200.py +++ b/synapse/tests/test_tools_migrate200.py @@ -16,7 +16,6 @@ import synapse.tests.utils as s_t_utils import synapse.lib.cell as s_cell -import synapse.lib.coro as s_coro import synapse.lib.msgpack as s_msgpack import synapse.lib.version as s_version import synapse.lib.lmdbslab as s_lmdbslab @@ -267,7 +266,7 @@ async def _checkSplices(self, core0, tdata): lyr01 = core1.getLayer() nes0 = [nodeedits for offs, nodeedits in lyr00.nodeeditlog.iter(0)] - sodes0 = [await lyr01.storNodeEdits(nes[0], None) for nes in nes0] + [await lyr01.storNodeEdits(nes[0], None) for nes in nes0] self.lt(len(nes0), tdata['splices'][lyr00.iden]['nextoffs']) # secondary layer @@ -276,7 +275,7 @@ async def _checkSplices(self, core0, tdata): lyr11 = core1.getLayer(lyr11def['iden']) nes1 = [nodeedits for offs, nodeedits in lyr10.nodeeditlog.iter(0)] - sodes1 = [await lyr11.storNodeEdits(nes[0], None) for nes in nes1] + [await lyr11.storNodeEdits(nes[0], None) for nes in nes1] self.lt(len(nes1), tdata['splices'][lyr10.iden]['nextoffs']) await self._checkCore(core1, tdata, nodesonly=True) @@ -1061,11 +1060,11 @@ async def test_migr_splices(self): lyr0 = core.layers[locallyrs[0]] nes0 = [nodeedits for offs, nodeedits in lyr0.nodeeditlog.iter(0)] - sodes0 = [await lyr0.storNodeEdits(nes[0], None) for nes in nes0] + [await lyr0.storNodeEdits(nes[0], None) for nes in nes0] lyr1 = core.layers[locallyrs[1]] nes1 = [nodeedits for offs, nodeedits in lyr1.nodeeditlog.iter(0)] - sodes1 = [await lyr1.storNodeEdits(nes[0], None) for nes in nes1] + [await lyr1.storNodeEdits(nes[0], None) for nes in nes1] await self._checkCore(core, tdata, nodesonly=True) @@ -1235,7 +1234,7 @@ async def test_migr_partiallayer(self): async with await s_lmdbslab.Slab.anit(lyrdirn) as slab: bybuid = slab.initdb('bybuid') - for lkey, lval in slab.scanByFull(db=bybuid): + for lkey, _ in slab.scanByFull(db=bybuid): prop = lkey[32:].decode('utf8') if prop[0] == '*': slab.delete(lkey, db=bybuid) @@ -1282,7 +1281,7 @@ async def test_migr_partiallayer_errors(self): async with await s_lmdbslab.Slab.anit(lyrdirn) as slab: bybuid = slab.initdb('bybuid') - for lkey, lval in slab.scanByFull(db=bybuid): + for lkey, _ in slab.scanByFull(db=bybuid): prop = lkey[32:].decode('utf8') if prop[0] == '*': slab.delete(lkey, db=bybuid) diff --git a/synapse/tools/migrate_200.py b/synapse/tools/migrate_200.py index 2e93c5600d..32c84e422b 100644 --- a/synapse/tools/migrate_200.py +++ b/synapse/tools/migrate_200.py @@ -832,6 +832,7 @@ async def _initStors(self, migr=True, nexus=True, cell=True): path = os.path.join(self.dest) if self.nexusroot is None: self.nexusroot = await s_nexus.NexsRoot.anit(path) + await self.nexusroot.setLeader(None, '') self.onfini(self.nexusroot.fini) # open cell