Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor nexus to remove leadership #1785

Merged
merged 40 commits into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
79b45cf
refactor nexus to remove leadership
invisig0th Jun 30, 2020
b2b82da
regression repo tests passing
invisig0th Jun 30, 2020
6284295
Merge branch 'master' into visi-nexs-active
invisig0th Jun 30, 2020
9569924
Merge branch 'master' into visi-nexs-active
invisig0th Jun 30, 2020
c6ec3e3
fixes for cluster
invisig0th Jul 1, 2020
92771d3
Merge branch 'visi-nexs-active' of https://github.com/vertexproject/s…
invisig0th Jul 1, 2020
396951b
wip
invisig0th Jul 1, 2020
bdf8ecc
added client side certs, some more nexs tweaks, layer APIs to help cl…
invisig0th Jul 6, 2020
b3b4dce
Merge branch 'master' into visi-nexs-active
invisig0th Jul 6, 2020
69ba5ef
fix for migration tests
invisig0th Jul 7, 2020
fdbc777
Merge branch 'master' into visi-nexs-active
invisig0th Jul 7, 2020
2d44b7a
fixes for tests
invisig0th Jul 8, 2020
913389d
Merge branch 'visi-nexs-active' of https://github.com/vertexproject/s…
invisig0th Jul 8, 2020
cc5a654
tweaks from code review
invisig0th Jul 8, 2020
61d93f9
Merge branch 'master' into visi-nexs-active
invisig0th Jul 9, 2020
c466dea
fixes from review input
invisig0th Jul 9, 2020
e100567
Merge branch 'master' into visi-nexs-active
invisig0th Jul 9, 2020
ccf4109
Merge branch 'master' into visi-nexs-active
invisig0th Jul 9, 2020
b3403a9
remove cell cert dir until we can really implement it properly
invisig0th Jul 9, 2020
59cabbb
added support for multiple paths in certdir
invisig0th Jul 10, 2020
d0b5752
reenabled unit test for now
invisig0th Jul 10, 2020
bf6f20e
Merge branch 'master' into visi-nexs-active
invisig0th Jul 10, 2020
5946433
remove extra white space
invisig0th Jul 10, 2020
df073f2
gendir rather than reqdir
invisig0th Jul 10, 2020
675104d
fix for non-sequential node edit log
invisig0th Jul 10, 2020
ddb2d40
bugfix for incremented splice offsets
invisig0th Jul 13, 2020
fcd58ef
update schema for cron
invisig0th Jul 13, 2020
2a57714
all nexs handlers must return msgpack compatible data
invisig0th Jul 13, 2020
2b8f6b6
Merge branch 'master' into visi-nexs-active
invisig0th Jul 13, 2020
67fdd63
telepath fixes for unit tests
invisig0th Jul 14, 2020
69f9538
Merge branch 'visi-nexs-active' of https://github.com/vertexproject/s…
invisig0th Jul 14, 2020
d0d47ea
unit test tweaks
invisig0th Jul 14, 2020
faedf0e
Merge branch 'master' into visi-nexs-active
invisig0th Jul 14, 2020
932f4cf
more test coverage
invisig0th Jul 15, 2020
08daad0
Merge branch 'visi-nexs-active' of https://github.com/vertexproject/s…
invisig0th Jul 15, 2020
6177565
bugfix and unit test coverage
invisig0th Jul 15, 2020
ce07e46
fixes from code review
invisig0th Jul 15, 2020
4e399e1
Merge branch 'master' into visi-nexs-active
invisig0th Jul 15, 2020
10b38e6
Merge branch 'master' into visi-nexs-active
invisig0th Jul 15, 2020
793de8c
Merge branch 'master' into visi-nexs-active
invisig0th Jul 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,9 @@ async def syncLayerNodeEdits(self, offs, layriden=None, wait=True):
consumer falls behind the max window size of 10,000 nodeedit messages.
'''
layr = self.cell.getLayer(layriden)
if layr is None:
raise s_exc.NoSuchLayer(iden=layriden)

self.user.confirm(('sync',), gateiden=layr.iden)

async for item in self.cell.syncLayerNodeEdits(layr.iden, offs, wait=wait):
Expand Down Expand Up @@ -979,6 +982,11 @@ async def coreQueueSize(self, name):
return self.multiqueue.size(name)

async def getSpawnInfo(self):

if self.spawncorector is None:
mesg = 'spawn storm option not supported on this cortex'
raise s_exc.FeatureNotSupported(mesg=mesg)

ret = {
'iden': self.iden,
'dirn': self.dirn,
Expand Down Expand Up @@ -1406,7 +1414,7 @@ async def _delStormSvc(self, iden):
Delete a registered storm service from the cortex.
'''
sdef = self.svchive.get(iden)
if sdef is None:
if sdef is None: # pragma: no cover
return

