From 9e554ca06095b573583b78e6d8b49c30fa3bb09a Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 16 Mar 2021 19:26:44 +0800 Subject: [PATCH] Threads: Support cpu affinity for threads. 1. Config cpu_affinity in threads. 2. Default to not set the cpu affinity. 3. Support set by cpu range 0-63. --- trunk/conf/full.conf | 17 ++++++++ trunk/src/app/srs_app_config.cpp | 41 ++++++++++++++++++++ trunk/src/app/srs_app_config.hpp | 1 + trunk/src/app/srs_app_threads.cpp | 64 ++++++++++++++++++++++++++++--- trunk/src/app/srs_app_threads.hpp | 9 +++++ trunk/src/app/srs_app_utility.cpp | 2 +- 6 files changed, 128 insertions(+), 6 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 12321914a5..8988a0804b 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -127,6 +127,23 @@ threads { # Whether enable the ASYNC RECV, recv udp packets in dedicate threads. # Default: off async_recv off; + # CPU set for affinity, for example: + # 0 means CPU0 + # 0-3 means CPU0, CPU1, CPU2 + # 1-63 means all CPUs except CPU0 + # Default: 0-63 + cpu_affinity { + # For master thread manager. + master 0-63; + # For hybrid server or services. + hybrid 0-63; + # For log writing thread. + log 0-63; + # For SRTP encrypt/decrypt thread. + srtp 0-63; + # For UDP recv thread. + recv 0-63; + } } ############################################################################################# diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index bfb767a49c..6b11767ad7 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -4161,6 +4161,47 @@ bool SrsConfig::get_threads_async_recv() return SRS_CONF_PERFER_FALSE(conf->arg0()); } +bool SrsConfig::get_threads_cpu_affinity(std::string label, int* start, int* end) +{ + static int DEFAULT_START = 0; + static int DEFAULT_END = 63; + + *start = DEFAULT_START; + *end = DEFAULT_END; + + SrsConfDirective* conf = root->get("threads"); + if (!conf) { + return false; + } + + conf = conf->get("cpu_affinity"); + if (!conf) { + return false; + } + + conf = conf->get(label); + if (!conf) { + return false; + } + + string v = conf->arg0(); + size_t pos = v.find("-"); + if (pos == string::npos) { + *start = *end = ::atoi(v.c_str()); + return true; + } + + string sv = v.substr(0, pos); + string ev = v.substr(pos + 1); + if (!sv.empty()) { + *start = ::atoi(sv.c_str()); + } + if (!ev.empty()) { + *end = ::atoi(ev.c_str()); + } + return true; +} + vector SrsConfig::get_stream_casters() { srs_assert(root); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 50ebc4731c..4758485bb6 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -480,6 +480,7 @@ class SrsConfig virtual srs_utime_t get_threads_interval(); virtual bool get_threads_async_srtp(); virtual bool get_threads_async_recv(); + virtual bool get_threads_cpu_affinity(std::string label, int* start, int* end); // stream_caster section public: // Get all stream_caster in config file. diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index ae01164a70..0c576e13f9 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -40,6 +40,21 @@ SrsPps* _srs_thread_sync_100us = new SrsPps(); SrsPps* _srs_thread_sync_1000us = new SrsPps(); SrsPps* _srs_thread_sync_plus = new SrsPps(); +uint64_t srs_covert_cpuset(cpu_set_t v) +{ +#ifdef SRS_OSX + return v; +#else + uint64_t iv = 0; + for (int i = 0; i <= 63; i++) { + if (CPU_ISSET(i, &v)) { + iv |= uint64_t(1) << i; + } + } + return iv; +#endif +} + SrsThreadMutex::SrsThreadMutex() { // https://man7.org/linux/man-pages/man3/pthread_mutexattr_init.3.html @@ -88,6 +103,11 @@ SrsThreadEntry::SrsThreadEntry() num = 0; err = srs_success; + + // Set affinity mask to include CPUs 0 to 7 + CPU_ZERO(&cpuset); + CPU_ZERO(&cpuset2); + cpuset_ok = false; } SrsThreadEntry::~SrsThreadEntry() @@ -132,10 +152,29 @@ srs_error_t SrsThreadPool::initialize() return srs_error_wrap(err, "initialize st failed"); } + SrsThreadEntry* entry = (SrsThreadEntry*)entry_; +#ifndef SRS_OSX + // Load CPU affinity from config. + int cpu_start = 0, cpu_end = 0; + entry->cpuset_ok = _srs_config->get_threads_cpu_affinity("master", &cpu_start, &cpu_end); + for (int i = cpu_start; entry->cpuset_ok && i <= cpu_end; i++) { + CPU_SET(i, &entry->cpuset); + } +#endif + + int r0 = 0, r1 = 0; +#ifndef SRS_OSX + if (entry->cpuset_ok) { + r0 = pthread_setaffinity_np(pthread_self(), sizeof(entry->cpuset), &entry->cpuset); + } + r1 = pthread_getaffinity_np(pthread_self(), sizeof(entry->cpuset2), &entry->cpuset2); +#endif + interval_ = _srs_config->get_threads_interval(); bool async_srtp = _srs_config->get_threads_async_srtp(); - srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d", - entry_->num, entry_->label.c_str(), entry_->name.c_str(), srsu2msi(interval_), async_srtp); + srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64, + entry->num, entry->label.c_str(), entry->name.c_str(), srsu2msi(interval_), async_srtp, + entry->cpuset_ok, r0, srs_covert_cpuset(entry->cpuset), r1, srs_covert_cpuset(entry->cpuset2)); return err; } @@ -167,6 +206,15 @@ srs_error_t SrsThreadPool::execute(string label, srs_error_t (*start)(void* arg) snprintf(buf, sizeof(buf), "srs-%s-%d", entry->label.c_str(), entry->num); entry->name = buf; +#ifndef SRS_OSX + // Load CPU affinity from config. + int cpu_start = 0, cpu_end = 0; + entry->cpuset_ok = _srs_config->get_threads_cpu_affinity(label, &cpu_start, &cpu_end); + for (int i = cpu_start; entry->cpuset_ok && i <= cpu_end; i++) { + CPU_SET(i, &entry->cpuset); + } +#endif + // https://man7.org/linux/man-pages/man3/pthread_create.3.html pthread_t trd; int r0 = pthread_create(&trd, NULL, SrsThreadPool::start, entry); @@ -232,13 +280,19 @@ void* SrsThreadPool::start(void* arg) SrsThreadEntry* entry = (SrsThreadEntry*)arg; + int r0 = 0, r1 = 0; #ifndef SRS_OSX // https://man7.org/linux/man-pages/man3/pthread_setname_np.3.html - pthread_setname_np(entry->trd, entry->name.c_str()); + pthread_setname_np(pthread_self(), entry->name.c_str()); + if (entry->cpuset_ok) { + r0 = pthread_setaffinity_np(pthread_self(), sizeof(entry->cpuset), &entry->cpuset); + } + r1 = pthread_getaffinity_np(pthread_self(), sizeof(entry->cpuset2), &entry->cpuset2); #endif - srs_trace("Thread #%d: run with label=%s, name=%s", entry->num, - entry->label.c_str(), entry->name.c_str()); + srs_trace("Thread #%d: run with label=%s, name=%s, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64, entry->num, + entry->label.c_str(), entry->name.c_str(), entry->cpuset_ok, r0, srs_covert_cpuset(entry->cpuset), + r1, srs_covert_cpuset(entry->cpuset2)); if ((err = entry->start(entry->arg)) != srs_success) { entry->err = err; diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp index 2e68aa497c..cd7d401b43 100644 --- a/trunk/src/app/srs_app_threads.hpp +++ b/trunk/src/app/srs_app_threads.hpp @@ -74,6 +74,11 @@ class impl__SrsThreadLocker } }; +#ifdef SRS_OSX + typedef uint64_t cpu_set_t; + #define CPU_ZERO(p) *p = 0 +#endif + // The information for a thread. class SrsThreadEntry { @@ -89,6 +94,10 @@ class SrsThreadEntry pthread_t trd; // The exit error of thread. srs_error_t err; + // @see https://man7.org/linux/man-pages/man3/pthread_setaffinity_np.3.html + cpu_set_t cpuset; // Config value. + cpu_set_t cpuset2; // Actual value. + bool cpuset_ok; public: SrsThreadEntry(); virtual ~SrsThreadEntry(); diff --git a/trunk/src/app/srs_app_utility.cpp b/trunk/src/app/srs_app_utility.cpp index d1f781d37f..1961ab54ad 100644 --- a/trunk/src/app/srs_app_utility.cpp +++ b/trunk/src/app/srs_app_utility.cpp @@ -877,9 +877,9 @@ static SrsSnmpUdpStat _srs_snmp_udp_stat; void srs_update_udp_snmp_statistic() { +#ifndef SRS_OSX SrsSnmpUdpStat& r = _srs_snmp_udp_stat; -#ifndef SRS_OSX FILE* f = fopen("/proc/net/snmp", "r"); if (f == NULL) { return;