Skip to content

Commit

Permalink
Threads: Support cpu affinity for threads.
Browse files Browse the repository at this point in the history
1. Config cpu_affinity in threads.
2. Default to not set the cpu affinity.
3. Support set by cpu range 0-63.
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent fa4c9f9 commit 9e554ca
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 6 deletions.
17 changes: 17 additions & 0 deletions trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,23 @@ threads {
# Whether enable the ASYNC RECV, recv udp packets in dedicate threads.
# Default: off
async_recv off;
# CPU set for affinity, for example:
# 0 means CPU0
# 0-3 means CPU0, CPU1, CPU2
# 1-63 means all CPUs except CPU0
# Default: 0-63
cpu_affinity {
# For master thread manager.
master 0-63;
# For hybrid server or services.
hybrid 0-63;
# For log writing thread.
log 0-63;
# For SRTP encrypt/decrypt thread.
srtp 0-63;
# For UDP recv thread.
recv 0-63;
}
}

#############################################################################################
Expand Down
41 changes: 41 additions & 0 deletions trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4161,6 +4161,47 @@ bool SrsConfig::get_threads_async_recv()
return SRS_CONF_PERFER_FALSE(conf->arg0());
}

bool SrsConfig::get_threads_cpu_affinity(std::string label, int* start, int* end)
{
static int DEFAULT_START = 0;
static int DEFAULT_END = 63;

*start = DEFAULT_START;
*end = DEFAULT_END;

SrsConfDirective* conf = root->get("threads");
if (!conf) {
return false;
}

conf = conf->get("cpu_affinity");
if (!conf) {
return false;
}

conf = conf->get(label);
if (!conf) {
return false;
}

string v = conf->arg0();
size_t pos = v.find("-");
if (pos == string::npos) {
*start = *end = ::atoi(v.c_str());
return true;
}

string sv = v.substr(0, pos);
string ev = v.substr(pos + 1);
if (!sv.empty()) {
*start = ::atoi(sv.c_str());
}
if (!ev.empty()) {
*end = ::atoi(ev.c_str());
}
return true;
}

vector<SrsConfDirective*> SrsConfig::get_stream_casters()
{
srs_assert(root);
Expand Down
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ class SrsConfig
virtual srs_utime_t get_threads_interval();
virtual bool get_threads_async_srtp();
virtual bool get_threads_async_recv();
virtual bool get_threads_cpu_affinity(std::string label, int* start, int* end);
// stream_caster section
public:
// Get all stream_caster in config file.
Expand Down
64 changes: 59 additions & 5 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,21 @@ SrsPps* _srs_thread_sync_100us = new SrsPps();
SrsPps* _srs_thread_sync_1000us = new SrsPps();
SrsPps* _srs_thread_sync_plus = new SrsPps();

uint64_t srs_covert_cpuset(cpu_set_t v)
{
#ifdef SRS_OSX
return v;
#else
uint64_t iv = 0;
for (int i = 0; i <= 63; i++) {
if (CPU_ISSET(i, &v)) {
iv |= uint64_t(1) << i;
}
}
return iv;
#endif
}

SrsThreadMutex::SrsThreadMutex()
{
// https://man7.org/linux/man-pages/man3/pthread_mutexattr_init.3.html
Expand Down Expand Up @@ -88,6 +103,11 @@ SrsThreadEntry::SrsThreadEntry()
num = 0;

err = srs_success;

// Set affinity mask to include CPUs 0 to 7
CPU_ZERO(&cpuset);
CPU_ZERO(&cpuset2);
cpuset_ok = false;
}

SrsThreadEntry::~SrsThreadEntry()
Expand Down Expand Up @@ -132,10 +152,29 @@ srs_error_t SrsThreadPool::initialize()
return srs_error_wrap(err, "initialize st failed");
}

SrsThreadEntry* entry = (SrsThreadEntry*)entry_;
#ifndef SRS_OSX
// Load CPU affinity from config.
int cpu_start = 0, cpu_end = 0;
entry->cpuset_ok = _srs_config->get_threads_cpu_affinity("master", &cpu_start, &cpu_end);
for (int i = cpu_start; entry->cpuset_ok && i <= cpu_end; i++) {
CPU_SET(i, &entry->cpuset);
}
#endif

