Skip to content

Commit

Permalink
Add on thread start callback (#629)
Browse files Browse the repository at this point in the history
fixes: #476
  • Loading branch information
ofriedma authored and udi-speedb committed Dec 5, 2023
1 parent fd98fdd commit 4de6540
Show file tree
Hide file tree
Showing 23 changed files with 198 additions and 48 deletions.
5 changes: 5 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
### Enhancements
* db_bench: add estimate-table-readers-mem benchmark which prints these stats.

* A new option on_thread_start_callback has been added. It allows to set thread affinity or perform other optimizations (e.g. NUMA pinning) to speedb background threads.
An example file on_thread_start_callback_example.cc has been provided to demonstrate how to use this feature.



### Bug Fixes
* unit tests: fix GlobalWriteControllerTest.GlobalAndWBMSetupDelay by waiting for the memtable memory release.
* spdb memtable: use_seek_parallel_threshold option parameter mishandled (#570)
Expand Down
3 changes: 2 additions & 1 deletion build_tools/check-sources.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ git grep -n 'using namespace' -- ':!build_tools' ':!docs' \
':!third-party/folly/folly/lang/Align.h' \
':!third-party/gtest-1.8.1/fused-src/gtest/gtest.h' \
':!examples/speedb_with_ttl_example.cc' \
':!examples/enable_speedb_features_example.cc'
':!examples/enable_speedb_features_example.cc' \
':!examples/on_thread_start_callback_example.cc'
if [ "$?" != "1" ]; then
echo '^^^^ Do not use "using namespace"'
BAD=1
Expand Down
36 changes: 36 additions & 0 deletions db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2195,6 +2195,42 @@ TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSizeMultiLevelMerge) {
}
}

TEST_F(DBBasicTest, DBSetThreadAffinity) {
Options options = GetDefaultOptions();
std::string dbname = test::PerThreadDBPath("db_close_test");
ASSERT_OK(DestroyDB(dbname, options));

DB* db = nullptr;
TestEnv* env = new TestEnv(env_);
std::unique_ptr<TestEnv> local_env_guard(env);
options.create_if_missing = true;
options.env = env;
auto f = [](std::thread::native_handle_type thr) {
#if defined(OS_WIN)
#include "winbase.h"
SetThreadAffinityMask(thr, 0);
#else
#include "pthread.h"
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(0, &cpuset);
pthread_setaffinity_np(thr, sizeof(cpu_set_t), &cpuset);
#endif
};
options.on_thread_start_callback =
std::make_shared<std::function<void(std::thread::native_handle_type)>>(f);
Status s = DB::Open(options, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != nullptr);

s = db->Close();
ASSERT_EQ(env->GetCloseCount(), 1);
ASSERT_TRUE(s.IsIOError());

delete db;
ASSERT_EQ(env->GetCloseCount(), 1);
}

INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam,
testing::Combine(testing::Bool(), testing::Bool()));

Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1943,6 +1943,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
const bool seq_per_batch, const bool batch_per_txn) {
port::Thread::on_thread_start_callback = db_options.on_thread_start_callback;
Status s = ValidateOptionsByTable(db_options, column_families);
if (!s.ok()) {
return s;
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_spdb_impl_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class SpdbWriteImpl {
std::condition_variable flush_thread_cv_;
port::Mutex add_buffer_mutex_;
port::RWMutexWr flush_rwlock_;
std::thread flush_thread_;
port::Thread flush_thread_;
port::RWMutexWr wal_buffers_rwlock_;
port::Mutex wal_write_mutex_;
port::Mutex wb_list_mutex_;
Expand Down
39 changes: 12 additions & 27 deletions env/env_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ class PosixEnv : public CompositeEnv {
struct JoinThreadsOnExit {
explicit JoinThreadsOnExit(PosixEnv& _deflt) : deflt(_deflt) {}
~JoinThreadsOnExit() {
for (const auto tid : deflt.threads_to_join_) {
pthread_join(tid, nullptr);
for (auto& tid : deflt.threads_to_join_) {
pthread_join(tid.native_handle(), nullptr);
}
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
deflt.thread_pools_[pool_id].JoinAllThreads();
Expand Down Expand Up @@ -403,12 +403,12 @@ class PosixEnv : public CompositeEnv {
// members in te default instance
std::vector<ThreadPoolImpl> thread_pools_storage_;
pthread_mutex_t mu_storage_;
std::vector<pthread_t> threads_to_join_storage_;
std::vector<port::Thread> threads_to_join_storage_;
bool allow_non_owner_access_storage_;

std::vector<ThreadPoolImpl>& thread_pools_;
pthread_mutex_t& mu_;
std::vector<pthread_t>& threads_to_join_;
std::vector<port::Thread>& threads_to_join_;
// If true, allow non owner read access for db files. Otherwise, non-owner
// has no access to db files.
bool& allow_non_owner_access_;
Expand Down Expand Up @@ -457,33 +457,18 @@ int PosixEnv::ReleaseThreads(int threads_to_released, Priority pri) {
return thread_pools_[pri].ReleaseThreads(threads_to_released);
}

struct StartThreadState {
void (*user_function)(void*);
void* arg;
};

static void* StartThreadWrapper(void* arg) {
StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
state->user_function(state->arg);
delete state;
return nullptr;
}

void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
pthread_t t;
StartThreadState* state = new StartThreadState;
state->user_function = function;
state->arg = arg;
ThreadPoolImpl::PthreadCall(
"start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
threads_to_join_.push_back(t);
ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
auto thr = port::Thread(function, arg);
pthread_mutex_lock(&mu_);
threads_to_join_.push_back(std::move(thr));
pthread_mutex_unlock(&mu_);
}

void PosixEnv::WaitForJoin() {
for (const auto tid : threads_to_join_) {
pthread_join(tid, nullptr);
for (auto& thr : threads_to_join_) {
if (thr.joinable()) {
thr.join();
}
}
threads_to_join_.clear();
}
Expand Down
1 change: 1 addition & 0 deletions examples/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ transaction_example
rocksdb_backup_restore_example
speedb_is_awesome_example
speedb_with_ttl_example
on_thread_start_callback_example
5 changes: 5 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,8 @@ add_executable(speedb_is_awesome_example
speedb_is_awesome_example.cc)
target_link_libraries(speedb_is_awesome_example
${ROCKSDB_LIB})

add_executable(on_thread_start_callback_example
on_thread_start_callback_example.cc)
target_link_libraries(on_thread_start_callback_example
${ROCKSDB_LIB})
7 changes: 5 additions & 2 deletions examples/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ CFLAGS += -Wstrict-prototypes

all: simple_example column_families_example compaction_filter_example compact_files_example c_simple_example optimistic_transaction_example \
transaction_example options_file_example rocksdb_backup_restore_example speedb_is_awesome_example speedb_with_ttl_example \
enable_speedb_features_example
enable_speedb_features_example on_thread_start_callback_example

simple_example: static_lib simple_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
Expand Down Expand Up @@ -61,13 +61,16 @@ enable_speedb_features_example: static_lib enable_speedb_features_example.cc
speedb_with_ttl_example: static_lib speedb_with_ttl_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)

on_thread_start_callback_example: static_lib on_thread_start_callback_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)

rocksdb_backup_restore_example: static_lib rocksdb_backup_restore_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)

clean:
rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o \
./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example ./rocksdb_backup_restore_example \
./speedb_is_awesome_example ./speedb_with_ttl_example ./enable_speedb_features_example
./speedb_is_awesome_example ./speedb_with_ttl_example ./enable_speedb_features_example ./on_thread_start_callback_example

static_lib:
LIBNAME="$(LIBNAME)" $(MAKE) -C .. static_lib
72 changes: 72 additions & 0 deletions examples/on_thread_start_callback_example.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (C) 2023 Speedb Ltd. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <functional>
#include <iostream>
#include <memory>

#include "rocksdb/db.h"
#include "rocksdb/options.h"

using namespace ROCKSDB_NAMESPACE;

#if defined(OS_WIN)
std::string kDBPath = "C:\\Windows\\TEMP\\speedb_thr_affinity";
#else
std::string kDBPath = "/tmp/speedb_thr_affinity";
#endif

int main() {
// Open the storage
DB* db = nullptr;
Options options;
// create the DB if it's not already present
options.create_if_missing = true;
auto f = [](std::thread::native_handle_type thr) {
// callback to pin all Speedb threads to the first core.
#if defined(OS_WIN)
#include "winbase.h"
SetThreadAffinityMask(thr, 0);
#else
#include "pthread.h"
std::cout << "thread spawned, thread_id: " << thr << std::endl;
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(0, &cpuset);
pthread_setaffinity_np(thr, sizeof(cpu_set_t), &cpuset);
#endif
};
options.on_thread_start_callback =
std::make_shared<std::function<void(std::thread::native_handle_type)>>(f);
Status s = DB::Open(options, kDBPath, &db);
assert(s.ok());

// append new entry
std::string key = "key_1";
std::string put_value = "Speedb is awesome!";
s = db->Put(WriteOptions(), key, put_value);
assert(s.ok());

// retrieve entry
std::string get_value;
s = db->Get(ReadOptions(), key, &get_value);
assert(s.ok());
assert(get_value == put_value);
std::cout << get_value << std::endl;

// close DB
s = db->Close();
assert(s.ok());
return 0;
}
1 change: 1 addition & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "rocksdb/listener.h"
#include "rocksdb/metadata.h"
#include "rocksdb/options.h"
#include "rocksdb/port_defs.h"
#include "rocksdb/snapshot.h"
#include "rocksdb/sst_file_writer.h"
#include "rocksdb/thread_status.h"
Expand Down
1 change: 1 addition & 0 deletions include/rocksdb/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <limits>
#include <memory>
#include <string>
#include <thread>
#include <vector>

#include "rocksdb/customizable.h"
Expand Down
5 changes: 3 additions & 2 deletions include/rocksdb/memtablerep.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include <unordered_set>

#include "rocksdb/customizable.h"
#include "rocksdb/port_defs.h"
#include "rocksdb/slice.h"

namespace ROCKSDB_NAMESPACE {
Expand Down Expand Up @@ -319,7 +320,7 @@ class MemTableRepFactory : public Customizable {

void Init() {
switch_memtable_thread_ =
std::thread(&MemTableRepFactory::PrepareSwitchMemTable, this);
port::Thread(&MemTableRepFactory::PrepareSwitchMemTable, this);
// need to verify the thread was executed
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
Expand Down Expand Up @@ -420,7 +421,7 @@ class MemTableRepFactory : public Customizable {
bool enable_switch_memtable_ = false;

private:
std::thread switch_memtable_thread_;
port::Thread switch_memtable_thread_;
std::mutex switch_memtable_thread_mutex_;
std::condition_variable switch_memtable_thread_cv_;
std::atomic<bool> terminate_switch_memtable_ = false;
Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,8 @@ struct DBOptions {
// Defaults to check once per hour. Set to 0 to disable the task.
unsigned int refresh_options_sec = 60 * 60;
std::string refresh_options_file;
std::shared_ptr<std::function<void(std::thread::native_handle_type)>>
on_thread_start_callback = nullptr;
};

// Options to control the behavior of a database (passed to DB::Open)
Expand Down
38 changes: 37 additions & 1 deletion include/rocksdb/port_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

#pragma once

#include "rocksdb/rocksdb_namespace.h"
#include <functional>
#include <memory>
#include <thread>

#include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE {

namespace port {
Expand All @@ -22,5 +25,38 @@ enum class CpuPriority {
kNormal = 2,
kHigh = 3,
};
namespace port {
class ThreadWithCb {
public:
static std::shared_ptr<std::function<void(std::thread::native_handle_type)>>
on_thread_start_callback;
template <typename Function, typename... Args>
ThreadWithCb(Function&& func, Args&&... args) {
thread_ =
std::thread(std::forward<Function>(func), std::forward<Args>(args)...);
if (on_thread_start_callback) {
on_thread_start_callback->operator()(native_handle());
}
}

ThreadWithCb() {}
bool joinable() { return thread_.joinable(); }

void join() { thread_.join(); }

void detach() { thread_.detach(); }
std::thread::id get_id() { return thread_.get_id(); }
std::thread& operator=(std::thread&& __t) {
thread_ = std::move(__t);
return thread_;
}
std::thread::native_handle_type native_handle() {
return thread_.native_handle();
}

private:
std::thread thread_;
};
using Thread = ThreadWithCb;
} // namespace port
} // namespace ROCKSDB_NAMESPACE
3 changes: 2 additions & 1 deletion include/rocksdb/write_buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <vector>

#include "rocksdb/cache.h"
#include "rocksdb/port_defs.h"

namespace ROCKSDB_NAMESPACE {
struct Options;
Expand Down Expand Up @@ -454,7 +455,7 @@ class WriteBufferManager final {
// reason to wakeup. See the thread's code for more details
bool new_flushes_wakeup_ = false;

std::thread flushes_thread_;
port::Thread flushes_thread_;
bool terminate_flushes_thread_ = false;
};

Expand Down
Loading

0 comments on commit 4de6540

Please sign in to comment.