Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow benchmark script to run on remote cortices #1829

Merged
merged 8 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 164 additions & 113 deletions scripts/benchmark_cortex.py

Large diffs are not rendered by default.

14 changes: 12 additions & 2 deletions synapse/lib/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,9 @@ def pack(self):

@s_nexus.Pusher.onPushAuto('layer:truncate')
async def truncate(self):

'''
Nuke all the contents in the layer, leaving an empty layer
'''
self.buidcache.clear()

await self.layrslab.trash()
Expand All @@ -1088,7 +1090,9 @@ async def truncate(self):
await self._initLayerStorage()

async def clone(self, newdirn):

'''
Copy the contents of this layer to a new layer
'''
for root, dnames, fnames in os.walk(self.dirn, topdown=True):

relpath = os.path.relpath(root, start=self.dirn)
Expand Down Expand Up @@ -1119,6 +1123,12 @@ async def clone(self, newdirn):
dstpath = s_common.genpath(newdirn, relpath, name)
shutil.copy(srcpath, dstpath)

async def waitForHot(self):
'''
Wait for the layer's slab to be prefaulted and locked into memory if lockmemory is true, otherwise return.
'''
await self.layrslab.lockdoneevent.wait()

async def _initLayerStorage(self):

slabopts = {
Expand Down
91 changes: 56 additions & 35 deletions synapse/lib/lmdbslab.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import shutil
import asyncio
import pathlib
import functools
import threading
import collections

Expand Down Expand Up @@ -706,6 +705,8 @@ async def memlockfini():
await self.memlocktask
self.memlocktask = s_coro.executor(self._memorylockloop)
self.onfini(memlockfini)
else:
self.lockdoneevent.set()

self.dbnames = {None: (None, False)} # prepopulate the default DB for speed

Expand Down Expand Up @@ -1096,11 +1097,6 @@ def stat(self, db=None):
finally:
self._relXactForReading()

# non-scan ("atomic") interface.
# def getByDup(self, lkey, db=None):
# def getByPref(self, lkey, db=None):
# def getByRange(self, lkey, db=None):

def scanKeys(self, db=None):

with ScanKeys(self, db) as scan:
Expand All @@ -1116,12 +1112,12 @@ async def countByPref(self, byts, db=None):
'''
count = 0
size = len(byts)
with Scan(self, db) as scan:
with ScanKeys(self, db) as scan:

if not scan.set_range(byts):
return 0

for lkey, lval in scan.iternext():
for lkey in scan.iternext():

if lkey[:size] != byts:
return count
Expand Down Expand Up @@ -1245,9 +1241,6 @@ def scanByFullBack(self, db=None):

yield from scan.iternext()

# def keysByRange():
# def valsByRange():

def _initCoXact(self):
try:
self.xact = self.lenv.begin(write=not self.readonly)
Expand Down Expand Up @@ -1472,16 +1465,17 @@ def first(self):
if not self.curs.first():
return False

self.genr = self.iterfunc(self.curs)
self.genr = self.iterfunc()
self.atitem = next(self.genr)

return True

def set_key(self, lkey):

if not self.curs.set_key(lkey):
return False

self.genr = self.iterfunc(self.curs)
self.genr = self.iterfunc()
self.atitem = next(self.genr)
return True

Expand All @@ -1490,7 +1484,7 @@ def set_range(self, lkey):
if not self.curs.set_range(lkey):
return False

self.genr = self.iterfunc(self.curs)
self.genr = self.iterfunc()
self.atitem = next(self.genr)

return True
Expand All @@ -1512,11 +1506,11 @@ def iternext(self):

self.curs = self.slab.xact.cursor(db=self.db)

if not self.resume(self.atitem):
if not self.resume():
raise StopIteration

self.genr = self.iterfunc(self.curs)
if self.item() == self.atitem:
self.genr = self.iterfunc()
if self.isatitem():
next(self.genr)

self.atitem = next(self.genr)
Expand All @@ -1529,13 +1523,11 @@ def bump(self):
self.curs.close()
self.bumped = True

def item(self):
return self.curs.item()
def iterfunc(self):
return self.curs.iternext()

def iterfunc(self, curs):
return curs.iternext()

def resume(self, item):
def resume(self):
item = self.atitem

if not self.dupsort:
return self.curs.set_range(item[0])
Expand All @@ -1553,32 +1545,60 @@ def resume(self, item):

return True

class ScanKeys(Scan):
def isatitem(self):
'''
Returns if the cursor is at the value in atitem
'''
return self.atitem == self.curs.item()

def item(self):
return self.curs.key()
class ScanKeys(Scan):
'''
An iterator over the keys of the database. If the database is dupsort, a key with multiple values with be yielded
once for each value.
'''
def iterfunc(self):
if self.dupsort:
return Scan.iterfunc(self)

def iterfunc(self, curs):
return self.curs.iternext(keys=True, values=False)

def resume(self, item):
return self.curs.set_range(item)
def resume(self):
if self.dupsort:
return Scan.resume(self)

return self.curs.set_range(self.atitem)

def isatitem(self):
'''
Returns if the cursor is at the value in atitem
'''
if self.dupsort:
return Scan.isatitem(self)

return self.atitem == self.curs.key()

def iternext(self):
if self.dupsort:
yield from (item[0] for item in Scan.iternext(self))
return

yield from Scan.iternext(self)

class ScanBack(Scan):
'''
A state-object used by Slab. Not to be instantiated directly.

Scans backwards.
'''
def iterfunc(self, curs):
return curs.iterprev()
def iterfunc(self):
return self.curs.iterprev()

def first(self):

if not self.curs.last():
return False

self.genr = self.iterfunc(self.curs)
self.genr = self.iterfunc()
self.atitem = next(self.genr)
return True

Expand All @@ -1590,7 +1610,7 @@ def set_key(self, lkey):
if self.dupsort:
self.curs.last_dup()

self.genr = self.iterfunc(self.curs)
self.genr = self.iterfunc()
self.atitem = next(self.genr)
return True

Expand All @@ -1608,12 +1628,13 @@ def set_range(self, lkey):
if self.dupsort:
self.curs.last_dup()

self.genr = self.iterfunc(self.curs)
self.genr = self.iterfunc()
self.atitem = next(self.genr)

return True

def resume(self, item):
def resume(self):
item = self.atitem

if not self.dupsort:

Expand Down
15 changes: 13 additions & 2 deletions synapse/tests/test_lib_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,6 @@ async def test_layer_nodeedits(self):

nodelist0 = [node.pack() for node in nodelist0]

count = 0
editlist = []

layr = core0.getLayer()
Expand Down Expand Up @@ -836,6 +835,18 @@ async def test_layer(self):
self.false(await layr.hasTagProp('score'))
nodes = await core.nodes('[test:str=bar +#test:score=100]')

async def test_layer_waitForHot(self):

async with self.getTestCore() as core:
layr = core.getLayer()

await asyncio.wait_for(layr.waitForHot(), timeout=1.0)

conf = {'layers:lockmemory': True}
async with self.getTestCore(conf=conf) as core:
layr = core.getLayer()
await asyncio.wait_for(layr.waitForHot(), timeout=1.0)

async def test_layer_no_extra_logging(self):

async with self.getTestCore() as core:
Expand Down Expand Up @@ -884,7 +895,7 @@ async def test_layer_clone(self):
self.eq('foo', await layr.getNodeValu(buid))
self.eq((1420070400000, 1451606400000), await layr.getNodeValu(buid, '.seen'))

adir = s_common.gendir(layr.dirn, 'adir')
s_common.gendir(layr.dirn, 'adir')

copylayrinfo = await core.cloneLayer(layr.iden)
self.len(2, core.layers)
Expand Down
6 changes: 3 additions & 3 deletions synapse/tests/test_lib_lmdbslab.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async def test_lmdbslab_scankeys(self):
slab.put(b'hoho', b'haha', db=testdb)

testgenr = slab.scanKeys(db=testdb)
dupsgenr = slab.scanKeys(db=testdb)
dupsgenr = slab.scanKeys(db=dupsdb)

testlist = [next(testgenr)]
dupslist = [next(dupsgenr)]
Expand All @@ -65,7 +65,7 @@ async def test_lmdbslab_scankeys(self):
dupslist.extend(dupsgenr)

self.eq(testlist, (b'hehe', b'hoho'))
self.eq(dupslist, (b'hehe', b'hoho'))
self.eq(dupslist, (b'hehe', b'hehe', b'hoho'))

# now lets delete the key we're on
testgenr = slab.scanKeys(db=testdb)
Expand Down Expand Up @@ -558,7 +558,7 @@ async def test_lmdbslab_count_empty(self):
with self.getTestDir() as dirn:
path = os.path.join(dirn, 'test.lmdb')
async with await s_lmdbslab.Slab.anit(path, map_size=100000, growsize=10000) as slab:
await slab.countByPref(b'asdf')
self.eq(0, await slab.countByPref(b'asdf'))

async def test_lmdbslab_grow(self):

Expand Down
2 changes: 1 addition & 1 deletion synapse/tests/test_lib_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ async def taskfunc(i):
n = 4
tasks = [taskfunc(i) for i in range(n)]
try:
await asyncio.wait_for(asyncio.gather(*tasks), timeout=40)
await asyncio.wait_for(asyncio.gather(*tasks), timeout=80)
except asyncio.TimeoutError:
self.fail('Timeout awaiting for spawn tasks to finish.')

Expand Down
2 changes: 1 addition & 1 deletion synapse/tests/test_lib_stormtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3122,7 +3122,7 @@ async def test_stormtypes_layer_edits(self):

await core.nodes('[inet:ipv4=1.2.3.4]')

#TODO: should we asciify the buid here so it is json compatible?
# TODO: should we asciify the buid here so it is json compatible?
q = '''$list = $lib.list()
for ($offs, $edit) in $lib.layer.get().edits(wait=$lib.false) {
$list.append($edit)
Expand Down