Skip to content

Commit

Permalink
Add support for associate user callbacks to be notified cross-thread …
Browse files Browse the repository at this point in the history
…and associated with user buffers
  • Loading branch information
kriszyp committed May 17, 2024
1 parent fca18a9 commit 90d3e36
Show file tree
Hide file tree
Showing 7 changed files with 557 additions and 270 deletions.
2 changes: 2 additions & 0 deletions native.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export let Env,
compress,
directWrite,
getUserSharedBuffer,
notifyUserCallbacks,
attemptLock,
unlock;
path = pathModule;
Expand Down Expand Up @@ -105,6 +106,7 @@ export function setNativeFunctions(externals) {
resetTxn = externals.resetTxn;
directWrite = externals.directWrite;
getUserSharedBuffer = externals.getUserSharedBuffer;
notifyUserCallbacks = externals.notifyUserCallbacks;
attemptLock = externals.attemptLock;
unlock = externals.unlock;
getCurrentValue = externals.getCurrentValue;
Expand Down
20 changes: 18 additions & 2 deletions read.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
setReadCallback,
directWrite,
getUserSharedBuffer,
notifyUserCallbacks,
attemptLock,
unlock,
} from './native.js';
Expand Down Expand Up @@ -391,10 +392,25 @@ export function addReadMethods(
if (rc < 0) lmdbError(rc);
},