try:
Expand Down
12 changes: 9 additions & 3 deletions synapse/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ async def __anit__(self, certdir=None):
self.cells = {} # all cells are shared. not all shared are cells.
self.shared = {} # objects provided by daemon
self.listenservers = [] # the sockets we're listening on
self.connectedlinks = [] # the links we're currently connected on
self.links = set()

self.sessions = {}

Expand Down Expand Up @@ -326,7 +326,7 @@ async def _onDmonFini(self):
if finis:
await asyncio.wait(finis)

finis = [link.fini() for link in self.connectedlinks]
finis = [link.fini() for link in self.links]
vEpiphyte marked this conversation as resolved.
Show resolved Hide resolved
if finis:
await asyncio.wait(finis)

Expand All @@ -336,6 +336,12 @@ async def _onDmonFini(self):

async def _onLinkInit(self, link):

self.links.add(link)
async def fini():
self.links.discard(link)

link.onfini(fini)

async def rxloop():

while not link.isfini:
Expand All @@ -348,7 +354,7 @@ async def rxloop():
coro = self._onLinkMesg(link, mesg)
link.schedCoro(coro)

self.schedCoro(rxloop())
link.schedCoro(rxloop())

async def _onLinkMesg(self, link, mesg):

Expand Down
2 changes: 2 additions & 0 deletions synapse/exc.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class LinkShutDown(LinkErr): pass
class NoCertKey(SynErr):
''' Raised when a Cert object requires a RSA Private Key to perform an operation and the key is not present. '''
pass
class NoSuchCert(SynErr): pass

class ModAlreadyLoaded(SynErr): pass
class MustBeJsonSafe(SynErr): pass
Expand Down Expand Up @@ -209,6 +210,7 @@ class SchemaViolation(SynErr): pass

class SlabAlreadyOpen(SynErr): pass
class SpawnExit(SynErr): pass
class FeatureNotSupported(SynErr): pass

class ReadOnlyLayer(SynErr): pass
class ReadOnlyProp(SynErr): pass
Expand Down
10 changes: 5 additions & 5 deletions synapse/lib/agenda.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@
'req': {
'type': 'object',
'properties': {
'minute': {'type': 'number'},
'hour': {'type': 'number'},
'dayofmonth': {'type': 'number'},
'month': {'type': 'number'},
'year': {'type': 'number'},
'minute': {'oneOf': [{'type': 'number'}, {'type': 'array', 'items': {'type': 'number'}}]},
'hour': {'oneOf': [{'type': 'number'}, {'type': 'array', 'items': {'type': 'number'}}]},
'dayofmonth': {'oneOf': [{'type': 'number'}, {'type': 'array', 'items': {'type': 'number'}}]},
'month': {'oneOf': [{'type': 'number'}, {'type': 'array', 'items': {'type': 'number'}}]},
'year': {'oneOf': [{'type': 'number'}, {'type': 'array', 'items': {'type': 'number'}}]},
}
}
}
Expand Down
15 changes: 10 additions & 5 deletions synapse/lib/cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ def getCellIden(self):
def getNexsIndx(self):
return self.cell.getNexsIndx()

@adminapi()
async def promote(self):
return await self.cell.promote()

def getCellUser(self):
return self.user.pack()

Expand Down Expand Up @@ -660,10 +664,10 @@ async def setCellActive(self, active):
else:
await self.initServicePassive()

async def initServiceActive(self):
async def initServiceActive(self): # pragma: no cover
pass

async def initServicePassive(self):
async def initServicePassive(self): # pragma: no cover
pass

async def getNexusChanges(self, offs):
Expand Down Expand Up @@ -956,8 +960,9 @@ def addHttpApi(self, path, ctor, info):
))

