Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward: support config full rtmp url forward to other server #2799

Merged
merged 6 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion trunk/conf/forward.master.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ srs_log_tank console;
vhost __defaultVhost__ {
forward {
enabled on;
destination 127.0.0.1:19350;
#destination 127.0.0.1:19350;
backend http://127.0.0.1:8085/api/v1/forward;
}
}
168 changes: 168 additions & 0 deletions trunk/research/api-server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,173 @@ def POST(self):
def OPTIONS(self, *args, **kwargs):
enable_crossdomain()

'''
handle the forward requests: dynamic forward url.
'''
class RESTForward(object):
exposed = True

def __init__(self):
self.__forwards = []

self.__forwards.append({
"vhost":"ossrs.net",
Copy link
Member

@winlinvip winlinvip Feb 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to slave,

TRANS_BY_GPT3

"app":"live",
"stream":"livestream",
"url":"push.ossrs.com",
})

self.__forwards.append({
"app":"live",
"stream":"livestream",
"url":"push.ossrs.com",
})

self.__forwards.append({
"app":"live",
"stream":"livestream",
"url":"rtmp://push.ossrs.com/test/teststream?auth_token=123456",
})

def GET(self):
enable_crossdomain()

forwards = {}
return json.dumps(forwards)

'''
for SRS hook: on_forward
on_forward:
when srs reap a dvr file, call the hook,
the request in the POST data string is a object encode by json:
{
"action": "on_forward",
"server_id": "server_test",
"client_id": 1985,
"ip": "192.168.1.10",
"vhost": "video.test.com",
"app": "live",
"tcUrl": "rtmp://video.test.com/live?key=d2fa801d08e3f90ed1e1670e6e52651a",
"stream": "livestream",
"param":"?token=xxx&salt=yyy"
}
if valid, the hook must return HTTP code 200(Stauts OK) and response
an int value specifies the error code(0 corresponding to success):
0
'''
def POST(self):
enable_crossdomain()

# return the error code in str
code = Error.success

req = cherrypy.request.body.read()
trace("post to forwards, req=%s"%(req))
try:
json_req = json.loads(req)
except Exception, ex:
code = Error.system_parse_json
trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
return json.dumps({"code": int(code), "data": None})

action = json_req["action"]
if action == "on_forward":
return self.__on_forward(json_req)
else:
trace("invalid request action: %s"%(json_req["action"]))
code = Error.request_invalid_action

return json.dumps({"code": int(code), "data": None})

def OPTIONS(self, *args, **kwargs):
enable_crossdomain()

def __on_forward(self, req):
code = Error.success

trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, tcUrl=%s, stream=%s, param=%s"%(
req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["tcUrl"], req["stream"], req["param"]
))

# dynamic create forward config
forwards = []
destinations = []

# handle param: ?forward=xxxxx&auth_token=xxxxx
# 1.delete ?
req_param = req["param"].replace('?', '', 1)

# 2.delete 'forward=xxxxx'
new_req_param = ""
params = req_param.split("&")
for param in params:
result = param.split("=")
if result[0].find("forward") != -1:
destinations.append({
"url": result[1],
})
elif len(new_req_param) > 0:
new_req_param = new_req_param + "&" + param
else:
new_req_param = param

# secne: dynamic config
for forward in self.__forwards:
# vhost exist
if hasattr(forward, "vhost"):
if len(forward["vhost"]) > 0 and req["vhost"] != forward["vhost"]:
continue
# app exist
if hasattr(forward, "app"):
if len(forward["app"]) > 0 and req["app"] != forward["app"]:
continue
# app exist
if hasattr(forward, "stream"):
if len(forward["stream"]) > 0 and req["stream"] != forward["stream"]:
continue
# no url
if forward["url"] is None:
continue

# url maybe spell full rtmp address
url = forward["url"]
if url.find("rtmp://") == -1:
# format: xxx:xxx
# maybe you should use destination config
url = "rtmp://%s/%s"%(url, req['app'])
if len(req['vhost']) > 0 and req['vhost'] != "__defaultVhost__" and url.find(req['vhost']) == -1:
url = url + "?vhost=" + req['vhost']
url = url + "/" + req['stream']
if len(new_req_param) > 0:
url = url + "?" + new_req_param

# append
forwards.append({
"url": url,
})

# secne: parse client params, like:
# format1: rtmp://srs-server/live/stream?forward=aliyuncdn.com:1936&token=xxxxxx
# format2: rtmp://srs-server/live/stream?forward=rtmp://cdn.com/myapp/mystream?XXXXXX
for destination in destinations:
url = destination["url"]
if url.find("rtmp://") == -1:
# format: xxx:xxx
# maybe you should use destination config
url = "rtmp://%s/%s"%(url, req['app'])
if len(req['vhost']) > 0 and req['vhost'] != "__defaultVhost__" and url.find(req['vhost']) == -1:
url = url + "?vhost=" + req['vhost']
url = url + "/" + req['stream']
if len(new_req_param) > 0:
url = url + "?" + new_req_param

# append
forwards.append({
"url": url,
})

return json.dumps({"code": int(code), "data": {"forwards": forwards}})

