Skip to content

Commit

Permalink
Threads-RECV: Drop received packet if exceed max queue size.
Browse files Browse the repository at this point in the history
1. Print the number of recv/srtp queue packets.
2. Drop packet if exceed max recv queue size.
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent 9e554ca commit 6fca411
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 2 deletions.
3 changes: 3 additions & 0 deletions trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ threads {
# Whether enable the ASYNC RECV, recv udp packets in dedicate threads.
# Default: off
async_recv off;
# If exceed the max size of recv queue, drop the received packet.
# Default: 5000
max_recv_queue 5000;
# CPU set for affinity, for example:
# 0 means CPU0
# 0-3 means CPU0, CPU1, CPU2
Expand Down
17 changes: 17 additions & 0 deletions trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4202,6 +4202,23 @@ bool SrsConfig::get_threads_cpu_affinity(std::string label, int* start, int* end
return true;
}

int SrsConfig::get_threads_max_recv_queue()
{
static int DEFAULT = 5000;

SrsConfDirective* conf = root->get("threads");
if (!conf) {
return DEFAULT;
}

conf = conf->get("max_recv_queue");
if (!conf) {
return DEFAULT;
}

return ::atoi(conf->arg0().c_str());
}

vector<SrsConfDirective*> SrsConfig::get_stream_casters()
{
srs_assert(root);
Expand Down
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ class SrsConfig
virtual bool get_threads_async_srtp();
virtual bool get_threads_async_recv();
virtual bool get_threads_cpu_affinity(std::string label, int* start, int* end);
virtual int get_threads_max_recv_queue();
// stream_caster section
public:
// Get all stream_caster in config file.
Expand Down
46 changes: 44 additions & 2 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ srs_error_t SrsThreadPool::initialize()
r1 = pthread_getaffinity_np(pthread_self(), sizeof(entry->cpuset2), &entry->cpuset2);
#endif

if ((err = _srs_async_recv->initialize()) != srs_success) {
return srs_error_wrap(err, "init async recv");
}

interval_ = _srs_config->get_threads_interval();
bool async_srtp = _srs_config->get_threads_async_srtp();
srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64,
Expand Down Expand Up @@ -254,6 +258,12 @@ srs_error_t SrsThreadPool::run()
static char buf[128];
string async_logs = _srs_async_log->description();

string queue_desc;
if (true) {
snprintf(buf, sizeof(buf), ", queue=%d,%d,%d", _srs_async_recv->size(), _srs_async_srtp->size(), _srs_async_srtp->cooked_size());
queue_desc = buf;
}

string sync_desc;
_srs_thread_sync_10us->update(); _srs_thread_sync_100us->update();
_srs_thread_sync_1000us->update(); _srs_thread_sync_plus->update();
Expand All @@ -262,8 +272,8 @@ srs_error_t SrsThreadPool::run()
sync_desc = buf;
}

srs_trace("Thread: %s cycle threads=%d%s%s", entry_->name.c_str(), (int)threads_.size(),
async_logs.c_str(), sync_desc.c_str());
srs_trace("Thread: %s cycle threads=%d%s%s%s", entry_->name.c_str(), (int)threads_.size(),
async_logs.c_str(), sync_desc.c_str(), queue_desc.c_str());
}

return err;
Expand Down Expand Up @@ -722,6 +732,15 @@ void SrsAsyncSRTPManager::add_packet(SrsAsyncSRTPPacket* pkt)
packets_->push_back(pkt);
}

int SrsAsyncSRTPManager::size()
{
return packets_->size();
}
int SrsAsyncSRTPManager::cooked_size()
{
return cooked_packets_->size();
}

srs_error_t SrsAsyncSRTPManager::start(void* arg)
{
SrsAsyncSRTPManager* srtp = (SrsAsyncSRTPManager*)arg;
Expand Down Expand Up @@ -807,6 +826,7 @@ SrsAsyncRecvManager::SrsAsyncRecvManager()
lock_ = new SrsThreadMutex();
packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
handler_ = NULL;
max_recv_queue_ = 0;
}

// TODO: FIXME: We should stop the thread first, then free the manager.
Expand All @@ -822,6 +842,16 @@ SrsAsyncRecvManager::~SrsAsyncRecvManager()
}
}

srs_error_t SrsAsyncRecvManager::initialize()
{
srs_error_t err = srs_success;

max_recv_queue_ = _srs_config->get_threads_max_recv_queue();
srs_trace("AsyncRecv: Set max_queue=%d", max_recv_queue_);

return err;
}

void SrsAsyncRecvManager::set_handler(ISrsUdpMuxHandler* v)
{
handler_ = v;
Expand All @@ -833,6 +863,11 @@ void SrsAsyncRecvManager::add_listener(SrsThreadUdpListener* listener)
listeners_.push_back(listener);
}

int SrsAsyncRecvManager::size()
{
return packets_->size();
}

srs_error_t SrsAsyncRecvManager::start(void* arg)
{
SrsAsyncRecvManager* recv = (SrsAsyncRecvManager*)arg;
Expand All @@ -859,6 +894,13 @@ srs_error_t SrsAsyncRecvManager::do_start()

// TODO: FIXME: Use st_recvfrom to recv if thread-safe ST is ok.
int nread = listener->skt_->raw_recvfrom();

// Drop packet if exceed max recv queue size.
if ((int)packets_->size() >= max_recv_queue_) {
continue;
}

// If got packet, copy to the queue.
if (nread > 0) {
got_packet = true;
packets_->push_back(listener->skt_->copy());
Expand Down
5 changes: 5 additions & 0 deletions trunk/src/app/srs_app_threads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ class SrsAsyncSRTPManager
void register_task(SrsAsyncSRTPTask* task);
void remove_task(SrsAsyncSRTPTask* task);
void add_packet(SrsAsyncSRTPPacket* pkt);
int size();
int cooked_size();
static srs_error_t start(void* arg);
private:
srs_error_t do_start();
Expand Down Expand Up @@ -364,6 +366,7 @@ class SrsAsyncRecvManager
private:
ISrsUdpMuxHandler* handler_;
private:
int max_recv_queue_;
SrsThreadQueue<SrsUdpMuxSocket>* packets_;
private:
std::vector<SrsThreadUdpListener*> listeners_;
Expand All @@ -372,8 +375,10 @@ class SrsAsyncRecvManager
SrsAsyncRecvManager();
virtual ~SrsAsyncRecvManager();
public:
srs_error_t initialize();
void set_handler(ISrsUdpMuxHandler* v);
void add_listener(SrsThreadUdpListener* listener);
int size();
static srs_error_t start(void* arg);
private:
srs_error_t do_start();
Expand Down

0 comments on commit 6fca411

Please sign in to comment.