Skip to content

Commit

Permalink
Threads-Hybrid: Start multiple hybrid threads
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent 3367956 commit babe8e6
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 10 deletions.
4 changes: 3 additions & 1 deletion trunk/src/app/srs_app_hybrid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ SrsHybridServer::SrsHybridServer()
timer_ = NULL;

clock_monitor_ = new SrsClockWallMonitor();

stream_index_ = -1;
}

SrsHybridServer::~SrsHybridServer()
Expand Down Expand Up @@ -444,5 +446,5 @@ srs_error_t SrsHybridServer::on_thread_message(SrsThreadMessage* msg, SrsThreadP
return err;
}

SrsHybridServer* _srs_hybrid = new SrsHybridServer();
__thread SrsHybridServer* _srs_hybrid = NULL;

8 changes: 7 additions & 1 deletion trunk/src/app/srs_app_hybrid.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,17 @@ class SrsHybridServer : public ISrsFastTimer, public ISrsThreadResponder
std::vector<ISrsHybridServer*> servers;
SrsFastTimer* timer_;
SrsClockWallMonitor* clock_monitor_;
private:
// The config index for hybrid/stream server.
int stream_index_;
public:
SrsHybridServer();
virtual ~SrsHybridServer();
public:
virtual void register_server(ISrsHybridServer* svr);
public:
int stream_index() { return stream_index_; } // SrsHybridServer::stream_index()
void set_stream_index(int v) { stream_index_ = v; } // SrsHybridServer::set_stream_index()
public:
virtual srs_error_t initialize();
virtual srs_error_t run();
Expand All @@ -74,6 +80,6 @@ class SrsHybridServer : public ISrsFastTimer, public ISrsThreadResponder
srs_error_t on_thread_message(SrsThreadMessage* msg, SrsThreadPipeChannel* channel);
};

extern SrsHybridServer* _srs_hybrid;
extern __thread SrsHybridServer* _srs_hybrid;

#endif
3 changes: 0 additions & 3 deletions trunk/src/app/srs_app_rtc_dtls.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,6 @@ srs_error_t SrsDtlsCertificate::initialize()
// OPENSSL_init_ssl();
#endif

// Initialize SRTP first.
srs_assert(srtp_init() == 0);

// Whether use ECDSA certificate.
ecdsa_mode = _srs_config->get_rtc_server_ecdsa();

Expand Down
3 changes: 2 additions & 1 deletion trunk/src/app/srs_app_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,8 @@ srs_error_t SrsServer::register_signal()
srs_error_t SrsServer::ingest()
{
srs_error_t err = srs_success;


// TODO: FIXME: Should move from hybrid to api threads.
if ((err = ingester->start()) != srs_success) {
return srs_error_wrap(err, "ingest start");
}
Expand Down
8 changes: 8 additions & 0 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <srs_core_autofree.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_app_utility.hpp>
#include <srs_app_hybrid.hpp>

#include <unistd.h>

Expand Down Expand Up @@ -505,13 +506,20 @@ srs_error_t SrsThreadPool::setup()
return srs_error_wrap(err, "init st");
}

// Create the hybrid RTMP/HTTP/RTC server.
_srs_hybrid = new SrsHybridServer();

return err;
}

srs_error_t SrsThreadPool::initialize()
{
srs_error_t err = srs_success;

// Initialize global shared SRTP once.
srs_assert(srtp_init() == 0);

// Initialize the master primordial thread.
SrsThreadEntry* entry = (SrsThreadEntry*)entry_;
#ifndef SRS_OSX
// Load CPU affinity from config.
Expand Down
17 changes: 13 additions & 4 deletions trunk/src/main/srs_main_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,12 +530,17 @@ srs_error_t run_in_thread_pool()
return srs_error_wrap(err, "start api server thread");
}

// Start the hybrid service worker thread, for RTMP and RTC server, etc.
if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) {
return srs_error_wrap(err, "start hybrid server thread");
// Start a number of hybrid service threads.
int hybrids = _srs_config->get_threads_hybrids();
for (int stream_index = 0; stream_index < hybrids; stream_index++) {
// TODO: FIXME: Change the thread name for debugging?
// Start the hybrid service worker thread, for RTMP and RTC server, etc.
if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, (void*)(uint64_t)stream_index)) != srs_success) {
return srs_error_wrap(err, "start hybrid server %d thread", stream_index);
}
}

srs_trace("Pool: Start threads srtp=%d, recv=%d, send=%d", srtps, recvs, sends);
srs_trace("Pool: Start threads hybrids=%d, srtp=%d, recv=%d, send=%d", hybrids, srtps, recvs, sends);

return _srs_thread_pool->run();
}
Expand All @@ -545,6 +550,10 @@ srs_error_t run_hybrid_server(void* arg)
{
srs_error_t err = srs_success;

// The config index for hybrid/stream server.
int stream_index = (int)(uint64_t)arg;
_srs_hybrid->set_stream_index(stream_index);

// Create servers and register them.
_srs_hybrid->register_server(new SrsServerAdapter());

Expand Down

0 comments on commit babe8e6

Please sign in to comment.