Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make async_logger::flush() synchronous and wait for the flush to comp… #3049

Merged
merged 1 commit into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions include/spdlog/async_logger-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ SPDLOG_LOGGER_CATCH(msg.source)
}

// send flush request to the thread pool
SPDLOG_INLINE void spdlog::async_logger::flush_(){
SPDLOG_TRY{if (auto pool_ptr = thread_pool_.lock()){
pool_ptr->post_flush(shared_from_this(), overflow_policy_);
}
else {
SPDLOG_INLINE void spdlog::async_logger::flush_(){SPDLOG_TRY{auto pool_ptr = thread_pool_.lock();
if (!pool_ptr) {
throw_spdlog_ex("async flush: thread pool doesn't exist anymore");
}

std::future<void> future = pool_ptr->post_flush(shared_from_this(), overflow_policy_);
// Wait for the flush operation to complete.
// This might throw exception if the flush message get dropped because of overflow.
future.get();
}
SPDLOG_LOGGER_CATCH(source_loc())
}
Expand Down
11 changes: 8 additions & 3 deletions include/spdlog/details/thread_pool-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,13 @@ void SPDLOG_INLINE thread_pool::post_log(async_logger_ptr &&worker_ptr,
post_async_msg_(std::move(async_m), overflow_policy);
}

void SPDLOG_INLINE thread_pool::post_flush(async_logger_ptr &&worker_ptr,
async_overflow_policy overflow_policy) {
post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush), overflow_policy);
std::future<void> SPDLOG_INLINE thread_pool::post_flush(async_logger_ptr &&worker_ptr,
async_overflow_policy overflow_policy) {
std::promise<void> promise;
std::future<void> future = promise.get_future();
post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush, std::move(promise)),
overflow_policy);
return future;
}

size_t SPDLOG_INLINE thread_pool::overrun_counter() { return q_.overrun_counter(); }
Expand Down Expand Up @@ -108,6 +112,7 @@ bool SPDLOG_INLINE thread_pool::process_next_msg_() {
}
case async_msg_type::flush: {
incoming_async_msg.worker_ptr->backend_flush_();
incoming_async_msg.flush_promise.set_value();
return true;
}

Expand Down
19 changes: 16 additions & 3 deletions include/spdlog/details/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <chrono>
#include <functional>
#include <future>
#include <memory>
#include <thread>
#include <vector>
Expand All @@ -27,6 +28,7 @@ enum class async_msg_type { log, flush, terminate };
struct async_msg : log_msg_buffer {
async_msg_type msg_type{async_msg_type::log};
async_logger_ptr worker_ptr;
std::promise<void> flush_promise;

async_msg() = default;
~async_msg() = default;
Expand Down Expand Up @@ -56,12 +58,22 @@ struct async_msg : log_msg_buffer {
async_msg(async_logger_ptr &&worker, async_msg_type the_type, const details::log_msg &m)
: log_msg_buffer{m},
msg_type{the_type},
worker_ptr{std::move(worker)} {}
worker_ptr{std::move(worker)},
flush_promise{} {}

async_msg(async_logger_ptr &&worker, async_msg_type the_type)
: log_msg_buffer{},
msg_type{the_type},
worker_ptr{std::move(worker)} {}
worker_ptr{std::move(worker)},
flush_promise{} {}

async_msg(async_logger_ptr &&worker,
async_msg_type the_type,
std::promise<void> &&promise)
: log_msg_buffer{},
msg_type{the_type},
worker_ptr{std::move(worker)},
flush_promise{std::move(promise)} {}

explicit async_msg(async_msg_type the_type)
: async_msg{nullptr, the_type} {}
Expand All @@ -88,7 +100,8 @@ class SPDLOG_API thread_pool {
void post_log(async_logger_ptr &&worker_ptr,
const details::log_msg &msg,
async_overflow_policy overflow_policy);
void post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy);
std::future<void> post_flush(async_logger_ptr &&worker_ptr,
async_overflow_policy overflow_policy);
size_t overrun_counter();
void reset_overrun_counter();
size_t discard_counter();
Expand Down
43 changes: 43 additions & 0 deletions tests/test_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,49 @@ TEST_CASE("flush", "[async]") {
REQUIRE(test_sink->flush_counter() == 1);
}

TEST_CASE("multithread flush", "[async]") {
auto test_sink = std::make_shared<spdlog::sinks::test_sink_mt>();
size_t queue_size = 2;
size_t messages = 10;
size_t n_threads = 10;
size_t flush_count = 1024;
std::mutex mtx;
std::vector<std::string> errmsgs;
{
auto tp = std::make_shared<spdlog::details::thread_pool>(queue_size, 1);
auto logger = std::make_shared<spdlog::async_logger>(
"as", test_sink, tp, spdlog::async_overflow_policy::discard_new);

logger->set_error_handler([&](const std::string &) {
std::unique_lock<std::mutex> lock(mtx);
errmsgs.push_back("Broken promise");
});

for (size_t i = 0; i < messages; i++) {
logger->info("Hello message #{}", i);
}

std::vector<std::thread> threads;
for (size_t i = 0; i < n_threads; i++) {
threads.emplace_back([logger, flush_count] {
for (size_t j = 0; j < flush_count; j++) {
// flush does not throw exception even if failed.
// Instead, the error handler is invoked.
logger->flush();
}
});
}

for (auto &t : threads) {
t.join();
}
}
REQUIRE(test_sink->flush_counter() >= 1);
REQUIRE(test_sink->flush_counter() + errmsgs.size() == n_threads * flush_count);
REQUIRE(errmsgs.size() >= 1);
REQUIRE(errmsgs[0] == "Broken promise");
}

TEST_CASE("async periodic flush", "[async]") {
auto logger = spdlog::create_async<spdlog::sinks::test_sink_mt>("as");
auto test_sink = std::static_pointer_cast<spdlog::sinks::test_sink_mt>(logger->sinks()[0]);
Expand Down