Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor nexus to remove leadership #1785

Merged
merged 40 commits into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
79b45cf
refactor nexus to remove leadership
invisig0th Jun 30, 2020
b2b82da
regression repo tests passing
invisig0th Jun 30, 2020
6284295
Merge branch 'master' into visi-nexs-active
invisig0th Jun 30, 2020
9569924
Merge branch 'master' into visi-nexs-active
invisig0th Jun 30, 2020
c6ec3e3
fixes for cluster
invisig0th Jul 1, 2020
92771d3
Merge branch 'visi-nexs-active' of https://github.com/vertexproject/s…
invisig0th Jul 1, 2020
396951b
wip
invisig0th Jul 1, 2020
bdf8ecc
added client side certs, some more nexs tweaks, layer APIs to help cl…
invisig0th Jul 6, 2020
b3b4dce
Merge branch 'master' into visi-nexs-active
invisig0th Jul 6, 2020
69ba5ef
fix for migration tests
invisig0th Jul 7, 2020
fdbc777
Merge branch 'master' into visi-nexs-active
invisig0th Jul 7, 2020
2d44b7a
fixes for tests
invisig0th Jul 8, 2020
913389d
Merge branch 'visi-nexs-active' of https://github.com/vertexproject/s…
invisig0th Jul 8, 2020
cc5a654
tweaks from code review
invisig0th Jul 8, 2020
61d93f9
Merge branch 'master' into visi-nexs-active
invisig0th Jul 9, 2020
c466dea
fixes from review input
invisig0th Jul 9, 2020
e100567
Merge branch 'master' into visi-nexs-active
invisig0th Jul 9, 2020
ccf4109
Merge branch 'master' into visi-nexs-active
invisig0th Jul 9, 2020
b3403a9
remove cell cert dir until we can really implement it properly
invisig0th Jul 9, 2020
59cabbb
added support for multiple paths in certdir
invisig0th Jul 10, 2020
d0b5752
reenabled unit test for now
invisig0th Jul 10, 2020
bf6f20e
Merge branch 'master' into visi-nexs-active
invisig0th Jul 10, 2020
5946433
remove extra white space
invisig0th Jul 10, 2020
df073f2
gendir rather than reqdir
invisig0th Jul 10, 2020
675104d
fix for non-sequential node edit log
invisig0th Jul 10, 2020
ddb2d40
bugfix for incremented splice offsets
invisig0th Jul 13, 2020
fcd58ef
update schema for cron
invisig0th Jul 13, 2020
2a57714
all nexs handlers must return msgpack compatible data
invisig0th Jul 13, 2020
2b8f6b6
Merge branch 'master' into visi-nexs-active
invisig0th Jul 13, 2020
67fdd63
telepath fixes for unit tests
invisig0th Jul 14, 2020
69f9538
Merge branch 'visi-nexs-active' of https://github.com/vertexproject/s…
invisig0th Jul 14, 2020
d0d47ea
unit test tweaks
invisig0th Jul 14, 2020
faedf0e
Merge branch 'master' into visi-nexs-active
invisig0th Jul 14, 2020
932f4cf
more test coverage
invisig0th Jul 15, 2020
08daad0
Merge branch 'visi-nexs-active' of https://github.com/vertexproject/s…
invisig0th Jul 15, 2020
6177565
bugfix and unit test coverage
invisig0th Jul 15, 2020
ce07e46
fixes from code review
invisig0th Jul 15, 2020
4e399e1
Merge branch 'master' into visi-nexs-active
invisig0th Jul 15, 2020
10b38e6
Merge branch 'master' into visi-nexs-active
invisig0th Jul 15, 2020
793de8c
Merge branch 'master' into visi-nexs-active
invisig0th Jul 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 33 additions & 43 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ async def watch(self, wdef):
async for mesg in self.cell.watch(wdef):
yield mesg

async def syncLayerNodeEdits(self, offs, layriden=None):
async def syncLayerNodeEdits(self, offs, layriden=None, wait=True):
'''
Yield (indx, mesg) nodeedit sets for the given layer beginning at offset.

Expand All @@ -493,9 +493,12 @@ async def syncLayerNodeEdits(self, offs, layriden=None):
consumer falls behind the max window size of 10,000 nodeedit messages.
'''
layr = self.cell.getLayer(layriden)
if layr is None:
raise s_exc.NoSuchLayer(iden=layriden)

self.user.confirm(('sync',), gateiden=layr.iden)

async for item in self.cell.syncLayerNodeEdits(layr.iden, offs):
async for item in self.cell.syncLayerNodeEdits(layr.iden, offs, wait=wait):
yield item

@s_cell.adminapi()
Expand Down Expand Up @@ -761,9 +764,8 @@ class Cortex(s_cell.Cell): # type: ignore
layrctor = s_layer.Layer.anit
spawncorector = 'synapse.lib.spawn.SpawnCore'

