Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/342 logging multithreading #344

Merged
merged 2 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Change Log

## [0.0.40](https://github.com/ceph-dovecot/dovecot-ceph-plugin/tree/0.0.39) (2022-09-22)
## [0.0.41](https://github.com/ceph-dovecot/dovecot-ceph-plugin/tree/0.0.41) (2022-09-27)
- #342 multithreading bugfix and additional logging

## [0.0.40](https://github.com/ceph-dovecot/dovecot-ceph-plugin/tree/0.0.40) (2022-09-22)
- #342 multithreading object search for doveadm force-resync (feature toggle)
new config params:
# search method default = 0 | 1 multithreading
Expand Down
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
AC_PREREQ([2.59])


AC_INIT([dovecot-ceph-plugin], [0.0.40], [https://github.com/ceph-dovecot/dovecot-ceph-plugin/issues/new], ,[https://github.com/ceph-dovecot/dovecot-ceph-plugin])
AC_INIT([dovecot-ceph-plugin], [0.0.41], [https://github.com/ceph-dovecot/dovecot-ceph-plugin/issues/new], ,[https://github.com/ceph-dovecot/dovecot-ceph-plugin])


AC_CONFIG_AUX_DIR([.])
Expand Down
2 changes: 1 addition & 1 deletion rpm/dovecot-ceph-plugin.spec
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Name: dovecot-ceph-plugin
Summary: Dovecot Ceph RADOS plugins


Version: 0.0.40
Version: 0.0.41

Release: 0%{?dist}
URL: https://github.com/ceph-dovecot/dovecot-ceph-plugin
Expand Down
41 changes: 25 additions & 16 deletions src/librmb/rados-storage-impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,45 +198,54 @@ librados::NObjectIterator RadosStorageImpl::find_mails(const RadosMetadata *attr
**/
std::set<std::string> RadosStorageImpl::find_mails_async(const RadosMetadata *attr,
std::string &pool_name,
int num_threads){
int num_threads,
void (*ptr)(std::string&)){

std::set<std::string> oid_list;
std::mutex oid_list_mutex;

// Define a Lambda Expression
auto f = [](const std::vector<std::string> &list, std::mutex &oid_mutex, std::set<std::string> &oids, librados::IoCtx *io_ctx) {
auto f = [](const std::vector<std::string> &list, std::mutex &oid_mutex, std::set<std::string> &oids, librados::IoCtx *io_ctx,
void (*ptr)(std::string&), std::string osd) {

std::lock_guard<std::mutex> guard(oid_mutex);
for (auto const &pg: list) {

uint64_t ppool;
uint32_t pseed;
int r = sscanf(pg.c_str(), "%llu.%x", (long long unsigned *)&ppool, &pseed);

librados::NObjectIterator iter= io_ctx->nobjects_begin(pseed);

while (iter != librados::NObjectIterator::__EndObjectIterator) {
std::string oid = iter->get_oid();
oids.insert(oid);
{
std::lock_guard<std::mutex> guard(oid_mutex);
oids.insert(oid);
}
iter++;
}
}
}
std::string t = "osd "+ osd +" pg done " + pg;
(*ptr)(t);
}
std::string t = "done with osd "+ osd ;
(*ptr)(t);
};

//std::string pool_mame = "mail_storage";
std::map<std::string, std::vector<std::string>> osd_pg_map = cluster->list_pgs_osd_for_pool(pool_name);
std::vector<std::thread> threads;

for (const auto& x : osd_pg_map)
{
if(threads.size() == num_threads){
for (auto const &thread: threads) {
thread.join();
}
threads.clear();
for (const auto& x : osd_pg_map){
if(threads.size() == num_threads){
threads[0].join();
threads.erase(threads.begin());
}
threads.push_back(std::thread(f, std::ref(x.second),std::ref(oid_list_mutex),std::ref(oid_list), &get_io_ctx()));
threads.push_back(std::thread(f, std::ref(x.second),std::ref(oid_list_mutex),std::ref(oid_list), &get_io_ctx(), ptr, x.first));
std::string create_msg = "creating thread for osd: "+ x.first;
(*ptr)(create_msg);
}
for (auto const &thread: threads) {

for (auto const &thread: threads) {
thread.join();
}
return oid_list;
Expand Down
2 changes: 1 addition & 1 deletion src/librmb/rados-storage-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class RadosStorageImpl : public RadosStorage {
librados::ObjectWriteOperation *op) override;
librados::NObjectIterator find_mails(const RadosMetadata *attr) override;

std::set<std::string> find_mails_async(const RadosMetadata *attr, std::string &pool_name, int num_threads) override;
std::set<std::string> find_mails_async(const RadosMetadata *attr, std::string &pool_name, int num_threads, void (*ptr)(std::string&)) override;

int open_connection(const std::string &poolname) override;
int open_connection(const std::string &poolname, const std::string &clustername,
Expand Down
5 changes: 4 additions & 1 deletion src/librmb/rados-storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ class RadosStorage {
virtual librados::NObjectIterator find_mails(const RadosMetadata *attr) = 0;


virtual std::set<std::string> find_mails_async(const RadosMetadata *attr, std::string &pool_name, int num_threads) = 0;
virtual std::set<std::string> find_mails_async(const RadosMetadata *attr,
std::string &pool_name,
int num_threads,
void (*ptr)(std::string&)) = 0;


/*! open the rados connections with default cluster and username
Expand Down
8 changes: 7 additions & 1 deletion src/storage-rbox/rbox-sync-rebuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,9 @@ int find_inbox_mailbox_guid(struct mail_namespace *ns, std::string *mailbox_guid
return ret;
}

void cb(std::string &pg){
i_debug("processing: %s",pg.c_str());
}

int repair_namespace(struct mail_namespace *ns, bool force, struct rbox_storage *r_storage, std::map<std::string, std::list<librmb::RadosMail>> &rados_mails) {
FUNC_START();
Expand Down Expand Up @@ -447,6 +450,7 @@ int repair_namespace(struct mail_namespace *ns, bool force, struct rbox_storage
mail_index_lock_sync(box->index, "LOCKED_FOR_REPAIR");

if(rados_mails.size() == 0) {

if (rbox_open_rados_connection(box, false) < 0) {
i_error("rbox_sync_index_rebuild_objects: cannot open rados connection");
FUNC_END();
Expand All @@ -456,10 +460,12 @@ int repair_namespace(struct mail_namespace *ns, bool force, struct rbox_storage

std::set<std::string> mail_list;
std::string pool_name = r_storage->s->get_pool_name();

if( r_storage->config->get_object_search_method() == 1) {
mail_list = r_storage->s->find_mails_async(nullptr,
pool_name,
r_storage->config->get_object_search_threads());
r_storage->config->get_object_search_threads(),
&cb);
i_info("multithreading done");
}else{
i_info("Ceph connection established using namespace: %s",r_storage->s->get_namespace().c_str());
Expand Down
2 changes: 1 addition & 1 deletion src/tests/mocks/mock_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class RadosStorageMock : public RadosStorage {
MOCK_METHOD1(find_mails, librados::NObjectIterator(const RadosMetadata *attr));
MOCK_METHOD1(open_connection, int(const std::string &poolname));

MOCK_METHOD3(find_mails_async, std::set<std::string>(const RadosMetadata *attr, std::string &pool_name,int num_threads));
MOCK_METHOD4(find_mails_async, std::set<std::string>(const RadosMetadata *attr, std::string &pool_name,int num_threads, void (*ptr)(std::string&)));

MOCK_METHOD3(open_connection,
int(const std::string &poolname, const std::string &clustername, const std::string &rados_username));
Expand Down