Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add layer cloning #1819

Merged
merged 19 commits into from
Jul 31, 2020
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,13 @@ async def enableMigrationMode(self):
async def disableMigrationMode(self):
await self.cell._disableMigrationMode()

@s_cell.adminapi()
async def cloneLayer(self, iden):
ldef = {
Cisphyx marked this conversation as resolved.
Show resolved Hide resolved
'creator': self.user.iden,
}
return await self.cell.cloneLayer(iden, ldef)

class Cortex(s_cell.Cell): # type: ignore
'''
A Cortex implements the synapse hypergraph.
Expand Down Expand Up @@ -2498,6 +2505,61 @@ async def _initCoreLayers(self):
layrinfo = await node.dict()
await self._initLayr(layrinfo)

async def cloneLayer(self, iden, ldef=None):
'''
Make a copy of a Layer in the cortex.

Args:
iden (str): Layer iden to clone
ldef (Optional[Dict]): Layer configuration overrides

Note:
This should only be called with a reasonably static Cortex
due to possible races.
'''
layr = self.layers.get(iden, None)
if layr is None:
raise s_exc.NoSuchLayer(iden=iden)

ldef = ldef or {}
ldef['iden'] = s_common.guid()
ldef.setdefault('creator', self.auth.rootuser.iden)

return await self._push('layer:clone', iden, ldef)

@s_nexus.Pusher.onPush('layer:clone')
async def _cloneLayer(self, iden, ldef):

layr = self.layers.get(iden)
if layr is None:
return

newiden = ldef.get('iden')
if newiden in self.layers:
return

newpath = s_common.gendir(self.dirn, 'layers', newiden)
await layr.clone(newpath)

node = await self.hive.open(('cortex', 'layers', iden))
copynode = await self.hive.open(('cortex', 'layers', newiden))

layrinfo = await node.dict()
copyinfo = await copynode.dict()
for name, valu in layrinfo.items():
await copyinfo.set(name, valu)
Cisphyx marked this conversation as resolved.
Show resolved Hide resolved

for name, valu in ldef.items():
await copyinfo.set(name, valu)

copylayr = await self._initLayr(copyinfo)

creator = copyinfo.get('creator')
user = await self.auth.reqUser(creator)
Cisphyx marked this conversation as resolved.
Show resolved Hide resolved
await user.setAdmin(True, gateiden=newiden, logged=False)

return copylayr.pack()

def addStormCmd(self, ctor):
'''
Add a synapse.lib.storm.Cmd class to the cortex.
Expand Down
32 changes: 32 additions & 0 deletions synapse/lib/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,38 @@ async def truncate(self):

await self._initLayerStorage()

async def clone(self, newdirn):

for root, dnames, fnames in os.walk(self.dirn, topdown=True):

relpath = os.path.relpath(root, start=self.dirn)

for name in list(dnames):

relname = os.path.join(relpath, name)

srcpath = s_common.genpath(root, name)
dstpath = s_common.genpath(newdirn, relname)

if srcpath in s_lmdbslab._AllSlabs:
slab = s_lmdbslab._AllSlabs.get(srcpath)
await slab.copyslab(dstpath)

dnames.remove(name)
continue

s_common.gendir(dstpath)

for name in fnames:

srcpath = s_common.genpath(root, name)
# skip unix sockets etc...
if not os.path.isfile(srcpath):
continue

dstpath = s_common.genpath(newdirn, relpath, name)
shutil.copy(srcpath, dstpath)

async def _initLayerStorage(self):

slabopts = {
Expand Down
26 changes: 23 additions & 3 deletions synapse/lib/lmdbslab.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
int64max = s_common.int64en(0xffffffffffffffff)

# The paths of all open slabs, to prevent accidental opening of the same slab in two places
_AllSlabs = set() # type: ignore
_AllSlabs = {} # type: ignore

class Hist:
'''
Expand Down Expand Up @@ -679,7 +679,7 @@ async def __anit__(self, path, **kwargs):
self._saveOptsFile()

self.lenv = lmdb.open(str(path), **opts)
_AllSlabs.add(abspath)
_AllSlabs[abspath] = self

self.scans = set()

Expand Down Expand Up @@ -808,7 +808,7 @@ async def _onCoFini(self):
break

self.lenv.close()
_AllSlabs.discard(self.abspath)
_AllSlabs.pop(self.abspath, None)
del self.lenv

def _finiCoXact(self):
Expand Down Expand Up @@ -1380,6 +1380,26 @@ def copydb(self, sourcedbname, destslab, destdbname=None, progresscb=None):

return rowcount

async def copyslab(self, dstpath, compact=True):

dstpath = pathlib.Path(dstpath)
if dstpath.exists():
raise s_exc.DataAlreadyExists()

dstoptspath = dstpath.with_suffix('.opts.yaml')
s_common.gendir(dstpath)

await self.sync()

self.lenv.copy(str(dstpath), compact=compact)

try:
shutil.copy(self.optspath, dstoptspath)
except FileNotFoundError: # pragma: no cover
pass

return True

def pop(self, lkey, db=None):
return self._xact_action(self.pop, lmdb.Transaction.pop, lkey, db=db)

Expand Down
41 changes: 41 additions & 0 deletions synapse/tests/test_lib_layer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import math
import asyncio
import contextlib
Expand Down Expand Up @@ -869,3 +870,43 @@ async def test_layer_flat_edits(self):
)),
)
self.len(2, s_layer.getFlatEdits(nodeedits))

async def test_layer_clone(self):

async with self.getTestCoreAndProxy() as (core, prox):

layr = core.getLayer()
self.isin(f'Layer (Layer): {layr.iden}', str(layr))

nodes = await core.nodes('[test:str=foo .seen=(2015, 2016)]')
buid = nodes[0].buid

self.eq('foo', await layr.getNodeValu(buid))
self.eq((1420070400000, 1451606400000), await layr.getNodeValu(buid, '.seen'))

adir = s_common.gendir(layr.dirn, 'adir')

copylayrinfo = await core.cloneLayer(layr.iden)
self.len(2, core.layers)

copylayr = core.getLayer(copylayrinfo.get('iden'))
self.isin(f'Layer (Layer): {copylayr.iden}', str(copylayr))
self.ne(layr.iden, copylayr.iden)

self.eq('foo', await copylayr.getNodeValu(buid))
self.eq((1420070400000, 1451606400000), await copylayr.getNodeValu(buid, '.seen'))

cdir = s_common.gendir(copylayr.dirn, 'adir')
self.true(os.path.exists(cdir))

await self.asyncraises(s_exc.NoSuchLayer, prox.cloneLayer('newp'))

self.false(layr.readonly)

# Test overriding layer config values
ldef = {'readonly': True}
readlayrinfo = await core.cloneLayer(layr.iden, ldef)
self.len(3, core.layers)

readlayr = core.getLayer(readlayrinfo.get('iden'))
self.true(readlayr.readonly)
21 changes: 21 additions & 0 deletions synapse/tests/test_lib_lmdbslab.py
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,27 @@ async def test_lmdbslab_doubleopen(self):
# Can't re-open while already open
await self.asyncraises(s_exc.SlabAlreadyOpen, s_lmdbslab.Slab.anit(path))

async def test_lmdbslab_copyslab(self):

with self.getTestDir() as dirn:

path = os.path.join(dirn, 'test.lmdb')
copypath = os.path.join(dirn, 'copy.lmdb')

async with await s_lmdbslab.Slab.anit(path) as slab:
foo = slab.initdb('foo')
slab.put(b'\x00\x01', b'hehe', db=foo)

await slab.copyslab(copypath)

self.true(pathlib.Path(copypath).with_suffix('.opts.yaml').exists())

async with await s_lmdbslab.Slab.anit(copypath) as slabcopy:
foo = slabcopy.initdb('foo')
self.eq(b'hehe', slabcopy.get(b'\x00\x01', db=foo))
Cisphyx marked this conversation as resolved.
Show resolved Hide resolved

await self.asyncraises(s_exc.DataAlreadyExists, slab.copyslab(copypath))

class LmdbSlabMemLockTest(s_t_utils.SynTest):

async def test_lmdbslabmemlock(self):
Expand Down