getUserSharedBuffer(id, defaultBuffer) {
getUserSharedBuffer(id, defaultBuffer, options) {
let keySize;
if (options?.envKey) keySize = this.writeKey(id, keyBytes, 0);
else {
keyBytes.dataView.setUint32(0, this.db.dbi);
keySize = this.writeKey(id, keyBytes, 4);
}
return getUserSharedBuffer(
env.address,
keySize,
defaultBuffer,
options?.callback,
);
},

notifyUserCallbacks(id) {
keyBytes.dataView.setUint32(0, this.db.dbi);
let keySize = this.writeKey(id, keyBytes, 4);
return getUserSharedBuffer(env.address, keySize, defaultBuffer);
return notifyUserCallbacks(env.address, keySize);
},

attemptLock(id, version, callback) {
Expand Down
117 changes: 86 additions & 31 deletions src/env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ int32_t EnvWrap::toSharedBuffer(MDB_env* env, uint32_t* keyBuffer, MDB_val data
return -30001;
}

void notifyCallbacks(std::vector<napi_threadsafe_function> callbacks);
void notifyCallbacks(std::vector<napi_threadsafe_function> callbacks, bool release);

void EnvWrap::closeEnv(bool hasLock) {
if (!env)
Expand All @@ -679,12 +679,12 @@ void EnvWrap::closeEnv(bool hasLock) {
// unlock any record locks held by this thread/EnvWrap
ExtendedEnv* extended_env = (ExtendedEnv*) mdb_env_get_userctx(env);
pthread_mutex_lock(&extended_env->locksModificationLock);
auto it = extended_env->lock_callbacks.begin();
while (it != extended_env->lock_callbacks.end())
auto it = extended_env->lockCallbacks.begin();
while (it != extended_env->lockCallbacks.end())
{
if (it->second.ew == this) {
notifyCallbacks(it->second.callbacks);
it = extended_env->lock_callbacks.erase(it);
notifyCallbacks(it->second.callbacks, true);
it = extended_env->lockCallbacks.erase(it);
} else ++it;
}
pthread_mutex_unlock(&extended_env->locksModificationLock);
Expand Down Expand Up @@ -1053,9 +1053,11 @@ int32_t writeFFI(double ewPointer, uint64_t instructionAddress) {
}
ExtendedEnv::ExtendedEnv() {
pthread_mutex_init(&locksModificationLock, nullptr);
pthread_mutex_init(&userBuffersLock, nullptr);
}
ExtendedEnv::~ExtendedEnv() {
pthread_mutex_destroy(&locksModificationLock);
pthread_mutex_destroy(&userBuffersLock);
}
uint64_t ExtendedEnv::getNextTime() {
uint64_t next_time_int = next_time_double();
Expand All @@ -1066,7 +1068,7 @@ uint64_t ExtendedEnv::getLastTime() {
return bswap_64(lastTime);
}
NAPI_FUNCTION(getUserSharedBuffer) {
ARGS(3)
ARGS(4)
GET_INT64_ARG(0)
EnvWrap* ew = (EnvWrap*) i64;
uint32_t size;
Expand All @@ -1075,35 +1077,82 @@ NAPI_FUNCTION(getUserSharedBuffer) {
napi_get_arraybuffer_info(env, args[2], &default_buffer.mv_data, &default_buffer.mv_size);
ExtendedEnv* extend_env = (ExtendedEnv*) mdb_env_get_userctx(ew->env);
std::string key(ew->keyBuffer, size);
// get an incrementer with the key, starting value, and convert pointer to an array buffer
MDB_val buffer = extend_env->getUserSharedBuffer(key, default_buffer, env);
napi_value as_bool;
napi_coerce_to_bool(env, args[3], &as_bool);
bool has_callback;
napi_get_value_bool(env, as_bool, &has_callback);

// get a shared buffer with the key, starting value, and convert pointer to an array buffer
MDB_val buffer = extend_env->getUserSharedBuffer(key, default_buffer, args[3], has_callback, env);
if (buffer.mv_data == default_buffer.mv_data) return args[2];
napi_value return_value;
napi_create_external_arraybuffer(env, buffer.mv_data, buffer.mv_size, cleanupLMDB, buffer.mv_data, &return_value);
return return_value;
}

MDB_val ExtendedEnv::getUserSharedBuffer(std::string key, MDB_val default_buffer, napi_env env) {
pthread_mutex_lock(&locksModificationLock);
MDB_val ExtendedEnv::getUserSharedBuffer(std::string key, MDB_val default_buffer, napi_value func, bool has_callback, napi_env env) {
pthread_mutex_lock(&userBuffersLock);
auto resolution = userSharedBuffers.find(key);
bool found;
MDB_val user_shared_buffer;
if (resolution == userSharedBuffers.end()) {
userSharedBuffers.emplace(key, user_shared_buffer = default_buffer);
} else {
user_shared_buffer = resolution->second;
user_buffer_t user_shared_buffer;
user_shared_buffer.buffer = default_buffer;
resolution = userSharedBuffers.emplace(key, user_shared_buffer).first;
}
if (has_callback) {
napi_threadsafe_function callback;
napi_value resource;
napi_status status;
status = napi_create_object(env, &resource);
napi_value resource_name;
status = napi_create_string_latin1(env, "user-callback", NAPI_AUTO_LENGTH, &resource_name);
napi_create_threadsafe_function(env, func, resource, resource_name, 0, 1, nullptr, nullptr, nullptr, nullptr,
&callback);
napi_unref_threadsafe_function(env, callback);
resolution->second.callbacks.push_back(callback);
}
MDB_val buffer = resolution->second.buffer;
pthread_mutex_unlock(&userBuffersLock);
return buffer;
}
/**
* Notify the user callbacks associated with a user buffer for a given key
* @param key
* @param env
* @return
*/
bool ExtendedEnv::notifyUserCallbacks(std::string key) {
pthread_mutex_lock(&userBuffersLock);
auto resolution = userSharedBuffers.find(key);
bool found = resolution != userSharedBuffers.end();
if (found) {
notifyCallbacks(resolution->second.callbacks, false);
}
pthread_mutex_unlock(&locksModificationLock);
return user_shared_buffer;
pthread_mutex_unlock(&userBuffersLock);
return found;
}

NAPI_FUNCTION(notifyUserCallbacks) {
ARGS(2)
GET_INT64_ARG(0)
EnvWrap* ew = (EnvWrap*) i64;
uint32_t size;
GET_UINT32_ARG(size, 1);
ExtendedEnv* extend_env = (ExtendedEnv*) mdb_env_get_userctx(ew->env);
std::string key(ew->keyBuffer, size);
bool found = extend_env->notifyUserCallbacks(key);
napi_value return_value;
napi_get_boolean(env, found, &return_value);
return return_value;
}

bool ExtendedEnv::attemptLock(std::string key, napi_env env, napi_value func, bool has_callback, EnvWrap* ew) {
pthread_mutex_lock(&locksModificationLock);
auto resolution = lock_callbacks.find(key);
auto resolution = lockCallbacks.find(key);
bool found;
if (resolution == lock_callbacks.end()) {
if (resolution == lockCallbacks.end()) {
callback_holder_t callbacks;
callbacks.ew = ew;
lock_callbacks.emplace(key, callbacks);
lockCallbacks.emplace(key, callbacks);
found = true;
} else {
if (has_callback) {
Expand Down Expand Up @@ -1133,31 +1182,36 @@ NAPI_FUNCTION(attemptLock) {
napi_coerce_to_bool(env, args[2], &as_bool);
bool has_callback;
napi_get_value_bool(env, as_bool, &has_callback);
ExtendedEnv* lock_callbacks = (ExtendedEnv*) mdb_env_get_userctx(ew->env);
ExtendedEnv* extended_env = (ExtendedEnv*) mdb_env_get_userctx(ew->env);
std::string key(ew->keyBuffer, size);
bool result = lock_callbacks->attemptLock(key, env, args[2], has_callback, ew);
bool result = extended_env->attemptLock(key, env, args[2], has_callback, ew);
napi_value return_value;
napi_get_boolean(env, result, &return_value);
return return_value;
}
bool ExtendedEnv::unlock(std::string key, bool only_check) {
pthread_mutex_lock(&locksModificationLock);
auto resolution = lock_callbacks.find(key);
if (resolution == lock_callbacks.end()) {
auto resolution = lockCallbacks.find(key);
if (resolution == lockCallbacks.end()) {
pthread_mutex_unlock(&locksModificationLock);
return false;
}
if (!only_check) {
notifyCallbacks(resolution->second.callbacks);
lock_callbacks.erase(resolution);
notifyCallbacks(resolution->second.callbacks, true);
lockCallbacks.erase(resolution);
}
pthread_mutex_unlock(&locksModificationLock);
return true;
}
void notifyCallbacks(std::vector<napi_threadsafe_function> callbacks) {
void notifyCallbacks(std::vector<napi_threadsafe_function> callbacks, bool release) {
for (auto callback = callbacks.begin(); callback != callbacks.end();) {
napi_call_threadsafe_function(*callback, nullptr, napi_tsfn_blocking);
napi_release_threadsafe_function(*callback, napi_tsfn_release);
napi_status status = napi_call_threadsafe_function(*callback, nullptr, napi_tsfn_blocking);
if (status == napi_closing) { // if the callback is closing, we may need to remove it from our list
if (!release) // if we are releasing, we don't need to remove it
callback = callbacks.erase(callback);
continue;
} else if (release)
napi_release_threadsafe_function(*callback, napi_tsfn_release);
callback++;
}
}
Expand All @@ -1169,9 +1223,9 @@ NAPI_FUNCTION(unlock) {
GET_UINT32_ARG(size, 1);
bool only_check = false;
napi_get_value_bool(env, args[2], &only_check);
ExtendedEnv* lock_callbacks = (ExtendedEnv*) mdb_env_get_userctx(ew->env);
ExtendedEnv* extended_env = (ExtendedEnv*) mdb_env_get_userctx(ew->env);
std::string key(ew->keyBuffer, size);
bool result = lock_callbacks->unlock(key, only_check);
bool result = extended_env->unlock(key, only_check);
napi_value return_value;
napi_get_boolean(env, result, &return_value);
return return_value;
Expand Down Expand Up @@ -1256,6 +1310,7 @@ void EnvWrap::setupExports(Napi::Env env, Object exports) {
EXPORT_NAPI_FUNCTION("setTestRef", setTestRef);
EXPORT_NAPI_FUNCTION("getTestRef", getTestRef);
EXPORT_NAPI_FUNCTION("getUserSharedBuffer", getUserSharedBuffer);
EXPORT_NAPI_FUNCTION("notifyUserCallbacks", notifyUserCallbacks);
EXPORT_NAPI_FUNCTION("attemptLock", attemptLock);
EXPORT_NAPI_FUNCTION("unlock", unlock);
EXPORT_FUNCTION_ADDRESS("writePtr", writeFFI);
Expand Down
12 changes: 9 additions & 3 deletions src/lmdb-js.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,18 +282,24 @@ typedef struct callback_holder_t {
EnvWrap* ew;
std::vector<napi_threadsafe_function> callbacks;
} callback_holder_t;
typedef struct user_buffer_t {
MDB_val buffer;
std::vector<napi_threadsafe_function> callbacks;
} user_buffer_t;
class ExtendedEnv {
public:
ExtendedEnv();
~ExtendedEnv();
static MDB_txn* prefetchTxns[20];
static pthread_mutex_t* prefetchTxnsLock;
std::unordered_map<std::string, callback_holder_t> lock_callbacks;
std::unordered_map<std::string, MDB_val> userSharedBuffers;
std::unordered_map<std::string, callback_holder_t> lockCallbacks;
std::unordered_map<std::string, user_buffer_t> userSharedBuffers;
pthread_mutex_t locksModificationLock;
pthread_mutex_t userBuffersLock;
uint64_t lastTime; // actually encoded as double
uint64_t previousTime; // actually encoded as double
MDB_val getUserSharedBuffer(std::string key, MDB_val default_buffer, napi_env env);
MDB_val getUserSharedBuffer(std::string key, MDB_val default_buffer, napi_value func, bool has_callback, napi_env env);
bool notifyUserCallbacks(std::string key);
bool attemptLock(std::string key, napi_env env, napi_value func, bool has_callback, EnvWrap* ew);
bool unlock(std::string key, bool only_check);
uint64_t getNextTime();
Expand Down
20 changes: 15 additions & 5 deletions src/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,17 +498,22 @@ void WriteWorker::Write() {
mdb_txn_set_callback(txn, txn_callback, this);
}
#endif
bool had_changes = false;
if (rc || resultCode) {
fprintf(stderr, "do_write error %u %u\n", rc, resultCode);
mdb_txn_abort(txn);
} else
} else {
rc = mdb_txn_commit(txn);
#ifdef MDB_OVERLAPPINGSYNC
#endif
#ifdef MDB_EMPTY_TXN
if (rc == MDB_EMPTY_TXN)
rc = 0;
if (rc == MDB_EMPTY_TXN)
rc = 0;
else {
had_changes = true;
}
#else
had_changes = true;
#endif
}
if (!(*instructions & TXN_DELIMITER))
fprintf(stderr, "end write %p, next start %p NOT still valid %p\n", start, instructions, *instructions);
txn_callback(this, 1);
Expand All @@ -520,6 +525,11 @@ void WriteWorker::Write() {
}
*(instructions - 1) = txnId;
std::atomic_fetch_or((std::atomic<uint32_t>*) instructions, (uint32_t) TXN_COMMITTED);
if (had_changes) {
ExtendedEnv* extended_env = (ExtendedEnv*) mdb_env_get_userctx(env);
std::string key("__committed__");
extended_env->notifyUserCallbacks(key);
}
}

void write_progress(napi_env env,
Expand Down
13 changes: 13 additions & 0 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,19 @@ describe('lmdb-js', function () {
should.equal(defaultIncrementer[0], 6n);
should.equal(secondDefaultIncrementer[0], 0n);
});
it('getUserSharedBuffer with callbacks', async function () {
let shared_number = new Float64Array(1);
let notified;
let shared_buffer;
await new Promise((resolve) => {
db.getUserSharedBuffer('with-callback', shared_number.buffer, {
callback() {
resolve();
},
});
db.notifyUserCallbacks('with-callback');
});
});
it('prefetch', async function () {
await new Promise((resolve) => db.prefetch(['key1', 'key2'], resolve));
let key = '';
Expand Down
Loading

0 comments on commit 90d3e36

Please sign in to comment.