Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize layer storage format for mem size and perf #1877

Merged
merged 14 commits into from
Sep 22, 2020
Merged
719 changes: 338 additions & 381 deletions synapse/lib/layer.py

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions synapse/lib/lmdbslab.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ async def memlockfini():

self.dbnames = {None: (None, False)} # prepopulate the default DB for speed

self.onfini(self._onCoFini)
self.onfini(self._onSlabFini)

self.commitstats = collections.deque(maxlen=1000) # stores Tuple[time, replayloglen, commit time delta]

Expand Down Expand Up @@ -800,10 +800,12 @@ async def sync(self):
self._handle_mapfull()
# There's no need to re-try self.forcecommit as _growMapSize does it

async def _onCoFini(self):
assert s_glob.iAmLoop()

async def fini(self):
await self.fire('commit')
return await s_base.Base.fini(self)

async def _onSlabFini(self):
assert s_glob.iAmLoop()

while True:
try:
Expand Down
49 changes: 25 additions & 24 deletions synapse/lib/snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ async def _joinStorNode(self, buid, cache):
return node

ndef = None

tags = {}
props = {}
nodedata = {}
Expand All @@ -276,44 +275,45 @@ async def _joinStorNode(self, buid, cache):
if sode is None:
sode = await layr.getStorNode(buid)

info = sode[1]

storndef = info.get('ndef')
if storndef is not None:
ndef = storndef
form = sode.get('form')
valt = sode.get('valu')
if valt is not None:
ndef = (form, valt[0])
bylayer['ndef'] = layr

storprops = info.get('props')
storprops = sode.get('props')
if storprops is not None:
props.update(storprops)
bylayer['props'].update({p: layr for p in storprops.keys()})
for prop, (valu, stype) in storprops.items():
props[prop] = valu
bylayer['props'][prop] = layr

stortags = info.get('tags')
stortags = sode.get('tags')
if stortags is not None:
tags.update(stortags)
bylayer['tags'].update({p: layr for p in stortags.keys()})

stortagprops = info.get('tagprops')
stortagprops = sode.get('tagprops')
if stortagprops is not None:
tagprops.update(stortagprops)
bylayer['tagprops'].update({p: layr for p in stortagprops.keys()})
for tagprop, (valu, stype) in stortagprops.items():
tagprops[tagprop] = valu
bylayer['tagprops'][tagprop] = layr

stordata = info.get('nodedata')
stordata = sode.get('nodedata')
if stordata is not None:
nodedata.update(stordata)

if ndef is None:
return None