# HTTP RESTful path.
class Root(object):
exposed = True
Expand Down Expand Up @@ -846,6 +1013,7 @@ def __init__(self):
self.chats = RESTChats()
self.servers = RESTServers()
self.snapshots = RESTSnapshots()
self.forward = RESTForward()
def GET(self):
enable_crossdomain();
return json.dumps({"code":Error.success, "urls":{
Expand Down
17 changes: 16 additions & 1 deletion trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2795,7 +2795,7 @@ srs_error_t SrsConfig::check_normal_config()
} else if (n == "forward") {
for (int j = 0; j < (int)conf->directives.size(); j++) {
string m = conf->at(j)->name;
if (m != "enabled" && m != "destination") {
if (m != "enabled" && m != "destination" && m != "backend") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.forward.%s of %s", m.c_str(), vhost->arg0().c_str());
}
}
Expand Down Expand Up @@ -4605,6 +4605,21 @@ SrsConfDirective* SrsConfig::get_forwards(string vhost)
return conf->get("destination");
}

SrsConfDirective* SrsConfig::get_forward_backend(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
}

conf = conf->get("forward");
if (!conf) {
return NULL;
}

return conf->get("backend");
}

SrsConfDirective* SrsConfig::get_vhost_http_hooks(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,8 @@ class SrsConfig
virtual bool get_forward_enabled(SrsConfDirective* vhost);
// Get the forward directive of vhost.
virtual SrsConfDirective* get_forwards(std::string vhost);
// Get the forward directive of backend.
virtual SrsConfDirective* get_forward_backend(std::string vhost);

public:
// Whether the srt sevice enabled
Expand Down
4 changes: 3 additions & 1 deletion trunk/src/app/srs_app_forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ SrsForwarder::~SrsForwarder()

srs_freep(sh_video);
srs_freep(sh_audio);

srs_freep(req);
}

srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep)
Expand All @@ -60,7 +62,7 @@ srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep)

// it's ok to use the request object,
// SrsLiveSource already copy it and never delete it.
req = r;
req = r->copy();

// the ep(endpoint) to forward to
ep_forward = ep;
Expand Down
94 changes: 94 additions & 0 deletions trunk/src/app/srs_app_http_hooks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,100 @@ srs_error_t SrsHttpHooks::discover_co_workers(string url, string& host, int& por
return err;
}

// Request:
Copy link
Member

@winlinvip winlinvip Feb 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put it in full.conf

TRANS_BY_GPT3

// POST /api/v1/forward
// {
// "action": "on_forward",
// "server_id": "vid-k21d7y2",
// "client_id": "9o7g1330",
// "ip": "127.0.0.1",
// "vhost": "__defaultVhost__",
// "app": "live",
// "tcUrl": "rtmp://127.0.0.1:1935/live",
// "stream": "livestream",
// "param": "?forward=rtmp://ossrs.net/live/livestream"
// }
// Response:
// {
// "code": 0,
// "data": {
// "forwards":[{
// "url": "rtmp://ossrs.net:1935/live/livestream?auth_token=xxx"
Copy link
Member

@winlinvip winlinvip Feb 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data: {
  urls: ["rtmp://xxx", "rtmp://xxx"]
}

If you want to expand:

data {
   forwards: xxx
}

TRANS_BY_GPT3

// },{
// "url": "rtmp://aliyuncdn.com:1935/live/livestream?auth_token=xxx"
// }]
// }
// }
srs_error_t SrsHttpHooks::on_forward_backend(string url, SrsRequest* req, std::vector<std::string>& rtmp_urls)
{
srs_error_t err = srs_success;

SrsContextId cid = _srs_context->get_id();

SrsStatistic* stat = SrsStatistic::instance();

SrsJsonObject* obj = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, obj);

obj->set("action", SrsJsonAny::str("on_forward"));
obj->set("server_id", SrsJsonAny::str(stat->server_id().c_str()));
obj->set("client_id", SrsJsonAny::str(cid.c_str()));
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));

std::string data = obj->dumps();
std::string res;
int status_code;

SrsHttpClient http;
if ((err = do_post(&http, url, data, status_code, res)) != srs_success) {
return srs_error_wrap(err, "http: on_forward_backend failed, client_id=%s, url=%s, request=%s, response=%s, code=%d",
cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code);
}

// parse string res to json.
SrsJsonAny* info = SrsJsonAny::loads(res);
if (!info) {
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "load json from %s", res.c_str());
}
SrsAutoFree(SrsJsonAny, info);

// response error code in string.
if (!info->is_object()) {
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "response %s", res.c_str());
}

SrsJsonAny* prop = NULL;
// response standard object, format in json: {}
SrsJsonObject* res_info = info->to_object();
if ((prop = res_info->ensure_property_object("data")) == NULL) {
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "parse data %s", res.c_str());
}

SrsJsonObject* p = prop->to_object();
if ((prop = p->ensure_property_array("forwards")) == NULL) {
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "parse forwards %s", res.c_str());
}

SrsJsonArray* forwards = prop->to_array();
for (int i = 0; i < forwards->count(); i++) {
prop = forwards->at(i);
SrsJsonObject* forward = prop->to_object();
if ((prop = forward->ensure_property_string("url")) != NULL) {
rtmp_urls.push_back(prop->to_str());
}
}

srs_trace("http: on_forward_backend ok, client_id=%s, url=%s, request=%s, response=%s",
cid.c_str(), url.c_str(), data.c_str(), res.c_str());

return err;
}

srs_error_t SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, string& res)
{
srs_error_t err = srs_success;
Expand Down
5 changes: 5 additions & 0 deletions trunk/src/app/srs_app_http_hooks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <srs_core.hpp>

#include <string>
#include <vector>

class SrsHttpUri;
class SrsStSocket;
Expand Down Expand Up @@ -79,6 +80,10 @@ class SrsHttpHooks
static srs_error_t on_hls_notify(SrsContextId cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify);
// Discover co-workers for origin cluster.
static srs_error_t discover_co_workers(std::string url, std::string& host, int& port);
// The on_forward_backend hook, when publish stream start to forward
// @param url the api server url, to valid the client.
// ignore if empty.
static srs_error_t on_forward_backend(std::string url, SrsRequest* req, std::vector<std::string>& rtmp_urls);
private:
static srs_error_t do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, std::string& res);
};
Expand Down
Loading