From 37aee44d0800347f75d2a2b58cf31a55082e197f Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 14 Mar 2021 08:55:17 +0800 Subject: [PATCH] Threads-Log: Support dual queue cache for async logs. 1. Create dual queue, the coroutine queue and thread queue. 2. The coroutine queue cache logs does not require lock. 3. When need to flush, flush the logs from coroutine-queue to thread-queue. 4. Finally, flush thread-queue to disk. --- trunk/conf/full.conf | 6 ++-- trunk/src/app/srs_app_config.hpp | 2 +- trunk/src/app/srs_app_log.cpp | 1 + trunk/src/app/srs_app_threads.cpp | 58 ++++++++++++++++++++++++------ trunk/src/app/srs_app_threads.hpp | 51 +++++++++++++++++++++++++- trunk/src/main/srs_main_server.cpp | 12 +++++++ 6 files changed, 115 insertions(+), 15 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 7290d08703..419c5bb802 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -41,9 +41,11 @@ srs_log_level trace; # when srs_log_tank is file, specifies the log file. # default: ./objs/srs.log srs_log_file ./objs/srs.log; -# The interval in ms, to flush async log. +# The interval in ms, to flush async log. Generally, we flush from +# coroutine-queue to thread-queue, then from thread-queue to disk. +# So the delay of logs might be 2*srs_log_flush_interval. # Default: 1300 -srs_log_async_interval 1300; +srs_log_flush_interval 1300; # the max connections. # if exceed the max connections, server will drop the new connection. # default: 1000 diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 36572a2742..741d0c556b 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -892,7 +892,7 @@ class SrsConfig virtual std::string get_log_level(); // Get the log file path. virtual std::string get_log_file(); - // Get the interval in ms to flush asyn log. + // Get the interval in ms to flush async log. virtual srs_utime_t srs_log_flush_interval(); // Whether ffmpeg log enabled virtual bool get_ff_log_enabled(); diff --git a/trunk/src/app/srs_app_log.cpp b/trunk/src/app/srs_app_log.cpp index af57b706ba..620833a270 100644 --- a/trunk/src/app/srs_app_log.cpp +++ b/trunk/src/app/srs_app_log.cpp @@ -61,6 +61,7 @@ SrsFileLog::~SrsFileLog() srs_freepa(log_data); } +// @remark Note that we should never write logs, because log is not ready not. srs_error_t SrsFileLog::initialize() { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index 789bf5c8b4..fa278b24e1 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -110,6 +110,7 @@ SrsThreadPool::~SrsThreadPool() srs_freep(lock_); } +// @remark Note that we should never write logs, because log is not ready not. srs_error_t SrsThreadPool::initialize() { srs_error_t err = srs_success; @@ -119,12 +120,7 @@ srs_error_t SrsThreadPool::initialize() return srs_error_wrap(err, "initialize st failed"); } - if ((err = _srs_async_log->initialize()) != srs_success) { - return srs_error_wrap(err, "init async log"); - } - interval_ = _srs_config->get_threads_interval(); - srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_)); return err; } @@ -169,12 +165,15 @@ srs_error_t SrsThreadPool::run() { srs_error_t err = srs_success; + // Write the init log here. + srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_)); + while (true) { + sleep(interval_ / SRS_UTIME_SECONDS); + string async_logs = _srs_async_log->description(); srs_trace("Thread #%d(%s): cycle threads=%d%s", entry_->num, entry_->label.c_str(), (int)threads_.size(), async_logs.c_str()); - - sleep(interval_ / SRS_UTIME_SECONDS); } return err; @@ -202,11 +201,14 @@ void* SrsThreadPool::start(void* arg) SrsThreadPool* _srs_thread_pool = new SrsThreadPool(); -SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p) +SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p, srs_utime_t interval) { filename_ = p; writer_ = new SrsFileWriter(); queue_ = new SrsThreadQueue(); + co_queue_ = new SrsCoroutineQueue(); + interval_ = interval; + last_flush_time_ = srs_get_system_time(); } // TODO: FIXME: Before free the writer, we must remove it from the manager. @@ -215,6 +217,7 @@ SrsAsyncFileWriter::~SrsAsyncFileWriter() // TODO: FIXME: Should we flush dirty logs? srs_freep(writer_); srs_freep(queue_); + srs_freep(co_queue_); } srs_error_t SrsAsyncFileWriter::open() @@ -246,7 +249,16 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite) SrsSharedPtrMessage* msg = new SrsSharedPtrMessage(); msg->wrap(cp, count); - queue_->push_back(msg); + co_queue_->push_back(msg); + + // Whether flush to thread-queue. + if (srs_get_system_time() - last_flush_time_ >= interval_) { + last_flush_time_ = srs_get_system_time(); + + vector flying; + co_queue_->swap(flying); + queue_->push_back(flying); + } if (pnwrite) { *pnwrite = count; @@ -303,6 +315,7 @@ srs_error_t SrsAsyncFileWriter::flush() SrsAsyncLogManager::SrsAsyncLogManager() { interval_ = 0; + reopen_ = false; lock_ = new SrsThreadMutex(); } @@ -318,6 +331,7 @@ SrsAsyncLogManager::~SrsAsyncLogManager() } } +// @remark Note that we should never write logs, because log is not ready not. srs_error_t SrsAsyncLogManager::initialize() { srs_error_t err = srs_success; @@ -327,10 +341,20 @@ srs_error_t SrsAsyncLogManager::initialize() return srs_error_new(ERROR_SYSTEM_LOGFILE, "invalid interval=%dms", srsu2msi(interval_)); } + return err; +} + +// @remark Now, log is ready, and we can print logs. +srs_error_t SrsAsyncLogManager::run() +{ + srs_error_t err = srs_success; + if ((err = _srs_thread_pool->execute("log", SrsAsyncLogManager::start, this)) != srs_success) { return srs_error_wrap(err, "run async log"); } + srs_trace("AsyncLogs: Init flush_interval=%dms", srsu2msi(interval_)); + return err; } @@ -338,7 +362,7 @@ srs_error_t SrsAsyncLogManager::create_writer(std::string filename, SrsAsyncFile { srs_error_t err = srs_success; - SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename); + SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename, interval_); writers_.push_back(writer); if ((err = writer->open()) != srs_success) { @@ -369,8 +393,20 @@ std::string SrsAsyncLogManager::description() max_logs = srs_max(max_logs, nn); } + int nn_co_logs = 0; + int max_co_logs = 0; + for (int i = 0; i < (int)writers_.size(); i++) { + SrsAsyncFileWriter* writer = writers_.at(i); + + int nn = (int)writer->co_queue_->size(); + nn_co_logs += nn; + max_co_logs = srs_max(max_co_logs, nn); + } + static char buf[128]; - snprintf(buf, sizeof(buf), ", files=%d, queue=%d/%d", (int)writers_.size(), nn_logs, max_logs); + snprintf(buf, sizeof(buf), ", files=%d, queue=%d/%d, coq=%d/%d", + (int)writers_.size(), nn_logs, max_logs, nn_co_logs, max_co_logs); + return buf; } diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp index ff0c757692..624d94e95e 100644 --- a/trunk/src/app/srs_app_threads.hpp +++ b/trunk/src/app/srs_app_threads.hpp @@ -116,6 +116,37 @@ class SrsThreadPool // The global thread pool. extern SrsThreadPool* _srs_thread_pool; +// We use coroutine queue to collect messages from different coroutines, +// then swap to the SrsThreadQueue and process by another thread. +template +class SrsCoroutineQueue +{ +private: + std::vector dirty_; +public: + SrsCoroutineQueue() { + } + virtual ~SrsCoroutineQueue() { + for (int i = 0; i < (int)dirty_.size(); i++) { + T* msg = dirty_.at(i); + srs_freep(msg); + } + } +public: + // SrsCoroutineQueue::push_back + void push_back(T* msg) { + dirty_.push_back(msg); + } + // SrsCoroutineQueue::swap + void swap(std::vector& flying) { + dirty_.swap(flying); + } + // SrsCoroutineQueue::size + size_t size() { + return dirty_.size(); + } +}; + // Thread-safe queue. template class SrsThreadQueue @@ -142,6 +173,11 @@ class SrsThreadQueue SrsThreadLocker(lock_); dirty_.push_back(msg); } + // SrsThreadQueue::push_back + void push_back(std::vector& flying) { + SrsThreadLocker(lock_); + dirty_.insert(dirty_.end(), flying.begin(), flying.end()); + } // SrsThreadQueue::swap void swap(std::vector& flying) { SrsThreadLocker(lock_); @@ -161,9 +197,18 @@ class SrsAsyncFileWriter : public ISrsWriter private: std::string filename_; SrsFileWriter* writer_; +private: + // The thread-queue, to flush to disk by dedicated thread. SrsThreadQueue* queue_; private: - SrsAsyncFileWriter(std::string p); + // The interval to flush from coroutine-queue to thread-queue. + srs_utime_t interval_; + // Last flush coroutine-queue time, to calculate the timeout. + srs_utime_t last_flush_time_; + // The coroutine-queue, to avoid requires lock for each log. + SrsCoroutineQueue* co_queue_; +private: + SrsAsyncFileWriter(std::string p, srs_utime_t interval); virtual ~SrsAsyncFileWriter(); public: // Open file writer, in truncate mode. @@ -188,6 +233,8 @@ class SrsAsyncLogManager private: // The async flush interval. srs_utime_t interval_; + // The number of logs to flush from coroutine-queue to thread-queue. + int flush_co_queue_; private: // The async reopen event. bool reopen_; @@ -200,6 +247,8 @@ class SrsAsyncLogManager public: // Initialize the async log manager. srs_error_t initialize(); + // Run the async log manager thread. + srs_error_t run(); // Create a managed writer, user should never free it. srs_error_t create_writer(std::string filename, SrsAsyncFileWriter** ppwriter); // Reopen all log files, asynchronously. diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 2cbb846f0b..5a88584360 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -131,6 +131,11 @@ srs_error_t do_main(int argc, char** argv) if ((err = _srs_config->initialize_cwd()) != srs_success) { return srs_error_wrap(err, "config cwd"); } + + // We must initialize the async log manager before log init. + if ((err = _srs_async_log->initialize()) != srs_success) { + return srs_error_wrap(err, "init async log"); + } // config parsed, initialize log. if ((err = _srs_log->initialize()) != srs_success) { @@ -469,6 +474,13 @@ srs_error_t run_in_thread_pool() return srs_error_wrap(err, "init thread pool"); } + // After all init(log, async log manager, thread pool), now we can start to + // run the log manager thread. + if ((err = _srs_async_log->run()) != srs_success) { + return srs_error_wrap(err, "run async log"); + } + + // Start the service worker thread, for RTMP and RTC server, etc. if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) { return srs_error_wrap(err, "run hybrid server"); }