Skip to content

Commit

Permalink
Clean up threadsafe functions when an env is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
kriszyp committed May 22, 2024
1 parent 0b58a4b commit 163bf2d
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 7 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "lmdb",
"author": "Kris Zyp",
"version": "3.0.10",
"version": "3.0.11",
"description": "Simple, efficient, scalable, high-performance LMDB interface",
"license": "MIT",
"repository": {
Expand Down
29 changes: 24 additions & 5 deletions src/env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -678,14 +678,31 @@ void EnvWrap::closeEnv(bool hasLock) {
#ifdef MDB_OVERLAPPINGSYNC
// unlock any record locks held by this thread/EnvWrap
ExtendedEnv* extended_env = (ExtendedEnv*) mdb_env_get_userctx(env);
pthread_mutex_lock(&extended_env->userBuffersLock);
for (auto buffer_iter = extended_env->userSharedBuffers.begin(); buffer_iter != extended_env->userSharedBuffers.end();) {
for (auto callback_iter = buffer_iter->second.callbacks.begin(); callback_iter != buffer_iter->second.callbacks.end();) {
EnvWrap* context;
napi_get_threadsafe_function_context(*callback_iter, (void**) &context);
if (context == this) {
napi_release_threadsafe_function(*callback_iter, napi_tsfn_abort);
callback_iter = buffer_iter->second.callbacks.erase(callback_iter);
} else
callback_iter++;
}
if (buffer_iter->second.callbacks.size() == 0)
buffer_iter = extended_env->userSharedBuffers.erase(buffer_iter);
else
buffer_iter++;
}
pthread_mutex_unlock(&extended_env->userBuffersLock);
pthread_mutex_lock(&extended_env->locksModificationLock);
auto it = extended_env->lockCallbacks.begin();
while (it != extended_env->lockCallbacks.end())
{
if (it->second.ew == this) {
notifyCallbacks(it->second.callbacks, true);
it = extended_env->lockCallbacks.erase(it);
} else ++it;
} else ++it; // TODO: we may want to remove any thread safe functions that are no longer valid
}
pthread_mutex_unlock(&extended_env->locksModificationLock);
#endif
Expand Down Expand Up @@ -1083,14 +1100,16 @@ NAPI_FUNCTION(getUserSharedBuffer) {
napi_get_value_bool(env, as_bool, &has_callback);

// get a shared buffer with the key, starting value, and convert pointer to an array buffer
MDB_val buffer = extend_env->getUserSharedBuffer(key, default_buffer, args[3], has_callback, env);
MDB_val buffer = extend_env->getUserSharedBuffer(key, default_buffer, args[3], has_callback, env, ew);
if (buffer.mv_data == default_buffer.mv_data) return args[2];
napi_value return_value;
napi_create_external_arraybuffer(env, buffer.mv_data, buffer.mv_size, cleanupLMDB, buffer.mv_data, &return_value);
return return_value;
}

MDB_val ExtendedEnv::getUserSharedBuffer(std::string key, MDB_val default_buffer, napi_value func, bool has_callback, napi_env env) {
/*napi_finalize cleanup_callback = [](napi_env env, void* data, void* buffer_info) {
// Data belongs to LMDB, we shouldn't free it here
}*/
MDB_val ExtendedEnv::getUserSharedBuffer(std::string key, MDB_val default_buffer, napi_value func, bool has_callback, napi_env env, EnvWrap* ew) {
pthread_mutex_lock(&userBuffersLock);
auto resolution = userSharedBuffers.find(key);
if (resolution == userSharedBuffers.end()) {
Expand All @@ -1105,7 +1124,7 @@ MDB_val ExtendedEnv::getUserSharedBuffer(std::string key, MDB_val default_buffer
status = napi_create_object(env, &resource);
napi_value resource_name;
status = napi_create_string_latin1(env, "user-callback", NAPI_AUTO_LENGTH, &resource_name);
napi_create_threadsafe_function(env, func, resource, resource_name, 0, 1, nullptr, nullptr, nullptr, nullptr,
napi_create_threadsafe_function(env, func, resource, resource_name, 0, 1, nullptr, nullptr, ew, nullptr,
&callback);
napi_unref_threadsafe_function(env, callback);
resolution->second.callbacks.push_back(callback);
Expand Down
2 changes: 1 addition & 1 deletion src/lmdb-js.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ class ExtendedEnv {
pthread_mutex_t userBuffersLock;
uint64_t lastTime; // actually encoded as double
uint64_t previousTime; // actually encoded as double
MDB_val getUserSharedBuffer(std::string key, MDB_val default_buffer, napi_value func, bool has_callback, napi_env env);
MDB_val getUserSharedBuffer(std::string key, MDB_val default_buffer, napi_value func, bool has_callback, napi_env env, EnvWrap* ew);
bool notifyUserCallbacks(std::string key);
bool attemptLock(std::string key, napi_env env, napi_value func, bool has_callback, EnvWrap* ew);
bool unlock(std::string key, bool only_check);
Expand Down

0 comments on commit 163bf2d

Please sign in to comment.