Skip to content

Commit

Permalink
Cursor.getmulti() initital implementation (jnwatson#201)
Browse files Browse the repository at this point in the history
  • Loading branch information
willsthompson committed Sep 12, 2020
1 parent 62a36e8 commit 06c7387
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 0 deletions.
48 changes: 48 additions & 0 deletions lmdb/cffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2060,6 +2060,54 @@ def get(self, key, default=None):
return self.value()
return default

def getmulti(self, keys, dupdata=False, dupfixed_bytes=None):
"""Returns an iterable of `(key, value)` 2-tuples containing results
for each key in the iterable `keys`.
`keys`:
Iterable to read keys from.
`dupdata`:
If ``True`` and database was opened with `dupsort=True`, read
all duplicate values for each matching key.
`dupfixed_bytes`:
If database was opened with `dupsort=True` and `dupfixed=True`,
accepts the size of each value, in bytes, and applies an
optimization reducing the number of database lookups.
"""
if dupfixed_bytes and dupfixed_bytes < 0:
raise _error("dupfixed_bytes must be a positive integer.")
elif dupfixed_bytes and not dupfixed_bytes:
raise _error("dupdata is required for dupfixed_bytes.")

if dupfixed_bytes:
get_op = _lib.MDB_GET_MULTIPLE
next_op = _lib.MDB_NEXT_MULTIPLE
else:
get_op = _lib.MDB_GET_CURRENT
next_op = _lib.MDB_NEXT_DUP

for key in keys:
if self.set_key(key):
while self._valid:
self._cursor_get(get_op)
preload(self._val)
k = self._to_py(self._key)
v = self._to_py(self._val)

if dupfixed_bytes:
gen = (
(k, v[i:i+dupfixed_bytes])
for i in range(0, len(v), dupfixed_bytes))
for k, v in gen:
yield k, v
else:
yield k, v

if dupdata:
self._cursor_get(next_op)

def set_range(self, key):
"""Seek to the first key greater than or equal to `key`, returning
``True`` on success, or ``False`` to indicate key was past end of
Expand Down
153 changes: 153 additions & 0 deletions lmdb/cpython.c
Original file line number Diff line number Diff line change
Expand Up @@ -2094,6 +2094,158 @@ cursor_first_dup(CursorObject *self)
static PyObject *
cursor_value(CursorObject *self);

/**
* Cursor.getmulti() -> Iterable of (key, value)
*/
static PyObject *
cursor_get_multi(CursorObject *self, PyObject *args, PyObject *kwds)
{
struct cursor_get {
PyObject *keys;
int dupdata;
int dupfixed_bytes;
} arg = {Py_None, 0, 0};

int as_buffer;
PyObject *iter;
PyObject *item;
PyObject *tup;
PyObject *key;
PyObject *val;
PyObject *ret = PyList_New(0);
MDB_cursor_op get_op;
MDB_cursor_op next_op;
int i;
bool done;

static const struct argspec argspec[] = {
{"keys", ARG_OBJ, OFFSET(cursor_get, keys)},
{"dupdata", ARG_BOOL, OFFSET(cursor_get, dupdata)},
{"dupfixed_bytes", ARG_INT, OFFSET(cursor_get, dupfixed_bytes)} // ARG_SIZE?
};

static PyObject *cache = NULL;
if(parse_args(self->valid, SPECSIZE(), argspec, &cache, args, kwds, &arg)) {
return NULL;
}

if(arg.dupfixed_bytes < 0) {
return type_error("dupfixed_bytes must be a positive integer.");
}else if (arg.dupfixed_bytes > 0 && !arg.dupdata) {
return type_error("dupdata is required for dupfixed_bytes.");
}

if(! ((iter = PyObject_GetIter(arg.keys)))) {
return NULL;
}

/* Choose ops for dupfixed vs standard */
if(arg.dupfixed_bytes) {
get_op = MDB_GET_MULTIPLE;
next_op = MDB_NEXT_MULTIPLE;
} else {
get_op = MDB_GET_CURRENT;
next_op = MDB_NEXT_DUP;
}

as_buffer = self->trans->flags & TRANS_BUFFERS;

while((item = PyIter_Next(iter))) {
MDB_val mkey;

// validate item?

if(val_from_buffer(&mkey, item)) {
Py_DECREF(item);
Py_DECREF(iter);
return NULL;
} /* val_from_buffer sets exception */

self->key = mkey;
if(_cursor_get_c(self, MDB_SET_KEY)) { // MDB_SET?
Py_DECREF(item);
Py_DECREF(iter);
return NULL;
}

done = false;
while (!done) {
//TODO valid cursor check?

if(! self->positioned) {
done = true;
}
// TODO check for mutation and refresh key?
else if(_cursor_get_c(self, get_op)) {
Py_DECREF(item);
Py_DECREF(iter);
return NULL;
} else {
key = obj_from_val(&self->key, as_buffer);
PRELOAD_UNLOCKED(0, self->val.mv_data, self->val.mv_size);

if(!arg.dupfixed_bytes) {
/* Not dupfixed, MDB_GET_CURRENT returns single item */
val = obj_from_val(&self->val, as_buffer);
tup = PyTuple_New(2);

if (tup && key && val) {
PyTuple_SET_ITEM(tup, 0, key);
PyTuple_SET_ITEM(tup, 1, val);
PyList_Append(ret, tup);
Py_DECREF(tup);
} else {
Py_DECREF(key);
Py_DECREF(val);
Py_DECREF(tup);
}
} else {
/* dupfixed, MDB_GET_MULTIPLE returns batch, iterate values */
int len = (int) self->val.mv_size/arg.dupfixed_bytes; // size_t?
for(i=0; i<len; i++){
// TODO Handle as_buffer?
val = PyBytes_FromStringAndSize(
(char *) self->val.mv_data+(i*arg.dupfixed_bytes),
(Py_ssize_t) arg.dupfixed_bytes);
tup = PyTuple_New(2);

if (tup && key && val) {
Py_INCREF(key); // Hold key in loop
PyTuple_SET_ITEM(tup, 0, key);
PyTuple_SET_ITEM(tup, 1, val);
PyList_Append(ret, tup);
Py_DECREF(tup);
} else {
Py_DECREF(val);
Py_DECREF(tup);
}
}
Py_DECREF(key); // Release key
}

if(arg.dupdata){
if(_cursor_get_c(self, next_op)) {
Py_DECREF(item);
Py_DECREF(iter);
return NULL;
}
}
else {
done = true;
}
}
}
Py_DECREF(item);
}

Py_DECREF(iter);
if(PyErr_Occurred()) {
return NULL;
}

return ret;
}

/**
* Cursor.get() -> result
*/
Expand Down Expand Up @@ -2829,6 +2981,7 @@ static struct PyMethodDef cursor_methods[] = {
{"first", (PyCFunction)cursor_first, METH_NOARGS},
{"first_dup", (PyCFunction)cursor_first_dup, METH_NOARGS},
{"get", (PyCFunction)cursor_get, METH_VARARGS|METH_KEYWORDS},
{"getmulti", (PyCFunction)cursor_get_multi, METH_VARARGS|METH_KEYWORDS},
{"item", (PyCFunction)cursor_item, METH_NOARGS},
{"iternext", (PyCFunction)cursor_iternext, METH_VARARGS|METH_KEYWORDS},
{"iternext_dup", (PyCFunction)cursor_iternext_dup, METH_VARARGS|METH_KEYWORDS},
Expand Down

0 comments on commit 06c7387

Please sign in to comment.