Skip to content

Commit

Permalink
forward api: support rtmp forward to multi other platform
Browse files Browse the repository at this point in the history
  • Loading branch information
chundonglinlin committed Dec 7, 2021
1 parent 7c353b5 commit c4ba70a
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 1 deletion.
75 changes: 75 additions & 0 deletions trunk/src/app/srs_app_http_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,81 @@ srs_error_t SrsGoApiClusters::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
return srs_api_response(w, r, obj->dumps());
}

SrsGoApiForwarder::SrsGoApiForwarder(SrsServer* svr)
{
server_ = svr;
}

SrsGoApiForwarder::~SrsGoApiForwarder()
{
}

srs_error_t SrsGoApiForwarder::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
srs_error_t err = srs_success;

// parse param for req
// pattern=/api/v1/forward/[app]/[stream]?[vhost=]&remote=rtmp%3A%2F%2Fdomain%3Aport%2Fapp%2Fstream%3Fvhost%3Dtest.com%26params%3Dauth
// For forward stream to other server, the template must have the same schema with upath.
// The template is defined in config, the mout of http stream. The upath is specified by http request path.
// If template is "/[app]/[stream]?[vhost=]&remote=", the upath should be:
// matched for "/live/livestream?remote="
// matched for "/live/livestream?vhost=&remote="

std::string remote_url = srs_string_url_decode(r->query_get("remote"));
if(remote_url.empty()) {
return srs_api_response_code(w, r, ERROR_RTMP_FORWARD_PARAM_ERROR);
}

// check remote url valid and reconstruct
// 1. pattern=domain
// 2. pattern=domain:port
// 3. pattern=rtmp://domain:port/app/stream?vhost=test.com&params=auth

std::string vhost = r->query_get("vhost");
if (vhost.empty()) {
vhost = SRS_CONSTS_RTMP_DEFAULT_VHOST;
}

// pattern=/api/v1/forward/[app]/[stream]
std::string upath = r->path();
if (srs_string_count(upath, "/") != 5) {
return srs_api_response_code(w, r, ERROR_RTMP_FORWARD_PARAM_ERROR);
}

std::string app, stream;
size_t pos = string::npos;
if ((pos = upath.rfind("/")) != string::npos) {
stream = upath.substr(pos + 1);
upath = upath.substr(0, pos);
if ((pos = upath.rfind("/")) != string::npos) {
app = upath.substr(pos + 1);
} else {
return srs_api_response_code(w, r, ERROR_RTMP_FORWARD_PARAM_ERROR);
}
} else {
return srs_api_response_code(w, r, ERROR_RTMP_FORWARD_PARAM_ERROR);
}

std::string stream_url = srs_generate_stream_url(vhost, app, stream);

// find a source to serve.
SrsLiveSource* source = NULL;
if ((err = _srs_sources->fetch_source(stream_url, &source)) != srs_success) {
return srs_api_response_code(w, r, ERROR_RTMP_FORWARD_NOT_FOUND);
}
srs_assert(source != NULL);

// create forwarder to source
if ((err = source->create_forwarder(remote_url)) != srs_success) {
int code = srs_error_code(err);
srs_error_reset(err);
return srs_api_response_code(w, r, code);
}

return srs_api_response_code(w, r, ERROR_SUCCESS);
}

SrsGoApiError::SrsGoApiError()
{
}
Expand Down
11 changes: 11 additions & 0 deletions trunk/src/app/srs_app_http_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,17 @@ class SrsGoApiClusters : public ISrsHttpHandler
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
};

class SrsGoApiForwarder : public ISrsHttpHandler
{
private:
SrsServer* server_;
public:
SrsGoApiForwarder(SrsServer* svr);
virtual ~SrsGoApiForwarder();
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
};

class SrsGoApiError : public ISrsHttpHandler
{
public:
Expand Down
6 changes: 5 additions & 1 deletion trunk/src/app/srs_app_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,11 @@ srs_error_t SrsServer::http_handle()
if ((err = http_api_mux->handle("/api/v1/clusters", new SrsGoApiClusters())) != srs_success) {
return srs_error_wrap(err, "handle clusters");
}


if ((err = http_api_mux->handle("/api/v1/forward/", new SrsGoApiForwarder(this))) != srs_success) {
return srs_error_wrap(err, "handle clusters");
}

// test the request info.
if ((err = http_api_mux->handle("/api/v1/tests/requests", new SrsGoApiRequests())) != srs_success) {
return srs_error_wrap(err, "handle tests requests");
Expand Down
55 changes: 55 additions & 0 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,38 @@ void SrsOriginHub::destroy_forwarders()
forwarders.clear();
}

srs_error_t SrsOriginHub::create_forwarder(std::string url)
{
srs_error_t err = srs_success;

// maybe ignore delete
SrsRequest* freq = new SrsRequest();
srs_parse_rtmp_url(url, freq->tcUrl, freq->stream);
srs_discovery_tc_url(freq->tcUrl, freq->schema, freq->host, freq->vhost, freq->app, freq->stream, freq->port, freq->param);

SrsForwarder* forwarder = new SrsForwarder(this);
forwarders.push_back(forwarder);

std::stringstream ss;
ss << freq->host << ":" << freq->port;
std::string forward_server = ss.str();

// initialize the forwarder with request.
if ((err = forwarder->initialize(freq, forward_server)) != srs_success) {
return srs_error_wrap(err, "init forwarder");
}

srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
forwarder->set_queue_size(queue_size);

if ((err = forwarder->on_publish()) != srs_success) {
return srs_error_wrap(err, "start forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s",
req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), url.c_str());
}

return err;
}

