Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add layer cloning #1819

Merged
merged 19 commits into from
Jul 31, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,10 @@ async def enableMigrationMode(self):
async def disableMigrationMode(self):
await self.cell._disableMigrationMode()

@s_cell.adminapi()
async def cloneLayer(self, iden):
return await self.cell.cloneLayer(iden)

class Cortex(s_cell.Cell): # type: ignore
'''
A Cortex implements the synapse hypergraph.
Expand Down Expand Up @@ -2495,6 +2499,52 @@ async def _initCoreLayers(self):
layrinfo = await node.dict()
await self._initLayr(layrinfo)

async def cloneLayer(self, iden):
'''
Make a copy of a Layer in the cortex.

Args:
iden (str): layer iden
'''
layr = self.layers.get(iden, None)
if layr is None:
raise s_exc.NoSuchLayer(iden=iden)

newiden = s_common.guid()

return await self._push('layer:clone', iden, newiden)

@s_nexus.Pusher.onPush('layer:clone')
async def _cloneLayer(self, iden, newiden):

if newiden in self.layers:
return

layr = self.layers.get(iden)
if layr is None:
return

newpath = s_common.gendir(self.dirn, 'layers', newiden)
await layr.clone(newpath)

node = await self.hive.open(('cortex', 'layers', iden))
copynode = await self.hive.open(('cortex', 'layers', newiden))

layrinfo = await node.dict()
copyinfo = await copynode.dict()
for name, valu in layrinfo.items():
await copyinfo.set(name, valu)
Cisphyx marked this conversation as resolved.
Show resolved Hide resolved

await copyinfo.set('iden', newiden)

copylayr = await self._initLayr(copyinfo)

creator = copyinfo.get('creator')
user = await self.auth.reqUser(creator)
Cisphyx marked this conversation as resolved.
Show resolved Hide resolved
await user.setAdmin(True, gateiden=newiden, logged=False)

return copylayr.pack()

def addStormCmd(self, ctor):
'''
Add a synapse.lib.storm.Cmd class to the cortex.
Expand Down
11 changes: 11 additions & 0 deletions synapse/lib/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,17 @@ async def truncate(self):

await self._initLayerStorage()

async def clone(self, newdirn):

path = s_common.genpath(newdirn, 'layer_v2.lmdb')
await self.layrslab.copyslab(path)

nodedatapath = s_common.genpath(newdirn, 'nodedata.lmdb')
await self.dataslab.copyslab(nodedatapath)

editspath = s_common.genpath(newdirn, 'nodeedits.lmdb')
await self.nodeeditslab.copyslab(editspath)
Cisphyx marked this conversation as resolved.
Show resolved Hide resolved

async def _initLayerStorage(self):

slabopts = {
Expand Down
20 changes: 20 additions & 0 deletions synapse/lib/lmdbslab.py
Original file line number Diff line number Diff line change
Expand Up @@ -1350,6 +1350,26 @@ def copydb(self, sourcedbname, destslab, destdbname=None, progresscb=None):

return rowcount

async def copyslab(self, dstpath, compact=True):

dstpath = pathlib.Path(dstpath)
if dstpath.exists():
raise s_exc.DataAlreadyExists()

dstoptspath = dstpath.with_suffix('.opts.yaml')
s_common.gendir(dstpath)

await self.sync()

self.lenv.copy(str(dstpath), compact=compact)

try:
shutil.copy(self.optspath, dstoptspath)
except FileNotFoundError: # pragma: no cover
pass

return True

def pop(self, lkey, db=None):
return self._xact_action(self.pop, lmdb.Transaction.pop, lkey, db=db)

Expand Down
23 changes: 23 additions & 0 deletions synapse/tests/test_lib_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,3 +869,26 @@ async def test_layer_flat_edits(self):
)),
)
self.len(2, s_layer.getFlatEdits(nodeedits))

async def test_layer_clone(self):

async with self.getTestCore() as core:

layr = core.getLayer()
self.isin(f'Layer (Layer): {layr.iden}', str(layr))

nodes = await core.nodes('[test:str=foo .seen=(2015, 2016)]')
buid = nodes[0].buid

self.eq('foo', await layr.getNodeValu(buid))
self.eq((1420070400000, 1451606400000), await layr.getNodeValu(buid, '.seen'))

copylayrinfo = await core.cloneLayer(layr.iden)
self.len(2, core.layers)

copylayr = core.getLayer(copylayrinfo.get('iden'))
self.isin(f'Layer (Layer): {copylayr.iden}', str(copylayr))
self.ne(layr.iden, copylayr.iden)

self.eq('foo', await copylayr.getNodeValu(buid))
self.eq((1420070400000, 1451606400000), await copylayr.getNodeValu(buid, '.seen'))
19 changes: 19 additions & 0 deletions synapse/tests/test_lib_lmdbslab.py
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,25 @@ async def test_lmdbslab_doubleopen(self):
# Can't re-open while already open
await self.asyncraises(s_exc.SlabAlreadyOpen, s_lmdbslab.Slab.anit(path))

async def test_lmdbslab_copyslab(self):

with self.getTestDir() as dirn:

path = os.path.join(dirn, 'test.lmdb')
copypath = os.path.join(dirn, 'copy.lmdb')

async with await s_lmdbslab.Slab.anit(path) as slab:
foo = slab.initdb('foo')
slab.put(b'\x00\x01', b'hehe', db=foo)

await slab.copyslab(copypath)

async with await s_lmdbslab.Slab.anit(copypath) as slabcopy:
foo = slabcopy.initdb('foo')
self.eq(b'hehe', slabcopy.get(b'\x00\x01', db=foo))
Cisphyx marked this conversation as resolved.
Show resolved Hide resolved

await self.asyncraises(s_exc.DataAlreadyExists, slab.copyslab(copypath))

class LmdbSlabMemLockTest(s_t_utils.SynTest):

async def test_lmdbslabmemlock(self):
Expand Down