Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement env var based node and file limits on cortex and axon #1950

Merged
merged 10 commits into from
Nov 16, 2020
33 changes: 31 additions & 2 deletions synapse/axon.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ async def put(self):
await self._save()
return


class AxonHttpHasV1(s_httpapi.Handler):

async def get(self, sha256):
Expand All @@ -80,7 +79,6 @@ async def get(self, sha256):
resp = await self.cell.has(s_common.uhex(sha256))
return self.sendRestRetn(resp)


class AxonHttpDownloadV1(s_httpapi.Handler):

async def get(self, sha256):
Expand Down Expand Up @@ -221,6 +219,21 @@ class Axon(s_cell.Cell):

cellapi = AxonApi

confdefs = {
'max:bytes': {
'description': 'The maximum number of bytes that can be stored in the Axon.',
'type': 'integer',
'minimum': 1,
'hidecmdl': True,
},
'max:count': {
'description': 'The maximum number of files that can be stored in the Axon.',
'type': 'integer',
'minimum': 1,
'hidecmdl': True,
}
}

async def __anit__(self, dirn, conf=None): # type: ignore

await s_cell.Cell.__anit__(self, dirn, conf=conf)
Expand All @@ -242,13 +255,28 @@ async def __anit__(self, dirn, conf=None): # type: ignore
self.axonmetrics.setdefault('size:bytes', 0)
self.axonmetrics.setdefault('file:count', 0)

self.maxbytes = self.conf.get('max:bytes')
self.maxcount = self.conf.get('max:count')

self.addHealthFunc(self._axonHealth)

# modularize blob storage
await self._initBlobStor()

self._initAxonHttpApi()

def _reqBelowLimit(self):

if (self.maxbytes is not None and
self.maxbytes <= self.axonmetrics.get('size:bytes')):
mesg = f'Axon is at size:bytes limit: {self.maxbytes}'
raise s_exc.HitLimit(mesg=mesg)

if (self.maxcount is not None and
self.maxcount <= self.axonmetrics.get('file:count')):
mesg = f'Axon is at file:count limit: {self.maxcount}'
raise s_exc.HitLimit(mesg=mesg)

async def _axonHealth(self, health):
health.update('axon', 'nominal', '', data=await self.metrics())

Expand Down Expand Up @@ -310,6 +338,7 @@ async def metrics(self):

async def save(self, sha256, genr):

