Skip to content

Commit

Permalink
Bugfix/342 logging multithreading (#344)
Browse files Browse the repository at this point in the history
* #342 fix threading and additional logs

* version
  • Loading branch information
jrse committed Sep 27, 2022
1 parent 0be962b commit 7c0aed6
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 23 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Change Log

## [0.0.40](https://github.com/ceph-dovecot/dovecot-ceph-plugin/tree/0.0.39) (2022-09-22)
## [0.0.41](https://github.com/ceph-dovecot/dovecot-ceph-plugin/tree/0.0.41) (2022-09-27)
- #342 multithreading bugfix and additional logging

## [0.0.40](https://github.com/ceph-dovecot/dovecot-ceph-plugin/tree/0.0.40) (2022-09-22)
- #342 multithreading object search for doveadm force-resync (feature toggle)
new config params:
# search method default = 0 | 1 multithreading
Expand Down
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
AC_PREREQ([2.59])


AC_INIT([dovecot-ceph-plugin], [0.0.40], [https://github.com/ceph-dovecot/dovecot-ceph-plugin/issues/new], ,[https://github.com/ceph-dovecot/dovecot-ceph-plugin])
AC_INIT([dovecot-ceph-plugin], [0.0.41], [https://github.com/ceph-dovecot/dovecot-ceph-plugin/issues/new], ,[https://github.com/ceph-dovecot/dovecot-ceph-plugin])


AC_CONFIG_AUX_DIR([.])
Expand Down
2 changes: 1 addition & 1 deletion rpm/dovecot-ceph-plugin.spec
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Name: dovecot-ceph-plugin
Summary: Dovecot Ceph RADOS plugins


Version: 0.0.40
Version: 0.0.41

Release: 0%{?dist}
URL: https://github.com/ceph-dovecot/dovecot-ceph-plugin
Expand Down
41 changes: 25 additions & 16 deletions src/librmb/rados-storage-impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,45 +198,54 @@ librados::NObjectIterator RadosStorageImpl::find_mails(const RadosMetadata *attr
**/
std::set<std::string> RadosStorageImpl::find_mails_async(const RadosMetadata *attr,
std::string &pool_name,
int num_threads){
int num_threads,
void (*ptr)(std::string&)){

std::set<std::string> oid_list;
std::mutex oid_list_mutex;

// Define a Lambda Expression
auto f = [](const std::vector<std::string> &list, std::mutex &oid_mutex, std::set<std::string> &oids, librados::IoCtx *io_ctx) {
auto f = [](const std::vector<std::string> &list, std::mutex &oid_mutex, std::set<std::string> &oids, librados::IoCtx *io_ctx,
void (*ptr)(std::string&), std::string osd) {

std::lock_guard<std::mutex> guard(oid_mutex);
for (auto const &pg: list) {

uint64_t ppool;
uint32_t pseed;
int r = sscanf(pg.c_str(), "%llu.%x", (long long unsigned *)&ppool, &pseed);

librados::NObjectIterator iter= io_ctx->nobjects_begin(pseed);

while (iter != librados::NObjectIterator::__EndObjectIterator) {
std::string oid = iter->get_oid();
oids.insert(oid);
{
std::lock_guard<std::mutex> guard(oid_mutex);
oids.insert(oid);
}
iter++;
}
}
}
std::string t = "osd "+ osd +" pg done " + pg;
(*ptr)(t);
}
std::string t = "done with osd "+ osd ;
(*ptr)(t);
};

//std::string pool_mame = "mail_storage";
std::map<std::string, std::vector<std::string>> osd_pg_map = cluster->list_pgs_osd_for_pool(pool_name);
std::vector<std::thread> threads;

for (const auto& x : osd_pg_map)
{
if(threads.size() == num_threads){
for (auto const &thread: threads) {
thread.join();
}
threads.clear();
for (const auto& x : osd_pg_map){
if(threads.size() == num_threads){
threads[0].join();
threads.erase(threads.begin());
}
threads.push_back(std::thread(f, std::ref(x.second),std::ref(oid_list_mutex),std::ref(oid_list), &get_io_ctx()));
threads.push_back(std::thread(f, std::ref(x.second),std::ref(oid_list_mutex),std::ref(oid_list), &get_io_ctx(), ptr, x.first));
std::string create_msg = "creating thread for osd: "+ x.first;
(*ptr)(create_msg);
}
for (auto const &thread: threads) {

for (auto const &thread: threads) {
thread.join();
}
return oid_list;
Expand Down
2 changes: 1 addition & 1 deletion src/librmb/rados-storage-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class RadosStorageImpl : public RadosStorage {
librados::ObjectWriteOperation *op) override;
librados::NObjectIterator find_mails(const RadosMetadata *attr) override;

std::set<std::string> find_mails_async(const RadosMetadata *attr, std::string &pool_name, int num_threads) override;
std::set<std::string> find_mails_async(const RadosMetadata *attr, std::string &pool_name, int num_threads, void (*ptr)(std::string&)) override;

int open_connection(const std::string &poolname) override;
int open_connection(const std::string &poolname, const std::string &clustername,
Expand Down
5 changes: 4 additions & 1 deletion src/librmb/rados-storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ class RadosStorage {
virtual librados::NObjectIterator find_mails(const RadosMetadata *attr) = 0;


virtual std::set<std::string> find_mails_async(const RadosMetadata *attr, std::string &pool_name, int num_threads) = 0;
virtual std::set<std::string> find_mails_async(const RadosMetadata *attr,
std::string &pool_name,
int num_threads,
void (*ptr)(std::string&)) = 0;


/*! open the rados connections with default cluster and username
Expand Down
8 changes: 7 additions & 1 deletion src/storage-rbox/rbox-sync-rebuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,9 @@ int find_inbox_mailbox_guid(struct mail_namespace *ns, std::string *mailbox_guid
return ret;
}

void cb(std::string &pg){
i_debug("processing: %s",pg.c_str());
}

int repair_namespace(struct mail_namespace *ns, bool force, struct rbox_storage *r_storage, std::map<std::string, std::list<librmb::RadosMail>> &rados_mails) {
FUNC_START();
Expand Down Expand Up @@ -447,6 +450,7 @@ int repair_namespace(struct mail_namespace *ns, bool force, struct rbox_storage
mail_index_lock_sync(box->index, "LOCKED_FOR_REPAIR");

if(rados_mails.size() == 0) {

if (rbox_open_rados_connection(box, false) < 0) {
i_error("rbox_sync_index_rebuild_objects: cannot open rados connection");
FUNC_END();
Expand All @@ -456,10 +460,12 @@ int repair_namespace(struct mail_namespace *ns, bool force, struct rbox_storage

std::set<std::string> mail_list;
std::string pool_name = r_storage->s->get_pool_name();

if( r_storage->config->get_object_search_method() == 1) {
mail_list = r_storage->s->find_mails_async(nullptr,
pool_name,
r_storage->config->get_object_search_threads());
r_storage->config->get_object_search_threads(),
&cb);
i_info("multithreading done");
}else{
i_info("Ceph connection established using namespace: %s",r_storage->s->get_namespace().c_str());
Expand Down
2 changes: 1 addition & 1 deletion src/tests/mocks/mock_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class RadosStorageMock : public RadosStorage {
MOCK_METHOD1(find_mails, librados::NObjectIterator(const RadosMetadata *attr));
MOCK_METHOD1(open_connection, int(const std::string &poolname));

MOCK_METHOD3(find_mails_async, std::set<std::string>(const RadosMetadata *attr, std::string &pool_name,int num_threads));
MOCK_METHOD4(find_mails_async, std::set<std::string>(const RadosMetadata *attr, std::string &pool_name,int num_threads, void (*ptr)(std::string&)));

MOCK_METHOD3(open_connection,
int(const std::string &poolname, const std::string &clustername, const std::string &rados_username));
Expand Down

0 comments on commit 7c0aed6

Please sign in to comment.