int r0 = 0, r1 = 0;
#ifndef SRS_OSX
if (entry->cpuset_ok) {
r0 = pthread_setaffinity_np(pthread_self(), sizeof(entry->cpuset), &entry->cpuset);
}
r1 = pthread_getaffinity_np(pthread_self(), sizeof(entry->cpuset2), &entry->cpuset2);
#endif

interval_ = _srs_config->get_threads_interval();
bool async_srtp = _srs_config->get_threads_async_srtp();
srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d",
entry_->num, entry_->label.c_str(), entry_->name.c_str(), srsu2msi(interval_), async_srtp);
srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64,
entry->num, entry->label.c_str(), entry->name.c_str(), srsu2msi(interval_), async_srtp,
entry->cpuset_ok, r0, srs_covert_cpuset(entry->cpuset), r1, srs_covert_cpuset(entry->cpuset2));

return err;
}
Expand Down Expand Up @@ -167,6 +206,15 @@ srs_error_t SrsThreadPool::execute(string label, srs_error_t (*start)(void* arg)
snprintf(buf, sizeof(buf), "srs-%s-%d", entry->label.c_str(), entry->num);
entry->name = buf;

#ifndef SRS_OSX
// Load CPU affinity from config.
int cpu_start = 0, cpu_end = 0;
entry->cpuset_ok = _srs_config->get_threads_cpu_affinity(label, &cpu_start, &cpu_end);
for (int i = cpu_start; entry->cpuset_ok && i <= cpu_end; i++) {
CPU_SET(i, &entry->cpuset);
}
#endif

// https://man7.org/linux/man-pages/man3/pthread_create.3.html
pthread_t trd;
int r0 = pthread_create(&trd, NULL, SrsThreadPool::start, entry);
Expand Down Expand Up @@ -232,13 +280,19 @@ void* SrsThreadPool::start(void* arg)

SrsThreadEntry* entry = (SrsThreadEntry*)arg;

int r0 = 0, r1 = 0;
#ifndef SRS_OSX
// https://man7.org/linux/man-pages/man3/pthread_setname_np.3.html
pthread_setname_np(entry->trd, entry->name.c_str());
pthread_setname_np(pthread_self(), entry->name.c_str());
if (entry->cpuset_ok) {
r0 = pthread_setaffinity_np(pthread_self(), sizeof(entry->cpuset), &entry->cpuset);
}
r1 = pthread_getaffinity_np(pthread_self(), sizeof(entry->cpuset2), &entry->cpuset2);
#endif

srs_trace("Thread #%d: run with label=%s, name=%s", entry->num,
entry->label.c_str(), entry->name.c_str());
srs_trace("Thread #%d: run with label=%s, name=%s, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64, entry->num,
entry->label.c_str(), entry->name.c_str(), entry->cpuset_ok, r0, srs_covert_cpuset(entry->cpuset),
r1, srs_covert_cpuset(entry->cpuset2));

if ((err = entry->start(entry->arg)) != srs_success) {
entry->err = err;
Expand Down
9 changes: 9 additions & 0 deletions trunk/src/app/srs_app_threads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ class impl__SrsThreadLocker
}
};

#ifdef SRS_OSX
typedef uint64_t cpu_set_t;
#define CPU_ZERO(p) *p = 0
#endif

// The information for a thread.
class SrsThreadEntry
{
Expand All @@ -89,6 +94,10 @@ class SrsThreadEntry
pthread_t trd;
// The exit error of thread.
srs_error_t err;
// @see https://man7.org/linux/man-pages/man3/pthread_setaffinity_np.3.html
cpu_set_t cpuset; // Config value.
cpu_set_t cpuset2; // Actual value.
bool cpuset_ok;
public:
SrsThreadEntry();
virtual ~SrsThreadEntry();
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_utility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -877,9 +877,9 @@ static SrsSnmpUdpStat _srs_snmp_udp_stat;

void srs_update_udp_snmp_statistic()
{
#ifndef SRS_OSX
SrsSnmpUdpStat& r = _srs_snmp_udp_stat;

#ifndef SRS_OSX
FILE* f = fopen("/proc/net/snmp", "r");
if (f == NULL) {
return;
Expand Down

0 comments on commit 9e554ca

Please sign in to comment.