Skip to content

Commit

Permalink
Optimize a Slab while it's running
Browse files Browse the repository at this point in the history
  • Loading branch information
invisig0th committed Nov 13, 2024
1 parent 4e9811d commit 13ac65c
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 12 deletions.
70 changes: 70 additions & 0 deletions synapse/lib/fifofile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import io
import asyncio

import synapse.common as s_common

import synapse.lib.base as s_base
import synapse.lib.const as s_const

class FifoFile(s_base.Base):
'''
Use a file as an async FIFO.
'''
async def __anit__(self):

await s_base.Base.__anit__(self)

self.size = 0
self.offset = 0

self.count = 0
self.yielded = 0

self.lock = asyncio.Lock()
self.readable = asyncio.Event()
self.unpacker = s_msgpack.unpacker()

self.tempdir = self.enter_context(s_common.getTempDir())
self.fifopath = s_common.genpath(self.tempdir, 'fifo.mpk')
self.fifofile = await s_coro.executor(io.open, self.fifopath, 'wb')
self.fifofd = self.fifofile.fileno()

async def fini():
self.readable.set()

self.onfini(fini)

async def put(self, item):
byts = s_msgpack.en(item)
async with self.lock:
self.size += await s_coro.executor(os.pwrite, self.fifofd, self.size)
self.count += 1
self.readable.set()

async def get(self):

while True:

if self.isfini:
raise s_exc.IsFini()

async with self.lock:

while True:

try:
item = self.unpacker.unpack()
self.count -= 1
return item

except msgpack.exceptions.OutOfData:

if self.count == 0:
# TODO if we catch up, truncate the file?
# await s_coro.executor(self.fifofile.truncate)
break

byts = s_coro.executor(self.fifofile.read, s_const.mebibyte)
self.unpacker.feed(byts)

await self.readable.wait()
183 changes: 171 additions & 12 deletions synapse/lib/lmdbslab.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import shutil
import asyncio
import threading
import contextlib
import collections

import logging
Expand All @@ -19,6 +20,7 @@
import synapse.lib.const as s_const
import synapse.lib.nexus as s_nexus
import synapse.lib.msgpack as s_msgpack
import synapse.lib.fifofile as s_fifofile
import synapse.lib.thishost as s_thishost
import synapse.lib.thisplat as s_thisplat
import synapse.lib.slabseqn as s_slabseqn
Expand Down Expand Up @@ -762,6 +764,101 @@ def _mapsizeround(size):

return _roundup(size, MAX_DOUBLE_SIZE)

class Optimizer(s_base.Base):

async def __anit__(self, slab, dirn=None):

await s_base.Base.__anit__(self)

self.slab = slab
self.slab.onfini(self)

self.size = 0
self.count = 0
self.synced = 0

self.fifo = await s_fifofile.FifoFile.anit()

self.optslab = None
self.optpath = f'{self.slab.path}_optimizing'

async def fini():

await self.fifo.fini()
if self.optslab is not None:
await self.optslab.fini()
shutil.rmtree(self.optpath, ignore_errors=True)

self.onfini(fini)

def writeahead(self, xactops):

packops = []
for (func, args, kwargs) in xactops:
indx = self.slab.xactlooks.get(func)
assert indx is not None
packops.append((indx, args, kwargs))

try:
await self.fifo.put(packops)
except Exception as e:
logger.exception('Optimize Failed On')
# TODO: how do we kill the task?
# await self.fini()

async def _backup(self):
# lockstep a new process which opens at the current
# transaction and fires a thread to run the copy
# (confirmed LMDB copy releases the GIL)

s_common.gendir(self.optpath)

# TODO: figure out how to run this in a thread...
with self.slab.lenv.begin(write=False) as txn:
self.slab.lenv.copy(self.optpath, compact=True, txn=txn)

self.optslab = await Slab.anit(self.optpath)

async def _catchup(self):

while True:

if self.fifo.count == 0 and not self.slab.dirty:
return

indx, args, kwargs = await self.fifo.get()
self.slab.xactfuncs[indx](*args, **kwargs)
await asyncio.sleep(0)

async def _switchup(self):

await self.optslab.fini()

[scan.bump() for scan in self.slab.scans]

# how confident are we feeling?
# os.rename(self.slab.path, f'{self.slab.path}_old')
shutil.rmtree(self.slab.path, ignore_errors=True)
os.rename(self.optpath, self.slab.path)

self.slab.reopen()

async def optimize(self):

logger.warning(f'Beginning Optimization: {self.slab.path}')
try:
await self.slab.sync()

await self._backup()
await self._catchup()
await self._switchup()

await self.fini()

except Exception as e:
logger.exception('...optimization failed! Resuming normal operation.')
await self.fini()

