From 13ac65c950c881d08874b1eef225764e8363b84b Mon Sep 17 00:00:00 2001 From: visi Date: Wed, 13 Nov 2024 10:28:26 -0500 Subject: [PATCH] Optimize a Slab while it's running --- synapse/lib/fifofile.py | 70 +++++++++++ synapse/lib/lmdbslab.py | 183 +++++++++++++++++++++++++++-- synapse/lib/msgpack.py | 5 + synapse/tests/test_lib_lmdbslab.py | 70 +++++++++++ 4 files changed, 316 insertions(+), 12 deletions(-) create mode 100644 synapse/lib/fifofile.py diff --git a/synapse/lib/fifofile.py b/synapse/lib/fifofile.py new file mode 100644 index 0000000000..fb44e4c39c --- /dev/null +++ b/synapse/lib/fifofile.py @@ -0,0 +1,70 @@ +import io +import asyncio + +import synapse.common as s_common + +import synapse.lib.base as s_base +import synapse.lib.const as s_const + +class FifoFile(s_base.Base): + ''' + Use a file as an async FIFO. + ''' + async def __anit__(self): + + await s_base.Base.__anit__(self) + + self.size = 0 + self.offset = 0 + + self.count = 0 + self.yielded = 0 + + self.lock = asyncio.Lock() + self.readable = asyncio.Event() + self.unpacker = s_msgpack.unpacker() + + self.tempdir = self.enter_context(s_common.getTempDir()) + self.fifopath = s_common.genpath(self.tempdir, 'fifo.mpk') + self.fifofile = await s_coro.executor(io.open, self.fifopath, 'wb') + self.fifofd = self.fifofile.fileno() + + async def fini(): + self.readable.set() + + self.onfini(fini) + + async def put(self, item): + byts = s_msgpack.en(item) + async with self.lock: + self.size += await s_coro.executor(os.pwrite, self.fifofd, self.size) + self.count += 1 + self.readable.set() + + async def get(self): + + while True: + + if self.isfini: + raise s_exc.IsFini() + + async with self.lock: + + while True: + + try: + item = self.unpacker.unpack() + self.count -= 1 + return item + + except msgpack.exceptions.OutOfData: + + if self.count == 0: + # TODO if we catch up, truncate the file? + # await s_coro.executor(self.fifofile.truncate) + break + + byts = s_coro.executor(self.fifofile.read, s_const.mebibyte) + self.unpacker.feed(byts) + + await self.readable.wait() diff --git a/synapse/lib/lmdbslab.py b/synapse/lib/lmdbslab.py index f50569c7e4..874759c036 100644 --- a/synapse/lib/lmdbslab.py +++ b/synapse/lib/lmdbslab.py @@ -2,6 +2,7 @@ import shutil import asyncio import threading +import contextlib import collections import logging @@ -19,6 +20,7 @@ import synapse.lib.const as s_const import synapse.lib.nexus as s_nexus import synapse.lib.msgpack as s_msgpack +import synapse.lib.fifofile as s_fifofile import synapse.lib.thishost as s_thishost import synapse.lib.thisplat as s_thisplat import synapse.lib.slabseqn as s_slabseqn @@ -762,6 +764,101 @@ def _mapsizeround(size): return _roundup(size, MAX_DOUBLE_SIZE) +class Optimizer(s_base.Base): + + async def __anit__(self, slab, dirn=None): + + await s_base.Base.__anit__(self) + + self.slab = slab + self.slab.onfini(self) + + self.size = 0 + self.count = 0 + self.synced = 0 + + self.fifo = await s_fifofile.FifoFile.anit() + + self.optslab = None + self.optpath = f'{self.slab.path}_optimizing' + + async def fini(): + + await self.fifo.fini() + if self.optslab is not None: + await self.optslab.fini() + shutil.rmtree(self.optpath, ignore_errors=True) + + self.onfini(fini) + + def writeahead(self, xactops): + + packops = [] + for (func, args, kwargs) in xactops: + indx = self.slab.xactlooks.get(func) + assert indx is not None + packops.append((indx, args, kwargs)) + + try: + await self.fifo.put(packops) + except Exception as e: + logger.exception('Optimize Failed On') + # TODO: how do we kill the task? + # await self.fini() + + async def _backup(self): + # lockstep a new process which opens at the current + # transaction and fires a thread to run the copy + # (confirmed LMDB copy releases the GIL) + + s_common.gendir(self.optpath) + + # TODO: figure out how to run this in a thread... + with self.slab.lenv.begin(write=False) as txn: + self.slab.lenv.copy(self.optpath, compact=True, txn=txn) + + self.optslab = await Slab.anit(self.optpath) + + async def _catchup(self): + + while True: + + if self.fifo.count == 0 and not self.slab.dirty: + return + + indx, args, kwargs = await self.fifo.get() + self.slab.xactfuncs[indx](*args, **kwargs) + await asyncio.sleep(0) + + async def _switchup(self): + + await self.optslab.fini() + + [scan.bump() for scan in self.slab.scans] + + # how confident are we feeling? + # os.rename(self.slab.path, f'{self.slab.path}_old') + shutil.rmtree(self.slab.path, ignore_errors=True) + os.rename(self.optpath, self.slab.path) + + self.slab.reopen() + + async def optimize(self): + + logger.warning(f'Beginning Optimization: {self.slab.path}') + try: + await self.slab.sync() + + await self._backup() + await self._catchup() + await self._switchup() + + await self.fini() + + except Exception as e: + logger.exception('...optimization failed! Resuming normal operation.') + await self.fini() + class Slab(s_base.Base): ''' A "monolithic" LMDB instance for use in a asyncio loop thread. @@ -790,7 +887,7 @@ def getSlabsInDir(clas, dirn): if toppath == slab.path or slab.path.startswith(toppath + os.sep)] @classmethod - async def initSyncLoop(clas, inst): + async def initSyncLoop(clas): if clas.synctask is not None: return @@ -825,6 +922,20 @@ async def syncLoopOnce(clas): await slab.sync() await asyncio.sleep(0) + @contextlib.asynccontextmanager + async def optimizer(self): + + if self._optimizer is not None: + raise s_exc.SynErr(mesg='FIXME DUP') + + self._optimizer = await Optimizer.anit(self) + + yield self._optimizer + + await self._optimizer.fini() + + self._optimizer = None + @classmethod async def getSlabStats(clas): retn = [] @@ -858,6 +969,7 @@ async def __anit__(self, path, **kwargs): self.path = path self.optspath = s_common.switchext(path, ext='.opts.yaml') + self._optimizer = None # Make sure we don't have this lmdb DB open already. (This can lead to seg faults) if path in self.allslabs: @@ -883,8 +995,10 @@ async def __anit__(self, path, **kwargs): self.max_xactops_len = opts.pop('max_replay_log', 10000) self.recovering = False - opts.setdefault('max_dbs', 128) - opts.setdefault('writemap', True) + self.xactfuncs = [self.put, self.pop, self.delete, self.replace, self._putmulti] + self.xactlooks = {func: indx for (indx, func) in enumerate(self.xactfuncs)} + + self.setDefOpts(opts) self.maxsize = opts.pop('maxsize', None) self.growsize = opts.pop('growsize', self.DEFAULT_GROWSIZE) @@ -955,7 +1069,11 @@ async def memlockfini(): self.commitstats = collections.deque(maxlen=1000) # stores Tuple[time, replayloglen, commit time delta] if not self.readonly: - await Slab.initSyncLoop(self) + await Slab.initSyncLoop() + + def setDefOpts(self, opts): + opts.setdefault('max_dbs', 128) + opts.setdefault('writemap', True) def __repr__(self): return 'Slab: %r' % (self.path,) @@ -1087,6 +1205,9 @@ def _finiCoXact(self): self.xact.commit() + if self._optimizer: + self._optimizer.writeahead(self.xactops) + self.xactops.clear() del self.xact @@ -1236,7 +1357,7 @@ def _memorylockloop(self): self.locking_memory = False logger.debug('memory locking thread ended') - def initdb(self, name, dupsort=False, integerkey=False, dupfixed=False): + def initdb(self, name, dupsort=False): if name in self.dbnames: return name @@ -1246,11 +1367,10 @@ def initdb(self, name, dupsort=False, integerkey=False, dupfixed=False): if self.readonly: # In a readonly environment, we can't make our own write transaction, but we # can have the lmdb module create one for us by not specifying the transaction - db = self.lenv.open_db(name.encode('utf8'), create=False, dupsort=dupsort, integerkey=integerkey, - dupfixed=dupfixed) + db = self.lenv.open_db(name.encode('utf8'), create=False, dupsort=dupsort) else: - db = self.lenv.open_db(name.encode('utf8'), txn=self.xact, dupsort=dupsort, integerkey=integerkey, - dupfixed=dupfixed) + db = self.lenv.open_db(name.encode('utf8'), txn=self.xact, dupsort=dupsort) + self.dirty = True self.forcecommit() @@ -1582,6 +1702,37 @@ def scanByFullBack(self, db=None): yield from scan.iternext() + def reopen(self): + + [scan.bump() for scan in self.scans] + + if not self.readonly: + self._finiCoXact() + + self.lenv.close() + del self.lenv + + opts = s_common.yamlload(self.optspath) + self.setDefOpts(opts) + + self.lenv = lmdb.open(self.path, **opts) + + # FIXME: any other thing we need to repopulate? + self.mapsize = self.lenv.info()['map_size'] + + if not self.readonly: + self._initCoXact() + + dbnames = list(self.dbnames.items()) + + for (dbname, (_, dupsort)) in dbnames: + + if dbname is None: + continue + + self.dbnames.pop(dbname, None) + self.initdb(dbname, dupsort=dupsort) + def _initCoXact(self): try: self.xact = self.lenv.begin(write=not self.readonly) @@ -1594,6 +1745,9 @@ def _initCoXact(self): self.dirty = False def _logXactOper(self, func, *args, **kwargs): + + assert self.xactlooks.get(func) is not None + self.xactops.append((func, args, kwargs)) if len(self.xactops) == self.max_xactops_len: @@ -1819,16 +1973,21 @@ class Scan: db (str): name of open database on the slab ''' def __init__(self, slab, db): + self.db = db self.slab = slab - self.db, self.dupsort = slab.dbnames[db] + _, self.dupsort = slab.dbnames[db] self.atitem = None self.bumped = False self.curs = None + def getScanDb(self): + db, dupsort = self.slab.dbnames.get(self.db) + return db + def __enter__(self): self.slab._acqXactForReading() - self.curs = self.slab.xact.cursor(db=self.db) + self.curs = self.slab.xact.cursor(db=self.getScanDb()) self.slab.scans.add(self) return self @@ -1886,7 +2045,7 @@ def iternext(self): self.bumped = False - self.curs = self.slab.xact.cursor(db=self.db) + self.curs = self.slab.xact.cursor(db=self.getScanDb()) if not self.resume(): raise StopIteration diff --git a/synapse/lib/msgpack.py b/synapse/lib/msgpack.py index ecc50efed8..0b35678449 100644 --- a/synapse/lib/msgpack.py +++ b/synapse/lib/msgpack.py @@ -263,3 +263,8 @@ def getvars(varz): continue items.append(item) return dict(items) + +def unpacker(**kwargs): + opts = dict(unpacker_kwargs) + opts.update(kwargs) + return msgpack.Unpacker(**opts) diff --git a/synapse/tests/test_lib_lmdbslab.py b/synapse/tests/test_lib_lmdbslab.py index 71af0b4d16..ab533b4470 100644 --- a/synapse/tests/test_lib_lmdbslab.py +++ b/synapse/tests/test_lib_lmdbslab.py @@ -33,6 +33,76 @@ def __init__(self, *args, **kwargs): self._nowtime = 1000 s_t_utils.SynTest.__init__(self, *args, **kwargs) + async def test_lmdbslab_optimize(self): + + # A simple pass to test the Slab.optimize() function + with self.getTestDir() as dirn: + + path = os.path.join(dirn, 'test.lmdb') + async with await s_lmdbslab.Slab.anit(path) as slab: + + testdb = slab.initdb('woot') + dupsdb = slab.initdb('dups', dupsort=True) + + slab.put(b'foo', b'bar', db=testdb) + slab.put(b'baz', b'faz', db=testdb) + + slab.put(b'hehe', b'haha', db=dupsdb) + slab.put(b'hehe', b'hoho', db=dupsdb) + + # hold open scanners across the optimize + testscan = slab.scanByFull(db=testdb) + dupsscan = slab.scanByFull(db=dupsdb) + + self.eq((b'baz', b'faz'), next(testscan)) + self.eq((b'hehe', b'haha'), next(dupsscan)) + + async with slab.optimizer() as optimizer: + await optimizer.optimize() + + self.eq((b'foo', b'bar'), next(testscan)) + self.eq((b'hehe', b'hoho'), next(dupsscan)) + + # Another pass where we break the optimize() down into + # parts so we can test incremental commits and such... + with self.getTestDir() as dirn: + + path = os.path.join(dirn, 'test.lmdb') + async with await s_lmdbslab.Slab.anit(path) as slab: + + testdb = slab.initdb('test') + dupsdb = slab.initdb('dups', dupsort=True) + + slab.put(b'foo', b'bar', db=testdb) + slab.put(b'baz', b'faz', db=testdb) + + slab.put(b'hehe', b'haha', db=dupsdb) + slab.put(b'hehe', b'hoho', db=dupsdb) + + # hold open scanners across the optimize + testscan = slab.scanByFull(db=testdb) + dupsscan = slab.scanByFull(db=dupsdb) + + self.eq((b'baz', b'faz'), next(testscan)) + self.eq((b'hehe', b'haha'), next(dupsscan)) + + async with slab.optimizer() as optimizer: + + await optimizer._backup() + + # force a write to the WAL + slab.put(b'zap', b'zop', db=testdb) + await slab.sync() + + # and force a dirty slab + slab.put(b'zip', b'zop', db=testdb) + + await optimizer._catchup() + await optimizer._switchup() + + self.eq((b'foo', b'bar'), next(testscan)) + self.eq((b'hehe', b'hoho'), next(dupsscan)) + async def test_lmdbslab_scankeys(self): with self.getTestDir() as dirn: