diff --git a/synapse/cortex.py b/synapse/cortex.py index f1f1233862..e5a7045f3a 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -484,7 +484,7 @@ async def watch(self, wdef): async for mesg in self.cell.watch(wdef): yield mesg - async def syncLayerNodeEdits(self, offs, layriden=None): + async def syncLayerNodeEdits(self, offs, layriden=None, wait=True): ''' Yield (indx, mesg) nodeedit sets for the given layer beginning at offset. @@ -493,9 +493,12 @@ async def syncLayerNodeEdits(self, offs, layriden=None): consumer falls behind the max window size of 10,000 nodeedit messages. ''' layr = self.cell.getLayer(layriden) + if layr is None: + raise s_exc.NoSuchLayer(iden=layriden) + self.user.confirm(('sync',), gateiden=layr.iden) - async for item in self.cell.syncLayerNodeEdits(layr.iden, offs): + async for item in self.cell.syncLayerNodeEdits(layr.iden, offs, wait=wait): yield item @s_cell.adminapi() @@ -761,9 +764,8 @@ class Cortex(s_cell.Cell): # type: ignore layrctor = s_layer.Layer.anit spawncorector = 'synapse.lib.spawn.SpawnCore' - async def __anit__(self, dirn, conf=None): - - await s_cell.Cell.__anit__(self, dirn, conf=conf) + # phase 2 - service storage + async def initServiceStorage(self): # NOTE: we may not make *any* nexus actions in this method @@ -774,10 +776,6 @@ async def __anit__(self, dirn, conf=None): s_version.reqVersion(corevers, reqver, exc=s_exc.BadStorageVersion, mesg='cortex version in storage is incompatible with running software') - # share ourself via the cell dmon as "cortex" - # for potential default remote use - self.dmon.share('cortex', self) - self.views = {} self.layers = {} self.modules = {} @@ -786,7 +784,6 @@ async def __anit__(self, dirn, conf=None): self.stormcmds = {} self.spawnpool = None - self.mirror = self.conf.get('mirror') self.storm_cmd_ctors = {} self.storm_cmd_cdefs = {} @@ -863,6 +860,7 @@ async def __anit__(self, dirn, conf=None): cmdhive = await self.hive.open(('cortex', 'storm', 'cmds')) pkghive = await self.hive.open(('cortex', 'storm', 'packages')) svchive = await self.hive.open(('cortex', 'storm', 'services')) + self.cmdhive = await cmdhive.dict() self.pkghive = await pkghive.dict() self.svchive = await svchive.dict() @@ -882,38 +880,24 @@ async def __anit__(self, dirn, conf=None): 'axon': self.axon }) - self.nexsroot.onPreLeader(self.preLeaderHook) - await self.auth.addAuthGate('cortex', 'cortex') - async def _initNexsRoot(self): - ''' - Just like cell _initNexsRoot except doesn't call nexsroot.setLeader - ''' - nexsroot = await s_nexus.NexsRoot.anit(self.dirn, donexslog=self.donexslog) - self.onfini(nexsroot.fini) - nexsroot.onfini(self) - return nexsroot + async def initServiceRuntime(self): - async def preLeaderHook(self, leader): - ''' - These run after the leader is set, but before the leader/follower callbacks are run and the follower loop runs - ''' + # do any post-nexus initialization here... await self._initCoreMods() await self._initStormSvcs() - async def startAsLeader(self): - ''' - Run things that only a leader Cortex runs. - ''' + # share ourself via the cell dmon as "cortex" + # for potential default remote use + self.dmon.share('cortex', self) + + async def initServiceActive(self): if self.conf.get('cron:enable'): await self.agenda.start() await self.stormdmons.start() - async def stopAsLeader(self): - ''' - Stop things that only a leader Cortex runs. - ''' + async def initServicePassive(self): await self.agenda.stop() await self.stormdmons.stop() @@ -985,8 +969,9 @@ async def coreQueueGets(self, name, offs=0, cull=True, wait=None, size=None): async def coreQueuePuts(self, name, items): await self._push('queue:puts', name, items) - @s_nexus.Pusher.onPush('queue:puts', passoff=True) - async def _coreQueuePuts(self, name, items, nexsoff): + @s_nexus.Pusher.onPush('queue:puts', passitem=True) + async def _coreQueuePuts(self, name, items, nexsitem): + nexsoff, nexsmesg = nexsitem await self.multiqueue.puts(name, items, reqid=nexsoff) @s_nexus.Pusher.onPushAuto('queue:cull') @@ -997,6 +982,11 @@ async def coreQueueSize(self, name): return self.multiqueue.size(name) async def getSpawnInfo(self): + + if self.spawncorector is None: + mesg = 'spawn storm option not supported on this cortex' + raise s_exc.FeatureNotSupported(mesg=mesg) + ret = { 'iden': self.iden, 'dirn': self.dirn, @@ -1424,11 +1414,11 @@ async def _delStormSvc(self, iden): Delete a registered storm service from the cortex. ''' sdef = self.svchive.get(iden) - if sdef is None: + if sdef is None: # pragma: no cover return try: - if await self.isLeader(): + if self.isactive: await self.runStormSvcEvent(iden, 'del') except asyncio.CancelledError: # pragma: no cover raise @@ -1822,7 +1812,7 @@ async def _onCoreFini(self): if self.axon: await self.axon.fini() - async def syncLayerNodeEdits(self, iden, offs): + async def syncLayerNodeEdits(self, iden, offs, wait=True): ''' Yield (offs, mesg) tuples for nodeedits in a layer. ''' @@ -1830,7 +1820,7 @@ async def syncLayerNodeEdits(self, iden, offs): if layr is None: raise s_exc.NoSuchLayer(iden=iden) - async for item in layr.syncNodeEdits(offs): + async for item in layr.syncNodeEdits(offs, wait=wait): yield item async def spliceHistory(self, user): @@ -2379,7 +2369,7 @@ def getLayer(self, iden=None): def listLayers(self): return self.layers.values() - async def getLayerDef(self, iden): + async def getLayerDef(self, iden=None): layr = self.getLayer(iden) if layr is not None: return layr.pack() @@ -2503,14 +2493,14 @@ async def _ctorLayr(self, layrinfo): ''' iden = layrinfo.get('iden') path = s_common.gendir(self.dirn, 'layers', iden) + # In case that we're a mirror follower and we have a downstream layer, disable upstream sync - return await s_layer.Layer.anit(layrinfo, path, nexsroot=self.nexsroot, allow_upstream=not self.mirror) + # FIXME allow_upstream needs to be separated out + mirror = self.conf.get('mirror') + return await s_layer.Layer.anit(layrinfo, path, nexsroot=self.nexsroot, allow_upstream=not mirror) async def _initCoreLayers(self): - node = await self.hive.open(('cortex', 'layers')) - - # TODO eventually hold this and watch for changes for _, node in node: layrinfo = await node.dict() await self._initLayr(layrinfo) diff --git a/synapse/daemon.py b/synapse/daemon.py index a6953bbe46..f327a0c098 100644 --- a/synapse/daemon.py +++ b/synapse/daemon.py @@ -228,7 +228,7 @@ async def __anit__(self, certdir=None): self.cells = {} # all cells are shared. not all shared are cells. self.shared = {} # objects provided by daemon self.listenservers = [] # the sockets we're listening on - self.connectedlinks = [] # the links we're currently connected on + self.links = set() self.sessions = {} @@ -272,11 +272,16 @@ async def listen(self, url, **opts): sslctx = None if scheme == 'ssl': + + caname = None hostname = None + query = info.get('query') if query is not None: hostname = query.get('hostname', host) - sslctx = self.certdir.getServerSSLContext(hostname=hostname) + caname = query.get('ca') + + sslctx = self.certdir.getServerSSLContext(hostname=hostname, caname=caname) server = await s_link.listen(host, port, self._onLinkInit, ssl=sslctx) @@ -321,7 +326,7 @@ async def _onDmonFini(self): if finis: await asyncio.wait(finis) - finis = [link.fini() for link in self.connectedlinks] + finis = [link.fini() for link in self.links] if finis: await asyncio.wait(finis) @@ -331,6 +336,12 @@ async def _onDmonFini(self): async def _onLinkInit(self, link): + self.links.add(link) + async def fini(): + self.links.discard(link) + + link.onfini(fini) + async def rxloop(): while not link.isfini: @@ -343,7 +354,7 @@ async def rxloop(): coro = self._onLinkMesg(link, mesg) link.schedCoro(coro) - self.schedCoro(rxloop()) + link.schedCoro(rxloop()) async def _onLinkMesg(self, link, mesg): diff --git a/synapse/exc.py b/synapse/exc.py index 9899fdf76e..0fd9560691 100644 --- a/synapse/exc.py +++ b/synapse/exc.py @@ -146,6 +146,7 @@ class LinkShutDown(LinkErr): pass class NoCertKey(SynErr): ''' Raised when a Cert object requires a RSA Private Key to perform an operation and the key is not present. ''' pass +class NoSuchCert(SynErr): pass class ModAlreadyLoaded(SynErr): pass class MustBeJsonSafe(SynErr): pass @@ -209,6 +210,7 @@ class SchemaViolation(SynErr): pass class SlabAlreadyOpen(SynErr): pass class SpawnExit(SynErr): pass +class FeatureNotSupported(SynErr): pass class ReadOnlyLayer(SynErr): pass class ReadOnlyProp(SynErr): pass diff --git a/synapse/lib/agenda.py b/synapse/lib/agenda.py index 9cb10709ee..9e583c2759 100644 --- a/synapse/lib/agenda.py +++ b/synapse/lib/agenda.py @@ -58,11 +58,11 @@ 'req': { 'type': 'object', 'properties': { - 'minute': {'type': 'number'}, - 'hour': {'type': 'number'}, - 'dayofmonth': {'type': 'number'}, - 'month': {'type': 'number'}, - 'year': {'type': 'number'}, + 'minute': {'oneOf': [{'type': 'number'}, {'type': 'array', 'items': {'type': 'number'}}]}, + 'hour': {'oneOf': [{'type': 'number'}, {'type': 'array', 'items': {'type': 'number'}}]}, + 'dayofmonth': {'oneOf': [{'type': 'number'}, {'type': 'array', 'items': {'type': 'number'}}]}, + 'month': {'oneOf': [{'type': 'number'}, {'type': 'array', 'items': {'type': 'number'}}]}, + 'year': {'oneOf': [{'type': 'number'}, {'type': 'array', 'items': {'type': 'number'}}]}, } } } diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index 4c2c4c6183..824a0e3436 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -137,6 +137,14 @@ def getCellType(self): def getCellIden(self): return self.cell.getCellIden() + @adminapi() + def getNexsIndx(self): + return self.cell.getNexsIndx() + + @adminapi() + async def promote(self): + return await self.cell.promote() + def getCellUser(self): return self.user.pack() @@ -445,6 +453,14 @@ async def getDiagInfo(self): class Cell(s_nexus.Pusher, s_telepath.Aware): ''' A Cell() implements a synapse micro-service. + + A Cell has 5 phases of startup: + 1. Universal cell data structures + 2. Service specific storage/data (pre-nexs) + 3. Nexus subsystem initialization + 4. Service specific startup (with nexus) + 5. Networking and mirror services + ''' cellapi = CellApi @@ -467,25 +483,34 @@ class Cell(s_nexus.Pusher, s_telepath.Aware): 'description': 'Record all changes to the cell. Required for mirroring (on both sides).', 'type': 'boolean' }, + 'dmon:listen': { + 'description': 'A config-driven way to specify the telepath bind URL.', + 'type': ['string', 'null'], + }, + 'https:port': { + 'description': 'A config-driven way to specify the HTTPS port.', + 'type': ['integer', 'null'], + }, } - async def __anit__(self, dirn, conf=None, readonly=False, *args, **kwargs): + async def __anit__(self, dirn, conf=None, readonly=False): + # phase 1 if conf is None: conf = {} s_telepath.Aware.__init__(self) - self.dirn = s_common.gendir(dirn) self.auth = None self.sessions = {} + self.isactive = False self.inaugural = False self.conf = self._initCellConf(conf) # each cell has a guid - path = s_common.genpath(dirn, 'cell.guid') + path = s_common.genpath(self.dirn, 'cell.guid') # generate a guid file if needed if not os.path.isfile(path): @@ -505,12 +530,22 @@ async def __anit__(self, dirn, conf=None, readonly=False, *args, **kwargs): self.donexslog = self.conf.get('nexslog:en') + if self.conf.get('mirror') and not self.conf.get('nexslog:en'): + mesg = 'Mirror mode requires nexslog:en=True' + raise s_exc.BadConfValu(mesg=mesg) + + # construct our nexsroot instance ( but do not start it ) await s_nexus.Pusher.__anit__(self, self.iden) - await self._initCellSlab(readonly=readonly) + root = await self._ctorNexsRoot() + + # mutually assured destruction with our nexs root + self.onfini(root.fini) + root.onfini(self.fini) - self.setNexsRoot(await self._initNexsRoot()) - self.nexsroot.onStateChange(self.onLeaderChange) + self.setNexsRoot(root) + + await self._initCellSlab(readonly=readonly) self.hive = await self._initCellHive() @@ -536,55 +571,103 @@ async def __anit__(self, dirn, conf=None, readonly=False, *args, **kwargs): if not user.tryPasswd(auth_passwd): await user.setPasswd(auth_passwd, nexs=False) - await self._initCellDmon() - self.boss = await s_boss.Boss.anit() self.onfini(self.boss) - await self._initCellHttp() + self.dynitems = { + 'auth': self.auth, + 'cell': self + } + # initialize web app and callback data structures self._health_funcs = [] self.addHealthFunc(self._cellHealth) - async def fini(): - [await s.fini() for s in self.sessions.values()] + # initialize network daemons (but do not listen yet) + # to allow registration of callbacks and shared objects + # within phase 2/4. + await self._initCellHttp() + await self._initCellDmon() - self.onfini(fini) + # phase 2 - service storage + await self.initServiceStorage() + # phase 3 - nexus subsystem + await self.initNexusSubsystem() + # phase 4 - service logic + await self.initServiceRuntime() + # phase 5 - service networking + await self.initServiceNetwork() - self.dynitems = { - 'auth': self.auth, - 'cell': self - } + async def initServiceStorage(self): + pass - async def postAnit(self): + async def initNexusSubsystem(self): mirror = self.conf.get('mirror') - await self.nexsroot.setLeader(mirror, self.iden) + await self.nexsroot.startup(mirror, celliden=self.iden) + await self.setCellActive(mirror is None) + + async def initServiceNetwork(self): + + # start a unix local socket daemon listener + sockpath = os.path.join(self.dirn, 'sock') + sockurl = f'unix://{sockpath}' - async def _initNexsRoot(self): + try: + await self.dmon.listen(sockurl) + except asyncio.CancelledError: # pragma: no cover + raise + except OSError as e: + logger.error(f'Failed to listen on unix socket at: [{sockpath}][{e}]') + logger.error('LOCAL UNIX SOCKET WILL BE UNAVAILABLE') + except Exception: # pragma: no cover + logging.exception(f'Unknown dmon listen error.') + raise + + turl = self.conf.get('dmon:listen') + if turl is not None: + await self.dmon.listen(turl) + logger.info(f'dmon listening: {turl}') + + port = self.conf.get('https:port') + if port is not None: + await self.addHttpsPort(port) + logger.info(f'https listening: {port}') + + async def initServiceRuntime(self): + pass + + async def _ctorNexsRoot(self): ''' Initialize a NexsRoot to use for the cell. ''' - nexsroot = await s_nexus.NexsRoot.anit(self.dirn, donexslog=self.donexslog) - self.onfini(nexsroot.fini) - nexsroot.onfini(self) - await nexsroot.setLeader(None, '') - return nexsroot + return await s_nexus.NexsRoot.anit(self.dirn, donexslog=self.donexslog) + + async def getNexsIndx(self): + return await self.nexsroot.index() - async def onLeaderChange(self, leader): + async def promote(self): ''' - Args: - leader(bool): If True, self is now the leader, else is now a follower + Transform this cell from a passive follower to + an active cell that writes changes locally. ''' - self.isleader = leader - if leader: - await self.startAsLeader() + if self.conf.get('mirror') is None: + mesg = 'promote() called on non-mirror' + raise s_exc.BadConfValu(mesg=mesg) + + await self.nexsroot.promote() + await self.setCellActive(True) + + async def setCellActive(self, active): + self.isactive = active + if self.isactive: + await self.initServiceActive() else: - await self.stopAsLeader() + await self.initServicePassive() - async def startAsLeader(self): + async def initServiceActive(self): # pragma: no cover pass - async def stopAsLeader(self): + async def initServicePassive(self): # pragma: no cover pass async def getNexusChanges(self, offs): @@ -832,6 +915,7 @@ async def _initCellHttp(self): self.sessstor = s_lmdbslab.GuidStor(self.slab, 'http:sess') async def fini(): + [await s.fini() for s in self.sessions.values()] for http in self.httpds: http.stop() @@ -876,24 +960,11 @@ def addHttpApi(self, path, ctor, info): )) async def _initCellDmon(self): - # start a unix local socket daemon listener - sockpath = os.path.join(self.dirn, 'sock') - sockurl = f'unix://{sockpath}' + cdir = s_common.gendir(self.dirn, 'certs') - self.dmon = await s_daemon.Daemon.anit() + self.dmon = await s_daemon.Daemon.anit(certdir=(cdir, s_certdir.defdir)) self.dmon.share('*', self) - try: - await self.dmon.listen(sockurl) - except asyncio.CancelledError: # pragma: no cover - raise - except OSError as e: - logger.error(f'Failed to listen on unix socket at: [{sockpath}][{e}]') - logger.error('LOCAL UNIX SOCKET WILL BE UNAVAILABLE') - except Exception: # pragma: no cover - logging.exception(f'Unknown dmon listen error.') - raise - self.onfini(self.dmon.fini) async def _initCellHive(self): @@ -1105,16 +1176,34 @@ async def initFromArgv(cls, argv, outp=None): try: - await cell.addHttpsPort(opts.https) - await cell.dmon.listen(opts.telepath) + if 'dmon:listen' not in cell.conf: + await cell.dmon.listen(opts.telepath) + if outp is not None: + outp.printf(f'...{cell.getCellType()} API (telepath): %s' % (opts.telepath,)) + else: + + if outp is not None: + lisn = cell.conf.get('dmon:listen') + if lisn is None: + lisn = cell.getLocalUrl() + + outp.printf(f'...{cell.getCellType()} API (telepath): %s' % (lisn,)) + + if 'https:port' not in cell.conf: + await cell.addHttpsPort(opts.https) + if outp is not None: + outp.printf(f'...{cell.getCellType()} API (https): %s' % (opts.https,)) + else: + if outp is not None: + port = cell.conf.get('https:port') + if port is None: + outp.printf(f'...{cell.getCellType()} API (https): disabled') + else: + outp.printf(f'...{cell.getCellType()} API (https): %s' % (port,)) if opts.name is not None: cell.dmon.share(opts.name, cell) - - if outp is not None: - outp.printf(f'...{cell.getCellType()} API (telepath): %s' % (opts.telepath,)) - outp.printf(f'...{cell.getCellType()} API (https): %s' % (opts.https,)) - if opts.name is not None: + if outp is not None: outp.printf(f'...{cell.getCellType()} API (telepath name): %s' % (opts.name,)) except Exception: diff --git a/synapse/lib/certdir.py b/synapse/lib/certdir.py index 3be890a4f4..3bc78caf44 100644 --- a/synapse/lib/certdir.py +++ b/synapse/lib/certdir.py @@ -48,12 +48,21 @@ def __init__(self, path=None): if path is None: path = defdir - s_common.gendir(path, 'cas') - s_common.gendir(path, 'hosts') - s_common.gendir(path, 'users') + self.certdirs = [] - self.certdir = s_common.reqdir(path) - self.path = path + # for backward compatibility, do some type inspection + if isinstance(path, str): + self.certdirs.append(s_common.gendir(path)) + elif isinstance(path, (tuple, list)): + [self.certdirs.append(s_common.gendir(p)) for p in path] + else: + mesg = 'Certdir path must be a path string or a list/tuple of path strings.' + raise s_exc.SynErr(mesg=mesg) + + for cdir in self.certdirs: + s_common.gendir(cdir, 'cas') + s_common.gendir(cdir, 'hosts') + s_common.gendir(cdir, 'users') def genCaCert(self, name, signas=None, outp=None, save=True): ''' @@ -314,14 +323,16 @@ def getCaCerts(self): ''' retn = [] - path = s_common.genpath(self.certdir, 'cas') + for cdir in self.certdirs: + + path = s_common.genpath(cdir, 'cas') - for name in os.listdir(path): - if not name.endswith('.crt'): - continue + for name in os.listdir(path): + if not name.endswith('.crt'): + continue - full = s_common.genpath(self.certdir, 'cas', name) - retn.append(self._loadCertPath(full)) + full = s_common.genpath(cdir, 'cas', name) + retn.append(self._loadCertPath(full)) return retn @@ -340,10 +351,10 @@ def getCaCertPath(self, name): Returns: str: The path if exists. ''' - path = s_common.genpath(self.certdir, 'cas', '%s.crt' % name) - if not os.path.isfile(path): - return None - return path + for cdir in self.certdirs: + path = s_common.genpath(cdir, 'cas', '%s.crt' % name) + if os.path.isfile(path): + return path def getCaKey(self, name): ''' @@ -377,10 +388,10 @@ def getCaKeyPath(self, name): Returns: str: The path if exists. ''' - path = s_common.genpath(self.certdir, 'cas', '%s.key' % name) - if not os.path.isfile(path): - return None - return path + for cdir in self.certdirs: + path = s_common.genpath(cdir, 'cas', '%s.key' % name) + if os.path.isfile(path): + return path def getClientCert(self, name): ''' @@ -417,10 +428,10 @@ def getClientCertPath(self, name): Returns: str: The path if exists. ''' - path = s_common.genpath(self.certdir, 'users', '%s.p12' % name) - if not os.path.isfile(path): - return None - return path + for cdir in self.certdirs: + path = s_common.genpath(cdir, 'users', '%s.p12' % name) + if os.path.isfile(path): + return path def getHostCaPath(self, name): ''' @@ -475,10 +486,10 @@ def getHostCertPath(self, name): Returns: str: The path if exists. ''' - path = s_common.genpath(self.certdir, 'hosts', '%s.crt' % name) - if not os.path.isfile(path): - return None - return path + for cdir in self.certdirs: + path = s_common.genpath(cdir, 'hosts', '%s.crt' % name) + if os.path.isfile(path): + return path def getHostKey(self, name): ''' @@ -512,10 +523,10 @@ def getHostKeyPath(self, name): Returns: str: The path if exists. ''' - path = s_common.genpath(self.certdir, 'hosts', '%s.key' % name) - if not os.path.isfile(path): - return None - return path + for cdir in self.certdirs: + path = s_common.genpath(cdir, 'hosts', '%s.key' % name) + if os.path.isfile(path): + return path def getUserCaPath(self, name): ''' @@ -570,10 +581,10 @@ def getUserCertPath(self, name): Returns: str: The path if exists. ''' - path = s_common.genpath(self.certdir, 'users', '%s.crt' % name) - if not os.path.isfile(path): - return None - return path + for cdir in self.certdirs: + path = s_common.genpath(cdir, 'users', '%s.crt' % name) + if os.path.isfile(path): + return path def getUserForHost(self, user, host): ''' @@ -629,10 +640,10 @@ def getUserKeyPath(self, name): Returns: str: The path if exists. ''' - path = s_common.genpath(self.certdir, 'users', '%s.key' % name) - if not os.path.isfile(path): - return None - return path + for cdir in self.certdirs: + path = s_common.genpath(cdir, 'users', '%s.key' % name) + if os.path.isfile(path): + return path def importFile(self, path, mode, outp=None): ''' @@ -664,7 +675,7 @@ def importFile(self, path, mode, outp=None): mesg = 'importFile only supports .crt, .key, .p12 extensions' raise s_exc.BadFileExt(mesg=mesg, ext=ext) - newpath = s_common.genpath(self.certdir, mode, fname) + newpath = s_common.genpath(self.certdirs[0], mode, fname) if os.path.isfile(newpath): raise s_exc.FileExists('File already exists') @@ -687,8 +698,7 @@ def isCaCert(self, name): Returns: bool: True if the certificate is present, False otherwise. ''' - crtpath = self._getPathJoin('cas', '%s.crt' % name) - return os.path.isfile(crtpath) + return self.getCaCertPath(name) is not None def isClientCert(self, name): ''' @@ -723,8 +733,7 @@ def isHostCert(self, name): Returns: bool: True if the certificate is present, False otherwise. ''' - crtpath = self._getPathJoin('hosts', '%s.crt' % name) - return os.path.isfile(crtpath) + return self.getHostCertPath(name) is not None def isUserCert(self, name): ''' @@ -741,8 +750,7 @@ def isUserCert(self, name): Returns: bool: True if the certificate is present, False otherwise. ''' - crtpath = self._getPathJoin('users', '%s.crt' % name) - return os.path.isfile(crtpath) + return self.getUserCertPath(name) is not None def signCertAs(self, cert, signas): ''' @@ -831,27 +839,49 @@ def signUserCsr(self, xcsr, signas, outp=None): return self.genUserCert(name, csr=pkey, signas=signas, outp=outp) def _loadCasIntoSSLContext(self, ctx): - path = s_common.genpath(self.certdir, 'cas') - for name in os.listdir(path): - if name.endswith('.crt'): - ctx.load_verify_locations(os.path.join(path, name)) + for cdir in self.certdirs: + path = s_common.genpath(cdir, 'cas') + for name in os.listdir(path): + if name.endswith('.crt'): + ctx.load_verify_locations(os.path.join(path, name)) - def getClientSSLContext(self): + def getClientSSLContext(self, certname=None): ''' Returns an ssl.SSLContext appropriate for initiating a TLS session + + Args: + certname: If specified, use the user certificate with the matching + name to authenticate to the remote service. ''' sslctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) self._loadCasIntoSSLContext(sslctx) + if certname is not None: + certfile = self.getUserCertPath(certname) + if certfile is None: + mesg = f'Missing TLS certificate file for user: {certname}' + raise s_exc.NoSuchCert(mesg=mesg) + + keyfile = self.getUserKeyPath(certname) + if keyfile is None: + mesg = f'Missing TLS key file for user: {certname}' + raise s_exc.NoCertKey(mesg=mesg) + + sslctx.load_cert_chain(certfile, keyfile) + return sslctx - def getServerSSLContext(self, hostname=None): + def getServerSSLContext(self, hostname=None, caname=None): ''' Returns an ssl.SSLContext appropriate to listen on a socket Args: + hostname: if None, the value from socket.gethostname is used to find the key in the servers directory. - This name should match the not-suffixed part of two files ending in .key and .crt in the hosts subdirectory + This name should match the not-suffixed part of two files ending in .key and .crt in the hosts + subdirectory + + caname: If not None, the given name is used to locate a CA certificate used to validate client SSL certs. ''' sslctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) @@ -870,6 +900,11 @@ def getServerSSLContext(self, hostname=None): sslctx.load_cert_chain(certfile, keyfile) + if caname is not None: + cafile = self.getCaCertPath(caname) + sslctx.verify_mode = ssl.VerifyMode.CERT_REQUIRED + sslctx.load_verify_locations(cafile=cafile) + return sslctx def saveCertPem(self, cert, path): @@ -935,13 +970,8 @@ def _genPkeyCsr(self, name, mode, outp=None): outp.printf('csr saved: %s' % (csrpath,)) def _getCaPath(self, cert): - subj = cert.get_issuer() - capath = self._getPathJoin('cas', '%s.crt' % subj.CN) - if not os.path.isfile(capath): - return None - - return capath + return self.getCaCertPath(subj.CN) def _getPathBytes(self, path): if path is None: @@ -949,7 +979,7 @@ def _getPathBytes(self, path): return s_common.getbytes(path) def _getPathJoin(self, *paths): - return s_common.genpath(self.certdir, *paths) + return s_common.genpath(self.certdirs[0], *paths) def _loadCertPath(self, path): byts = self._getPathBytes(path) diff --git a/synapse/lib/config.py b/synapse/lib/config.py index 22ec2e5719..25bc79d69e 100644 --- a/synapse/lib/config.py +++ b/synapse/lib/config.py @@ -161,14 +161,20 @@ def getArgParseArgs(self): if conf.get('hideconf'): continue + typename = conf.get('type') + # only allow single-typed values to have command line arguments + if not isinstance(typename, str): + continue + atyp = jsonschematype2argparse.get(conf.get('type')) if atyp is None: continue - akwargs = {'help': conf.get('description', ''), - 'action': 'store', - 'type': atyp, - } + akwargs = { + 'help': conf.get('description', ''), + 'action': 'store', + 'type': atyp, + } if atyp is bool: diff --git a/synapse/lib/hive.py b/synapse/lib/hive.py index a72d583974..3144c38c53 100644 --- a/synapse/lib/hive.py +++ b/synapse/lib/hive.py @@ -173,7 +173,7 @@ async def getHiveAuth(self): path = tuple(self.conf.get('auth:path').split('/')) node = await self.open(path) - self.auth = await s_hiveauth.Auth.anit(node) + self.auth = await s_hiveauth.Auth.anit(node, nexsroot=self.nexsroot) self.onfini(self.auth.fini) return self.auth diff --git a/synapse/lib/hiveauth.py b/synapse/lib/hiveauth.py index 1c2fd2637f..d59c25fc7b 100644 --- a/synapse/lib/hiveauth.py +++ b/synapse/lib/hiveauth.py @@ -112,13 +112,15 @@ async def __anit__(self, node, nexsroot=None): if self.allrole is None: # initialize the role of which all users are a member guid = s_common.guid() - self.allrole = await self._addRole(guid, 'all') + await self._addRole(guid, 'all') + self.allrole = self.role(guid) # initialize an admin user named root self.rootuser = await self.getUserByName('root') if self.rootuser is None: guid = s_common.guid() - self.rootuser = await self._addUser(guid, 'root') + await self._addUser(guid, 'root') + self.rootuser = self.user(guid) await self.rootuser.setAdmin(True, logged=False) await self.rootuser.setLocked(False, logged=False) @@ -336,7 +338,9 @@ async def addUser(self, name, passwd=None, email=None): raise s_exc.DupUserName(name=name) iden = s_common.guid() - user = await self._push('user:add', iden, name) + await self._push('user:add', iden, name) + + user = self.user(iden) if passwd is not None: await user.setPasswd(passwd) @@ -359,14 +363,16 @@ async def _addUser(self, iden, name): node = await self.node.open(('users', iden)) await node.set(name) - return await self._addUserNode(node) + await self._addUserNode(node) async def addRole(self, name): if self.rolesbyname.get(name) is not None: raise s_exc.DupRoleName(name=name) iden = s_common.guid() - return await self._push('role:add', iden, name) + await self._push('role:add', iden, name) + + return self.role(iden) @s_nexus.Pusher.onPush('role:add') async def _addRole(self, iden, name): @@ -378,7 +384,7 @@ async def _addRole(self, iden, name): node = await self.node.open(('roles', iden)) await node.set(name) - return await self._addRoleNode(node) + await self._addRoleNode(node) async def delUser(self, iden): diff --git a/synapse/lib/layer.py b/synapse/lib/layer.py index 85ab2c70b5..df62b63e3f 100644 --- a/synapse/lib/layer.py +++ b/synapse/lib/layer.py @@ -141,14 +141,14 @@ async def storNodeEditsNoLift(self, nodeedits, meta=None): await self.layr.storNodeEditsNoLift(nodeedits, meta) - async def syncNodeEdits(self, offs): + async def syncNodeEdits(self, offs, wait=True): ''' Yield (offs, nodeedits) tuples from the nodeedit log starting from the given offset. Once caught up with storage, yield them in realtime. ''' await self._reqUserAllowed(self.liftperm) - async for item in self.layr.syncNodeEdits(offs): + async for item in self.layr.syncNodeEdits(offs, wait=wait): yield item async def splices(self, offs=None, size=None): @@ -1073,9 +1073,7 @@ async def _initLayerStorage(self): self.nodedata = self.dataslab.initdb('nodedata') self.dataname = self.dataslab.initdb('dataname', dupsort=True) - self.nodeeditlog = None - if self.logedits: - self.nodeeditlog = self.nodeeditctor(self.nodeeditslab, 'nodeedits') + self.nodeeditlog = self.nodeeditctor(self.nodeeditslab, 'nodeedits') def getSpawnInfo(self): info = self.pack() @@ -1154,6 +1152,11 @@ async def getNodeTag(self, buid, tag): return None return s_msgpack.un(byts) + async def getNodeForm(self, buid): + byts = self.layrslab.get(buid + b'\x09', db=self.bybuid) + if byts is not None: + return byts.decode() + async def getStorNode(self, buid): ''' Return a potentially incomplete pode. @@ -1324,8 +1327,8 @@ async def storNodeEdits(self, nodeedits, meta): return retn - @s_nexus.Pusher.onPush('edits') - async def _storNodeEdits(self, nodeedits, meta): + @s_nexus.Pusher.onPush('edits', passitem=True) + async def _storNodeEdits(self, nodeedits, meta, nexsitem): ''' Execute a series of node edit operations, returning the updated nodes. @@ -1336,15 +1339,16 @@ async def _storNodeEdits(self, nodeedits, meta): List[Tuple[buid, form, edits]] Same list, but with only the edits actually applied (plus the old value) ''' results = [] + nexsindx = nexsitem[0] for edits in [await self._storNodeEdit(e) for e in nodeedits]: results.extend(edits) if self.logedits: + changes = [r for r in results if r[2]] if changes: - - offs = self.nodeeditlog.add((changes, meta)) + offs = self.nodeeditlog.add((changes, meta), indx=nexsindx) [(await wind.put((offs, changes))) for wind in tuple(self.windows)] await asyncio.sleep(0) @@ -2199,7 +2203,7 @@ async def splices(self, offs=None, size=None): if size is not None: count = 0 - for offset, (nodeedits, meta) in self.nodeeditlog.iter(offs[0]): + async for offset, nodeedits, meta in self.iterNodeEditLog(offs[0]): async for splice in self.makeSplices(offset, nodeedits, meta): if splice[0] < offs: @@ -2211,7 +2215,7 @@ async def splices(self, offs=None, size=None): yield splice count = count + 1 else: - for offset, (nodeedits, meta) in self.nodeeditlog.iter(offs[0]): + async for offset, nodeedits, meta in self.iterNodeEditLog(offs[0]): async for splice in self.makeSplices(offset, nodeedits, meta): if splice[0] < offs: @@ -2225,12 +2229,12 @@ async def splicesBack(self, offs=None, size=None): return if offs is None: - offs = (self.nodeeditlog.index(), 0, 0) + offs = (await self.getNodeEditOffset(), 0, 0) if size is not None: count = 0 - for offset, (nodeedits, meta) in self.nodeeditlog.iterBack(offs[0]): + async for offset, nodeedits, meta in self.iterNodeEditLogBack(offs[0]): async for splice in self.makeSplices(offset, nodeedits, meta, reverse=True): if splice[0] > offs: @@ -2242,7 +2246,7 @@ async def splicesBack(self, offs=None, size=None): yield splice count += 1 else: - for offset, (nodeedits, meta) in self.nodeeditlog.iterBack(offs[0]): + async for offset, nodeedits, meta in self.iterNodeEditLogBack(offs[0]): async for splice in self.makeSplices(offset, nodeedits, meta, reverse=True): if splice[0] > offs: @@ -2250,6 +2254,20 @@ async def splicesBack(self, offs=None, size=None): yield splice + async def iterNodeEditLog(self, offs=0): + ''' + Iterate the node edit log and yield (offs, edits, meta) tuples. + ''' + for offs, (edits, meta) in self.nodeeditlog.iter(offs): + yield (offs, edits, meta) + + async def iterNodeEditLogBack(self, offs=0): + ''' + Iterate the node edit log and yield (offs, edits, meta) tuples in reverse. + ''' + for offs, (edits, meta) in self.nodeeditlog.iterBack(offs): + yield (offs, edits, meta) + async def syncNodeEdits(self, offs, wait=True): ''' Yield (offs, nodeedits) tuples from the nodeedit log starting from the given offset. @@ -2416,3 +2434,20 @@ async def delete(self): ''' await self.fini() shutil.rmtree(self.dirn, ignore_errors=True) + +def getFlatEdits(nodeedits): + + editsbynode = collections.defaultdict(list) + + # flatten out conditional node edits + def addedits(buid, form, edits): + nkey = (buid, form) + for edittype, editinfo, condedits in edits: + editsbynode[nkey].append((edittype, editinfo, ())) + for condedit in condedits: + addedits(*condedit) + + for buid, form, edits in nodeedits: + addedits(buid, form, edits) + + return [(k[0], k[1], v) for (k, v) in editsbynode.items()] diff --git a/synapse/lib/lmdbslab.py b/synapse/lib/lmdbslab.py index 3a420e5316..985c62afe1 100644 --- a/synapse/lib/lmdbslab.py +++ b/synapse/lib/lmdbslab.py @@ -23,6 +23,7 @@ import synapse.lib.msgpack as s_msgpack import synapse.lib.thishost as s_thishost import synapse.lib.thisplat as s_thisplat +import synapse.lib.slabseqn as s_slabseqn COPY_CHUNKSIZE = 512 PROGRESS_PERIOD = COPY_CHUNKSIZE * 1024 @@ -239,6 +240,7 @@ def set(self, name: str, valu): byts = name.encode() self.cache[byts] = valu self.dirty.add(byts) + return valu def sync(self): tups = [(p, self.EncFunc(self.cache[p])) for p in self.dirty] @@ -733,6 +735,9 @@ async def getHotCount(self, name): self.onfini(item) return item + def getSeqn(self, name): + return s_slabseqn.SlabSeqn(self, name) + def getNameAbrv(self, name): return SlabAbrv(self, name) @@ -751,6 +756,8 @@ def statinfo(self): } def _acqXactForReading(self): + if self.isfini: # pragma: no cover + raise s_exc.IsFini() if not self.readonly: return self.xact if not self.txnrefcount: @@ -1030,6 +1037,20 @@ def last(self, db=None): finally: self._relXactForReading() + def lastkey(self, db=None): + ''' + Return the last key or None from the given db. + ''' + self._acqXactForReading() + realdb, _ = self.dbnames[db] + try: + with self.xact.cursor(db=realdb) as curs: + if not curs.last(): + return None + return curs.key() + finally: + self._relXactForReading() + def hasdup(self, lkey, lval, db=None): realdb, dupsort = self.dbnames[db] with self.xact.cursor(db=realdb) as curs: diff --git a/synapse/lib/nexus.py b/synapse/lib/nexus.py index 7a09c7a178..78de454e2b 100644 --- a/synapse/lib/nexus.py +++ b/synapse/lib/nexus.py @@ -19,7 +19,6 @@ NexusLogEntryT = Tuple[str, str, List[Any], Dict[str, Any], Dict] # (nexsiden, event, args, kwargs, meta) - class RegMethType(type): ''' Metaclass that collects all methods in class with _regme prop into a class member called _regclstupls @@ -75,55 +74,42 @@ def update(self) -> bool: class NexsRoot(s_base.Base): async def __anit__(self, dirn: str, donexslog: bool = True): # type: ignore + await s_base.Base.__anit__(self) import synapse.lib.lmdbslab as s_lmdbslab # avoid import cycle import synapse.lib.slabseqn as s_slabseqn # avoid import cycle self.dirn = dirn - self._nexskids: Dict[str, 'Pusher'] = {} - - self._mirrors: List[ChangeDist] = [] + self.client = None + self.started = False + self.celliden = None self.donexslog = donexslog - self._preleader_run = False # Whether preleader funcs have been called once - self._preleader_funcs: List[Callable] = [] # Callbacks for after leadership set, but before loop run - self._state_lock = asyncio.Lock() - self._state_funcs: List[Callable] = [] # Callbacks for leadership changes - - # Mirror-related - self._ldrurl: Union[str, None, s_common.NoValu] = s_common.novalu # Initialized so that setLeader will run once - self._ldr: Optional[s_telepath.Proxy] = None # only set by looptask - self._looptask: Optional[asyncio.Task] = None - self._ldrready = asyncio.Event() + self._mirrors: List[ChangeDist] = [] + self._nexskids: Dict[str, 'Pusher'] = {} # Used to match pending follower write requests with the responses arriving on the log self._futures: Dict[str, asyncio.Future] = {} - if self.donexslog: - path = s_common.genpath(self.dirn, 'slabs', 'nexus.lmdb') - self._nexusslab = await s_lmdbslab.Slab.anit(path, map_async=False) - self._nexuslog = s_slabseqn.SlabSeqn(self._nexusslab, 'nexuslog') + path = s_common.genpath(self.dirn, 'slabs', 'nexus.lmdb') - async def fini(): - if self._looptask: - self._looptask.cancel() - try: - await self._looptask - except Exception: - pass - - for futu in self._futures.values(): - futu.cancel() + self.nexsslab = await s_lmdbslab.Slab.anit(path, map_async=False) - if self._ldr: - self._ldrready.clear() - await self._ldr.fini() + self.nexslog = self.nexsslab.getSeqn('nexuslog') + self.nexshot = await self.nexsslab.getHotCount('nexs:indx') + + # just in case were previously configured differently + logindx = self.nexslog.index() + hotindx = self.nexshot.get('nexs:indx') + self.nexshot.set('nexs:indx', max(logindx, hotindx)) + + async def fini(): - if self.donexslog: - await self._nexusslab.fini() + for futu in self._futures.values(): # pragma: no cover + futu.cancel() - [(await dist.fini()) for dist in self._mirrors] + await self.nexsslab.fini() self.onfini(fini) @@ -152,10 +138,10 @@ async def recover(self) -> None: The log can only have recorded 1 entry ahead of what is applied. All log actions are idempotent, so replaying the last action that (might have) already happened is harmless. ''' - if not self.donexslog: + if not self.donexslog: # pragma: no cover return - indxitem: Optional[Tuple[int, NexusLogEntryT]] = self._nexuslog.last() + indxitem = self.nexslog.last() if indxitem is None: # We have a brand new log return @@ -175,53 +161,68 @@ async def issue(self, nexsiden: str, event: str, args: List[Any], kwargs: Dict[s If I'm not a follower, mutate, otherwise, ask the leader to make the change and wait for the follower loop to hand me the result through a future. ''' - assert self._ldrurl is not s_common.novalu, 'Attempt to issue before leader known' + assert self.started, 'Attempt to issue before nexsroot is started' - if not self._ldrurl: - return await self.eat(nexsiden, event, args, kwargs, meta) + # pick up a reference to avoid race when we eventually can promote + client = self.client - live = await s_coro.event_wait(self._ldrready, FOLLOWER_WRITE_WAIT_S) - if not live: - raise s_exc.LinkErr(mesg='Mirror cannot reach leader for write request') + if client is None: + return await self.eat(nexsiden, event, args, kwargs, meta) - assert self._ldr is not None + try: + await client.waitready(timeout=FOLLOWER_WRITE_WAIT_S) + except asyncio.TimeoutError: + mesg = 'Mirror cannot reach leader for write request' + raise s_exc.LinkErr(mesg=mesg) from None with self._getResponseFuture() as (iden, futu): + if meta is None: meta = {} + meta['resp'] = iden - await self._ldr.issue(nexsiden, event, args, kwargs, meta) + await self.client.issue(nexsiden, event, args, kwargs, meta) return await asyncio.wait_for(futu, timeout=FOLLOWER_WRITE_WAIT_S) - async def eat(self, nexsiden: str, event: str, args: List[Any], kwargs: Dict[str, Any], - meta: Optional[Dict] = None) -> Any: + async def eat(self, nexsiden, event, args, kwargs, meta): ''' Actually mutate for the given nexsiden instance. ''' if meta is None: meta = {} - item: NexusLogEntryT = (nexsiden, event, args, kwargs, meta) + return await self._eat((nexsiden, event, args, kwargs, meta)) + async def index(self): if self.donexslog: - indx = self._nexuslog.add(item) + return self.nexslog.index() else: - indx = None + return self.nexshot.get('nexs:indx') - [dist.update() for dist in tuple(self._mirrors)] + async def _eat(self, item, indx=None): - retn = await self._apply(indx, item) + if self.donexslog: + saveindx = self.nexslog.add(item, indx=indx) + [dist.update() for dist in tuple(self._mirrors)] + + else: + saveindx = self.nexshot.get('nexs:indx') + if indx is not None and indx > saveindx: # pragma: no cover + saveindx = self.nexshot.set('nexs:indx', indx) + + self.nexshot.inc('nexs:indx') - return retn + return await self._apply(saveindx, item) - async def _apply(self, indx: Optional[int], item: NexusLogEntryT): - nexsiden, event, args, kwargs, _ = item + async def _apply(self, indx, mesg): + + nexsiden, event, args, kwargs, _ = mesg nexus = self._nexskids[nexsiden] - func, passoff = nexus._nexshands[event] - if passoff: - return await func(nexus, *args, nexsoff=indx, **kwargs) + func, passitem = nexus._nexshands[event] + if passitem: + return await func(nexus, *args, nexsitem=(indx, mesg), **kwargs) return await func(nexus, *args, **kwargs) @@ -237,8 +238,8 @@ async def iter(self, offs: int) -> AsyncIterator[Any]: maxoffs = offs - for item in self._nexuslog.iter(offs): - if self.isfini: + for item in self.nexslog.iter(offs): + if self.isfini: # pragma: no cover raise s_exc.IsFini() maxoffs = item[0] + 1 yield item @@ -252,7 +253,7 @@ async def iter(self, offs: int) -> AsyncIterator[Any]: @contextlib.asynccontextmanager async def getChangeDist(self, offs: int) -> AsyncIterator[ChangeDist]: - async with await ChangeDist.anit(self._nexuslog, offs) as dist: + async with await ChangeDist.anit(self.nexslog, offs) as dist: async def fini(): self._mirrors.remove(dist) @@ -263,145 +264,84 @@ async def fini(): yield dist - async def isLeader(self): - return self._ldrurl is None + async def startup(self, mirurl, celliden=None): - async def setLeader(self, url: Optional[str], iden: str) -> None: - ''' - Args: - url: if None, sets this nexsroot as leader, otherwise the telepath URL of the leader (must be a Cell) - iden: iden of the leader. Should be the same as my containing cell's iden - ''' - if url is not None and not self.donexslog: - raise s_exc.BadConfValu(mesg='Mirroring incompatible without nexslog:en') - - former = self._ldrurl - - if former == url: - return - - self._ldrurl = url - - oldlooptask = self._looptask + self.celliden = celliden - # If the looptask is already running, stop it - if self._looptask is not None: - self._looptask.cancel() - self._looptask = None - self._ldrready.clear() - if self._ldr is not None: - await self._ldr.fini() - self._ldr = None + if self.client is not None: + await self.client.fini() - await self._dopreleader() - await self._dostatechange() + self.client = None + if mirurl is not None: + self.client = await s_telepath.Client.anit(mirurl, onlink=self._onTeleLink) + self.onfini(self.client) - if not oldlooptask: - # Before we start mirroring anything, replay the last event because we don't know if it got committed - await self.recover() + self.started = True - if self._ldrurl is None: - return + async def promote(self): - self._looptask = self.schedCoro(self._followerLoop(iden)) + client = self.client + if client is None: + mesg = 'promote() called on non-mirror nexsroot' + raise s_exc.BadConfValu(mesg=mesg) - def onStateChange(self, func): - ''' - Add a state change callback. Callbacks take a single argument, - ``leader``, which is a boolean representing the leader status - at the time the callbacks are executed. - ''' - self._state_funcs.append(func) + await self.startup(None) - def onPreLeader(self, func): - ''' - Add a method that will be called exactly once inside setLeader, after the leadership is set, but before the - state change hooks are run and the follower loop is started. + async def _onTeleLink(self, proxy): + self.client.schedCoro(self.runMirrorLoop(proxy)) - To work, this must be called before the first time that setLeader is called. - ''' - self._preleader_funcs.append(func) - - async def _dopreleader(self): - if self._preleader_run: - return + async def runMirrorLoop(self, proxy): - self._preleader_run = True - amleader = await self.isLeader() - for func in self._preleader_funcs: - await s_coro.ornot(func, leader=amleader) - - async def _dostatechange(self): - amleader = await self.isLeader() - async with self._state_lock: - for func in self._state_funcs: - await s_coro.ornot(func, leader=amleader) + if self.celliden is not None: + if self.celliden != await proxy.getCellIden(): + logger.error('remote cell has different iden! Aborting mirror sync') + await proxy.fini() + await self.fini() + return - async def _followerLoop(self, iden) -> None: - while not self.isfini: + while not proxy.isfini: try: - if self._ldr is not None: - await self._ldr.fini() - - proxy = await s_telepath.openurl(self._ldrurl) - self._ldr = proxy - self._ldrready.set() - - # if we really are a mirror/follower, we have the same iden. - if iden != await proxy.getCellIden(): - logger.error('remote cell has different iden! Aborting mirror sync') - await proxy.fini() # Address a test race. - await self.fini() - return - - logger.info(f'mirror loop ready ({self._ldrurl})') - - while not proxy.isfini: - offs = self._nexuslog.index() + offs = self.nexslog.index() + genr = proxy.getNexusChanges(offs) + async for item in genr: - genr = proxy.getNexusChanges(offs) - async for item in genr: + if proxy.isfini: # pragma: no cover + break - if proxy.isfini: - break + offs, args = item + if offs != self.nexslog.index(): # pragma: no cover + logger.error('mirror desync') + await self.fini() + return - offs, args = item - if offs != self._nexuslog.index(): # pragma: nocover - logger.error('mirror desync') - await self.fini() - return + meta = args[-1] + respiden = meta.get('resp') + respfutu = self._futures.get(respiden) - meta = args[-1] - respiden = meta.get('resp') - respfutu = self._futures.get(respiden) + try: + retn = await self.eat(*args) - try: - retn = await self.eat(*args) + except asyncio.CancelledError: # pragma: no cover + raise - except asyncio.CancelledError: - raise + except Exception as e: + if respfutu is not None: + assert not respfutu.done() + respfutu.set_exception(e) + else: # pragma: no cover + logger.exception(e) - except Exception as e: - if respfutu is not None: - assert not respfutu.done() - respfutu.set_exception(e) - else: - logger.exception(e) - - else: - if respfutu is not None: - respfutu.set_result(retn) + else: + if respfutu is not None: + respfutu.set_result(retn) except asyncio.CancelledError: # pragma: no cover - return - - except Exception: - logger.exception('error in initCoreMirror loop') + raise - self._ldrready.clear() - await self.waitfini(1) + except Exception: # pragma: no cover + logger.exception('error in mirror loop') class Pusher(s_base.Base, metaclass=RegMethType): ''' @@ -420,8 +360,8 @@ async def __anit__(self, iden: str, nexsroot: NexsRoot = None): # type: ignore if nexsroot is not None: self.setNexsRoot(nexsroot) - for event, func, passoff in self._regclstupls: # type: ignore - self._nexshands[event] = func, passoff + for event, func, passitem in self._regclstupls: # type: ignore + self._nexshands[event] = func, passitem def setNexsRoot(self, nexsroot): @@ -435,40 +375,35 @@ def onfini(): self.nexsroot = nexsroot - async def isLeader(self): - if self.nexsroot is None: - return True - return await self.nexsroot.isLeader() - @classmethod - def onPush(cls, event: str, passoff=False) -> Callable: + def onPush(cls, event: str, passitem=False) -> Callable: ''' Decorator that registers a method to be a handler for a named event Args: event: string that distinguishes one handler from another. Must be unique per Pusher subclass - passoff: whether to pass the log offset as the parameter "nexsoff" into the handler + passitem: whether to pass the (offs, mesg) tuple to the handler as "nexsitem" ''' def decorator(func): - func._regme = (event, func, passoff) + func._regme = (event, func, passitem) return func return decorator @classmethod - def onPushAuto(cls, event: str, passoff=False) -> Callable: + def onPushAuto(cls, event: str, passitem=False) -> Callable: ''' Decorator that does the same as onPush, except automatically creates the top half method Args: event: string that distinguishes one handler from another. Must be unique per Pusher subclass - passoff: whether to pass the log offset as the parameter "nexsoff" into the handler + passitem: whether to pass the (offs, mesg) tuple to the handler as "nexsitem" ''' async def pushfunc(self, *args, **kwargs): return await self._push(event, *args, **kwargs) def decorator(func): - pushfunc._regme = (event, func, passoff) + pushfunc._regme = (event, func, passitem) setattr(cls, '_hndl' + func.__name__, func) functools.update_wrapper(pushfunc, func) return pushfunc @@ -482,10 +417,4 @@ async def _push(self, event: str, *args: Any, **kwargs: Any) -> Any: Note: This method is considered 'protected', in that it should not be called from something other than self. ''' - nexsiden = self.nexsiden - - if self.nexsroot is not None: # Distribute through the change root - return await self.nexsroot.issue(nexsiden, event, args, kwargs, None) - - # There's no change distribution, so directly execute - return await self._nexshands[event][0](self, *args, **kwargs) + return await self.nexsroot.issue(self.nexsiden, event, args, kwargs, None) diff --git a/synapse/lib/slabseqn.py b/synapse/lib/slabseqn.py index b2ad5f93e8..ade3a372e7 100644 --- a/synapse/lib/slabseqn.py +++ b/synapse/lib/slabseqn.py @@ -5,7 +5,6 @@ import synapse.lib.coro as s_coro import synapse.lib.msgpack as s_msgpack -import synapse.lib.lmdbslab as s_lmdbslab class SlabSeqn: ''' @@ -15,7 +14,7 @@ class SlabSeqn: lenv (lmdb.Environment): The LMDB Environment. name (str): The name of the sequence. ''' - def __init__(self, slab: s_lmdbslab.Slab, name: str) -> None: + def __init__(self, slab, name: str) -> None: self.slab = slab self.db = self.slab.initdb(name) @@ -29,10 +28,17 @@ def _wake_waiters(self): _, _, evnt = heapq.heappop(self.offsevents) evnt.set() - def add(self, item): + def add(self, item, indx=None): ''' Add a single item to the sequence. ''' + if indx is not None: + self.slab.put(s_common.int64en(indx), s_msgpack.en(item), db=self.db) + if indx >= self.indx: + self.indx = indx + 1 + self._wake_waiters() + return indx + indx = self.indx self.slab.put(s_common.int64en(indx), s_msgpack.en(item), db=self.db) @@ -106,12 +112,11 @@ def nextindx(self): Returns: int: The next insert offset. ''' - indx = 0 - with s_lmdbslab.Scan(self.slab, self.db) as curs: - last_key = curs.last_key() - if last_key is not None: - indx = s_common.int64un(last_key) + 1 - return indx + byts = self.slab.lastkey(db=self.db) + if byts is None: + return 0 + + return s_common.int64un(byts) + 1 def iter(self, offs): ''' diff --git a/synapse/lib/spawn.py b/synapse/lib/spawn.py index 125635365e..8360b56b3d 100644 --- a/synapse/lib/spawn.py +++ b/synapse/lib/spawn.py @@ -318,7 +318,7 @@ async def __anit__(self, spawninfo): self.views = {} self.layers = {} self.nexsroot = None - self.isleader = False + self.isactive = False self.spawninfo = spawninfo self.conf = spawninfo.get('conf') diff --git a/synapse/lib/storm.py b/synapse/lib/storm.py index adfcc82255..f43a9ecbde 100644 --- a/synapse/lib/storm.py +++ b/synapse/lib/storm.py @@ -888,7 +888,7 @@ async def stop(self): if not self.enabled: return await self._stopAllDmons() - await asyncio.sleep(0) + self.enabled = False # TODO write enable/disable APIS. # 1. Set dmon.status to 'disabled' diff --git a/synapse/lib/stormsvc.py b/synapse/lib/stormsvc.py index 9caf1826bc..56809b5624 100644 --- a/synapse/lib/stormsvc.py +++ b/synapse/lib/stormsvc.py @@ -159,7 +159,7 @@ async def _runSvcInit(self): logger.exception(f'setStormSvcEvents failed for service {self.name} ({self.iden})') try: - if self.core.isleader: + if self.core.isactive: await self.core._runStormSvcAdd(self.iden) except asyncio.CancelledError: # pragma: no cover diff --git a/synapse/telepath.py b/synapse/telepath.py index 656d505131..67bf87f56b 100644 --- a/synapse/telepath.py +++ b/synapse/telepath.py @@ -679,11 +679,16 @@ async def _teleLinkLoop(self): logger.warning(f'telepath client ({url}): {e}') await self.waitfini(timeout=self._t_conf.get('retrysleep', 0.2)) + async def proxy(self, timeout=10): + await self.waitready(timeout=timeout) + return self._t_proxy + async def _initTeleLink(self, url): if self._t_proxy is not None: await self._t_proxy.fini() self._t_proxy = await openurl(url, **self._t_opts) + self._t_methinfo = self._t_proxy.methinfo async def fini(): await self._fireLinkLoop() @@ -715,12 +720,12 @@ async def task(self, todo, name=None): logger.warning(f'telepath task redirected: ({url})') await self._t_proxy.fini() - async def waitready(self): - await asyncio.wait_for(self._t_ready.wait(), self._t_conf.get('timeout', 10)) + async def waitready(self, timeout=10): + await asyncio.wait_for(self._t_ready.wait(), self._t_conf.get('timeout', timeout)) def __getattr__(self, name): - info = self._t_proxy.methinfo.get(name) + info = self._t_methinfo.get(name) if info is not None and info.get('genr'): meth = GenrMethod(self, name) setattr(self, name, meth) @@ -954,7 +959,10 @@ async def openurl(url, **opts): elif scheme == 'unix': # unix:///path/to/sock:share - path, name = info.get('path').split(':') + name = '*' + path = info.get('path') + if ':' in path: + path, name = path.split(':') link = await s_link.unixconnect(path) else: @@ -964,9 +972,21 @@ async def openurl(url, **opts): sslctx = None if scheme == 'ssl': - certpath = info.get('certdir') - certdir = s_certdir.CertDir(certpath) - sslctx = certdir.getClientSSLContext() + + certname = None + certpath = None + + certdir = opts.get('certdir') + + query = info.get('query') + if query is not None: + certpath = query.get('certdir') + certname = query.get('certname') + + if certdir is None: + certdir = s_certdir.CertDir(certpath) + + sslctx = certdir.getClientSSLContext(certname=certname) link = await s_link.connect(host, port, ssl=sslctx) diff --git a/synapse/tests/test_cortex.py b/synapse/tests/test_cortex.py index 58aaf80386..2089c2f885 100644 --- a/synapse/tests/test_cortex.py +++ b/synapse/tests/test_cortex.py @@ -146,11 +146,13 @@ async def test_cortex_edges(self): # we should now be able to edge walk *and* refs out nodes = await core.nodes('media:news --> *') + self.len(2, nodes) self.eq(nodes[0].ndef[0], 'inet:url') self.eq(nodes[1].ndef[0], 'inet:ipv4') # we should now be able to edge walk *and* refs in nodes = await core.nodes('inet:ipv4=1.2.3.4 <-- *') + #self.len(2, nodes) self.eq(nodes[0].ndef[0], 'inet:dns:a') self.eq(nodes[1].ndef[0], 'media:news') @@ -502,11 +504,11 @@ async def test_cortex_of_the_future(self): async def test_cortex_noderefs(self): - async with self.getTestReadWriteCores() as (core, wcore): + async with self.getTestCore() as core: sorc = s_common.guid() - async with await wcore.snap() as snap: + async with await core.snap() as snap: node = await snap.addNode('inet:dns:a', ('woot.com', '1.2.3.4')) @@ -586,12 +588,12 @@ async def test_cortex_iter_props(self): self.eq((0x01020304, 0x05050505), tuple(sorted([row[1] for row in rows]))) async def test_cortex_lift_regex(self): - async with self.getTestReadWriteCores() as (core, wcore): + + async with self.getTestCore() as core: + core.model.addUnivProp('favcolor', ('str', {}), {}) - if wcore != core: - wcore.model.addUnivProp('favcolor', ('str', {}), {}) - async with await wcore.snap() as snap: + async with await core.snap() as snap: await snap.addNode('test:str', 'hezipha', props={'.favcolor': 'red'}) await snap.addNode('test:comp', (20, 'lulzlulz')) @@ -3006,13 +3008,17 @@ async def test_feed_syn_nodeedits(self): nodelist0.extend(await core0.nodes('[ test:str=foo ]')) nodelist0.extend(await core0.nodes('[ inet:ipv4=1.2.3.4 .seen=(2012,2014) +#foo.bar=(2012, 2014) ]')) - count = 0 + with self.raises(s_exc.NoSuchLayer): + async for _, nodeedits in prox0.syncLayerNodeEdits(0, layriden='asdf', wait=False): + pass + + with self.raises(s_exc.NoSuchLayer): + async for _, nodeedits in core0.syncLayerNodeEdits('asdf', 0, wait=False): + pass + editlist = [] - async for _, nodeedits in prox0.syncLayerNodeEdits(0): + async for _, nodeedits in prox0.syncLayerNodeEdits(0, wait=False): editlist.append(nodeedits) - count += 1 - if count == 6: - break async with self.getTestCoreAndProxy() as (core1, prox1): @@ -3720,7 +3726,7 @@ async def test_cortex_mirror(self): async with self.getTestCore(dirn=path00) as core00: - self.false(core00.mirror) + self.false(core00.conf.get('mirror')) await core00.nodes('[ inet:ipv4=1.2.3.4 ]') await core00.nodes('$lib.queue.add(hehe)') @@ -3729,8 +3735,7 @@ async def test_cortex_mirror(self): url = core00.getLocalUrl() - core01conf = {'nexslog:en': False, - 'mirror': url} + core01conf = {'nexslog:en': False, 'mirror': url} with self.raises(s_exc.BadConfValu): async with await s_cortex.Cortex.anit(dirn=path01, conf=core01conf) as core01: self.fail('Should never get here.') @@ -3805,8 +3810,8 @@ async def test_cortex_mirror(self): # remove the mirrorness from the Cortex and ensure that we can # write to the Cortex. This will move the core01 ahead of # core00 & core01 can become the leader. - await core01.nexsroot.setLeader(None, None) - core01.mirror = None + await core01.promote() + self.len(1, await core01.nodes('[inet:ipv4=9.9.9.8]')) new_url = core01.getLocalUrl() new_conf = {'mirror': new_url} @@ -3877,34 +3882,23 @@ async def test_norms(self): async def test_view_setlayers(self): - with self.getTestDir() as dirn: - path00 = s_common.gendir(dirn, 'core00') - path01 = s_common.gendir(dirn, 'core01') + async with self.getTestCore() as core: - async with self.getTestCore(dirn=path00) as core00: - self.len(1, await core00.nodes('[ test:str=core00 ]')) + self.len(1, await core.nodes('[ test:str=deflayr ]')) - iden00 = core00.getLayer().iden + newlayr = (await core.addLayer()).get('iden') + deflayr = (await core.getLayerDef()).get('iden') - async with self.getTestCore(dirn=path01) as core01: + vdef = {'layers': (deflayr,)} + view = (await core.addView(vdef)).get('iden') - self.len(1, await core01.nodes('[ test:str=core01 ]')) - iden00b = (await core01.addLayer()).get('iden') - iden01 = core01.getLayer().iden - # Set the default view for core01 to have a read layer with - # the new iden - await core01.setViewLayers((iden01, iden00b)) + opts = {'view': view} + self.len(1, await core.nodes('test:str=deflayr', opts=opts)) - # Blow away the old layer at the destination and replace it - # with our layer from core00 - src = s_common.gendir(path00, 'layers', iden00) - dst = s_common.gendir(path01, 'layers', iden00b) - shutil.rmtree(dst) - shutil.copytree(src, dst) + await core.setViewLayers((newlayr, deflayr), iden=view) - # Ensure data from both layers is present in the cortex - async with self.getTestCore(dirn=path01) as core01: - self.len(2, await core01.nodes('test:str*in=(core00, core01) | uniq')) + self.len(1, await core.nodes('[ test:str=newlayr ]', opts=opts)) + self.len(0, await core.nodes('test:str=newlayr')) async def test_cortex_lockmemory(self): ''' @@ -3961,6 +3955,13 @@ async def test_cortex_storm_lib_dmon(self): with self.raises(s_exc.NoSuchIden): await core.nodes('$lib.dmon.del(newp)') + async def test_cortex_spawn_notsupported(self): + + async with self.getTestCore() as core: + core.spawncorector = None + with self.raises(s_exc.FeatureNotSupported): + await core.getSpawnInfo() + async def test_cortex_storm_dmon_ps(self): with self.getTestDir() as dirn: diff --git a/synapse/tests/test_lib_cell.py b/synapse/tests/test_lib_cell.py index e0441022bc..58b024b4e7 100644 --- a/synapse/tests/test_lib_cell.py +++ b/synapse/tests/test_lib_cell.py @@ -391,8 +391,18 @@ async def test_cell_dyncall(self): todo = s_common.todo('stream', doraise=True) await self.agenraises(s_exc.BadTime, await prox.dyncall('self', todo)) + async def test_cell_promote(self): + + with self.getTestDir() as dirn: + async with await s_cell.Cell.anit(dirn) as cell: + async with cell.getLocalProxy() as proxy: + with self.raises(s_exc.BadConfValu): + await proxy.promote() + async def test_cell_nexuschanges(self): + with self.getTestDir() as dirn: + dir0 = s_common.genpath(dirn, 'cell00') dir1 = s_common.genpath(dirn, 'cell01') @@ -407,13 +417,18 @@ async def coro(prox, offs): break return yielded, retn - conf = {'nexslog:en': True} + conf = { + 'nexslog:en': True, + 'dmon:listen': 'tcp://127.0.0.1:0/', + 'https:port': 0, + } async with await s_cell.Cell.anit(dir0, conf=conf) as cell00, \ cell00.getLocalProxy() as prox00: self.true(cell00.nexsroot.donexslog) await prox00.addUser('test') + self.true(await prox00.getNexsIndx() > 0) # We should have a set of auth:auth changes to find task = cell00.schedCoro(coro(prox00, 0)) @@ -496,3 +511,31 @@ async def test_cell_hiveapi(self): async with core.getLocalProxy() as proxy: self.eq((), await proxy.getHiveKeys(('lulz',))) self.eq((('bar', 10), ('baz', 30)), await proxy.getHiveKeys(('foo',))) + + async def test_cell_confprint(self): + + with self.withSetLoggingMock(): + + with self.getTestDir() as dirn: + + conf = { + 'dmon:listen': 'tcp://127.0.0.1:0', + 'https:port': 0, + } + s_common.yamlsave(conf, dirn, 'cell.yaml') + + outp = self.getTestOutp() + async with await s_cell.Cell.initFromArgv([dirn], outp=outp) as cell: + outp.expect('...cell API (telepath): tcp://127.0.0.1:0') + outp.expect('...cell API (https): 0') + + conf = { + 'dmon:listen': 'tcp://127.0.0.1:0', + 'https:port': None, + } + s_common.yamlsave(conf, dirn, 'cell.yaml') + + outp = self.getTestOutp() + async with await s_cell.Cell.initFromArgv([dirn], outp=outp) as cell: + outp.expect('...cell API (telepath): tcp://127.0.0.1:0') + outp.expect('...cell API (https): disabled') diff --git a/synapse/tests/test_lib_certdir.py b/synapse/tests/test_lib_certdir.py index a14323cd68..475bbd9b9d 100644 --- a/synapse/tests/test_lib_certdir.py +++ b/synapse/tests/test_lib_certdir.py @@ -408,6 +408,10 @@ def test_certdir_users_csr(self): key = cdir.getUserKey(username) self.basic_assertions(cdir, cert, key, cacert=cacert) + def test_certdir_invalidpath(self): + with self.raises(s_exc.SynErr): + s_certdir.CertDir(path=1) + def test_certdir_importfile(self): with self.getCertDir() as cdir: # type: s_certdir.CertDir with self.getTestDir() as testpath: @@ -432,7 +436,7 @@ def test_certdir_importfile(self): ) for ftype, fname in tests: srcpath = s_common.genpath(testpath, fname) - dstpath = s_common.genpath(cdir.path, ftype, fname) + dstpath = s_common.genpath(cdir.certdirs[0], ftype, fname) with s_common.genfile(srcpath) as fd: fd.write(b'arbitrary data') @@ -481,3 +485,16 @@ def test_certdir_valUserCert(self): self.nn(cdir.valUserCert(byts, cacerts=(newpca,))) self.raises(crypto.X509StoreContextError, cdir.valUserCert, byts, cacerts=(syntestca,)) self.raises(crypto.X509StoreContextError, cdir.valUserCert, byts, cacerts=()) + + def test_certdir_sslctx(self): + + with self.getCertDir() as cdir: + + with self.raises(s_exc.NoSuchCert): + cdir.getClientSSLContext(certname='newp') + + with s_common.genfile(cdir.certdirs[0], 'users', 'newp.crt') as fd: + fd.write(b'asdf') + + with self.raises(s_exc.NoCertKey): + cdir.getClientSSLContext(certname='newp') diff --git a/synapse/tests/test_lib_hiveauth.py b/synapse/tests/test_lib_hiveauth.py index 9cba5edf54..7a954c070a 100644 --- a/synapse/tests/test_lib_hiveauth.py +++ b/synapse/tests/test_lib_hiveauth.py @@ -6,140 +6,148 @@ from synapse.tests.utils import alist import synapse.lib.hive as s_hive +import synapse.lib.nexus as s_nexus import synapse.lib.hiveauth as s_hiveauth class AuthTest(s_test.SynTest): async def test_hive_auth(self): - async with self.getTestTeleHive() as hive: + with self.getTestDir() as testdirn: - node = await hive.open(('hive', 'auth')) + async with self.getTestTeleHive() as hive: - async with await s_hiveauth.Auth.anit(node) as auth: + nexsroot = await s_nexus.NexsRoot.anit(testdirn) + await nexsroot.startup(None) - user = await auth.addUser('visi@vertex.link') - role = await auth.addRole('ninjas') + node = await hive.open(('hive', 'auth')) - self.eq(user, auth.user(user.iden)) - self.eq(user, await auth.getUserByName('visi@vertex.link')) + async with await s_hiveauth.Auth.anit(node, nexsroot=nexsroot) as auth: - self.eq(role, auth.role(role.iden)) - self.eq(role, await auth.getRoleByName('ninjas')) + auth.onfini(nexsroot.fini) - with self.raises(s_exc.DupUserName): - await auth.addUser('visi@vertex.link') + user = await auth.addUser('visi@vertex.link') + role = await auth.addRole('ninjas') - with self.raises(s_exc.DupRoleName): - await auth.addRole('ninjas') + self.eq(user, auth.user(user.iden)) + self.eq(user, await auth.getUserByName('visi@vertex.link')) - self.nn(user) + self.eq(role, auth.role(role.iden)) + self.eq(role, await auth.getRoleByName('ninjas')) - self.false(user.info.get('admin')) - self.len(0, user.info.get('rules')) - self.len(1, user.info.get('roles')) + with self.raises(s_exc.DupUserName): + await auth.addUser('visi@vertex.link') - await user.setAdmin(True) - self.true(user.info.get('admin')) + with self.raises(s_exc.DupRoleName): + await auth.addRole('ninjas') - self.true(user.allowed(('foo', 'bar'))) + self.nn(user) - await user.addRule((True, ('foo',))) + self.false(user.info.get('admin')) + self.len(0, user.info.get('rules')) + self.len(1, user.info.get('roles')) - self.true(user.allowed(('foo', 'bar'))) + await user.setAdmin(True) + self.true(user.info.get('admin')) - self.len(1, user.permcache) + self.true(user.allowed(('foo', 'bar'))) - await user.delRule((True, ('foo',))) + await user.addRule((True, ('foo',))) - self.len(0, user.permcache) + self.true(user.allowed(('foo', 'bar'))) - await user.addRule((True, ('foo',))) + self.len(1, user.permcache) - await user.grant(role.iden) + await user.delRule((True, ('foo',))) - self.len(0, user.permcache) + self.len(0, user.permcache) - self.true(user.allowed(('baz', 'faz'))) + await user.addRule((True, ('foo',))) - self.len(1, user.permcache) + await user.grant(role.iden) - await role.addRule((True, ('baz', 'faz'))) + self.len(0, user.permcache) - self.len(0, user.permcache) + self.true(user.allowed(('baz', 'faz'))) - self.true(user.allowed(('baz', 'faz'))) + self.len(1, user.permcache) - self.len(1, user.permcache) + await role.addRule((True, ('baz', 'faz'))) - await user.setLocked(True) + self.len(0, user.permcache) - self.false(user.allowed(('baz', 'faz'))) + self.true(user.allowed(('baz', 'faz'))) - await user.setAdmin(False) - await user.setLocked(False) + self.len(1, user.permcache) - self.true(user.allowed(('baz', 'faz'))) - self.true(user.allowed(('foo', 'bar'))) + await user.setLocked(True) - # Add a DENY to the beginning of the rule list - await role.addRule((False, ('baz', 'faz')), indx=0) - self.false(user.allowed(('baz', 'faz'))) + self.false(user.allowed(('baz', 'faz'))) - # Delete the DENY - await role.delRule((False, ('baz', 'faz'))) + await user.setAdmin(False) + await user.setLocked(False) - # After deleting, former ALLOW rule applies - self.true(user.allowed(('baz', 'faz'))) + self.true(user.allowed(('baz', 'faz'))) + self.true(user.allowed(('foo', 'bar'))) - # non-existent rule returns default - self.none(user.allowed(('boo', 'foo'))) - self.eq('yolo', user.allowed(('boo', 'foo'), default='yolo')) + # Add a DENY to the beginning of the rule list + await role.addRule((False, ('baz', 'faz')), indx=0) + self.false(user.allowed(('baz', 'faz'))) - await self.asyncraises(s_exc.NoSuchRole, user.revoke('newp')) + # Delete the DENY + await role.delRule((False, ('baz', 'faz'))) - await user.revoke(role.iden) - self.none(user.allowed(('baz', 'faz'))) + # After deleting, former ALLOW rule applies + self.true(user.allowed(('baz', 'faz'))) - await user.grant(role.iden) - self.true(user.allowed(('baz', 'faz'))) + # non-existent rule returns default + self.none(user.allowed(('boo', 'foo'))) + self.eq('yolo', user.allowed(('boo', 'foo'), default='yolo')) - await self.asyncraises(s_exc.NoSuchRole, auth.delRole('accountants')) + await self.asyncraises(s_exc.NoSuchRole, user.revoke('newp')) - await auth.delRole(role.iden) - self.false(user.allowed(('baz', 'faz'))) + await user.revoke(role.iden) + self.none(user.allowed(('baz', 'faz'))) - await self.asyncraises(s_exc.NoSuchUser, auth.delUser('fred@accountancy.com')) + await user.grant(role.iden) + self.true(user.allowed(('baz', 'faz'))) - await auth.delUser(user.iden) - self.false(user.allowed(('baz', 'faz'))) + await self.asyncraises(s_exc.NoSuchRole, auth.delRole('accountants')) - role = await auth.addRole('lolusers') - role2 = await auth.addRole('lolusers2') + await auth.delRole(role.iden) + self.false(user.allowed(('baz', 'faz'))) - self.none(await role.setName('lolusers')) + await self.asyncraises(s_exc.NoSuchUser, auth.delUser('fred@accountancy.com')) - with self.raises(s_exc.DupRoleName): - await role2.setName('lolusers') + await auth.delUser(user.iden) + self.false(user.allowed(('baz', 'faz'))) - await role.setName('roflusers') + role = await auth.addRole('lolusers') + role2 = await auth.addRole('lolusers2') - self.nn(await auth.getRoleByName('roflusers')) - self.none(await auth.getRoleByName('lolusers')) + self.none(await role.setName('lolusers')) - user = await auth.addUser('user1') - user2 = await auth.addUser('user') + with self.raises(s_exc.DupRoleName): + await role2.setName('lolusers') - # No problem if the user sets her own name to herself - self.none(await user.setName('user1')) + await role.setName('roflusers') - with self.raises(s_exc.DupUserName): - await user2.setName('user1') + self.nn(await auth.getRoleByName('roflusers')) + self.none(await auth.getRoleByName('lolusers')) - await user.setName('user2') + user = await auth.addUser('user1') + user2 = await auth.addUser('user') - self.nn(await auth.getUserByName('user2')) - self.none(await auth.getUserByName('user1')) + # No problem if the user sets her own name to herself + self.none(await user.setName('user1')) + + with self.raises(s_exc.DupUserName): + await user2.setName('user1') + + await user.setName('user2') + + self.nn(await auth.getUserByName('user2')) + self.none(await auth.getUserByName('user1')) async def test_hive_tele_auth(self): diff --git a/synapse/tests/test_lib_layer.py b/synapse/tests/test_lib_layer.py index 6ba2baeb7d..aef76bad78 100644 --- a/synapse/tests/test_lib_layer.py +++ b/synapse/tests/test_lib_layer.py @@ -503,7 +503,7 @@ async def test_layer_splices(self): # Get all but the first splice await self.agenlen(25, layr.splices((0, 0, 1))) - await self.agenlen(4, layr.splicesBack((1, 0, 0))) + await self.agenlen(4, layr.splicesBack((2, 0, 0))) # Make sure we still get two splices when # offset is not at the beginning of a nodeedit @@ -692,12 +692,10 @@ async def test_layer_nodeedits(self): count = 0 editlist = [] - layr = core0.getLayer(None) - necount = layr.nodeeditlog.index() - async for _, nodeedits in prox0.syncLayerNodeEdits(0): + layr = core0.getLayer() + async for offs, nodeedits in prox0.syncLayerNodeEdits(0): editlist.append(nodeedits) - count += 1 - if count == necount: + if offs == layr.nodeeditlog.index() - 1: break async with self.getTestCore() as core1: @@ -756,33 +754,27 @@ async def test_layer_form_by_buid(self): # add node - buid:form exists nodes = await core.nodes('[ inet:ipv4=1.2.3.4 :loc=us ]') buid0 = nodes[0].buid - fenc = layr00.layrslab.get(buid0 + b'\x09', db=layr00.bybuid) - self.eq('inet:ipv4', fenc.decode()) + self.eq('inet:ipv4', await layr00.getNodeForm(buid0)) # add edge and nodedata nodes = await core.nodes('[ inet:ipv4=2.3.4.5 ]') buid1 = nodes[0].buid - fenc = layr00.layrslab.get(buid1 + b'\x09', db=layr00.bybuid) - self.eq('inet:ipv4', fenc.decode()) + self.eq('inet:ipv4', await layr00.getNodeForm(buid1)) await core.nodes('inet:ipv4=1.2.3.4 [ +(refs)> {inet:ipv4=2.3.4.5} ] $node.data.set(spam, ham)') - fenc = layr00.layrslab.get(buid0 + b'\x09', db=layr00.bybuid) - self.eq('inet:ipv4', fenc.decode()) + self.eq('inet:ipv4', await layr00.getNodeForm(buid0)) # remove edge, map still exists await core.nodes('inet:ipv4=1.2.3.4 [ -(refs)> {inet:ipv4=2.3.4.5} ]') - fenc = layr00.layrslab.get(buid0 + b'\x09', db=layr00.bybuid) - self.eq('inet:ipv4', fenc.decode()) + self.eq('inet:ipv4', await layr00.getNodeForm(buid0)) # remove nodedata, map still exists await core.nodes('inet:ipv4=1.2.3.4 $node.data.pop(spam)') - fenc = layr00.layrslab.get(buid0 + b'\x09', db=layr00.bybuid) - self.eq('inet:ipv4', fenc.decode()) + self.eq('inet:ipv4', await layr00.getNodeForm(buid0)) # delete node - buid:form removed await core.nodes('inet:ipv4=1.2.3.4 | delnode') - fenc = layr00.layrslab.get(buid0 + b'\x09', db=layr00.bybuid) - self.none(fenc) + self.none(await layr00.getNodeForm(buid0)) await core.nodes('[ inet:ipv4=5.6.7.8 ]') @@ -794,41 +786,34 @@ async def test_layer_form_by_buid(self): await alist(view01.eval('[ inet:ipv4=6.7.8.9 ]')) # buid:form for a node in child doesn't exist - fenc = layr01.layrslab.get(buid1 + b'\x09', db=layr01.bybuid) - self.none(fenc) + self.none(await layr01.getNodeForm(buid1)) # add prop, buid:form map exists nodes = await alist(view01.eval('inet:ipv4=2.3.4.5 [ :loc=ru ]')) self.len(1, nodes) - fenc = layr01.layrslab.get(buid1 + b'\x09', db=layr01.bybuid) - self.eq('inet:ipv4', fenc.decode()) + self.eq('inet:ipv4', await layr01.getNodeForm(buid1)) # add nodedata and edge await alist(view01.eval('inet:ipv4=2.3.4.5 [ +(refs)> {inet:ipv4=6.7.8.9} ] $node.data.set(faz, baz)')) # remove prop, map still exists due to nodedata await alist(view01.eval('inet:ipv4=2.3.4.5 [ -:loc ]')) - fenc = layr01.layrslab.get(buid1 + b'\x09', db=layr01.bybuid) - self.eq('inet:ipv4', fenc.decode()) + self.eq('inet:ipv4', await layr01.getNodeForm(buid1)) # remove nodedata, map still exists due to edge await alist(view01.eval('inet:ipv4=2.3.4.5 $node.data.pop(faz)')) - fenc = layr01.layrslab.get(buid1 + b'\x09', db=layr01.bybuid) - self.eq('inet:ipv4', fenc.decode()) + self.eq('inet:ipv4', await layr01.getNodeForm(buid1)) # remove edge, map is deleted await alist(view01.eval('inet:ipv4=2.3.4.5 [ -(refs)> {inet:ipv4=6.7.8.9} ]')) - fenc = layr01.layrslab.get(buid1 + b'\x09', db=layr01.bybuid) - self.none(fenc) + self.none(await layr01.getNodeForm(buid1)) # edges between two nodes in parent await alist(view01.eval('inet:ipv4=2.3.4.5 [ +(refs)> {inet:ipv4=5.6.7.8} ]')) - fenc = layr01.layrslab.get(buid1 + b'\x09', db=layr01.bybuid) - self.eq('inet:ipv4', fenc.decode()) + self.eq('inet:ipv4', await layr01.getNodeForm(buid1)) await alist(view01.eval('inet:ipv4=2.3.4.5 [ -(refs)> {inet:ipv4=5.6.7.8} ]')) - fenc = layr01.layrslab.get(buid1 + b'\x09', db=layr01.bybuid) - self.none(fenc) + self.none(await layr01.getNodeForm(buid1)) async def test_layer(self): @@ -872,3 +857,15 @@ async def test_layer_del_then_lift(self): await core.nodes('.created | delnode --force') nodes = await core.nodes('.created') self.len(0, nodes) + + async def test_layer_flat_edits(self): + nodeedits = ( + (b'asdf', 'test:junk', ( + (s_layer.EDIT_NODE_ADD, (10, s_layer.STOR_TYPE_U64), ( + (b'qwer', 'test:junk', ( + (s_layer.EDIT_NODE_ADD, (11, s_layer.STOR_TYPE_U64), ()), + )), + )), + )), + ) + self.len(2, s_layer.getFlatEdits(nodeedits)) diff --git a/synapse/tests/test_lib_nexus.py b/synapse/tests/test_lib_nexus.py index f16c499e6e..fba220fe61 100644 --- a/synapse/tests/test_lib_nexus.py +++ b/synapse/tests/test_lib_nexus.py @@ -25,8 +25,9 @@ async def _push(self, event, *args, **kwargs): async def doathing2(self, eventdict): return await self._push('thing:doathing2', eventdict, 'foo') - @s_nexus.Pusher.onPush('thing:doathing2', passoff=True) - async def _doathing2handler(self, eventdict, anotherparm, nexsoff=None): + @s_nexus.Pusher.onPush('thing:doathing2', passitem=True) + async def _doathing2handler(self, eventdict, anotherparm, nexsitem=None): + nexsoff, nexsmesg = nexsitem eventdict['gotindex'] = nexsoff return anotherparm @@ -50,57 +51,74 @@ async def doathing(self, eventdict): return await self._push('thing:doathing', eventdict, 'bar') class NexusTest(s_t_utils.SynTest): + async def test_nexus(self): + with self.getTestDir() as dirn: - async with await SampleNexus.anit(1) as nexus1, await s_nexus.NexsRoot.anit(dirn) as nexsroot: - await nexsroot.setLeader(None, None) - eventdict = {'specialpush': 0} - self.eq('foo', await nexus1.doathing(eventdict)) - self.eq(1, eventdict.get('happened')) + async with await s_nexus.NexsRoot.anit(dirn) as nexsroot: + await nexsroot.startup(None) - self.eq('foo', await nexus1.doathingauto(eventdict, 'foo')) - self.eq(1, eventdict.get('autohappened')) + async with await SampleNexus.anit(1, nexsroot=nexsroot) as nexus1: - self.eq('foo', await nexus1.doathingauto2(eventdict, 'foo')) - self.eq(1, eventdict.get('autohappened2')) + eventdict = {'specialpush': 0} + self.eq('foo', await nexus1.doathing(eventdict)) + self.eq(1, eventdict.get('happened')) - self.eq('doc', nexus1.doathingauto2.__doc__) + self.eq('foo', await nexus1.doathingauto(eventdict, 'foo')) + self.eq(1, eventdict.get('autohappened')) - async with await SampleNexus2.anit(2, nexsroot=nexsroot) as testkid: + self.eq('foo', await nexus1.doathingauto2(eventdict, 'foo')) + self.eq(1, eventdict.get('autohappened2')) - eventdict = {'specialpush': 0} - # Tricky inheriting handler funcs themselves - self.eq('foo', await nexus1.doathing(eventdict)) - self.eq('bar', await testkid.doathing(eventdict)) - self.eq(2, eventdict.get('happened')) + self.eq('doc', nexus1.doathingauto2.__doc__) - # Check offset passing - self.eq('foo', await testkid.doathing2(eventdict)) - self.eq(1, eventdict.get('gotindex')) + self.eq(3, await nexsroot.index()) + + async with await SampleNexus2.anit(2, nexsroot=nexsroot) as testkid: + + eventdict = {'specialpush': 0} + # Tricky inheriting handler funcs themselves + self.eq('foo', await nexus1.doathing(eventdict)) + self.eq('bar', await testkid.doathing(eventdict)) + self.eq(2, eventdict.get('happened')) - # Check raising an exception - await self.asyncraises(s_exc.SynErr, testkid.doathingauto3(eventdict)) + # Check offset passing + self.eq('foo', await testkid.doathing2(eventdict)) + self.eq(5, eventdict.get('gotindex')) - with self.getLoggerStream('synapse.lib.nexus') as stream: - await nexsroot.recover() + # Check raising an exception + await self.asyncraises(s_exc.SynErr, testkid.doathingauto3(eventdict)) - stream.seek(0) - self.isin('while replaying log', stream.read()) + with self.getLoggerStream('synapse.lib.nexus') as stream: + await nexsroot.recover() + + stream.seek(0) + self.isin('while replaying log', stream.read()) async def test_nexus_no_logging(self): ''' Pushers/NexsRoot works with donexslog=False ''' with self.getTestDir() as dirn: - async with await SampleNexus.anit(1) as nexus1, \ - await s_nexus.NexsRoot.anit(dirn, donexslog=False) as nexsroot: - await nexsroot.setLeader(None, None) - - eventdict = {'specialpush': 0} - self.eq('foo', await nexus1.doathing(eventdict)) - self.eq(1, eventdict.get('happened')) - async with await SampleNexus2.anit(2, nexsroot=nexsroot) as testkid: + + async with await s_nexus.NexsRoot.anit(dirn, donexslog=False) as nexsroot: + + async with await SampleNexus.anit(1, nexsroot=nexsroot) as nexus1: + + await nexsroot.startup(None) + eventdict = {'specialpush': 0} - self.eq('bar', await testkid.doathing(eventdict)) - self.eq(2, eventdict.get('happened')) + self.eq('foo', await nexus1.doathing(eventdict)) + self.eq('foo', await nexus1.doathing2(eventdict)) + self.eq(1, eventdict.get('happened')) + self.eq(1, eventdict.get('gotindex')) + + self.eq(2, await nexsroot.index()) + + async with await SampleNexus2.anit(2, nexsroot=nexsroot) as nexus2: + eventdict = {'specialpush': 0} + self.eq('bar', await nexus2.doathing(eventdict)) + self.eq('foo', await nexus2.doathing2(eventdict)) + self.eq(2, eventdict.get('happened')) + self.eq(3, eventdict.get('gotindex')) diff --git a/synapse/tests/test_lib_stormsvc.py b/synapse/tests/test_lib_stormsvc.py index e53c577ffd..fa9dc11ea9 100644 --- a/synapse/tests/test_lib_stormsvc.py +++ b/synapse/tests/test_lib_stormsvc.py @@ -353,8 +353,9 @@ async def test_storm_svcs_bads(self): await core.nodes('$lib.service.wait(fake)') core.svcsbyname['fake'].proxy._t_conf['timeout'] = 0.1 + proxy = core.svcsbyname['fake'].proxy._t_proxy - await core.svcsbyname['fake'].proxy._t_proxy.waitfini(6) + self.true(await proxy.waitfini(6)) with self.raises(s_exc.StormRuntimeError): await core.nodes('[ inet:ipv4=6.6.6.6 ] | ohhai') @@ -733,8 +734,6 @@ async def test_storm_svc_mirror(self): async with self.getTestCore(dirn=path00) as core00: - self.false(core00.mirror) - url = core00.getLocalUrl() conf = {'mirror': url} @@ -742,8 +741,6 @@ async def test_storm_svc_mirror(self): await core01.sync() - self.true(core01.mirror) - # Add a storm service await core01.nodes(f'service.add real {lurl}') await core01.nodes('$lib.service.wait(real)') diff --git a/synapse/tests/test_lib_stormtypes.py b/synapse/tests/test_lib_stormtypes.py index 59b286598b..86d29a1c37 100644 --- a/synapse/tests/test_lib_stormtypes.py +++ b/synapse/tests/test_lib_stormtypes.py @@ -763,10 +763,8 @@ async def test_storm_csv(self): mesgs = await core.stormlist(q, {'show': ('err', 'csv:row')}) csv_rows = [m for m in mesgs if m[0] == 'csv:row'] self.len(2, csv_rows) - self.eq(csv_rows[0], - ('csv:row', {'row': [978307200000, None], 'table': None})) - self.eq(csv_rows[1], - ('csv:row', {'row': [32535216000000, None], 'table': None})) + self.eq(csv_rows[0], ('csv:row', {'row': [978307200000, None], 'table': None})) + self.eq(csv_rows[1], ('csv:row', {'row': [32535216000000, None], 'table': None})) # Sad path case... q = '''$data=() $genr=$lib.feed.genr(syn.node, $data) @@ -1704,12 +1702,7 @@ async def test_storm_lib_layer(self): self.stormIsInPrint(mainlayr, mesgs) # Create a new layer - q = f'$lib.print($lib.layer.add().iden)' - mesgs = await core.stormlist(q) - for mesg in mesgs: - if mesg[0] == 'print': - newlayr = mesg[1]['mesg'] - + newlayr = await core.callStorm('return($lib.layer.add().iden)') self.isin(newlayr, core.layers) # Ensure new layer is set to current model revision @@ -1810,7 +1803,6 @@ async def test_storm_lib_layer(self): layr = core.getLayer(locklayr) self.true(layr.lockmemory) - self.eq(layr.layrslab.growsize, 5000) async with self.getTestCore() as core2: @@ -1831,8 +1823,8 @@ async def test_storm_lib_layer(self): layr = core.getLayer(uplayr) - evnt = await layr.waitUpstreamOffs(layriden, offs) - await asyncio.wait_for(evnt.wait(), timeout=2.0) + #evnt = await layr.waitUpstreamOffs(layriden, offs) + #await asyncio.wait_for(evnt.wait(), timeout=2.0) async def test_storm_lib_view(self): @@ -1844,29 +1836,14 @@ async def test_storm_lib_view(self): await core.nodes('[test:int=12 +#tag.test +#tag.proptest:risk=20]') # Get the main view - q = '$lib.print($lib.view.get().pack().iden)' - mesgs = await core.stormlist(q) - mainiden = None - for mesg in mesgs: - if mesg[0] == 'print': - mainiden = mesg[1]['mesg'] - - self.isin(mainiden, core.views) - - q = f'$lib.print($lib.view.get({mainiden}).pack().iden)' - mesgs = await core.stormlist(q) - self.stormIsInPrint(mainiden, mesgs) + mainiden = await core.callStorm('return($lib.view.get().iden)') # Fork the main view q = f''' - $forkview=$lib.view.get({mainiden}).fork() - $forkvalu=$forkview.pack() - $lib.print("{{iden}},{{layr}}", iden=$forkvalu.iden, layr=$forkvalu.layers.index(0).iden) + $view=$lib.view.get().fork() + return(($view.iden, $view.layers.index(0).iden)) ''' - mesgs = await core.stormlist(q) - for mesg in mesgs: - if mesg[0] == 'print': - forkiden, forklayr = mesg[1]['mesg'].split(',') + forkiden, forklayr = await core.callStorm(q) self.isin(forkiden, core.views) self.isin(forklayr, core.layers) @@ -1878,30 +1855,20 @@ async def test_storm_lib_view(self): ldef = await core.addLayer() newlayer = core.getLayer(ldef.get('iden')) - q = f''' - $newview=$lib.view.add(({newlayer.iden},)) - $lib.print($newview.pack().iden) - ''' - newiden = None - mesgs = await core.stormlist(q) - for mesg in mesgs: - if mesg[0] == 'print': - newiden = mesg[1]['mesg'] + newiden = await core.callStorm(f'return($lib.view.add(({newlayer.iden},)).iden)') + self.nn(newiden) self.isin(newiden, core.views) # List the views in the cortex q = ''' + $views = $lib.list() for $view in $lib.view.list() { - $lib.print($view.pack().iden) + $views.append($view.iden) } + return($views) ''' - idens = [] - mesgs = await core.stormlist(q) - for mesg in mesgs: - if mesg[0] == 'print': - idens.append(mesg[1]['mesg']) - + idens = await core.callStorm(q) self.sorteq(idens, core.views.keys()) # Delete the added view @@ -1913,21 +1880,19 @@ async def test_storm_lib_view(self): # Fork the forked view q = f''' $forkview=$lib.view.get({forkiden}).fork() - $lib.print($forkview.pack().iden) + return($forkview.pack().iden) ''' - mesgs = await core.stormlist(q) - for mesg in mesgs: - if mesg[0] == 'print': - childiden = mesg[1]['mesg'] + childiden = await core.callStorm(q) + self.nn(childiden) # Can't merge the first forked view if it has children q = f'$lib.view.get({forkiden}).merge()' - await self.asyncraises(s_exc.CantMergeView, core.nodes(q)) + await self.asyncraises(s_exc.CantMergeView, core.callStorm(q)) # Can't merge the child forked view if the parent is read only core.views[childiden].parent.layers[0].readonly = True q = f'$lib.view.get({childiden}).merge()' - await self.asyncraises(s_exc.ReadOnlyLayer, core.nodes(q)) + await self.asyncraises(s_exc.ReadOnlyLayer, core.callStorm(q)) core.views[childiden].parent.layers[0].readonly = False await core.nodes(q) @@ -1945,19 +1910,13 @@ async def test_storm_lib_view(self): # Sad paths await self.asyncraises(s_exc.NoSuchView, core.nodes('$lib.view.del(foo)')) await self.asyncraises(s_exc.NoSuchView, core.nodes('$lib.view.get(foo)')) - await self.asyncraises(s_exc.CantMergeView, core.nodes(f'$lib.view.get({mainiden}).merge()')) + await self.asyncraises(s_exc.CantMergeView, core.nodes(f'$lib.view.get().merge()')) await self.asyncraises(s_exc.NoSuchLayer, core.nodes(f'view.add --layers {s_common.guid()}')) - - q = f'$lib.view.del({mainiden})' - mesgs = await core.stormlist(q) - errs = [m[1] for m in mesgs if m[0] == 'err'] - self.len(1, errs) - self.eq(errs[0][0], 'SynErr') + await self.asyncraises(s_exc.SynErr, core.nodes('$lib.view.del($lib.view.get().iden)')) # Check helper commands # Get the main view - q = 'view.get' - mesgs = await core.stormlist(q) + mesgs = await core.stormlist('view.get') self.stormIsInPrint(mainiden, mesgs) with self.raises(s_exc.BadOptValu): diff --git a/synapse/tests/test_lib_view.py b/synapse/tests/test_lib_view.py index fc894f6812..0cde15a8ac 100644 --- a/synapse/tests/test_lib_view.py +++ b/synapse/tests/test_lib_view.py @@ -6,7 +6,9 @@ from synapse.tests.utils import alist class ViewTest(s_t_utils.SynTest): + async def test_view_fork_merge(self): + async with self.getTestCore() as core: await core.nodes('[ test:int=8 +#faz ]') await core.nodes('[ test:int=9 test:int=10 ]') diff --git a/synapse/tests/test_servers_cortex.py b/synapse/tests/test_servers_cortex.py index ff8767a4ae..b7781028b2 100644 --- a/synapse/tests/test_servers_cortex.py +++ b/synapse/tests/test_servers_cortex.py @@ -115,14 +115,14 @@ async def test_server_mirror_restart(self): self.len(1, await core01.nodes('inet:asn=1')) # get the nexus index - nexusind = core01.nexsroot._nexuslog.index() + nexusind = core01.nexsroot.nexslog.index() await core00.nodes('[ inet:asn=2 ]') async with await s_cortex.Cortex.initFromArgv(argv, outp=outp) as core01: # check that startup does not create any events - self.eq(nexusind, core01.nexsroot._nexuslog.index()) + self.eq(nexusind, core01.nexsroot.nexslog.index()) await core01.sync() diff --git a/synapse/tests/test_telepath.py b/synapse/tests/test_telepath.py index eab8703ca9..263662f21f 100644 --- a/synapse/tests/test_telepath.py +++ b/synapse/tests/test_telepath.py @@ -360,6 +360,23 @@ async def test_telepath_tls_bad_cert(self): await self.asyncraises(ssl.SSLCertVerificationError, s_telepath.openurl(f'ssl://{hostname}/foo', port=addr[1])) + async def test_telepath_ssl_client_cert(self): + + foo = Foo() + async with self.getTestDmon() as dmon: + + dmon.certdir.genCaCert('userca') + dmon.certdir.genUserCert('visi', signas='userca') + + addr, port = await dmon.listen('ssl://127.0.0.1:0/?ca=userca&hostname=localhost') + dmon.share('foo', foo) + + with self.raises(s_exc.LinkShutDown): + await s_telepath.openurl(f'ssl://localhost/foo', port=port, certdir=dmon.certdir) + + proxy = await s_telepath.openurl(f'ssl://localhost/foo?certname=visi', port=port, certdir=dmon.certdir) + self.eq(20, await proxy.bar(15, 5)) + async def test_telepath_tls(self): self.thisHostMustNot(platform='darwin') @@ -711,6 +728,9 @@ async def dostuff(self, x): async with await s_telepath.Client.anit(url0) as targ: await targ.waitready() + proxy = await targ.proxy() + self.eq(proxy._getSynVers(), s_version.version) + # Client implements some base helpers the proxy does self.eq(targ._getSynVers(), s_version.version) self.eq(targ._getClasses(), diff --git a/synapse/tests/test_tools_migrate200.py b/synapse/tests/test_tools_migrate200.py index c2378798cc..92f010e84e 100644 --- a/synapse/tests/test_tools_migrate200.py +++ b/synapse/tests/test_tools_migrate200.py @@ -520,7 +520,7 @@ async def test_migr_nexus(self): # startup 2.x core async with await s_cortex.Cortex.anit(dest, conf={'nexslog:en': True}) as core: # check that nexus root has offsets from migration - self.gt(core.nexsroot._nexuslog.index(), 1) + self.gt(core.nexsroot.nexslog.index(), 1) # check core data await self._checkSplices(core, tdata) @@ -546,7 +546,7 @@ async def test_migr_nexusoff(self): # startup 2.x core async with await s_cortex.Cortex.anit(dest, conf=None) as core: # check that nexus root has *no* offsets from migration - self.eq(core.nexsroot._nexuslog.index(), 0) + self.eq(core.nexsroot.nexslog.index(), 0) # check core data await self._checkSplices(core, tdata) @@ -573,7 +573,7 @@ async def test_migr_editor(self): # startup 2.x core async with await s_cortex.Cortex.anit(dest, conf=None) as core: # check that nexus root has *no* offsets from migration - self.eq(core.nexsroot._nexuslog.index(), 0) + self.eq(core.nexsroot.nexslog.index(), 0) # check core data await self._checkSplices(core, tdata) diff --git a/synapse/tests/utils.py b/synapse/tests/utils.py index ef2180c125..106a2525d7 100644 --- a/synapse/tests/utils.py +++ b/synapse/tests/utils.py @@ -671,11 +671,11 @@ async def _doubleapply(self, indx, item): nexsiden, event, args, kwargs, _ = item nexus = self._nexskids[nexsiden] - func, passoff = nexus._nexshands[event] + func, passitem = nexus._nexshands[event] - if passoff: - retn = await func(nexus, *args, nexsoff=indx, **kwargs) - await func(nexus, *args, nexsoff=indx, **kwargs) + if passitem: + retn = await func(nexus, *args, nexsitem=(indx, item), **kwargs) + await func(nexus, *args, nexsitem=(indx, item), **kwargs) return retn retn = await func(nexus, *args, **kwargs) @@ -1739,8 +1739,11 @@ async def getTestHiveFromDirn(self, dirn): async with await s_slab.Slab.anit(dirn, map_size=map_size) as slab: - async with await s_hive.SlabHive.anit(slab) as hive: + nexsroot = await s_nexus.NexsRoot.anit(dirn) + await nexsroot.startup(None) + async with await s_hive.SlabHive.anit(slab, nexsroot=nexsroot) as hive: + hive.onfini(nexsroot.fini) yield hive @contextlib.asynccontextmanager diff --git a/synapse/tools/migrate_200.py b/synapse/tools/migrate_200.py index efbf954d43..3c433353ad 100644 --- a/synapse/tools/migrate_200.py +++ b/synapse/tools/migrate_200.py @@ -112,7 +112,7 @@ async def disableMigrationMode(self): # pragma: no cover async def enableMigrationMode(self): # pragma: no cover pass - async def _initCellDmon(self): + async def initServiceNetwork(self): pass async def _srcPullLyrSplices(self, lyriden): @@ -829,13 +829,12 @@ async def _initStors(self, migr=True, nexus=True, cell=True): self.migrdb = self.migrslab.initdb('migr') self.onfini(self.migrslab.fini) - # optionally create migration nexus - if nexus and self.addmode == 'nexus': + if self.nexusroot is None: path = os.path.join(self.dest) - if self.nexusroot is None: - self.nexusroot = await s_nexus.NexsRoot.anit(path) - await self.nexusroot.setLeader(None, '') + donexslog = self.addmode == 'nexus' + self.nexusroot = await s_nexus.NexsRoot.anit(path, donexslog=donexslog) self.onfini(self.nexusroot.fini) + await self.nexusroot.startup(None) # open cell if cell: