Skip to content

Commit

Permalink
Threads-SEND/RECV: Bind handler to listener to support multiple ports.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent 615da26 commit 1235b33
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 17 deletions.
10 changes: 6 additions & 4 deletions trunk/src/app/srs_app_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ SrsUdpMuxSocket::SrsUdpMuxSocket(srs_netfd_t fd)
nread = 0;

lfd = fd;
handler_ = NULL;

fromlen = 0;
peer_port = 0;
Expand Down Expand Up @@ -524,6 +525,8 @@ SrsUdpMuxSocket* SrsUdpMuxSocket::copy_sendonly()
sendonly->fast_id_ = fast_id_;
sendonly->address_changed_ = address_changed_;

sendonly->handler_ = handler_;

return sendonly;
}

Expand All @@ -547,6 +550,8 @@ SrsUdpMuxSocket* SrsUdpMuxSocket::copy()
cp->fast_id_ = fast_id_;
cp->address_changed_ = address_changed_;

cp->handler_ = handler_;

return cp;
}

Expand Down Expand Up @@ -659,13 +664,10 @@ srs_error_t SrsUdpMuxListener::cycle()

// Sleep infinite if use async_recv.
if (_srs_config->get_threads_async_recv()) {
SrsThreadUdpListener* listener = new SrsThreadUdpListener(lfd);

SrsThreadUdpListener* listener = new SrsThreadUdpListener(lfd, handler);
_srs_async_recv->add_listener(listener);
_srs_async_recv->set_handler(handler);

srs_usleep(SRS_UTIME_NO_TIMEOUT);

return trd->pull();
}

Expand Down
7 changes: 7 additions & 0 deletions trunk/src/app/srs_app_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ class SrsUdpMuxSocket
public:
SrsUdpMuxSocket(srs_netfd_t fd);
virtual ~SrsUdpMuxSocket();
private:
ISrsUdpMuxHandler* handler_;
public:
// SrsUdpMuxSocket::set_handler
void set_handler(ISrsUdpMuxHandler* h) { handler_ = h; }
// SrsUdpMuxSocket::handler
ISrsUdpMuxHandler* handler() { return handler_; }
public:
int recvfrom(srs_utime_t timeout);
int raw_recvfrom();
Expand Down
12 changes: 4 additions & 8 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1122,9 +1122,10 @@ srs_error_t SrsAsyncSRTPManager::consume(int* nn_consumed)

SrsAsyncSRTPManager* _srs_async_srtp = new SrsAsyncSRTPManager();

SrsThreadUdpListener::SrsThreadUdpListener(srs_netfd_t fd)
SrsThreadUdpListener::SrsThreadUdpListener(srs_netfd_t fd, ISrsUdpMuxHandler* handler)
{
skt_ = new SrsUdpMuxSocket(fd);
skt_->set_handler(handler);
}

SrsThreadUdpListener::~SrsThreadUdpListener()
Expand All @@ -1135,7 +1136,6 @@ SrsAsyncRecvManager::SrsAsyncRecvManager()
{
lock_ = new SrsThreadMutex();
received_packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
handler_ = NULL;
max_recv_queue_ = 0;
}

Expand All @@ -1152,11 +1152,6 @@ SrsAsyncRecvManager::~SrsAsyncRecvManager()
}
}

void SrsAsyncRecvManager::set_handler(ISrsUdpMuxHandler* v)
{
handler_ = v;
}

void SrsAsyncRecvManager::add_listener(SrsThreadUdpListener* listener)
{
SrsThreadLocker(lock_);
Expand Down Expand Up @@ -1246,7 +1241,8 @@ srs_error_t SrsAsyncRecvManager::consume(int* nn_consumed)
for (int i = 0; i < (int)flying_received_packets.size(); i++) {
SrsUdpMuxSocket* pkt = flying_received_packets.at(i);

if (handler_ && (err = handler_->on_udp_packet(pkt)) != srs_success) {
ISrsUdpMuxHandler* handler = pkt->handler();
if (handler && (err = handler->on_udp_packet(pkt)) != srs_success) {
srs_error_reset(err); // Ignore any error.
}

Expand Down
6 changes: 1 addition & 5 deletions trunk/src/app/srs_app_threads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,15 +394,13 @@ class SrsThreadUdpListener
public:
SrsUdpMuxSocket* skt_;
public:
SrsThreadUdpListener(srs_netfd_t fd);
SrsThreadUdpListener(srs_netfd_t fd, ISrsUdpMuxHandler* handler);
virtual ~SrsThreadUdpListener();
};

// The async RECV manager, to recv UDP packets.
class SrsAsyncRecvManager
{
private:
ISrsUdpMuxHandler* handler_;
private:
// The received UDP packets.
SrsThreadQueue<SrsUdpMuxSocket>* received_packets_;
Expand All @@ -416,8 +414,6 @@ class SrsAsyncRecvManager
SrsAsyncRecvManager();
virtual ~SrsAsyncRecvManager();
public:
// Set the handler to process the received UDP packet.
void set_handler(ISrsUdpMuxHandler* v);
// Set the max queue size.
// SrsAsyncRecvManager::set_max_recv_queue()
void set_max_recv_queue(int v) { max_recv_queue_ =v; }
Expand Down

0 comments on commit 1235b33

Please sign in to comment.