async def __anit__(self, dirn, conf=None):

await s_cell.Cell.__anit__(self, dirn, conf=conf)
# phase 2 - service storage
async def initServiceStorage(self):

# NOTE: we may not make *any* nexus actions in this method

Expand All @@ -774,10 +776,6 @@ async def __anit__(self, dirn, conf=None):
s_version.reqVersion(corevers, reqver, exc=s_exc.BadStorageVersion,
mesg='cortex version in storage is incompatible with running software')

# share ourself via the cell dmon as "cortex"
# for potential default remote use
self.dmon.share('cortex', self)

self.views = {}
self.layers = {}
self.modules = {}
Expand All @@ -786,7 +784,6 @@ async def __anit__(self, dirn, conf=None):
self.stormcmds = {}

self.spawnpool = None
self.mirror = self.conf.get('mirror')

self.storm_cmd_ctors = {}
self.storm_cmd_cdefs = {}
Expand Down Expand Up @@ -863,6 +860,7 @@ async def __anit__(self, dirn, conf=None):
cmdhive = await self.hive.open(('cortex', 'storm', 'cmds'))
pkghive = await self.hive.open(('cortex', 'storm', 'packages'))
svchive = await self.hive.open(('cortex', 'storm', 'services'))

self.cmdhive = await cmdhive.dict()
self.pkghive = await pkghive.dict()
self.svchive = await svchive.dict()
Expand All @@ -882,38 +880,24 @@ async def __anit__(self, dirn, conf=None):
'axon': self.axon
})

self.nexsroot.onPreLeader(self.preLeaderHook)

await self.auth.addAuthGate('cortex', 'cortex')

async def _initNexsRoot(self):
'''
Just like cell _initNexsRoot except doesn't call nexsroot.setLeader
'''
nexsroot = await s_nexus.NexsRoot.anit(self.dirn, donexslog=self.donexslog)
self.onfini(nexsroot.fini)
nexsroot.onfini(self)
return nexsroot
async def initServiceRuntime(self):

async def preLeaderHook(self, leader):
'''
These run after the leader is set, but before the leader/follower callbacks are run and the follower loop runs
'''
# do any post-nexus initialization here...
await self._initCoreMods()
await self._initStormSvcs()

async def startAsLeader(self):
'''
Run things that only a leader Cortex runs.
'''
# share ourself via the cell dmon as "cortex"
# for potential default remote use
self.dmon.share('cortex', self)

async def initServiceActive(self):
if self.conf.get('cron:enable'):
await self.agenda.start()
await self.stormdmons.start()

async def stopAsLeader(self):
'''
Stop things that only a leader Cortex runs.
'''
async def initServicePassive(self):
await self.agenda.stop()
await self.stormdmons.stop()

Expand Down Expand Up @@ -985,8 +969,9 @@ async def coreQueueGets(self, name, offs=0, cull=True, wait=None, size=None):
async def coreQueuePuts(self, name, items):
await self._push('queue:puts', name, items)

@s_nexus.Pusher.onPush('queue:puts', passoff=True)
async def _coreQueuePuts(self, name, items, nexsoff):
@s_nexus.Pusher.onPush('queue:puts', passitem=True)
async def _coreQueuePuts(self, name, items, nexsitem):
nexsoff, nexsmesg = nexsitem
await self.multiqueue.puts(name, items, reqid=nexsoff)

@s_nexus.Pusher.onPushAuto('queue:cull')
Expand All @@ -997,6 +982,11 @@ async def coreQueueSize(self, name):
return self.multiqueue.size(name)

async def getSpawnInfo(self):

if self.spawncorector is None:
mesg = 'spawn storm option not supported on this cortex'
raise s_exc.FeatureNotSupported(mesg=mesg)