fullnode = (buid, {
pode = (buid, {
'ndef': ndef,
'tags': tags,
'props': props,
'nodedata': nodedata,
'tagprops': tagprops,
})

node = s_node.Node(self, fullnode, bylayer=bylayer)
node = s_node.Node(self, pode, bylayer=bylayer)
self.livenodes[buid] = node
self.buidcache.append(node)

Expand All @@ -323,9 +323,9 @@ async def _joinStorNode(self, buid, cache):

async def _joinStorGenr(self, layr, genr):
cache = {}
async for sode in genr:
async for buid, sode in genr:
cache[layr.iden] = sode
node = await self._joinStorNode(sode[0], cache)
node = await self._joinStorNode(buid, cache)
if node is not None:
yield node

Expand Down Expand Up @@ -368,6 +368,7 @@ async def nodesByProp(self, full):
for layr in self.layers:
genr = layr.liftByProp(None, prop.name)
async for node in self._joinStorGenr(layr, genr):
# TODO should these type of filters yield?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no todos in mainline code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err fixmes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a question about how many results one could have that whiff in the continue block or not. that's really the question about yielding here.

if node.bylayer['props'].get(prop.name) != layr:
continue
yield node
Expand Down Expand Up @@ -623,7 +624,7 @@ async def applyNodeEdits(self, edits):
meta = await self.getSnapMeta()

todo = s_common.todo('storNodeEdits', edits, meta)
sodes = await self.core.dyncall(self.wlyr.iden, todo)
results = await self.core.dyncall(self.wlyr.iden, todo)

wlyr = self.wlyr
nodes = []
Expand All @@ -634,22 +635,20 @@ async def applyNodeEdits(self, edits):
# and collect up all the callbacks to fire at once at the end. It is
# critical to fire all callbacks after applying all Node() changes.

for sode in sodes:
for buid, sode, postedits in results:

cache = {wlyr.iden: sode}

node = await self._joinStorNode(sode[0], cache)
node = await self._joinStorNode(buid, cache)

if node is None:
# We got part of a node but no ndef
continue

nodes.append(node)

postedits = sode[1].get('edits', ())

if postedits:
actualedits.append((sode[0], sode[1]['form'], postedits))
actualedits.append((buid, node.form.name, postedits))

for edit in postedits:

Expand Down Expand Up @@ -793,6 +792,7 @@ async def addNode(self, name, valu, props=None):
raise

nodes = await self.applyNodeEdits(adds)
assert len(nodes) >= 1

# Adds is top-down, so the first node is what we want
return nodes[0]
Expand Down Expand Up @@ -933,6 +933,7 @@ async def getRuntNodes(self, full, valu=None, cmpr=None):
todo = s_common.todo('runRuntLift', full, valu, cmpr)
async for sode in self.core.dyniter('cortex', todo):

buid = s_common.buid()
vEpiphyte marked this conversation as resolved.
Show resolved Hide resolved
node = s_node.Node(self, sode)
node.isrunt = True

Expand Down
36 changes: 16 additions & 20 deletions synapse/tests/test_lib_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,35 +661,17 @@ async def test_layer_stortype_merge(self):
nodes = await core.nodes('inet:ipv4=1.2.3.4 [ +#foo.bar=2015 ]')
self.eq((1325376000000, 1420070400001), nodes[0].getTag('foo.bar'))

nodeedits = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC this tested applying node edits and showing that the results of applying edits without changes resulted in a empty set of changes. I guess this would be OBE with the sode format change.

(buid, None, (
(s_layer.EDIT_PROP_SET, ('loc', 'us', None, s_layer.STOR_TYPE_LOC), ()),
)),
]

nodeedits_out = await layr.storNodeEdits(nodeedits, {})
self.notin('loc', nodeedits_out[0][1]['props'])

nodeedits = [
(buid, None, (
(s_layer.EDIT_TAG_SET, ('faz.baz', (None, None), (None, None)), ()),
)),
]

nodeedits_out = await layr.storNodeEdits(nodeedits, {})
self.notin('faz.baz', nodeedits_out[0][1]['tags'])

async def test_layer_nodeedits_created(self):

async with self.getTestCore() as core:

nodes = await core.nodes('[ test:int=1 :loc=us ]')
created00 = nodes[0].get('.created')
self.nn(created00)

layr = core.getLayer()

editlist00 = [nes async for nes in layr.iterLayerNodeEdits()]

await core.nodes('test:int=1 | delnode')
self.len(0, await core.nodes('test:int'))

Expand All @@ -710,7 +692,7 @@ async def test_layer_nodeedits_created(self):

# If meta is not specified .created gets populated to now
await asyncio.sleep(0.01)
await layr.storNodeEdits(nexslist00, None)
await layr.storNodeEdits(nexslist00, {})

nodes = await core.nodes('test:int')
self.len(1, nodes)
Expand Down Expand Up @@ -994,3 +976,17 @@ async def test_layer_clone(self):

readlayr = core.getLayer(readlayrinfo.get('iden'))
self.true(readlayr.readonly)

async def test_layer_v3(self):

async with self.getRegrCore('2.0-layerv2tov3') as core:

nodes = await core.nodes('inet:ipv4=1.2.3.4')
self.len(1, nodes)
self.eq(nodes[0].ndef, ('inet:ipv4', 0x01020304))
self.eq(nodes[0].get('asn'), 33)
self.true(nodes[0].getTag('foo.bar'), (None, None))
self.true(nodes[0].getTagProp('foo.bar', 'confidence'), 22)

for layr in core.layers.values():
self.eq(layr.layrvers, 3)
3 changes: 2 additions & 1 deletion synapse/tests/test_lib_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ async def workloop():
await core.addTagProp('added', ('time', {}), {})
for q in queries:
await core.nodes(q)
core.view.layers[0].layrslab.forcecommit()

await core.view.layers[0].layrslab.sync()

spawninfo = await core.getSpawnInfo()
queue.put(spawninfo)
Expand Down
Loading