From babe8e67876ac734e1020c3949fda778335033d1 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 6 Apr 2021 08:22:48 +0800 Subject: [PATCH] Threads-Hybrid: Start multiple hybrid threads --- trunk/src/app/srs_app_hybrid.cpp | 4 +++- trunk/src/app/srs_app_hybrid.hpp | 8 +++++++- trunk/src/app/srs_app_rtc_dtls.cpp | 3 --- trunk/src/app/srs_app_server.cpp | 3 ++- trunk/src/app/srs_app_threads.cpp | 8 ++++++++ trunk/src/main/srs_main_server.cpp | 17 +++++++++++++---- 6 files changed, 33 insertions(+), 10 deletions(-) diff --git a/trunk/src/app/srs_app_hybrid.cpp b/trunk/src/app/srs_app_hybrid.cpp index 028d112179..d3e347a3ad 100644 --- a/trunk/src/app/srs_app_hybrid.cpp +++ b/trunk/src/app/srs_app_hybrid.cpp @@ -146,6 +146,8 @@ SrsHybridServer::SrsHybridServer() timer_ = NULL; clock_monitor_ = new SrsClockWallMonitor(); + + stream_index_ = -1; } SrsHybridServer::~SrsHybridServer() @@ -444,5 +446,5 @@ srs_error_t SrsHybridServer::on_thread_message(SrsThreadMessage* msg, SrsThreadP return err; } -SrsHybridServer* _srs_hybrid = new SrsHybridServer(); + __thread SrsHybridServer* _srs_hybrid = NULL; diff --git a/trunk/src/app/srs_app_hybrid.hpp b/trunk/src/app/srs_app_hybrid.hpp index bb396209af..63fa083e30 100644 --- a/trunk/src/app/srs_app_hybrid.hpp +++ b/trunk/src/app/srs_app_hybrid.hpp @@ -56,11 +56,17 @@ class SrsHybridServer : public ISrsFastTimer, public ISrsThreadResponder std::vector servers; SrsFastTimer* timer_; SrsClockWallMonitor* clock_monitor_; +private: + // The config index for hybrid/stream server. + int stream_index_; public: SrsHybridServer(); virtual ~SrsHybridServer(); public: virtual void register_server(ISrsHybridServer* svr); +public: + int stream_index() { return stream_index_; } // SrsHybridServer::stream_index() + void set_stream_index(int v) { stream_index_ = v; } // SrsHybridServer::set_stream_index() public: virtual srs_error_t initialize(); virtual srs_error_t run(); @@ -74,6 +80,6 @@ class SrsHybridServer : public ISrsFastTimer, public ISrsThreadResponder srs_error_t on_thread_message(SrsThreadMessage* msg, SrsThreadPipeChannel* channel); }; -extern SrsHybridServer* _srs_hybrid; +extern __thread SrsHybridServer* _srs_hybrid; #endif diff --git a/trunk/src/app/srs_app_rtc_dtls.cpp b/trunk/src/app/srs_app_rtc_dtls.cpp index e45698242b..74241647d0 100644 --- a/trunk/src/app/srs_app_rtc_dtls.cpp +++ b/trunk/src/app/srs_app_rtc_dtls.cpp @@ -246,9 +246,6 @@ srs_error_t SrsDtlsCertificate::initialize() // OPENSSL_init_ssl(); #endif - // Initialize SRTP first. - srs_assert(srtp_init() == 0); - // Whether use ECDSA certificate. ecdsa_mode = _srs_config->get_rtc_server_ecdsa(); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 22214f79dd..3f035b29ab 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -945,7 +945,8 @@ srs_error_t SrsServer::register_signal() srs_error_t SrsServer::ingest() { srs_error_t err = srs_success; - + + // TODO: FIXME: Should move from hybrid to api threads. if ((err = ingester->start()) != srs_success) { return srs_error_wrap(err, "ingest start"); } diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index 4bdff1305e..24a02feb9e 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include @@ -505,6 +506,9 @@ srs_error_t SrsThreadPool::setup() return srs_error_wrap(err, "init st"); } + // Create the hybrid RTMP/HTTP/RTC server. + _srs_hybrid = new SrsHybridServer(); + return err; } @@ -512,6 +516,10 @@ srs_error_t SrsThreadPool::initialize() { srs_error_t err = srs_success; + // Initialize global shared SRTP once. + srs_assert(srtp_init() == 0); + + // Initialize the master primordial thread. SrsThreadEntry* entry = (SrsThreadEntry*)entry_; #ifndef SRS_OSX // Load CPU affinity from config. diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 7b27144a80..b6b4fbdd12 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -530,12 +530,17 @@ srs_error_t run_in_thread_pool() return srs_error_wrap(err, "start api server thread"); } - // Start the hybrid service worker thread, for RTMP and RTC server, etc. - if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) { - return srs_error_wrap(err, "start hybrid server thread"); + // Start a number of hybrid service threads. + int hybrids = _srs_config->get_threads_hybrids(); + for (int stream_index = 0; stream_index < hybrids; stream_index++) { + // TODO: FIXME: Change the thread name for debugging? + // Start the hybrid service worker thread, for RTMP and RTC server, etc. + if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, (void*)(uint64_t)stream_index)) != srs_success) { + return srs_error_wrap(err, "start hybrid server %d thread", stream_index); + } } - srs_trace("Pool: Start threads srtp=%d, recv=%d, send=%d", srtps, recvs, sends); + srs_trace("Pool: Start threads hybrids=%d, srtp=%d, recv=%d, send=%d", hybrids, srtps, recvs, sends); return _srs_thread_pool->run(); } @@ -545,6 +550,10 @@ srs_error_t run_hybrid_server(void* arg) { srs_error_t err = srs_success; + // The config index for hybrid/stream server. + int stream_index = (int)(uint64_t)arg; + _srs_hybrid->set_stream_index(stream_index); + // Create servers and register them. _srs_hybrid->register_server(new SrsServerAdapter());