self._reqBelowLimit()
byts = self.axonslab.get(sha256, db=self.sizes)
if byts is not None:
return int.from_bytes(byts, 'big')
Expand Down
19 changes: 19 additions & 0 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,12 @@ class Cortex(s_cell.Cell): # type: ignore
'description': 'Enable provenance tracking for all writes.',
'type': 'boolean'
},
'max:nodes': {
'description': 'Maximum number of nodes which are allowed to be stored in a Cortex.',
'type': 'integer',
'minimum': 1,
'hidecmdl': True,
},
'modules': {
'default': [],
'description': 'A list of module classes to load.',
Expand Down Expand Up @@ -859,6 +865,9 @@ async def initServiceStorage(self):

self.spawnpool = None

self.maxnodes = self.conf.get('max:nodes')
self.nodecount = 0

self.storm_cmd_ctors = {}
self.storm_cmd_cdefs = {}

Expand Down Expand Up @@ -2684,6 +2693,16 @@ async def _initLayr(self, layrinfo):
self.layers[layr.iden] = layr
self.dynitems[layr.iden] = layr

if self.maxnodes:
counts = await layr.getFormCounts()
self.nodecount += sum(counts.values())
def onadd():
self.nodecount += 1
def ondel():
self.nodecount -= 1
layr.nodeAddHook = onadd
layr.nodeDelHook = ondel

await self.auth.addAuthGate(layr.iden, 'layer')

await self.bumpSpawnPool()
Expand Down
1 change: 1 addition & 0 deletions synapse/exc.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ class SlabAlreadyOpen(SynErr): pass
class SpawnExit(SynErr): pass
class FeatureNotSupported(SynErr): pass

class HitLimit(SynErr): pass
class ReadOnlyLayer(SynErr): pass
class ReadOnlyProp(SynErr): pass
class RecursionLimitHit(SynErr): pass
Expand Down
3 changes: 2 additions & 1 deletion synapse/lib/cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,8 @@ def getArgParser(cls, conf=None):
Notes:
Boot time configuration data is placed in the argument group called ``config``.
This adds default ``dirn``, ``--telepath``, ``--https`` and ``--name`` arguements to the argparser instance.
Configuration values which have the ``hideconf`` value set to True are not added to the argparser instance.
Configuration values which have the ``hideconf`` or ``hidecmdl`` value set to True are not added to the
argparser instance.

Returns:
argparse.ArgumentParser: A ArgumentParser for the Cell.
Expand Down
3 changes: 3 additions & 0 deletions synapse/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ def getArgParseArgs(self):
if conf.get('hideconf'):
continue

if conf.get('hidecmdl'):
continue

typename = conf.get('type')
# only allow single-typed values to have command line arguments
if not isinstance(typename, str):
Expand Down
6 changes: 6 additions & 0 deletions synapse/lib/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,10 @@ async def __anit__(self, layrinfo, dirn, nexsroot=None, allow_upstream=True):
self.growsize = self.layrinfo.get('growsize')
self.logedits = self.layrinfo.get('logedits')

# slim hooks to avoid async/fire
self.nodeAddHook = None
self.nodeDelHook = None

path = s_common.genpath(self.dirn, 'layer_v2.lmdb')

self.fresh = not os.path.exists(path)
Expand Down Expand Up @@ -1668,6 +1672,7 @@ def _editNodeAdd(self, buid, form, edit, sode, meta):
self.layrslab.put(abrv + indx, buid, db=self.byprop)

self.formcounts.inc(form)
if self.nodeAddHook is not None: self.nodeAddHook()

retn = [
(EDIT_NODE_ADD, (valu, stortype), ())
Expand Down Expand Up @@ -1709,6 +1714,7 @@ def _editNodeDel(self, buid, form, edit, sode, meta):
self.layrslab.delete(abrv + indx, buid, db=self.byprop)

self.formcounts.inc(form, valu=-1)
if self.nodeDelHook is not None: self.nodeDelHook()

self._wipeNodeData(buid)
# TODO edits to become async so we can sleep(0) on large deletes?
Expand Down
4 changes: 4 additions & 0 deletions synapse/lib/snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,10 @@ async def _getadds(f, p, formnorm, forminfo, doaddnode=True):
else:
yield (buid, f.name, edits)

if self.core.maxnodes is not None and self.core.maxnodes <= self.core.nodecount:
mesg = f'Cortex is at node:count limit: {self.core.maxnodes}'
raise s_exc.HitLimit(mesg=mesg)

if props is None:
props = {}

Expand Down
16 changes: 16 additions & 0 deletions synapse/tests/test_axon.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,3 +344,19 @@ async def test_axon_perms(self):
await user.addRule((True, ('axon', 'has',)))
await user.addRule((True, ('axon', 'upload',)))
await self.runAxonTestBase(prox)

async def test_axon_limits(self):

async with self.getTestAxon(conf={'max:count': 10}) as axon:
for i in range(10):
await axon.put(s_common.buid())

with self.raises(s_exc.HitLimit):
await axon.put(s_common.buid())

async with self.getTestAxon(conf={'max:bytes': 320}) as axon:
for i in range(10):
await axon.put(s_common.buid())

with self.raises(s_exc.HitLimit):
await axon.put(s_common.buid())
6 changes: 6 additions & 0 deletions synapse/tests/test_cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ async def test_cortex_must_upgrade(self):
async with await s_cortex.Cortex.anit(dirn) as core:
pass

async def test_cortex_limits(self):
async with self.getTestCore(conf={'max:nodes': 10}) as core:
self.len(1, await core.nodes('[ ou:org=* ]'))
with self.raises(s_exc.HitLimit):
await core.nodes('[ inet:ipv4=1.2.3.0/24 ]')

async def test_cortex_rawpivot(self):

async with self.getTestCore() as core:
Expand Down
44 changes: 42 additions & 2 deletions synapse/tests/test_lib_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,17 +288,57 @@ def test_hideconf(self):
for optname, optinfo in conf.getArgParseArgs():
pars.add_argument(optname, **optinfo)

# key:integer is not present in cmdline
hmsg = pars.format_help()
self.isin('--key-string', hmsg)
self.notin('--key-integer', hmsg)

s1 = yaml.safe_dump('We all float down here')
i1 = yaml.safe_dump(8675309)
# Load data from envars next - this shows precedence as well
# where data already set won't be set again via this method.
# Load data from envars next - this shows that we won't
# set the key:integer value
with self.setTstEnvars(KEY_STRING=s1,
KEY_INTEGER=i1,
):
conf.setConfFromEnvs()
self.eq(conf.get('key:string'), 'We all float down here')
self.none(conf.get('key:integer'))

# hidearg instead of hideconf
hide_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": False,
"properties": {
'key:string': {
'description': 'Key String. I have a defval!',
'type': 'string',
'default': 'Default string!'
},
'key:integer': {
'description': 'Key Integer',
'type': 'integer',
'arg': True,
'hidecmdl': True,
},
}
}
conf = s_config.Config(hide_schema)
pars = argparse.ArgumentParser('synapse.tests.test_lib_config.test_hideconf')
for optname, optinfo in conf.getArgParseArgs():
pars.add_argument(optname, **optinfo)

# key:integer is not present in cmdline
hmsg = pars.format_help()
self.isin('--key-string', hmsg)
self.notin('--key-integer', hmsg)

s1 = yaml.safe_dump('We all float down here')
i1 = yaml.safe_dump(8675309)
# Load data from envars next - this shows we will
# set the key:integer value
with self.setTstEnvars(KEY_STRING=s1,
KEY_INTEGER=i1,
):
conf.setConfFromEnvs()
self.eq(conf.get('key:string'), 'We all float down here')
self.eq(conf.get('key:integer'), 8675309)
6 changes: 3 additions & 3 deletions synapse/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,21 +791,21 @@ def thisHostMustNot(self, **props): # pragma: no cover
raise unittest.SkipTest('skip thishost: %s==%r' % (k, v))

@contextlib.asynccontextmanager
async def getTestAxon(self, dirn=None):
async def getTestAxon(self, dirn=None, conf=None):
'''
Get a test Axon as an async context manager.

Returns:
s_axon.Axon: A Axon object.
'''
if dirn is not None:
async with await s_axon.Axon.anit(dirn) as axon:
async with await s_axon.Axon.anit(dirn, conf) as axon:
yield axon

return

with self.getTestDir() as dirn:
async with await s_axon.Axon.anit(dirn) as axon:
async with await s_axon.Axon.anit(dirn, conf) as axon:
yield axon

@contextlib.contextmanager
Expand Down