ret = {
'iden': self.iden,
'dirn': self.dirn,
Expand Down Expand Up @@ -1424,11 +1414,11 @@ async def _delStormSvc(self, iden):
Delete a registered storm service from the cortex.
'''
sdef = self.svchive.get(iden)
if sdef is None:
if sdef is None: # pragma: no cover
return

try:
if await self.isLeader():
if self.isactive:
await self.runStormSvcEvent(iden, 'del')
except asyncio.CancelledError: # pragma: no cover
raise
Expand Down Expand Up @@ -1822,15 +1812,15 @@ async def _onCoreFini(self):
if self.axon:
await self.axon.fini()

async def syncLayerNodeEdits(self, iden, offs):
async def syncLayerNodeEdits(self, iden, offs, wait=True):
'''
Yield (offs, mesg) tuples for nodeedits in a layer.
'''
layr = self.getLayer(iden)
if layr is None:
raise s_exc.NoSuchLayer(iden=iden)

async for item in layr.syncNodeEdits(offs):
async for item in layr.syncNodeEdits(offs, wait=wait):
yield item

async def spliceHistory(self, user):
Expand Down Expand Up @@ -2379,7 +2369,7 @@ def getLayer(self, iden=None):
def listLayers(self):
return self.layers.values()

async def getLayerDef(self, iden):
async def getLayerDef(self, iden=None):
layr = self.getLayer(iden)
if layr is not None:
return layr.pack()
Expand Down Expand Up @@ -2503,14 +2493,14 @@ async def _ctorLayr(self, layrinfo):
'''
iden = layrinfo.get('iden')
path = s_common.gendir(self.dirn, 'layers', iden)

# In case that we're a mirror follower and we have a downstream layer, disable upstream sync
return await s_layer.Layer.anit(layrinfo, path, nexsroot=self.nexsroot, allow_upstream=not self.mirror)
# FIXME allow_upstream needs to be separated out
mirror = self.conf.get('mirror')
return await s_layer.Layer.anit(layrinfo, path, nexsroot=self.nexsroot, allow_upstream=not mirror)

async def _initCoreLayers(self):

node = await self.hive.open(('cortex', 'layers'))

# TODO eventually hold this and watch for changes
for _, node in node:
layrinfo = await node.dict()
await self._initLayr(layrinfo)
Expand Down
19 changes: 15 additions & 4 deletions synapse/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ async def __anit__(self, certdir=None):
self.cells = {} # all cells are shared. not all shared are cells.
self.shared = {} # objects provided by daemon
self.listenservers = [] # the sockets we're listening on
self.connectedlinks = [] # the links we're currently connected on
self.links = set()

self.sessions = {}

Expand Down Expand Up @@ -272,11 +272,16 @@ async def listen(self, url, **opts):

sslctx = None
if scheme == 'ssl':

caname = None
hostname = None

query = info.get('query')
if query is not None:
hostname = query.get('hostname', host)
sslctx = self.certdir.getServerSSLContext(hostname=hostname)
caname = query.get('ca')

sslctx = self.certdir.getServerSSLContext(hostname=hostname, caname=caname)

server = await s_link.listen(host, port, self._onLinkInit, ssl=sslctx)

Expand Down Expand Up @@ -321,7 +326,7 @@ async def _onDmonFini(self):
if finis:
await asyncio.wait(finis)

finis = [link.fini() for link in self.connectedlinks]
finis = [link.fini() for link in self.links]
vEpiphyte marked this conversation as resolved.
Show resolved Hide resolved
if finis:
await asyncio.wait(finis)

Expand All @@ -331,6 +336,12 @@ async def _onDmonFini(self):

async def _onLinkInit(self, link):

self.links.add(link)
async def fini():
self.links.discard(link)

link.onfini(fini)

async def rxloop():

while not link.isfini:
Expand All @@ -343,7 +354,7 @@ async def rxloop():
coro = self._onLinkMesg(link, mesg)
link.schedCoro(coro)

self.schedCoro(rxloop())
link.schedCoro(rxloop())

async def _onLinkMesg(self, link, mesg):

Expand Down
2 changes: 2 additions & 0 deletions synapse/exc.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class LinkShutDown(LinkErr): pass
class NoCertKey(SynErr):
''' Raised when a Cert object requires a RSA Private Key to perform an operation and the key is not present. '''
pass
class NoSuchCert(SynErr): pass

class ModAlreadyLoaded(SynErr): pass
class MustBeJsonSafe(SynErr): pass
Expand Down Expand Up @@ -209,6 +210,7 @@ class SchemaViolation(SynErr): pass

class SlabAlreadyOpen(SynErr): pass
class SpawnExit(SynErr): pass
class FeatureNotSupported(SynErr): pass

class ReadOnlyLayer(SynErr): pass
class ReadOnlyProp(SynErr): pass
Expand Down
10 changes: 5 additions & 5 deletions synapse/lib/agenda.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@
'req': {
'type': 'object',
'properties': {
'minute': {'type': 'number'},
'hour': {'type': 'number'},
'dayofmonth': {'type': 'number'},
'month': {'type': 'number'},
'year': {'type': 'number'},
'minute': {'oneOf': [{'type': 'number'}, {'type': 'array', 'items': {'type': 'number'}}]},
'hour': {'oneOf': [{'type': 'number'}, {'type': 'array', 'items': {'type': 'number'}}]},
'dayofmonth': {'oneOf': [{'type': 'number'}, {'type': 'array', 'items': {'type': 'number'}}]},
'month': {'oneOf': [{'type': 'number'}, {'type': 'array', 'items': {'type': 'number'}}]},
'year': {'oneOf': [{'type': 'number'}, {'type': 'array', 'items': {'type': 'number'}}]},
}
}
}
Expand Down
Loading