From 6e2de90d54095827014b75f15c2b18a2d2cb04bb Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 5 Apr 2021 21:48:25 +0800 Subject: [PATCH] Threads-Hybrid: Support communicate between threads by chan and slot 1. Hybrid thread is responder, API thread is initiator. 2. Responder read message from initiator-slot, write message to responder-slot. 3. Initiator write message to initiator-slot, read message from responder-slot. 4. Responder start a coroutine to consume requests and response it. --- trunk/src/app/srs_app_hybrid.cpp | 57 +++++++++++ trunk/src/app/srs_app_hybrid.hpp | 6 +- trunk/src/app/srs_app_rtc_conn.hpp | 1 - trunk/src/app/srs_app_rtc_server.hpp | 2 +- trunk/src/app/srs_app_server.cpp | 58 +++++++++-- trunk/src/app/srs_app_server.hpp | 18 ++++ trunk/src/app/srs_app_threads.cpp | 140 ++++++++++++++++++++++++++ trunk/src/app/srs_app_threads.hpp | 85 ++++++++++++++++ trunk/src/protocol/srs_service_st.hpp | 1 + 9 files changed, 356 insertions(+), 12 deletions(-) diff --git a/trunk/src/app/srs_app_hybrid.cpp b/trunk/src/app/srs_app_hybrid.cpp index 659cab107c..028d112179 100644 --- a/trunk/src/app/srs_app_hybrid.cpp +++ b/trunk/src/app/srs_app_hybrid.cpp @@ -29,6 +29,7 @@ #include #include #include +#include using namespace std; @@ -192,6 +193,19 @@ srs_error_t SrsHybridServer::initialize() } } + // Create slots for other threads to communicate with us. + SrsThreadEntry* self = _srs_thread_pool->self(); + + self->slot_ = new SrsThreadPipeSlot(1); + + if ((err = self->slot_->initialize()) != srs_success) { + return srs_error_wrap(err, "init slot"); + } + + if ((err = self->slot_->open_responder(this)) != srs_success) { + return srs_error_wrap(err, "init slot"); + } + return err; } @@ -387,5 +401,48 @@ srs_error_t SrsHybridServer::on_timer(srs_utime_t interval, srs_utime_t tick) return err; } +srs_error_t SrsHybridServer::on_thread_message(SrsThreadMessage* msg, SrsThreadPipeChannel* channel) +{ + srs_error_t err = srs_success; + + RtcServerAdapter* adapter = NULL; + if (true) { + vector servers = _srs_hybrid->servers; + for (vector::iterator it = servers.begin(); it != servers.end(); ++it) { + RtcServerAdapter* server = dynamic_cast(*it); + if (server) { + adapter = server; + break; + } + } + } + + // TODO: FIXME: Response error? + if (!adapter) { + return err; + } + + if (msg->id == (uint64_t)SrsThreadMessageIDRtcCreateSession) { + SrsThreadMessageRtcCreateSession* s = (SrsThreadMessageRtcCreateSession*)msg->ptr; + err = adapter->rtc->create_session(s->req, s->remote_sdp, s->local_sdp, s->mock_eip, + s->publish, s->dtls, s->srtp, &s->session); + + // TODO: FIXME: Response error? + if (err != srs_success) { + return srs_error_wrap(err, "create session"); + } + + // TODO: FIXME: Response timeout if error? + // TODO: FIXME: Response a different message? With trace ID? + // We're responder, write response to responder. + srs_error_t r0 = channel->responder()->write(msg, sizeof(SrsThreadMessage), NULL); + if (r0 != srs_success) { + srs_freep(r0); // Ignore any error. + } + } + + return err; +} + SrsHybridServer* _srs_hybrid = new SrsHybridServer(); diff --git a/trunk/src/app/srs_app_hybrid.hpp b/trunk/src/app/srs_app_hybrid.hpp index 292284d3d7..bb396209af 100644 --- a/trunk/src/app/srs_app_hybrid.hpp +++ b/trunk/src/app/srs_app_hybrid.hpp @@ -29,6 +29,7 @@ #include #include +#include class SrsServer; class SrsServerAdapter; @@ -49,9 +50,8 @@ class ISrsHybridServer }; // The hybrid server manager. -class SrsHybridServer : public ISrsFastTimer +class SrsHybridServer : public ISrsFastTimer, public ISrsThreadResponder { - friend class SrsApiServer; private: std::vector servers; SrsFastTimer* timer_; @@ -70,6 +70,8 @@ class SrsHybridServer : public ISrsFastTimer // interface ISrsFastTimer private: srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick); +private: + srs_error_t on_thread_message(SrsThreadMessage* msg, SrsThreadPipeChannel* channel); }; extern SrsHybridServer* _srs_hybrid; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 87373fe858..c9381f7b89 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index 16ea2bd890..59b53120d1 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -132,7 +132,7 @@ class SrsRtcServer : public ISrsUdpMuxHandler, public ISrsFastTimer, public ISrs // The RTC server adapter. class RtcServerAdapter : public ISrsHybridServer { - friend class SrsApiServer; + friend class SrsHybridServer; private: SrsRtcServer* rtc; public: diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 0e85c0024f..22214f79dd 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -1993,18 +1993,60 @@ srs_error_t SrsApiServer::create_session( ) { srs_error_t err = srs_success; - vector servers = _srs_hybrid->servers; - for (vector::iterator it = servers.begin(); it != servers.end(); ++it) { - RtcServerAdapter* adapter = dynamic_cast(*it); - if (!adapter) { - continue; + // Allocate slot to communicate with hybrid thread. + SrsThreadEntry* self = _srs_thread_pool->self(); + SrsThreadEntry* hybrid = _srs_thread_pool->hybrid(); + srs_assert(self && hybrid); + + SrsThreadPipeChannel* channel = NULL; + if (true) { + map::iterator it = self->channels_.find(hybrid->trd); + if (it == self->channels_.end()) { + self->channels_[hybrid->trd] = channel = hybrid->slot_->allocate(); + } else { + channel = it->second; } + } + srs_assert(channel); - // TODO: FIXME: Should notify thread by thread-slot. - return adapter->rtc->create_session(req, remote_sdp, local_sdp, mock_eip, - publish, dtls, srtp, psession); + // We're initiator, write to initiator, read from responder. + if ((err = channel->initiator()->open_write()) != srs_success) { + return srs_error_wrap(err, "open write"); + } + if ((err = channel->responder()->open_read()) != srs_success) { + return srs_error_wrap(err, "open read"); } + SrsThreadMessageRtcCreateSession s; + s.req = req; + s.remote_sdp = remote_sdp; + s.local_sdp = local_sdp; + s.mock_eip = mock_eip; + s.publish = publish; + s.dtls = dtls; + s.srtp = srtp; + s.session = NULL; + + SrsThreadMessage m; + m.id = (uint64_t)SrsThreadMessageIDRtcCreateSession; + m.ptr = (uint64_t)&s; + + // We're initiator, write to initiator, read from responder. + // TODO: FIXME: Write important logs, and error response, and timeout? + if ((err = channel->initiator()->write(&m, sizeof(m), NULL)) != srs_success) { + return srs_error_wrap(err, "write"); + } + + // TODO: FIXME: Write important logs, and error response, and timeout? + if ((err = channel->responder()->read(&m, sizeof(m), NULL)) != srs_success) { + return srs_error_wrap(err, "read"); + } + + // Covert to output params. + local_sdp = s.local_sdp; + // TODO: FIMXE: Should never return it, for it's not thread-safe. + *psession = s.session; + return err; } diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index caf7a0500e..cc24454976 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -41,6 +41,7 @@ #include #include #include +#include class SrsServer; class SrsHttpServeMux; @@ -449,5 +450,22 @@ class SrsApiServer : public ISrsTcpMuxHandler, public ISrsResourceManager, publi srs_error_t do_start(); }; +// The RTC create session information. +struct SrsThreadMessageRtcCreateSession +{ + // Input. + SrsRequest* req; + SrsSdp remote_sdp; + std::string mock_eip; + bool publish; + bool dtls; + bool srtp; + + // Output. + SrsSdp local_sdp; + // TODO: FIXME: It's not thread-safe. + SrsRtcConnection* session; +}; + #endif diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index 3c8c299f9f..4bdff1305e 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -103,6 +103,10 @@ srs_error_t SrsPipe::initialize() { srs_error_t err = srs_success; + if (pipes_[0] > 0) { + return err; + } + if (pipe(pipes_) < 0) { return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "create pipe"); } @@ -134,6 +138,10 @@ srs_error_t SrsThreadPipe::initialize(int fd) { srs_error_t err = srs_success; + if (stfd_) { + return err; + } + if ((stfd_ = srs_netfd_open(fd)) == NULL) { return srs_error_new(ERROR_PIPE_OPEN, "open pipe"); } @@ -220,6 +228,134 @@ srs_error_t SrsThreadPipePair::write(void* buf, size_t size, ssize_t* nwrite) return wpipe_->write(buf, size, nwrite); } +SrsThreadPipeChannel::SrsThreadPipeChannel() +{ + initiator_ = new SrsThreadPipePair(); + responder_ = new SrsThreadPipePair(); + + trd_ = new SrsFastCoroutine("chan", this); + handler_ = NULL; +} + +SrsThreadPipeChannel::~SrsThreadPipeChannel() +{ + srs_freep(trd_); + srs_freep(initiator_); + srs_freep(responder_); +} + +SrsThreadPipePair* SrsThreadPipeChannel::initiator() +{ + return initiator_; +} + +SrsThreadPipePair* SrsThreadPipeChannel::responder() +{ + return responder_; +} + +srs_error_t SrsThreadPipeChannel::start(ISrsThreadResponder* h) +{ + handler_ = h; + return trd_->start(); +} + +srs_error_t SrsThreadPipeChannel::cycle() +{ + srs_error_t err = srs_success; + + while (true) { + if ((err = trd_->pull()) != srs_success) { + return srs_error_wrap(err, "pull"); + } + + // Here we're responder, read from initiator. + SrsThreadMessage m; + if ((err = initiator_->read(&m, sizeof(m), NULL)) != srs_success) { + return srs_error_wrap(err, "read"); + } + + // Consume the message, the responder can write response to responder. + if (handler_ && (err = handler_->on_thread_message(&m, this)) != srs_success) { + return srs_error_wrap(err, "consume"); + } + } + + return err; +} + +SrsThreadPipeSlot::SrsThreadPipeSlot(int slots) +{ + nn_channels_ = slots; + channels_ = new SrsThreadPipeChannel[slots]; + + index_ = 0; + lock_ = new SrsThreadMutex(); +} + +SrsThreadPipeSlot::~SrsThreadPipeSlot() +{ + srs_freepa(channels_); + srs_freep(lock_); +} + +srs_error_t SrsThreadPipeSlot::initialize() +{ + srs_error_t err = srs_success; + + for (int i = 0; i < nn_channels_; i++) { + SrsThreadPipeChannel* channel = &channels_[i]; + + // Here we're responder, but it's ok to initialize the initiator. + if ((err = channel->initiator()->initialize()) != srs_success) { + return srs_error_wrap(err, "init %d initiator", i); + } + if ((err = channel->responder()->initialize()) != srs_success) { + return srs_error_wrap(err, "init %d responder", i); + } + } + + return err; +} + +srs_error_t SrsThreadPipeSlot::open_responder(ISrsThreadResponder* h) +{ + srs_error_t err = srs_success; + + for (int i = 0; i < nn_channels_; i++) { + SrsThreadPipeChannel* channel = &channels_[i]; + + // We're responder, read from initiator, write to responder. + if ((err = channel->initiator()->open_read()) != srs_success) { + return srs_error_wrap(err, "open read"); + } + if ((err = channel->responder()->open_write()) != srs_success) { + return srs_error_wrap(err, "open write"); + } + + // OK, we start the cycle coroutine for responder. + if ((err = channel->start(h)) != srs_success) { + return srs_error_wrap(err, "start %d consume coroutine", i); + } + } + + return err; +} + +SrsThreadPipeChannel* SrsThreadPipeSlot::allocate() +{ + SrsThreadLocker(lock_); + return index_ < nn_channels_? &channels_[index_++] : NULL; +} + +ISrsThreadResponder::ISrsThreadResponder() +{ +} + +ISrsThreadResponder::~ISrsThreadResponder() +{ +} + SrsThreadMutex::SrsThreadMutex() { // https://man7.org/linux/man-pages/man3/pthread_mutexattr_init.3.html @@ -276,6 +412,7 @@ SrsThreadEntry::SrsThreadEntry() cpuset_ok = false; stat = new SrsProcSelfStat(); + slot_ = NULL; received_packets_ = new SrsThreadQueue(); cooked_packets_ = new SrsThreadQueue(); @@ -286,6 +423,9 @@ SrsThreadEntry::~SrsThreadEntry() srs_freep(stat); srs_freep(err); + // TODO: FIXME: Before free slot, we MUST close pipes in threads that open them. + srs_freep(slot_); + srs_freep(received_packets_); srs_freep(cooked_packets_); diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp index 8ac0243217..8478873e0c 100644 --- a/trunk/src/app/srs_app_threads.hpp +++ b/trunk/src/app/srs_app_threads.hpp @@ -36,6 +36,7 @@ #include #include +#include #include class SrsThreadPool; @@ -43,6 +44,9 @@ class SrsAsyncSRTPTask; class SrsAsyncSRTPPacket; class SrsSecurityTransport; class SrsProcSelfStat; +class SrsThreadMutex; +class ISrsThreadResponder; +struct SrsThreadMessage; // The pipe wraps the os pipes(fds). class SrsPipe @@ -112,6 +116,82 @@ class SrsThreadPipePair srs_error_t write(void* buf, size_t size, ssize_t* nwrite); }; +// A thread pipe channel, bidirectional data channel, between two threads. +class SrsThreadPipeChannel : public ISrsCoroutineHandler +{ +private: + // ThreadA write initiator, read by ThreadB. + SrsThreadPipePair* initiator_; + // ThreadB write responder, read by ThreadA. + SrsThreadPipePair* responder_; +private: + // Coroutine for responder. + SrsFastCoroutine* trd_; + // The callback handler of responder. + ISrsThreadResponder* handler_; +public: + SrsThreadPipeChannel(); + virtual ~SrsThreadPipeChannel(); +public: + SrsThreadPipePair* initiator(); + SrsThreadPipePair* responder(); +public: + // For responder, start a coroutine to read messages from initiator. + srs_error_t start(ISrsThreadResponder* h); +private: + srs_error_t cycle(); +}; + +// A slot contains a fixed number of channels to communicate with threads. +class SrsThreadPipeSlot +{ +private: + SrsThreadPipeChannel* channels_; + int nn_channels_; +private: + // Current allocated index of slot for channels. + int index_; + SrsThreadMutex* lock_; +public: + SrsThreadPipeSlot(int slots); + virtual ~SrsThreadPipeSlot(); +public: + srs_error_t initialize(); + // Should only call by responder. + srs_error_t open_responder(ISrsThreadResponder* h); +public: + // Allocate channel for initiator. + SrsThreadPipeChannel* allocate(); +}; + +// The handler for responder, which got message from initiator. +class ISrsThreadResponder +{ +public: + ISrsThreadResponder(); + virtual ~ISrsThreadResponder(); +public: + // Got a thread message msg from channel. + virtual srs_error_t on_thread_message(SrsThreadMessage* msg, SrsThreadPipeChannel* channel) = 0; +}; + +// The ID for messages between threads. +enum SrsThreadMessageID +{ + // For SrsThreadMessageRtcCreateSession + SrsThreadMessageIDRtcCreateSession = 0x00000001, +}; + +// The message to marshal/unmarshal between threads. +struct SrsThreadMessage +{ + // Convert with SrsThreadMessageID. + uint64_t id; + // Convert with struct pointers. + uint64_t ptr; + // TODO: FIXME: Add a trace ID? +}; + // The thread mutex wrapper, without error. class SrsThreadMutex { @@ -127,6 +207,7 @@ class SrsThreadMutex }; // The thread mutex locker. +// TODO: FIXME: Rename _SRS to _srs #define SrsThreadLocker(instance) \ impl__SrsThreadLocker _SRS_free_##instance(instance) @@ -216,6 +297,10 @@ class SrsThreadEntry bool cpuset_ok; public: SrsProcSelfStat* stat; + // The slot for other threads to communicate with this thread. + SrsThreadPipeSlot* slot_; + // The channels to communicate with other threads. + std::map channels_; public: // The received UDP packets. SrsThreadQueue* received_packets_; diff --git a/trunk/src/protocol/srs_service_st.hpp b/trunk/src/protocol/srs_service_st.hpp index ffeb5c32fd..f5de3cd0c7 100644 --- a/trunk/src/protocol/srs_service_st.hpp +++ b/trunk/src/protocol/srs_service_st.hpp @@ -107,6 +107,7 @@ extern ssize_t srs_write(srs_netfd_t stfd, const void *buf, size_t nbyte, srs_ut extern bool srs_is_never_timeout(srs_utime_t tm); // The mutex locker. +// TODO: FIXME: Rename _SRS to _srs #define SrsLocker(instance) \ impl__SrsLocker _SRS_free_##instance(&instance)