SrsMetaCache::SrsMetaCache()
{
meta = video = audio = NULL;
Expand Down Expand Up @@ -1740,6 +1772,25 @@ srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceH
return err;
}

srs_error_t SrsLiveSourceManager::fetch_source(string stream_url, SrsLiveSource** pps)
{
srs_error_t err = srs_success;

// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
// TODO: FIXME: Use smaller lock.
SrsLocker(lock);

if (pool.find(stream_url) == pool.end()) {
err = srs_error_wrap(err, "fetch source %s", stream_url.c_str());
return err;
}

*pps = pool[stream_url];

return err;
}

SrsLiveSource* SrsLiveSourceManager::fetch(SrsRequest* r)
{
SrsLiveSource* source = NULL;
Expand Down Expand Up @@ -2674,3 +2725,7 @@ string SrsLiveSource::get_curr_origin()
return play_edge->get_curr_origin();
}

srs_error_t SrsLiveSource::create_forwarder(string url)
{
return hub->create_forwarder(url);
}
6 changes: 6 additions & 0 deletions trunk/src/app/srs_app_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ class SrsOriginHub : public ISrsReloadHandler
private:
virtual srs_error_t create_forwarders();
virtual void destroy_forwarders();
public:
virtual srs_error_t create_forwarder(std::string url);
};

// Each stream have optional meta(sps/pps in sequence header and metadata).
Expand Down Expand Up @@ -448,6 +450,8 @@ class SrsLiveSourceManager : public ISrsHourGlass
// @param h the event handler for source.
// @param pps the matched source, if success never be NULL.
virtual srs_error_t fetch_or_create(SrsRequest* r, ISrsLiveSourceHandler* h, SrsLiveSource** pps);
// fetch source by url
virtual srs_error_t fetch_source(std::string stream_url, SrsLiveSource** pps);
private:
// Get the exists source, NULL when not exists.
// update the request and return the exists source.
Expand Down Expand Up @@ -599,6 +603,8 @@ class SrsLiveSource : public ISrsReloadHandler
virtual void on_edge_proxy_unpublish();
public:
virtual std::string get_curr_origin();
// create forwarder by url
virtual srs_error_t create_forwarder(std::string url);
};

#endif
38 changes: 38 additions & 0 deletions trunk/src/app/srs_app_utility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1434,3 +1434,41 @@ string srs_getenv(string key)
return "";
}

unsigned char srs_hex_to_char(unsigned char x)
{
unsigned char y = '0';

if (x >= 'A' && x <= 'Z') {
y = x - 'A' + 10;
} else if (x >= 'a' && x <= 'z') {
y = x - 'a' + 10;
} else if (x >= '0' && x <= '9') {
y = x - '0';
}

return y;
}

std::string srs_string_url_decode(std::string url)
{
std::string ret = "";

size_t length = url.length();
for (size_t i = 0; i < length; i++) {
if (url[i] == '+') {
ret += ' ';
} else if (url[i] == '%') {
if (i + 2 > length) {
return ret;
}

unsigned char high = srs_hex_to_char((unsigned char)url[++i]);
unsigned char low = srs_hex_to_char((unsigned char)url[++i]);
ret += high * 16 + low;
} else {
ret += url[i];
}
}

return ret;
}
3 changes: 3 additions & 0 deletions trunk/src/app/srs_app_utility.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,5 +687,8 @@ extern std::string srs_string_dumps_hex(const char* str, int length, int limit,
// srs_getenv("EIP") === srs_getenv("$EIP")
extern std::string srs_getenv(std::string key);

// URL Decode
extern std::string srs_string_url_decode(std::string url);

#endif

2 changes: 2 additions & 0 deletions trunk/src/kernel/srs_kernel_error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@
#define ERROR_RTMP_MESSAGE_CREATE 2053
#define ERROR_RTMP_PROXY_EXCEED 2054
#define ERROR_RTMP_CREATE_STREAM_DEPTH 2055
#define ERROR_RTMP_FORWARD_PARAM_ERROR 2056
#define ERROR_RTMP_FORWARD_NOT_FOUND 2057
//
// The system control message,
// It's not an error, but special control logic.
Expand Down

0 comments on commit c4ba70a

Please sign in to comment.