class Slab(s_base.Base):
'''
A "monolithic" LMDB instance for use in a asyncio loop thread.
Expand Down Expand Up @@ -790,7 +887,7 @@ def getSlabsInDir(clas, dirn):
if toppath == slab.path or slab.path.startswith(toppath + os.sep)]

@classmethod
async def initSyncLoop(clas, inst):
async def initSyncLoop(clas):

if clas.synctask is not None:
return
Expand Down Expand Up @@ -825,6 +922,20 @@ async def syncLoopOnce(clas):
await slab.sync()
await asyncio.sleep(0)

@contextlib.asynccontextmanager
async def optimizer(self):

if self._optimizer is not None:
raise s_exc.SynErr(mesg='FIXME DUP')

self._optimizer = await Optimizer.anit(self)

yield self._optimizer

await self._optimizer.fini()

self._optimizer = None

@classmethod
async def getSlabStats(clas):
retn = []
Expand Down Expand Up @@ -858,6 +969,7 @@ async def __anit__(self, path, **kwargs):

self.path = path
self.optspath = s_common.switchext(path, ext='.opts.yaml')
self._optimizer = None

# Make sure we don't have this lmdb DB open already. (This can lead to seg faults)
if path in self.allslabs:
Expand All @@ -883,8 +995,10 @@ async def __anit__(self, path, **kwargs):
self.max_xactops_len = opts.pop('max_replay_log', 10000)
self.recovering = False

opts.setdefault('max_dbs', 128)
opts.setdefault('writemap', True)
self.xactfuncs = [self.put, self.pop, self.delete, self.replace, self._putmulti]
self.xactlooks = {func: indx for (indx, func) in enumerate(self.xactfuncs)}

self.setDefOpts(opts)

self.maxsize = opts.pop('maxsize', None)
self.growsize = opts.pop('growsize', self.DEFAULT_GROWSIZE)
Expand Down Expand Up @@ -955,7 +1069,11 @@ async def memlockfini():
self.commitstats = collections.deque(maxlen=1000) # stores Tuple[time, replayloglen, commit time delta]

if not self.readonly:
await Slab.initSyncLoop(self)
await Slab.initSyncLoop()

def setDefOpts(self, opts):
opts.setdefault('max_dbs', 128)
opts.setdefault('writemap', True)

def __repr__(self):
return 'Slab: %r' % (self.path,)
Expand Down Expand Up @@ -1087,6 +1205,9 @@ def _finiCoXact(self):

self.xact.commit()

if self._optimizer:
self._optimizer.writeahead(self.xactops)

self.xactops.clear()

del self.xact
Expand Down Expand Up @@ -1236,7 +1357,7 @@ def _memorylockloop(self):
self.locking_memory = False
logger.debug('memory locking thread ended')

def initdb(self, name, dupsort=False, integerkey=False, dupfixed=False):
def initdb(self, name, dupsort=False):

if name in self.dbnames:
return name
Expand All @@ -1246,11 +1367,10 @@ def initdb(self, name, dupsort=False, integerkey=False, dupfixed=False):
if self.readonly:
# In a readonly environment, we can't make our own write transaction, but we
# can have the lmdb module create one for us by not specifying the transaction
db = self.lenv.open_db(name.encode('utf8'), create=False, dupsort=dupsort, integerkey=integerkey,
dupfixed=dupfixed)
db = self.lenv.open_db(name.encode('utf8'), create=False, dupsort=dupsort)
else:
db = self.lenv.open_db(name.encode('utf8'), txn=self.xact, dupsort=dupsort, integerkey=integerkey,
dupfixed=dupfixed)
db = self.lenv.open_db(name.encode('utf8'), txn=self.xact, dupsort=dupsort)

self.dirty = True
self.forcecommit()

Expand Down Expand Up @@ -1582,6 +1702,37 @@ def scanByFullBack(self, db=None):

yield from scan.iternext()

def reopen(self):

[scan.bump() for scan in self.scans]

if not self.readonly:
self._finiCoXact()

self.lenv.close()
del self.lenv

opts = s_common.yamlload(self.optspath)
self.setDefOpts(opts)

self.lenv = lmdb.open(self.path, **opts)

# FIXME: any other thing we need to repopulate?
self.mapsize = self.lenv.info()['map_size']

if not self.readonly:
self._initCoXact()

dbnames = list(self.dbnames.items())

for (dbname, (_, dupsort)) in dbnames:

if dbname is None:
continue

self.dbnames.pop(dbname, None)
self.initdb(dbname, dupsort=dupsort)

def _initCoXact(self):
try:
self.xact = self.lenv.begin(write=not self.readonly)
Expand All @@ -1594,6 +1745,9 @@ def _initCoXact(self):
self.dirty = False

def _logXactOper(self, func, *args, **kwargs):

assert self.xactlooks.get(func) is not None

self.xactops.append((func, args, kwargs))

if len(self.xactops) == self.max_xactops_len:
Expand Down Expand Up @@ -1819,16 +1973,21 @@ class Scan:
db (str): name of open database on the slab
'''
def __init__(self, slab, db):
self.db = db
self.slab = slab
self.db, self.dupsort = slab.dbnames[db]
_, self.dupsort = slab.dbnames[db]

self.atitem = None
self.bumped = False
self.curs = None

def getScanDb(self):
db, dupsort = self.slab.dbnames.get(self.db)
return db

def __enter__(self):
self.slab._acqXactForReading()
self.curs = self.slab.xact.cursor(db=self.db)
self.curs = self.slab.xact.cursor(db=self.getScanDb())
self.slab.scans.add(self)
return self

Expand Down Expand Up @@ -1886,7 +2045,7 @@ def iternext(self):

self.bumped = False

self.curs = self.slab.xact.cursor(db=self.db)
self.curs = self.slab.xact.cursor(db=self.getScanDb())

if not self.resume():
raise StopIteration
Expand Down
5 changes: 5 additions & 0 deletions synapse/lib/msgpack.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,8 @@ def getvars(varz):
continue
items.append(item)
return dict(items)

def unpacker(**kwargs):
opts = dict(unpacker_kwargs)
opts.update(kwargs)
return msgpack.Unpacker(**opts)
Loading

0 comments on commit 13ac65c

Please sign in to comment.