async def _initCellDmon(self):
certdir = s_common.gendir(self.dirn, 'certs')
self.dmon = await s_daemon.Daemon.anit(certdir=certdir)
cdir = s_common.gendir(self.dirn, 'certs')

self.dmon = await s_daemon.Daemon.anit(certdir=(cdir, s_certdir.defdir))
self.dmon.share('*', self)

self.onfini(self.dmon.fini)
Expand Down Expand Up @@ -1194,7 +1199,7 @@ async def initFromArgv(cls, argv, outp=None):
if port is None:
outp.printf(f'...{cell.getCellType()} API (https): disabled')
else:
outp.printf(f'...{cell.getCellType()} API (https): %s' % (opts.https,))
outp.printf(f'...{cell.getCellType()} API (https): %s' % (port,))

if opts.name is not None:
cell.dmon.share(opts.name, cell)
Expand Down
120 changes: 62 additions & 58 deletions synapse/lib/certdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,21 @@ def __init__(self, path=None):
if path is None:
path = defdir

s_common.gendir(path, 'cas')
s_common.gendir(path, 'hosts')
s_common.gendir(path, 'users')
self.certdirs = []

self.certdir = s_common.reqdir(path)
self.path = path
# for backward compatibility, do some type inspection
if isinstance(path, str):
self.certdirs.append(s_common.gendir(path))
elif isinstance(path, (tuple, list)):
[self.certdirs.append(s_common.gendir(p)) for p in path]
else:
mesg = 'Certdir path must be a path string or a list/tuple of path strings.'
raise s_exc.SynErr(mesg=mesg)

for cdir in self.certdirs:
s_common.gendir(cdir, 'cas')
s_common.gendir(cdir, 'hosts')
s_common.gendir(cdir, 'users')

def genCaCert(self, name, signas=None, outp=None, save=True):
'''
Expand Down Expand Up @@ -314,14 +323,16 @@ def getCaCerts(self):
'''
retn = []

path = s_common.genpath(self.certdir, 'cas')
for cdir in self.certdirs:

for name in os.listdir(path):
if not name.endswith('.crt'):
continue
path = s_common.genpath(cdir, 'cas')

full = s_common.genpath(self.certdir, 'cas', name)
retn.append(self._loadCertPath(full))
for name in os.listdir(path):
if not name.endswith('.crt'):
continue

full = s_common.genpath(cdir, 'cas', name)
retn.append(self._loadCertPath(full))

return retn

Expand All @@ -340,10 +351,10 @@ def getCaCertPath(self, name):
Returns:
str: The path if exists.
'''
path = s_common.genpath(self.certdir, 'cas', '%s.crt' % name)
if not os.path.isfile(path):
return None
return path
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'cas', '%s.crt' % name)
if os.path.isfile(path):
return path

def getCaKey(self, name):
'''
Expand Down Expand Up @@ -377,10 +388,10 @@ def getCaKeyPath(self, name):
Returns:
str: The path if exists.
'''
path = s_common.genpath(self.certdir, 'cas', '%s.key' % name)
if not os.path.isfile(path):
return None
return path
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'cas', '%s.key' % name)
if os.path.isfile(path):
return path

def getClientCert(self, name):
'''
Expand Down Expand Up @@ -417,10 +428,10 @@ def getClientCertPath(self, name):
Returns:
str: The path if exists.
'''
path = s_common.genpath(self.certdir, 'users', '%s.p12' % name)
if not os.path.isfile(path):
return None
return path
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'users', '%s.p12' % name)
if os.path.isfile(path):
return path

def getHostCaPath(self, name):
'''
Expand Down Expand Up @@ -475,10 +486,10 @@ def getHostCertPath(self, name):
Returns:
str: The path if exists.
'''
path = s_common.genpath(self.certdir, 'hosts', '%s.crt' % name)
if not os.path.isfile(path):
return None
return path
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'hosts', '%s.crt' % name)
if os.path.isfile(path):
return path

def getHostKey(self, name):
'''
Expand Down Expand Up @@ -512,10 +523,10 @@ def getHostKeyPath(self, name):
Returns:
str: The path if exists.
'''
path = s_common.genpath(self.certdir, 'hosts', '%s.key' % name)
if not os.path.isfile(path):
return None
return path
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'hosts', '%s.key' % name)
if os.path.isfile(path):
return path

def getUserCaPath(self, name):
'''
Expand Down Expand Up @@ -570,10 +581,10 @@ def getUserCertPath(self, name):
Returns:
str: The path if exists.
'''
path = s_common.genpath(self.certdir, 'users', '%s.crt' % name)
if not os.path.isfile(path):
return None
return path
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'users', '%s.crt' % name)
if os.path.isfile(path):
return path

def getUserForHost(self, user, host):
'''
Expand Down Expand Up @@ -629,10 +640,10 @@ def getUserKeyPath(self, name):
Returns:
str: The path if exists.
'''
path = s_common.genpath(self.certdir, 'users', '%s.key' % name)
if not os.path.isfile(path):
return None
return path
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'users', '%s.key' % name)
if os.path.isfile(path):
return path

def importFile(self, path, mode, outp=None):
'''
Expand Down Expand Up @@ -664,7 +675,7 @@ def importFile(self, path, mode, outp=None):
mesg = 'importFile only supports .crt, .key, .p12 extensions'
raise s_exc.BadFileExt(mesg=mesg, ext=ext)

newpath = s_common.genpath(self.certdir, mode, fname)
newpath = s_common.genpath(self.certdirs[0], mode, fname)
if os.path.isfile(newpath):
raise s_exc.FileExists('File already exists')

Expand All @@ -687,8 +698,7 @@ def isCaCert(self, name):
Returns:
bool: True if the certificate is present, False otherwise.
'''
crtpath = self._getPathJoin('cas', '%s.crt' % name)
return os.path.isfile(crtpath)
return self.getCaCertPath(name) is not None

def isClientCert(self, name):
'''
Expand Down Expand Up @@ -723,8 +733,7 @@ def isHostCert(self, name):
Returns:
bool: True if the certificate is present, False otherwise.
'''
crtpath = self._getPathJoin('hosts', '%s.crt' % name)
return os.path.isfile(crtpath)
return self.getHostCertPath(name) is not None

def isUserCert(self, name):
'''
Expand All @@ -741,8 +750,7 @@ def isUserCert(self, name):
Returns:
bool: True if the certificate is present, False otherwise.
'''
crtpath = self._getPathJoin('users', '%s.crt' % name)
return os.path.isfile(crtpath)
return self.getUserCertPath(name) is not None

def signCertAs(self, cert, signas):
'''
Expand Down Expand Up @@ -831,10 +839,11 @@ def signUserCsr(self, xcsr, signas, outp=None):
return self.genUserCert(name, csr=pkey, signas=signas, outp=outp)

def _loadCasIntoSSLContext(self, ctx):
path = s_common.genpath(self.certdir, 'cas')
for name in os.listdir(path):
if name.endswith('.crt'):
ctx.load_verify_locations(os.path.join(path, name))
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'cas')
for name in os.listdir(path):
if name.endswith('.crt'):
ctx.load_verify_locations(os.path.join(path, name))

def getClientSSLContext(self, certname=None):
invisig0th marked this conversation as resolved.
Show resolved Hide resolved
'''
Expand All @@ -851,7 +860,7 @@ def getClientSSLContext(self, certname=None):
certfile = self.getUserCertPath(certname)
if certfile is None:
mesg = f'Missing TLS certificate file for user: {certname}'
raise s_exc.NoCertKey(mesg=mesg)
raise s_exc.NoSuchCert(mesg=mesg)

keyfile = self.getUserKeyPath(certname)
if keyfile is None:
Expand Down Expand Up @@ -961,21 +970,16 @@ def _genPkeyCsr(self, name, mode, outp=None):
outp.printf('csr saved: %s' % (csrpath,))

def _getCaPath(self, cert):

subj = cert.get_issuer()
capath = self._getPathJoin('cas', '%s.crt' % subj.CN)
if not os.path.isfile(capath):
return None

return capath
return self.getCaCertPath(subj.CN)

def _getPathBytes(self, path):
if path is None:
return None
return s_common.getbytes(path)

def _getPathJoin(self, *paths):
return s_common.genpath(self.certdir, *paths)
return s_common.genpath(self.certdirs[0], *paths)

def _loadCertPath(self, path):
byts = self._getPathBytes(path)
Expand Down
Loading