Skip to content

Commit

Permalink
Protect against previous iterations that duplicate the same entry
Browse files Browse the repository at this point in the history
  • Loading branch information
kriszyp committed Jun 7, 2024
1 parent c88ab30 commit 2f0dc8f
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 40 deletions.
116 changes: 78 additions & 38 deletions dependencies/lmdb/libraries/liblmdb/mdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -2660,6 +2660,52 @@ mdb_page_dirty(MDB_txn *txn, MDB_page *mp)

const static int MAX_SCAN_SEGMENT = 50;


static void test_reversal(MDB_cursor *mc) {
MDB_txn *txn = mc->mc_txn;
MDB_env *env = txn->mt_env;
MDB_cursor m2;
mdb_cursor_init(&m2, txn, FREE_DBI, NULL);
MDB_val key, data;
MDB_page *np, *leaf;
txnid_t id, last_id = 0;
int rc;
rc = mdb_cursor_get(&m2, &key, NULL, MDB_LAST);
if (rc) {
if (rc == MDB_NOTFOUND) return;
fprintf(stderr, "Error getting last value %i\n", rc);
}
int tried_last = 0;
do {
rc = mdb_cursor_get(&m2, &key, NULL, MDB_PREV);
if (rc == MDB_NOTFOUND) {
//fprintf(stderr, "done \n");
return;
}
id = *((txnid_t *) key.mv_data);
np = m2.mc_pg[m2.mc_top];
leaf = NODEPTR(np, m2.mc_ki[m2.mc_top]);
if ((rc = mdb_node_read(&m2, leaf, &data)) != MDB_SUCCESS) {
fprintf(stderr, "Read node failure: %i\n", id);
}
if (last_id && id >= last_id) {
if (id == last_id) {
fprintf(stderr, "Duplicate error: %i\n", id);
if (!tried_last) {
tried_last = 1;
rc = mdb_cursor_get(&m2, &key, NULL, MDB_SET_RANGE);
if (rc) fprintf(stderr, "lookup error: %i\n", id);
id = *((txnid_t *) key.mv_data);
fprintf(stderr, "after Duplicate error: %i\n", id);
}
} else
fprintf(stderr, "Out of order error: %i > %i\n", id, last_id);
}
//fprintf(stderr, "key %i ", id);
last_id = id;
} while(!rc);
if (rc) fprintf(stderr, "Error getting prev value %i\n", rc);
}
/** Allocate page numbers and memory for writing. Maintain me_freelist_start,
* me_pghead and mt_next_pgno. Set #MDB_TXN_ERROR on failure.
*
Expand Down Expand Up @@ -2883,20 +2929,6 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
key.mv_size = sizeof(last);
rc = mdb_cursor_get(&m2, &key, NULL, op);
op = MDB_NEXT; // now iterate forwards through the txns of free list
if (rc) {
if (rc == MDB_NOTFOUND) { // if not found, go to the beginning of the range and look for older txns
if (env->me_freelist_state & MDB_FREELIST_DELETING) {
//fprintf(stderr, "Skipping unsafe backwards iteration 1\n");
break; // but can't iterate backwards in deletion mode
}
mdb_cursor_last(&m2, &key, NULL);
env->me_freelist_end = oldest;
op = MDB_PREV;
rc = mdb_cursor_get(&m2, &key, NULL, op);
}
if (rc && rc != MDB_NOTFOUND) goto fail;
}
mdb_cassert(&m2, key.mv_size > 0);
last = *(txnid_t *) key.mv_data;
} else {
// no more transactions to read going forward through newest, we are now going
Expand All @@ -2912,10 +2944,10 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
} else {
// we are now iterating through the free list entries
// first, we need to check if we are in free-list deletion mode, in which case it is not safe to iterate backwards and we have to bail out
if (op == MDB_PREV && (env->me_freelist_state & MDB_FREELIST_DELETING)) {
/*if (op == MDB_PREV && (env->me_freelist_state & MDB_FREELIST_DELETING)) {
//fprintf(stderr, "Skipping unsafe backwards iteration 1\n");
break;
}
}*/
// now iterate
rc = mdb_cursor_get(&m2, &key, NULL, op);
if (rc && rc != MDB_NOTFOUND)
Expand All @@ -2928,13 +2960,13 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
if (op == MDB_NEXT) {
// iterating forward from the freelist range to find newer transactions
if (last >= oldest || rc == MDB_NOTFOUND) {
if (!rc) env->me_freelist_end = oldest;
env->me_freelist_end = oldest;
// no more newer transactions, go to the beginning of the range and look for older txns
if (env->me_freelist_state & MDB_FREELIST_DELETING) break;
//if (env->me_freelist_state & MDB_FREELIST_DELETING) break;
op = MDB_SET_RANGE;
if (env->me_freelist_start <= 1) break; // should be no zero entry, break out
last = env->me_freelist_start - 1;
key.mv_data = &last; // start at the end of the freelist and read newer transactions free pages
key.mv_data = &last; // start at the start of the freelist and read older transactions free pages
key.mv_size = sizeof(last);
mdb_cursor_init(&m2, txn, FREE_DBI, NULL);
rc = mdb_cursor_get(&m2, &key, NULL, op);
Expand All @@ -2946,8 +2978,7 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
if (rc == MDB_NOTFOUND) break;
else goto fail;
}
}
goto fail;
} else goto fail;
}
mdb_cassert(&m2, key.mv_size > 0);
last = *(txnid_t*)key.mv_data;
Expand All @@ -2956,21 +2987,6 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
rc = MDB_NOTFOUND;
goto fail;
}
if (last >= env->me_freelist_start) {
// go to previous entry, through next iteration
rc = mdb_cursor_get(&m2, &key, NULL, op);
if (rc) {
if (rc == MDB_NOTFOUND) {
// reached the very start, mark it as 1
env->me_freelist_start = 1;
break;
}
goto fail;
} else
mdb_cassert(&m2, key.mv_size > 0);
last = *(txnid_t*)key.mv_data;
}
env->me_freelist_start = last;
} else
env->me_freelist_end = last + 1;
} else {
Expand All @@ -2981,6 +2997,24 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
}
env->me_freelist_start = last;
}
if (op == MDB_PREV) {
while (last >= env->me_freelist_start) {
// go to previous entry, through prev iteration
rc = mdb_cursor_get(&m2, &key, NULL, MDB_PREV);
if (rc) {
if (rc == MDB_NOTFOUND) {
// reached the very start, mark it as 1
env->me_freelist_start = 1;
break;
}
goto fail;
} else
mdb_cassert(&m2, key.mv_size > 0);
last = *(txnid_t *) key.mv_data;
}
if (rc == MDB_NOTFOUND) break;
env->me_freelist_start = last;
}
if (!last) {
fprintf(stderr, "Invalid null txn id\n");
rc = MDB_NOTFOUND;
Expand Down Expand Up @@ -4288,7 +4322,7 @@ mdb_freelist_save(MDB_txn *txn)
total_room = head_room = 0;
mdb_tassert(txn, head_id >= env->me_freelist_start);
//fprintf(stderr, "Deleting free list record %u\n", head_id);
env->me_freelist_state = MDB_FREELIST_DELETING; // signal that we are deleting from the freelist, which means we can't iterate backwards
//env->me_freelist_state = MDB_FREELIST_DELETING; // signal that we are deleting from the freelist, which means we can't iterate backwards
rc = mdb_cursor_del(&mc, 0);
env->me_freelist_state = 0;
if (rc) {
Expand Down Expand Up @@ -9949,6 +9983,10 @@ mdb_node_add(MDB_cursor *mc, indx_t indx,
node_size = EVEN(node_size + sizeof(MDB_ovpage));
if ((ssize_t)node_size > room)
goto full;
if (mc->mc_dbi == 0) {
fprintf(stderr,"added new key %i ", *((txnid_t *) key->mv_data));
test_reversal(mc);
}
if ((rc = mdb_page_new(mc, P_OVERFLOW, ovpages, &ofp)))
return rc;
DPRINTF(("allocated overflow page %"Yu, ofp->mp_pgno));
Expand Down Expand Up @@ -10509,7 +10547,6 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft)
if ((rc = mdb_page_touch(csrc)) ||
(rc = mdb_page_touch(cdst)))
return rc;

if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
key.mv_size = csrc->mc_db->md_pad;
key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top], key.mv_size);
Expand Down Expand Up @@ -10581,13 +10618,16 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft)

/* Add the node to the destination page.
*/
test_reversal(cdst);
rc = mdb_node_add(cdst, cdst->mc_ki[cdst->mc_top], &key, &data, srcpg, flags);
test_reversal(cdst);
if (rc != MDB_SUCCESS)
return rc;

/* Delete the node from the source page.
*/
mdb_node_del(csrc, key.mv_size);
test_reversal(cdst);

{
/* Adjust other cursors pointing to mp */
Expand Down
11 changes: 9 additions & 2 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,18 @@ describe('lmdb-js', function () {
let promise;
let additive = 'this is more text';
for (let i = 0; i < 7; i++) additive += additive;
let read_txn = db.useReadTransaction();
for (let i = 0; i < 5000; i++) {
if (Math.random() < 0.3) {
read_txn.done();
read_txn = db.useReadTransaction();
}
let text = 'this is a test';
while (random() < 0.95) text += additive;
promise = db.put(i % 10, text);
if (i % 16 == 0) {
if (random() < 0.4) promise = db.remove(i % 40);
else promise = db.put(i % 40, text);

if (i % 2 == 0) {
await promise;
}
}
Expand Down

0 comments on commit 2f0dc8f

Please sign in to comment.