Skip to content

Commit

Permalink
For #464, refine result of origin cluster api
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Feb 16, 2018
1 parent 92f2bcd commit 2f09ec4
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 25 deletions.
31 changes: 22 additions & 9 deletions trunk/src/app/srs_app_coworkers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ using namespace std;
#include <srs_app_config.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_service_utility.hpp>
#include <srs_kernel_utility.hpp>

SrsCoWorkers* SrsCoWorkers::_instance = NULL;

Expand Down Expand Up @@ -64,20 +65,32 @@ SrsJsonAny* SrsCoWorkers::dumps(string vhost, string app, string stream)
return SrsJsonAny::null();
}

vector<string>& ips = srs_get_local_ips();
if (ips.empty()) {
vector<string> service_ports = _srs_config->get_listens();
if (service_ports.empty()) {
return SrsJsonAny::null();
}

SrsJsonArray* arr = SrsJsonAny::array();
for (int i = 0; i < (int)ips.size(); i++) {
arr->append(SrsJsonAny::object()
->set("ip", SrsJsonAny::str(ips.at(i).c_str()))
->set("vhost", SrsJsonAny::str(r->vhost.c_str()))
->set("self", SrsJsonAny::boolean(true)));
string service_ip = srs_get_public_internet_address();
string service_hostport = service_ports.at(0);

string service_host;
int service_port = SRS_CONSTS_RTMP_DEFAULT_PORT;
srs_parse_hostport(service_hostport, service_host, service_port);

string backend = _srs_config->get_http_api_listen();
if (backend.find(":") == string::npos) {
backend = service_ip + ":" + backend;
}

return arr;
// The routers to detect loop and identify path.
SrsJsonArray* routers = SrsJsonAny::array()->append(SrsJsonAny::str(backend.c_str()));

return SrsJsonAny::object()
->set("ip", SrsJsonAny::str(service_ip.c_str()))
->set("port", SrsJsonAny::integer(service_port))
->set("vhost", SrsJsonAny::str(r->vhost.c_str()))
->set("api", SrsJsonAny::str(backend.c_str()))
->set("routers", routers);
}

SrsRequest* SrsCoWorkers::find_stream_info(string vhost, string app, string stream)
Expand Down
34 changes: 18 additions & 16 deletions trunk/src/app/srs_app_rtmp_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -616,31 +616,40 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source)
{
srs_error_t err = srs_success;

// create consumer of souce.
// Check page referer of player.
SrsRequest* req = info->req;
if (_srs_config->get_refer_enabled(req->vhost)) {
if ((err = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != srs_success) {
return srs_error_wrap(err, "rtmp: referer check");
}
}

// Set the socket options for transport.
set_sock_options();

// Create a consumer of source.
SrsConsumer* consumer = NULL;
if ((err = source->create_consumer(this, consumer)) != srs_success) {
return srs_error_wrap(err, "rtmp: create consumer");
}
SrsAutoFree(SrsConsumer, consumer);

// use isolate thread to recv,
// Use receiving thread to receive packets from peer.
// @see: https://github.com/ossrs/srs/issues/217
SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP);

// start isolate recv thread.
if ((err = trd.start()) != srs_success) {
return srs_error_wrap(err, "rtmp: start receive thread");
}

// delivery messages for clients playing stream.
// Deliver packets to peer.
wakable = consumer;
err = do_playing(source, consumer, &trd);
wakable = NULL;

// stop isolate recv thread
trd.stop();

// warn for the message is dropped.
// Drop all packets in receiving thread.
if (!trd.empty()) {
srs_warn("drop the received %d messages", trd.size());
}
Expand All @@ -652,14 +661,9 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
{
srs_error_t err = srs_success;

srs_assert(consumer != NULL);

SrsRequest* req = info->req;
if (_srs_config->get_refer_enabled(req->vhost)) {
if ((err = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != srs_success) {
return srs_error_wrap(err, "rtmp: referer check");
}
}
srs_assert(req);
srs_assert(consumer);

// initialize other components
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
Expand All @@ -678,9 +682,6 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
// initialize the send_min_interval
send_min_interval = _srs_config->get_send_min_interval(req->vhost);

// set the sock options.
set_sock_options();

srs_trace("start play smi=%.2f, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d",
send_min_interval, mw_sleep, mw_enabled, realtime, tcp_nodelay);

Expand Down Expand Up @@ -800,6 +801,7 @@ srs_error_t SrsRtmpConn::publishing(SrsSource* source)
return srs_error_wrap(err, "rtmp: callback on publish");
}

// TODO: FIXME: Should refine the state of publishing.
if ((err = acquire_publish(source)) == srs_success) {
// use isolate thread to recv,
// @see: https://github.com/ossrs/srs/issues/237
Expand Down
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_rtmp_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class SrsRtmpConn : public virtual SrsConnection, public virtual ISrsReloadHandl
SrsBandwidth* bandwidth;
SrsSecurity* security;
// the wakable handler, maybe NULL.
// TODO: FIXME: Should refine the state for receiving thread.
ISrsWakable* wakable;
// elapse duration in ms
// for live play duration, for instance, rtmpdump to record.
Expand Down
6 changes: 6 additions & 0 deletions trunk/src/service/srs_service_utility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,12 @@ string srs_get_public_internet_address()
return ip;
}

// Finally, use first whatever kind of address.
if (!ips.empty()) {
_public_internet_address = ips.at(0);
return _public_internet_address;
}

return "";
}

0 comments on commit 2f09ec4

Please sign in to comment.