Skip to content

Commit

Permalink
Merge branch 'timestamp-insertion'
Browse files Browse the repository at this point in the history
  • Loading branch information
kriszyp committed Oct 23, 2023
2 parents 3096c93 + aa4b8ca commit e4266b6
Show file tree
Hide file tree
Showing 17 changed files with 826 additions and 93 deletions.
22 changes: 16 additions & 6 deletions caching.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,18 @@ export const CachingStore = (Store, env) => {
// don't cache binary data, since it will be decoded on get
this.cache.delete(id);
return result;
}
// sync operation, immediately add to cache, otherwise keep it pinned in memory until it is committed
let entry = this.cache.setValue(id, value, !result || result.isSync ? 0 : -1);
}
let entry;
if (result?.isSync) {
// sync operation, immediately add to cache
if (result.result) // if it succeeds
entry = this.cache.setValue(id, value, 0);
else {
this.cache.delete(id);
return result;
} // sync failure
// otherwise keep it pinned in memory until it is committed
} else entry = this.cache.setValue(id, value, -1);
if (childTxnChanges)
childTxnChanges.add(id);
if (version !== undefined)
Expand All @@ -136,19 +145,20 @@ export const CachingStore = (Store, env) => {
return result;
}
putSync(id, value, version, ifVersion) {
let result = super.putSync(id, value, version, ifVersion);
if (id !== 'object') {
// sync operation, immediately add to cache, otherwise keep it pinned in memory until it is committed
if (value && typeof value === 'object') {
if (value && typeof value === 'object' || !result) {
let entry = this.cache.setValue(id, value);
if (childTxnChanges)
childTxnChanges.add(id);
if (version !== undefined) {
entry.version = typeof version === 'object' ? version.version : version;
}
} else // it is possible that a value used to exist here
} else // it is possible that a value used to exist here
this.cache.delete(id);
}
return super.putSync(id, value, version, ifVersion);
return result;
}
remove(id, ifVersion) {
this.cache.delete(id);
Expand Down
21 changes: 15 additions & 6 deletions dependencies/lmdb/libraries/liblmdb/lmdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -1040,12 +1040,20 @@ int mdb_env_set_userctx(MDB_env *env, void *ctx);
*/
void *mdb_env_get_userctx(MDB_env *env);

/** @brief A callback function for most LMDB assert() failures,
* called before printing the message and aborting.
*
* @param[in] env An environment handle returned by #mdb_env_create().
* @param[in] msg The assertion message, not including newline.
*/
/** @brief Get the metrics information associated with the #MDB_env.
*
* @param[in] env An environment handle returned by #mdb_env_create()
* @return The pointer set by #mdb_env_set_userctx().
*/
MDB_metrics *mdb_env_get_metrics(MDB_env *env);

/** @brief A callback function for most LMDB assert() failures,
* called before printing the message and aborting.
*
* @param[in] env An environment handle returned by #mdb_env_create().
* @param[in] msg The assertion message, not including newline.
*/

typedef void MDB_assert_func(MDB_env *env, const char *msg);

/** Set or reset the assert() callback of the environment.
Expand Down Expand Up @@ -1447,6 +1455,7 @@ int mdb_set_relctx(MDB_txn *txn, MDB_dbi dbi, void *ctx);
* </ul>
*/
int mdb_get_with_txn(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data, mdb_size_t *txn_id);
int mdb_direct_write(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, unsigned int offset, MDB_val *data);

int mdb_get(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data);

Expand Down
64 changes: 52 additions & 12 deletions dependencies/lmdb/libraries/liblmdb/mdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,7 @@ struct MDB_env {
MDB_val me_enckey; /**< key for env encryption */
#endif
void *me_userctx; /**< User-settable context */
MDB_metrics me_metrics; /**< Metrics tracking */
MDB_assert_func *me_assert_func; /**< Callback for assertion failures */
void *me_callback; /**< General callback */
int64_t boot_id;
Expand Down Expand Up @@ -3120,7 +3121,7 @@ mdb_env_sync0(MDB_env *env, int force, pgno_t numpgs)
}
}
if (env->me_flags & MDB_TRACK_METRICS) {
((MDB_metrics*) env->me_userctx)->time_sync += get_time64() - start;
env->me_metrics.time_sync += get_time64() - start;
}
return rc;
}
Expand Down Expand Up @@ -3390,7 +3391,7 @@ mdb_txn_renew0(MDB_txn *txn)
} else {
/* Not yet touching txn == env->me_txn0, it may be active */
if (env->me_flags & MDB_TRACK_METRICS) {
((MDB_metrics*) env->me_userctx)->clock_txn = get_time64();
env->me_metrics.clock_txn = get_time64();
}
if (ti) {
if (LOCK_MUTEX(rc, env, env->me_wmutex))
Expand All @@ -3403,8 +3404,8 @@ mdb_txn_renew0(MDB_txn *txn)
}
if (env->me_flags & MDB_TRACK_METRICS) {
uint64_t now = get_time64();
((MDB_metrics*) env->me_userctx)->time_start_txns += now - ((MDB_metrics*) env->me_userctx)->clock_txn;
((MDB_metrics*) env->me_userctx)->clock_txn = now;
env->me_metrics.time_start_txns += now - env->me_metrics.clock_txn;
env->me_metrics.clock_txn = now;
}
txn->mt_txnid++;
#if MDB_DEBUG
Expand Down Expand Up @@ -4347,9 +4348,9 @@ mdb_page_flush(MDB_txn *txn, int keep)
*/
CACHEFLUSH(env->me_map, txn->mt_next_pgno * env->me_psize, DCACHE);
if (env->me_flags & MDB_TRACK_METRICS) {
((MDB_metrics*) env->me_userctx)->writes += write_i;
((MDB_metrics*) env->me_userctx)->page_flushes++;
((MDB_metrics*) env->me_userctx)->pages_written += pagecount - keep;
env->me_metrics.writes += write_i;
env->me_metrics.page_flushes++;
env->me_metrics.pages_written += pagecount - keep;
}

#ifdef _WIN32
Expand Down Expand Up @@ -4393,7 +4394,7 @@ mdb_page_flush(MDB_txn *txn, int keep)
txn->mt_dirty_room += i - j;
dl[0].mid = j;
if (env->me_flags & MDB_TRACK_METRICS) {
((MDB_metrics*) env->me_userctx)->time_page_flushes += get_time64() - start;
env->me_metrics.time_page_flushes += get_time64() - start;
}
return MDB_SUCCESS;
}
Expand Down Expand Up @@ -4663,8 +4664,8 @@ mdb_txn_commit(MDB_txn *txn)
done:
//<lmdb-js>
if (env->me_flags & MDB_TRACK_METRICS) {
((MDB_metrics*) env->me_userctx)->time_during_txns += get_time64() - ((MDB_metrics*) env->me_userctx)->clock_txn;
((MDB_metrics*) env->me_userctx)->txns++;
env->me_metrics.time_during_txns += get_time64() - env->me_metrics.clock_txn;
env->me_metrics.txns++;
}
if ((txn->mt_flags & MDB_NOSYNC) && (env->me_flags & MDB_OVERLAPPINGSYNC)) {
MDB_txn sync_txn;
Expand Down Expand Up @@ -7922,6 +7923,38 @@ mdb_get_with_txn(MDB_txn *txn, MDB_dbi dbi,
MDB_CURSOR_UNREF(&mc, 1);
return rc;
}
int
mdb_direct_write(MDB_txn *txn, MDB_dbi dbi,
MDB_val *key, unsigned int offset, MDB_val *data)
{
if (txn->mt_env->me_flags & MDB_REMAP_CHUNKS) return -1;
MDB_val existing_data;
int rc = mdb_get_with_txn(txn, dbi, key, &existing_data, NULL);
if (rc == 0) {
if (data->mv_size > existing_data.mv_size) {
last_error = malloc(100);
sprintf(last_error, "Attempt to direct write beyond the size of the value");
return EINVAL;
}
MDB_env* env = txn->mt_env;
mdb_size_t file_offset = (char*)existing_data.mv_data - env->me_map + offset;
// if we discover that a direct write can only be safely atomically applied to a memory map if it fits into
// single word, verify that here on some OSes, we can apply logic here:
//if (file_offset >> 3 != (file_offset + data->mv_size - 1) >> 3)
// return -1;
#ifdef _WIN32
DWORD written;
OVERLAPPED ov;
memset(&ov, 0, sizeof(ov));
ov.Offset = file_offset;
rc = WriteFile(env->me_fd, data->mv_data, data->mv_size, &written, &ov);
#else
int written = pwrite(env->me_fd, data->mv_data, data->mv_size, file_offset);
#endif;
if (written < 0) rc = written;
}
return rc;
}

/** Find a sibling for a page.
* Replaces the page at the top of the cursor's stack with the
Expand Down Expand Up @@ -10707,7 +10740,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
}
MDB_env* env = txn->mt_env;
if (env->me_flags & MDB_TRACK_METRICS) {
((MDB_metrics*) env->me_userctx)->deletes++;
env->me_metrics.deletes++;
}
return mdb_del0(txn, dbi, key, data, 0);
}
Expand Down Expand Up @@ -11202,7 +11235,7 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN;
MDB_env* env = txn->mt_env;
if (env->me_flags & MDB_TRACK_METRICS) {
((MDB_metrics*) env->me_userctx)->puts++;
env->me_metrics.puts++;
}

mdb_cursor_init(&mc, txn, dbi, &mx);
Expand Down Expand Up @@ -11803,12 +11836,19 @@ mdb_env_set_userctx(MDB_env *env, void *ctx)
return MDB_SUCCESS;
}


void * ESECT
mdb_env_get_userctx(MDB_env *env)
{
return env ? env->me_userctx : NULL;
}

MDB_metrics * ESECT
mdb_env_get_metrics(MDB_env *env)
{
return env ? &env->me_metrics : NULL;
}

int ESECT
mdb_env_set_assert(MDB_env *env, MDB_assert_func *func)
{
Expand Down
3 changes: 3 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ import { levelup } from './level.js';
export { clearKeptObjects } from './native.js';
import { nativeAddon } from './native.js';
export let { noop } = nativeAddon;
export const TIMESTAMP_PLACEHOLDER = new Uint8Array([1,1,1,1,0,0,0,0]);
export const DIRECT_WRITE_PLACEHOLDER = new Uint8Array([1,1,1,2,0,0,0,0]);
export { open, openAsClass, getLastVersion, allDbs, getLastTxnId } from './open.js';
import { toBufferKey as keyValueToBuffer, compareKeys as compareKey, fromBufferKey as bufferToKeyValue } from 'ordered-binary';
import { open, openAsClass, getLastVersion } from './open.js';
export const TransactionFlags = {
ABORTABLE: 1,
SYNCHRONOUS_COMMIT: 2,

NO_SYNC_FLUSH: 0x10000,
};
export default {
Expand Down
5 changes: 4 additions & 1 deletion native.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { dirname, join, default as pathModule } from 'path';
import { fileURLToPath } from 'url';
import loadNAPI from 'node-gyp-build-optional-packages';
export let Env, Txn, Dbi, Compression, Cursor, getAddress, getBufferAddress, createBufferForAddress, clearKeptObjects, globalBuffer, setGlobalBuffer, arch, fs, os, onExit, tmpdir, lmdbError, path, EventEmitter, orderedBinary, MsgpackrEncoder, WeakLRUCache, setEnvMap, getEnvMap, getByBinary, detachBuffer, startRead, setReadCallback, write, position, iterate, prefetch, resetTxn, getCurrentValue, getCurrentShared, getStringByBinary, getSharedByBinary, getSharedBuffer, compress;
export let Env, Txn, Dbi, Compression, Cursor, getAddress, getBufferAddress, createBufferForAddress, clearKeptObjects, globalBuffer, setGlobalBuffer, arch, fs, os, onExit, tmpdir, lmdbError, path, EventEmitter, orderedBinary, MsgpackrEncoder, WeakLRUCache, setEnvMap, getEnvMap, getByBinary, detachBuffer, startRead, setReadCallback, write, position, iterate, prefetch, resetTxn, getCurrentValue, getCurrentShared, getStringByBinary, getSharedByBinary, getSharedBuffer, compress, directWrite, attemptLock, unlock;
path = pathModule;
let dirName = dirname(fileURLToPath(import.meta.url)).replace(/dist$/, '');
export let nativeAddon = loadNAPI(dirName);
Expand Down Expand Up @@ -61,6 +61,9 @@ export function setNativeFunctions(externals) {
iterate = externals.iterate;
position = externals.position;
resetTxn = externals.resetTxn;
directWrite = externals.directWrite;
attemptLock = externals.attemptLock;
unlock = externals.unlock;
getCurrentValue = externals.getCurrentValue;
getCurrentShared = externals.getCurrentShared;
getStringByBinary = externals.getStringByBinary;
Expand Down
18 changes: 9 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "lmdb",
"author": "Kris Zyp",
"version": "2.8.5",
"version": "2.9.0-beta.1",
"description": "Simple, efficient, scalable, high-performance LMDB interface",
"license": "MIT",
"repository": {
Expand Down Expand Up @@ -62,13 +62,13 @@
"build-js": "rollup -c",
"prepare": "rollup -c",
"before-publish": "rollup -c && prebuildify-ci download && node util/set-optional-deps.cjs && npm run test",
"prebuild-libc-musl": "ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --tag-libc --napi --platform-packages --target 16.18.0",
"prebuild-libc": "prebuildify-platform-packages --tag-libc --target 20.0.0 || true && prebuildify-platform-packages --tag-libc --target 18.15.0 && prebuildify-platform-packages --platform-packages --tag-libc --target 16.18.0 && ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --tag-libc --target 16.18.0",
"prebuild-macos": "prebuildify-platform-packages --target 20.0.0 && prebuildify-platform-packages --target 18.15.0 && prebuildify-platform-packages --platform-packages --target 16.18.0 && ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --target 16.18.0",
"prebuild-win32": "prebuildify-platform-packages --target 20.0.0 && prebuildify-platform-packages --target 18.15.0 && prebuildify-platform-packages --target 16.18.0 && set ENABLE_V8_FUNCTIONS=false&& prebuildify-platform-packages --napi --platform-packages --target 16.18.0",
"prebuild-libc-arm7": "ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --tag-libc --target 16.18.0",
"prebuildify": "prebuildify-platform-packages --napi --target 18.15.0",
"full-publish": "cd prebuilds/win32-x64 && npm publish --access public && cd ../darwin-x64 && npm publish --access public && cd ../darwin-arm64 && npm publish --access public && cd ../linux-x64 && npm publish --access public && cd ../linux-arm64 && npm publish --access public && cd ../linux-arm && npm publish --access public && cd ../.. && npm publish",
"prebuild-libc-musl": "ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --tag-libc --napi --platform-packages --target 18.17.1",
"prebuild-libc": "prebuildify-platform-packages --tag-libc --target 20.0.0 || true && prebuildify-platform-packages --platform-packages --tag-libc --target 18.17.1 && ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --tag-libc --target 18.17.1",
"prebuild-macos": "prebuildify-platform-packages --target 20.0.0 && prebuildify-platform-packages --platform-packages --target 18.17.1 && ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --target 18.17.1",
"prebuild-win32": "prebuildify-platform-packages --target 20.0.0 && prebuildify-platform-packages --target 18.17.1 && set ENABLE_V8_FUNCTIONS=false&& prebuildify-platform-packages --napi --platform-packages --target 18.17.1",
"prebuild-libc-arm7": "ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --tag-libc --target 18.17.1",
"prebuildify": "prebuildify-platform-packages --napi --target 18.17.1",
"full-publish": "cd prebuilds/win32-x64 && npm publish --tag dev --access public && cd ../darwin-x64 && npm publish --tag dev --access public && cd ../darwin-arm64 && npm publish --tag dev --access public && cd ../linux-x64 && npm publish --tag dev --access public && cd ../linux-arm64 && npm publish --tag dev --access public && cd ../linux-arm && npm publish --tag dev --access public && cd ../.. && npm publish --tag dev",
"recompile": "node-gyp clean && node-gyp configure && node-gyp build",
"test": "mocha test/**.test.js --expose-gc --recursive",
"deno-test": "deno run --allow-ffi --allow-write --allow-read --allow-env --allow-net --unstable test/deno.ts",
Expand All @@ -78,7 +78,7 @@
},
"gypfile": true,
"dependencies": {
"msgpackr": "^1.9.5",
"msgpackr": "^1.9.9",
"node-addon-api": "^6.1.0",
"node-gyp-build-optional-packages": "5.1.1",
"ordered-binary": "^1.4.1",
Expand Down
52 changes: 51 additions & 1 deletion read.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
import { RangeIterable } from './util/RangeIterable.js';
import { getAddress, Cursor, Txn, orderedBinary, lmdbError, getByBinary, setGlobalBuffer, prefetch, iterate, position as doPosition, resetTxn, getCurrentValue, getCurrentShared, getStringByBinary, globalBuffer, getSharedBuffer, startRead, setReadCallback } from './native.js';
import {
getAddress,
Cursor,
Txn,
orderedBinary,
lmdbError,
getByBinary,
setGlobalBuffer,
prefetch,
iterate,
position as doPosition,
resetTxn,
getCurrentValue,
getCurrentShared,
getStringByBinary,
globalBuffer,
getSharedBuffer,
startRead,
setReadCallback,
directWrite,
attemptLock,
unlock
} from './native.js';
import { saveKey } from './keys.js';
const IF_EXISTS = 3.542694326329068e-103;
const ITERATOR_DONE = { done: true, value: undefined };
Expand Down Expand Up @@ -259,6 +281,34 @@ export function addReadMethods(LMDBStore, {
};
}
},

directWrite(id, options) {
let rc;
let txn = env.writeTxn || (options && options.transaction) || (readTxnRenewed ? readTxn : renewReadTxn(this));
let keySize = this.writeKey(id, keyBytes, 0);
let dataOffset = (((keySize >> 3) + 1) << 3);
keyBytes.set(options.bytes, dataOffset)
rc = directWrite(this.dbAddress, keySize, options.offset, options.bytes.length, txn.address || 0);
if (rc < 0) lmdbError(rc);
},

attemptLock(id, version, callback) {
keyBytes.dataView.setUint32(0, this.db.dbi);
keyBytes.dataView.setFloat64(4, version);
let keySize = this.writeKey(id, keyBytes, 12);
return attemptLock(env.address, keySize, callback);
},

unlock(id, version, onlyCheck) {
keyBytes.dataView.setUint32(0, this.db.dbi);
keyBytes.dataView.setFloat64(4, version);
let keySize = this.writeKey(id, keyBytes, 12);
return unlock(env.address, keySize, onlyCheck);
},
hasLock(id, version) {
return this.unlock(id, version, true);
},

resetReadTxn() {
resetReadTxn();
},
Expand Down
Loading

0 comments on commit e4266b6

Please sign in to comment.