From 931d19e120486d928b1ce15d3e8fba813aa9f8f9 Mon Sep 17 00:00:00 2001 From: chundonglinlin Date: Sat, 29 Jan 2022 00:02:45 +0800 Subject: [PATCH 1/6] Forward: add backend config and demo server for dynamic create forwarder to other server.(#1342) --- trunk/conf/forward.master.conf | 3 +- trunk/research/api-server/server.py | 168 +++++++++++++++++++++++++++ trunk/src/app/srs_app_config.cpp | 17 ++- trunk/src/app/srs_app_config.hpp | 2 + trunk/src/app/srs_app_forward.cpp | 4 +- trunk/src/app/srs_app_http_hooks.cpp | 94 +++++++++++++++ trunk/src/app/srs_app_http_hooks.hpp | 5 + trunk/src/app/srs_app_source.cpp | 51 +++++++- trunk/src/utest/srs_utest_config.cpp | 7 ++ 9 files changed, 346 insertions(+), 5 deletions(-) diff --git a/trunk/conf/forward.master.conf b/trunk/conf/forward.master.conf index 630a4c84ea..afdfcfa25b 100644 --- a/trunk/conf/forward.master.conf +++ b/trunk/conf/forward.master.conf @@ -10,6 +10,7 @@ srs_log_tank console; vhost __defaultVhost__ { forward { enabled on; - destination 127.0.0.1:19350; + #destination 127.0.0.1:19350; + backend http://127.0.0.1:8085/api/v1/forward; } } diff --git a/trunk/research/api-server/server.py b/trunk/research/api-server/server.py index e50af7c46e..e28dd7d9d6 100755 --- a/trunk/research/api-server/server.py +++ b/trunk/research/api-server/server.py @@ -805,6 +805,173 @@ def POST(self): def OPTIONS(self, *args, **kwargs): enable_crossdomain() +''' +handle the forward requests: dynamic forward url. +''' +class RESTForward(object): + exposed = True + + def __init__(self): + self.__forwards = [] + + self.__forwards.append({ + "vhost":"ossrs.net", + "app":"live", + "stream":"livestream", + "url":"push.ossrs.com", + }) + + self.__forwards.append({ + "app":"live", + "stream":"livestream", + "url":"push.ossrs.com", + }) + + self.__forwards.append({ + "app":"live", + "stream":"livestream", + "url":"rtmp://push.ossrs.com/test/teststream?auth_token=123456", + }) + + def GET(self): + enable_crossdomain() + + forwards = {} + return json.dumps(forwards) + + ''' + for SRS hook: on_forward + on_forward: + when srs reap a dvr file, call the hook, + the request in the POST data string is a object encode by json: + { + "action": "on_forward", + "server_id": "server_test", + "client_id": 1985, + "ip": "192.168.1.10", + "vhost": "video.test.com", + "app": "live", + "tcUrl": "rtmp://video.test.com/live?key=d2fa801d08e3f90ed1e1670e6e52651a", + "stream": "livestream", + "param":"?token=xxx&salt=yyy" + } + if valid, the hook must return HTTP code 200(Stauts OK) and response + an int value specifies the error code(0 corresponding to success): + 0 + ''' + def POST(self): + enable_crossdomain() + + # return the error code in str + code = Error.success + + req = cherrypy.request.body.read() + trace("post to forwards, req=%s"%(req)) + try: + json_req = json.loads(req) + except Exception, ex: + code = Error.system_parse_json + trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code)) + return json.dumps({"code": int(code), "data": None}) + + action = json_req["action"] + if action == "on_forward": + return self.__on_forward(json_req) + else: + trace("invalid request action: %s"%(json_req["action"])) + code = Error.request_invalid_action + + return json.dumps({"code": int(code), "data": None}) + + def OPTIONS(self, *args, **kwargs): + enable_crossdomain() + + def __on_forward(self, req): + code = Error.success + + trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, tcUrl=%s, stream=%s, param=%s"%( + req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["tcUrl"], req["stream"], req["param"] + )) + + # dynamic create forward config + forwards = [] + destinations = [] + + # handle param: ?forward=xxxxx&auth_token=xxxxx + # 1.delete ? + req_param = req["param"].replace('?', '', 1) + + # 2.delete 'forward=xxxxx' + new_req_param = "" + params = req_param.split("&") + for param in params: + result = param.split("=") + if result[0].find("forward") != -1: + destinations.append({ + "url": result[1], + }) + elif len(new_req_param) > 0: + new_req_param = new_req_param + "&" + param + else: + new_req_param = param + + # secne: dynamic config + for forward in self.__forwards: + # vhost exist + if hasattr(forward, "vhost"): + if len(forward["vhost"]) > 0 and req["vhost"] != forward["vhost"]: + continue + # app exist + if hasattr(forward, "app"): + if len(forward["app"]) > 0 and req["app"] != forward["app"]: + continue + # app exist + if hasattr(forward, "stream"): + if len(forward["stream"]) > 0 and req["stream"] != forward["stream"]: + continue + # no url + if forward["url"] is None: + continue + + # url maybe spell full rtmp address + url = forward["url"] + if url.find("rtmp://") == -1: + # format: xxx:xxx + # maybe you should use destination config + url = "rtmp://%s/%s"%(url, req['app']) + if len(req['vhost']) > 0 and req['vhost'] != "__defaultVhost__" and url.find(req['vhost']) == -1: + url = url + "?vhost=" + req['vhost'] + url = url + "/" + req['stream'] + if len(new_req_param) > 0: + url = url + "?" + new_req_param + + # append + forwards.append({ + "url": url, + }) + + # secne: parse client params, like: + # format1: rtmp://srs-server/live/stream?forward=aliyuncdn.com:1936&token=xxxxxx + # format2: rtmp://srs-server/live/stream?forward=rtmp://cdn.com/myapp/mystream?XXXXXX + for destination in destinations: + url = destination["url"] + if url.find("rtmp://") == -1: + # format: xxx:xxx + # maybe you should use destination config + url = "rtmp://%s/%s"%(url, req['app']) + if len(req['vhost']) > 0 and req['vhost'] != "__defaultVhost__" and url.find(req['vhost']) == -1: + url = url + "?vhost=" + req['vhost'] + url = url + "/" + req['stream'] + if len(new_req_param) > 0: + url = url + "?" + new_req_param + + # append + forwards.append({ + "url": url, + }) + + return json.dumps({"code": int(code), "data": {"forwards": forwards}}) + # HTTP RESTful path. class Root(object): exposed = True @@ -846,6 +1013,7 @@ def __init__(self): self.chats = RESTChats() self.servers = RESTServers() self.snapshots = RESTSnapshots() + self.forward = RESTForward() def GET(self): enable_crossdomain(); return json.dumps({"code":Error.success, "urls":{ diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 23d439a5cc..d22801503a 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -2795,7 +2795,7 @@ srs_error_t SrsConfig::check_normal_config() } else if (n == "forward") { for (int j = 0; j < (int)conf->directives.size(); j++) { string m = conf->at(j)->name; - if (m != "enabled" && m != "destination") { + if (m != "enabled" && m != "destination" && m != "backend") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.forward.%s of %s", m.c_str(), vhost->arg0().c_str()); } } @@ -4605,6 +4605,21 @@ SrsConfDirective* SrsConfig::get_forwards(string vhost) return conf->get("destination"); } +SrsConfDirective* SrsConfig::get_forward_backend(string vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return NULL; + } + + conf = conf->get("forward"); + if (!conf) { + return NULL; + } + + return conf->get("backend"); +} + SrsConfDirective* SrsConfig::get_vhost_http_hooks(string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index f48d71a6d1..19509d820c 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -607,6 +607,8 @@ class SrsConfig virtual bool get_forward_enabled(SrsConfDirective* vhost); // Get the forward directive of vhost. virtual SrsConfDirective* get_forwards(std::string vhost); + // Get the forward directive of backend. + virtual SrsConfDirective* get_forward_backend(std::string vhost); public: // Whether the srt sevice enabled diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 93e663f00f..5464ff6bdb 100755 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -52,6 +52,8 @@ SrsForwarder::~SrsForwarder() srs_freep(sh_video); srs_freep(sh_audio); + + srs_freep(req); } srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep) @@ -60,7 +62,7 @@ srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep) // it's ok to use the request object, // SrsLiveSource already copy it and never delete it. - req = r; + req = r->copy(); // the ep(endpoint) to forward to ep_forward = ep; diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index 2541161687..cc896e76ce 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -482,6 +482,100 @@ srs_error_t SrsHttpHooks::discover_co_workers(string url, string& host, int& por return err; } +// Request: +// POST /api/v1/forward +// { +// "action": "on_forward", +// "server_id": "vid-k21d7y2", +// "client_id": "9o7g1330", +// "ip": "127.0.0.1", +// "vhost": "__defaultVhost__", +// "app": "live", +// "tcUrl": "rtmp://127.0.0.1:1935/live", +// "stream": "livestream", +// "param": "?forward=rtmp://ossrs.net/live/livestream" +// } +// Response: +// { +// "code": 0, +// "data": { +// "forwards":[{ +// "url": "rtmp://ossrs.net:1935/live/livestream?auth_token=xxx" +// },{ +// "url": "rtmp://aliyuncdn.com:1935/live/livestream?auth_token=xxx" +// }] +// } +// } +srs_error_t SrsHttpHooks::on_forward_backend(string url, SrsRequest* req, std::vector& rtmp_urls) +{ + srs_error_t err = srs_success; + + SrsContextId cid = _srs_context->get_id(); + + SrsStatistic* stat = SrsStatistic::instance(); + + SrsJsonObject* obj = SrsJsonAny::object(); + SrsAutoFree(SrsJsonObject, obj); + + obj->set("action", SrsJsonAny::str("on_forward")); + obj->set("server_id", SrsJsonAny::str(stat->server_id().c_str())); + obj->set("client_id", SrsJsonAny::str(cid.c_str())); + obj->set("ip", SrsJsonAny::str(req->ip.c_str())); + obj->set("vhost", SrsJsonAny::str(req->vhost.c_str())); + obj->set("app", SrsJsonAny::str(req->app.c_str())); + obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str())); + obj->set("stream", SrsJsonAny::str(req->stream.c_str())); + obj->set("param", SrsJsonAny::str(req->param.c_str())); + + std::string data = obj->dumps(); + std::string res; + int status_code; + + SrsHttpClient http; + if ((err = do_post(&http, url, data, status_code, res)) != srs_success) { + return srs_error_wrap(err, "http: on_forward_backend failed, client_id=%s, url=%s, request=%s, response=%s, code=%d", + cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code); + } + + // parse string res to json. + SrsJsonAny* info = SrsJsonAny::loads(res); + if (!info) { + return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "load json from %s", res.c_str()); + } + SrsAutoFree(SrsJsonAny, info); + + // response error code in string. + if (!info->is_object()) { + return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "response %s", res.c_str()); + } + + SrsJsonAny* prop = NULL; + // response standard object, format in json: {} + SrsJsonObject* res_info = info->to_object(); + if ((prop = res_info->ensure_property_object("data")) == NULL) { + return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "parse data %s", res.c_str()); + } + + SrsJsonObject* p = prop->to_object(); + if ((prop = p->ensure_property_array("forwards")) == NULL) { + return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "parse forwards %s", res.c_str()); + } + + SrsJsonArray* forwards = prop->to_array(); + for (int i = 0; i < forwards->count(); i++) { + prop = forwards->at(i); + SrsJsonObject* forward = prop->to_object(); + if ((prop = forward->ensure_property_string("url")) != NULL) { + rtmp_urls.push_back(prop->to_str()); + } + } + + srs_trace("http: on_forward_backend ok, client_id=%s, url=%s, request=%s, response=%s", + cid.c_str(), url.c_str(), data.c_str(), res.c_str()); + + return err; +} + srs_error_t SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, string& res) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_http_hooks.hpp b/trunk/src/app/srs_app_http_hooks.hpp index 8e3e01663c..f0ad24d017 100644 --- a/trunk/src/app/srs_app_http_hooks.hpp +++ b/trunk/src/app/srs_app_http_hooks.hpp @@ -10,6 +10,7 @@ #include #include +#include class SrsHttpUri; class SrsStSocket; @@ -79,6 +80,10 @@ class SrsHttpHooks static srs_error_t on_hls_notify(SrsContextId cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify); // Discover co-workers for origin cluster. static srs_error_t discover_co_workers(std::string url, std::string& host, int& port); + // The on_forward_backend hook, when publish stream start to forward + // @param url the api server url, to valid the client. + // ignore if empty. + static srs_error_t on_forward_backend(std::string url, SrsRequest* req, std::vector& rtmp_urls); private: static srs_error_t do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, std::string& res); }; diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 40d2083167..3d8e968380 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -34,6 +34,7 @@ using namespace std; #include #include #include +#include #define CONST_MAX_JITTER_MS 250 #define CONST_MAX_JITTER_MS_NEG -250 @@ -1473,7 +1474,54 @@ srs_error_t SrsOriginHub::create_forwarders() return err; } - SrsConfDirective* conf = _srs_config->get_forwards(req->vhost); + srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); + + SrsConfDirective* conf = _srs_config->get_forward_backend(req->vhost); + if (conf) { + int count = conf->args.size(); + for (int i = 0; i < count; i++) { + std::string backend_url = conf->args.at(i); + + // create forward by backend + std::vector urls; + if ((err = SrsHttpHooks::on_forward_backend(backend_url, req, urls)) != srs_success) { + // ignore + srs_trace("get backend failed, %s", srs_error_desc(err).c_str()); + continue; + } + + std::vector::iterator it; + for (it = urls.begin(); it != urls.end(); ++it) { + std::string url = *it; + + // create forwarder by url + SrsRequest* freq = new SrsRequest(); + SrsAutoFree(SrsRequest, freq); + srs_parse_rtmp_url(url, freq->tcUrl, freq->stream); + srs_discovery_tc_url(freq->tcUrl, freq->schema, freq->host, freq->vhost, freq->app, freq->stream, freq->port, freq->param); + + SrsForwarder* forwarder = new SrsForwarder(this); + forwarders.push_back(forwarder); + + std::stringstream forward_server; + forward_server << freq->host << ":" << freq->port; + + // initialize the forwarder with request. + if ((err = forwarder->initialize(freq, forward_server.str())) != srs_success) { + return srs_error_wrap(err, "init forwarder"); + } + + forwarder->set_queue_size(queue_size); + + if ((err = forwarder->on_publish()) != srs_success) { + return srs_error_wrap(err, "start backend forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s", + req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), forward_server.str().c_str()); + } + } + } + } + + conf = _srs_config->get_forwards(req->vhost); for (int i = 0; conf && i < (int)conf->args.size(); i++) { std::string forward_server = conf->args.at(i); @@ -1485,7 +1533,6 @@ srs_error_t SrsOriginHub::create_forwarders() return srs_error_wrap(err, "init forwarder"); } - srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); forwarder->set_queue_size(queue_size); if ((err = forwarder->on_publish()) != srs_success) { diff --git a/trunk/src/utest/srs_utest_config.cpp b/trunk/src/utest/srs_utest_config.cpp index e5532d6786..0f9273d468 100644 --- a/trunk/src/utest/srs_utest_config.cpp +++ b/trunk/src/utest/srs_utest_config.cpp @@ -2914,6 +2914,13 @@ VOID TEST(ConfigMainTest, CheckVhostConfig2) EXPECT_EQ(5000000, conf.get_publish_normal_timeout("ossrs.net")); EXPECT_FALSE(conf.get_forward_enabled("ossrs.net")); EXPECT_TRUE(conf.get_forwards("ossrs.net") == NULL); + EXPECT_TRUE(conf.get_forward_backend("ossrs.net") == NULL); + } + + if (true) { + MockSrsConfig conf; + HELPER_ASSERT_SUCCESS(conf.parse(_MIN_OK_CONF "vhost ossrs.net{forward {backend xxx;}}")); + EXPECT_TRUE(conf.get_forward_backend("ossrs.net") != NULL); } if (true) { From 6458e372c42527bb0a0a1c5b36efc0d0034993f0 Mon Sep 17 00:00:00 2001 From: chundonglinlin Date: Sun, 13 Feb 2022 17:35:10 +0800 Subject: [PATCH 2/6] Forward: if call forward backend failed, then return directly. --- trunk/src/app/srs_app_source.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 3d8e968380..769298ae8a 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1485,9 +1485,7 @@ srs_error_t SrsOriginHub::create_forwarders() // create forward by backend std::vector urls; if ((err = SrsHttpHooks::on_forward_backend(backend_url, req, urls)) != srs_success) { - // ignore - srs_trace("get backend failed, %s", srs_error_desc(err).c_str()); - continue; + return srs_error_wrap(err, "get forward backend failed"); } std::vector::iterator it; From d1894c31a4c0dc2a5a1d0ed3d26401a4f8fdfaff Mon Sep 17 00:00:00 2001 From: chundonglinlin Date: Mon, 14 Feb 2022 13:53:22 +0800 Subject: [PATCH 3/6] Forward: add API description and change return value format. --- trunk/conf/forward.master.conf | 2 +- trunk/conf/full.conf | 28 +++++++++++++++ trunk/research/api-server/server.py | 53 +++++++++++++++------------- trunk/src/app/srs_app_http_hooks.cpp | 40 +++++---------------- trunk/src/app/srs_app_source.cpp | 23 +++++++----- 5 files changed, 80 insertions(+), 66 deletions(-) diff --git a/trunk/conf/forward.master.conf b/trunk/conf/forward.master.conf index afdfcfa25b..f1ccb24261 100644 --- a/trunk/conf/forward.master.conf +++ b/trunk/conf/forward.master.conf @@ -10,7 +10,7 @@ srs_log_tank console; vhost __defaultVhost__ { forward { enabled on; - #destination 127.0.0.1:19350; + destination 127.0.0.1:19350; backend http://127.0.0.1:8085/api/v1/forward; } } diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index c9652896e1..8670e4db7f 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -669,6 +669,34 @@ vhost same.vhost.forward.srs.com { # active-active for cdn to build high available fault tolerance system. # format: {ip}:{port} {ip_N}:{port_N} destination 127.0.0.1:1936 127.0.0.1:1937; + + # when client(encoder) publish to vhost/app/stream, then create forwarder, call the hook, + # the request in the POST data string is a object encode by json: + # { + # "action": "on_forward", + # "server_id": "vid-k21d7y2", + # "client_id": "9o7g1330", + # "ip": "127.0.0.1", + # "vhost": "video.test.com", + # "app": "live", + # "tcUrl": "rtmp://127.0.0.1:1935/live", + # "stream": "livestream", + # "param": "?forward=rtmp://ossrs.net/live/livestream" + # } + # if valid, the hook must return HTTP code 200(Status OK) and response + # an int value specifies the error code(0 corresponding to success): + # { + # "code": 0, + # "data": { + # "urls":[ + # "rtmp://ossrs.net:1935/live/livestream?auth_token=xxx", + # "rtmp://cdn.com:1935/live/livestream?auth_token=xxx" + # ] + # } + # } + # only support one api hook, format: + # backend http://xxx/api0 + backend http://127.0.0.1:8085/api/v1/forward; } } diff --git a/trunk/research/api-server/server.py b/trunk/research/api-server/server.py index e28dd7d9d6..57419e0470 100755 --- a/trunk/research/api-server/server.py +++ b/trunk/research/api-server/server.py @@ -814,6 +814,7 @@ class RESTForward(object): def __init__(self): self.__forwards = [] + # match vhost+app+stream to forward url self.__forwards.append({ "vhost":"ossrs.net", "app":"live", @@ -821,13 +822,17 @@ def __init__(self): "url":"push.ossrs.com", }) + # match app+stream to forward url self.__forwards.append({ + "vhost":"", "app":"live", "stream":"livestream", "url":"push.ossrs.com", }) + # match app+stream to forward url self.__forwards.append({ + "vhost":"", "app":"live", "stream":"livestream", "url":"rtmp://push.ossrs.com/test/teststream?auth_token=123456", @@ -907,9 +912,7 @@ def __on_forward(self, req): for param in params: result = param.split("=") if result[0].find("forward") != -1: - destinations.append({ - "url": result[1], - }) + destinations.append(result[1],) elif len(new_req_param) > 0: new_req_param = new_req_param + "&" + param else: @@ -917,20 +920,18 @@ def __on_forward(self, req): # secne: dynamic config for forward in self.__forwards: + # empty forward url + if forward["url"] is None: + continue + # vhost exist - if hasattr(forward, "vhost"): - if len(forward["vhost"]) > 0 and req["vhost"] != forward["vhost"]: - continue - # app exist - if hasattr(forward, "app"): - if len(forward["app"]) > 0 and req["app"] != forward["app"]: - continue + if len(forward['vhost']) > 0 and req['vhost'] != forward['vhost']: + continue # app exist - if hasattr(forward, "stream"): - if len(forward["stream"]) > 0 and req["stream"] != forward["stream"]: - continue - # no url - if forward["url"] is None: + if len(forward["app"]) > 0 and req["app"] != forward["app"]: + continue + # stream exist + if len(forward["stream"]) > 0 and req["stream"] != forward["stream"]: continue # url maybe spell full rtmp address @@ -946,15 +947,16 @@ def __on_forward(self, req): url = url + "?" + new_req_param # append - forwards.append({ - "url": url, - }) + forwards.append(url) + + #trace log + trace("add dynamic forward url: %s"%(url)) # secne: parse client params, like: - # format1: rtmp://srs-server/live/stream?forward=aliyuncdn.com:1936&token=xxxxxx + # format1: rtmp://srs-server/live/stream?forward=ossrs.com:1936&token=xxxxxx # format2: rtmp://srs-server/live/stream?forward=rtmp://cdn.com/myapp/mystream?XXXXXX - for destination in destinations: - url = destination["url"] + for dest in destinations: + url = dest if url.find("rtmp://") == -1: # format: xxx:xxx # maybe you should use destination config @@ -966,11 +968,12 @@ def __on_forward(self, req): url = url + "?" + new_req_param # append - forwards.append({ - "url": url, - }) + forwards.append(url) + + #trace log + trace("add client params forward url: %s, %s"%(url, req_param)) - return json.dumps({"code": int(code), "data": {"forwards": forwards}}) + return json.dumps({"code": int(code), "data": {"urls": forwards}}) # HTTP RESTful path. class Root(object): diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index cc896e76ce..f49b4a5ed1 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -482,30 +482,6 @@ srs_error_t SrsHttpHooks::discover_co_workers(string url, string& host, int& por return err; } -// Request: -// POST /api/v1/forward -// { -// "action": "on_forward", -// "server_id": "vid-k21d7y2", -// "client_id": "9o7g1330", -// "ip": "127.0.0.1", -// "vhost": "__defaultVhost__", -// "app": "live", -// "tcUrl": "rtmp://127.0.0.1:1935/live", -// "stream": "livestream", -// "param": "?forward=rtmp://ossrs.net/live/livestream" -// } -// Response: -// { -// "code": 0, -// "data": { -// "forwards":[{ -// "url": "rtmp://ossrs.net:1935/live/livestream?auth_token=xxx" -// },{ -// "url": "rtmp://aliyuncdn.com:1935/live/livestream?auth_token=xxx" -// }] -// } -// } srs_error_t SrsHttpHooks::on_forward_backend(string url, SrsRequest* req, std::vector& rtmp_urls) { srs_error_t err = srs_success; @@ -557,16 +533,16 @@ srs_error_t SrsHttpHooks::on_forward_backend(string url, SrsRequest* req, std::v } SrsJsonObject* p = prop->to_object(); - if ((prop = p->ensure_property_array("forwards")) == NULL) { - return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "parse forwards %s", res.c_str()); + if ((prop = p->ensure_property_array("urls")) == NULL) { + return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "parse urls %s", res.c_str()); } - SrsJsonArray* forwards = prop->to_array(); - for (int i = 0; i < forwards->count(); i++) { - prop = forwards->at(i); - SrsJsonObject* forward = prop->to_object(); - if ((prop = forward->ensure_property_string("url")) != NULL) { - rtmp_urls.push_back(prop->to_str()); + SrsJsonArray* urls = prop->to_array(); + for (int i = 0; i < urls->count(); i++) { + prop = urls->at(i); + string rtmp_url = prop->to_str(); + if (!rtmp_url.empty()) { + rtmp_urls.push_back(rtmp_url); } } diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 769298ae8a..446dad3ee7 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1476,28 +1476,31 @@ srs_error_t SrsOriginHub::create_forwarders() srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); - SrsConfDirective* conf = _srs_config->get_forward_backend(req->vhost); - if (conf) { - int count = conf->args.size(); - for (int i = 0; i < count; i++) { - std::string backend_url = conf->args.at(i); + // For backend config + if (true) { + SrsConfDirective* conf = _srs_config->get_forward_backend(req->vhost); + if (conf) { + // only get first backend url + std::string backend_url = conf->arg0(); - // create forward by backend + // get urls on forward backend std::vector urls; if ((err = SrsHttpHooks::on_forward_backend(backend_url, req, urls)) != srs_success) { return srs_error_wrap(err, "get forward backend failed"); } + // create forwarders by urls std::vector::iterator it; for (it = urls.begin(); it != urls.end(); ++it) { std::string url = *it; - // create forwarder by url + // create temp SrsRequest by url SrsRequest* freq = new SrsRequest(); SrsAutoFree(SrsRequest, freq); srs_parse_rtmp_url(url, freq->tcUrl, freq->stream); srs_discovery_tc_url(freq->tcUrl, freq->schema, freq->host, freq->vhost, freq->app, freq->stream, freq->port, freq->param); + // create forwarder SrsForwarder* forwarder = new SrsForwarder(this); forwarders.push_back(forwarder); @@ -1516,10 +1519,14 @@ srs_error_t SrsOriginHub::create_forwarders() req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), forward_server.str().c_str()); } } + + // if backend is configured and normal, return directly + return err; } } - conf = _srs_config->get_forwards(req->vhost); + // For destanition config + SrsConfDirective* conf = _srs_config->get_forwards(req->vhost); for (int i = 0; conf && i < (int)conf->args.size(); i++) { std::string forward_server = conf->args.at(i); From 45c27bb5f88eeee7a264bcadb2ad0a021f712479 Mon Sep 17 00:00:00 2001 From: chundonglinlin Date: Tue, 15 Feb 2022 16:39:04 +0800 Subject: [PATCH 4/6] Forward: add backend conf file and wrapper function for backend service. --- trunk/conf/forward.backend.conf | 15 +++ trunk/conf/forward.master.conf | 1 - trunk/research/api-server/server.py | 106 ++---------------- trunk/src/app/srs_app_source.cpp | 168 +++++++++++++++------------- trunk/src/app/srs_app_source.hpp | 3 +- 5 files changed, 117 insertions(+), 176 deletions(-) create mode 100644 trunk/conf/forward.backend.conf diff --git a/trunk/conf/forward.backend.conf b/trunk/conf/forward.backend.conf new file mode 100644 index 0000000000..2997fdca77 --- /dev/null +++ b/trunk/conf/forward.backend.conf @@ -0,0 +1,15 @@ +# the config for srs to forward to slave service +# @see https://github.com/ossrs/srs/wiki/v5_CN_SampleForward +# @see full.conf for detail config. + +listen 1935; +max_connections 1000; +pid ./objs/srs.master.pid; +daemon off; +srs_log_tank console; +vhost __defaultVhost__ { + forward { + enabled on; + backend http://127.0.0.1:8085/api/v1/forward; + } +} diff --git a/trunk/conf/forward.master.conf b/trunk/conf/forward.master.conf index f1ccb24261..630a4c84ea 100644 --- a/trunk/conf/forward.master.conf +++ b/trunk/conf/forward.master.conf @@ -11,6 +11,5 @@ vhost __defaultVhost__ { forward { enabled on; destination 127.0.0.1:19350; - backend http://127.0.0.1:8085/api/v1/forward; } } diff --git a/trunk/research/api-server/server.py b/trunk/research/api-server/server.py index 57419e0470..1cfb73a021 100755 --- a/trunk/research/api-server/server.py +++ b/trunk/research/api-server/server.py @@ -814,30 +814,6 @@ class RESTForward(object): def __init__(self): self.__forwards = [] - # match vhost+app+stream to forward url - self.__forwards.append({ - "vhost":"ossrs.net", - "app":"live", - "stream":"livestream", - "url":"push.ossrs.com", - }) - - # match app+stream to forward url - self.__forwards.append({ - "vhost":"", - "app":"live", - "stream":"livestream", - "url":"push.ossrs.com", - }) - - # match app+stream to forward url - self.__forwards.append({ - "vhost":"", - "app":"live", - "stream":"livestream", - "url":"rtmp://push.ossrs.com/test/teststream?auth_token=123456", - }) - def GET(self): enable_crossdomain() @@ -898,80 +874,14 @@ def __on_forward(self, req): req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["tcUrl"], req["stream"], req["param"] )) - # dynamic create forward config - forwards = [] - destinations = [] - - # handle param: ?forward=xxxxx&auth_token=xxxxx - # 1.delete ? - req_param = req["param"].replace('?', '', 1) - - # 2.delete 'forward=xxxxx' - new_req_param = "" - params = req_param.split("&") - for param in params: - result = param.split("=") - if result[0].find("forward") != -1: - destinations.append(result[1],) - elif len(new_req_param) > 0: - new_req_param = new_req_param + "&" + param - else: - new_req_param = param - - # secne: dynamic config - for forward in self.__forwards: - # empty forward url - if forward["url"] is None: - continue - - # vhost exist - if len(forward['vhost']) > 0 and req['vhost'] != forward['vhost']: - continue - # app exist - if len(forward["app"]) > 0 and req["app"] != forward["app"]: - continue - # stream exist - if len(forward["stream"]) > 0 and req["stream"] != forward["stream"]: - continue - - # url maybe spell full rtmp address - url = forward["url"] - if url.find("rtmp://") == -1: - # format: xxx:xxx - # maybe you should use destination config - url = "rtmp://%s/%s"%(url, req['app']) - if len(req['vhost']) > 0 and req['vhost'] != "__defaultVhost__" and url.find(req['vhost']) == -1: - url = url + "?vhost=" + req['vhost'] - url = url + "/" + req['stream'] - if len(new_req_param) > 0: - url = url + "?" + new_req_param - - # append - forwards.append(url) - - #trace log - trace("add dynamic forward url: %s"%(url)) - - # secne: parse client params, like: - # format1: rtmp://srs-server/live/stream?forward=ossrs.com:1936&token=xxxxxx - # format2: rtmp://srs-server/live/stream?forward=rtmp://cdn.com/myapp/mystream?XXXXXX - for dest in destinations: - url = dest - if url.find("rtmp://") == -1: - # format: xxx:xxx - # maybe you should use destination config - url = "rtmp://%s/%s"%(url, req['app']) - if len(req['vhost']) > 0 and req['vhost'] != "__defaultVhost__" and url.find(req['vhost']) == -1: - url = url + "?vhost=" + req['vhost'] - url = url + "/" + req['stream'] - if len(new_req_param) > 0: - url = url + "?" + new_req_param - - # append - forwards.append(url) - - #trace log - trace("add client params forward url: %s, %s"%(url, req_param)) + ''' + backend service config description: + support multiple rtmp urls(custom addresses or third-party cdn service), + url's host is slave service. + For example: + ["rtmp://127.0.0.1:19350/test/teststream", "rtmp://127.0.0.1:19350/test/teststream?token=xxxx"] + ''' + forwards = ["rtmp://127.0.0.1:19350/test/teststream"] return json.dumps({"code": int(code), "data": {"urls": forwards}}) diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 446dad3ee7..9417a18959 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -808,7 +808,7 @@ SrsSharedPtrMessage* SrsMixQueue::pop() SrsOriginHub::SrsOriginHub() { source = NULL; - req = NULL; + req_ = NULL; is_active = false; hls = new SrsHls(); @@ -852,22 +852,22 @@ srs_error_t SrsOriginHub::initialize(SrsLiveSource* s, SrsRequest* r) { srs_error_t err = srs_success; - req = r; + req_ = r; source = s; if ((err = format->initialize()) != srs_success) { return srs_error_wrap(err, "format initialize"); } - if ((err = hls->initialize(this, req)) != srs_success) { + if ((err = hls->initialize(this, req_)) != srs_success) { return srs_error_wrap(err, "hls initialize"); } - if ((err = dash->initialize(this, req)) != srs_success) { + if ((err = dash->initialize(this, req_)) != srs_success) { return srs_error_wrap(err, "dash initialize"); } - if ((err = dvr->initialize(this, req)) != srs_success) { + if ((err = dvr->initialize(this, req_)) != srs_success) { return srs_error_wrap(err, "dvr initialize"); } @@ -953,7 +953,7 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio) // when got audio stream info. SrsStatistic* stat = SrsStatistic::instance(); - if ((err = stat->on_audio_info(req, SrsAudioCodecIdAAC, c->sound_rate, c->sound_type, c->aac_object)) != srs_success) { + if ((err = stat->on_audio_info(req_, SrsAudioCodecIdAAC, c->sound_rate, c->sound_type, c->aac_object)) != srs_success) { return srs_error_wrap(err, "stat audio"); } @@ -967,7 +967,7 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio) if ((err = hls->on_audio(msg, format)) != srs_success) { // apply the error strategy for hls. // @see https://github.com/ossrs/srs/issues/264 - std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost); + std::string hls_error_strategy = _srs_config->get_hls_on_error(req_->vhost); if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) { srs_warn("hls: ignore audio error %s", srs_error_desc(err).c_str()); hls->on_unpublish(); @@ -1026,7 +1026,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se // user can disable the sps parse to workaround when parse sps failed. // @see https://github.com/ossrs/srs/issues/474 if (is_sequence_header) { - format->avc_parse_sps = _srs_config->get_parse_sps(req->vhost); + format->avc_parse_sps = _srs_config->get_parse_sps(req_->vhost); } if ((err = format->on_video(msg)) != srs_success) { @@ -1047,7 +1047,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se // when got video stream info. SrsStatistic* stat = SrsStatistic::instance(); - if ((err = stat->on_video_info(req, SrsVideoCodecIdAVC, c->avc_profile, c->avc_level, c->width, c->height)) != srs_success) { + if ((err = stat->on_video_info(req_, SrsVideoCodecIdAVC, c->avc_profile, c->avc_level, c->width, c->height)) != srs_success) { return srs_error_wrap(err, "stat video"); } @@ -1067,7 +1067,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se // TODO: We should support more strategies. // apply the error strategy for hls. // @see https://github.com/ossrs/srs/issues/264 - std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost); + std::string hls_error_strategy = _srs_config->get_hls_on_error(req_->vhost); if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) { srs_warn("hls: ignore video error %s", srs_error_desc(err).c_str()); hls->on_unpublish(); @@ -1127,7 +1127,7 @@ srs_error_t SrsOriginHub::on_publish() } // TODO: FIXME: use initialize to set req. - if ((err = encoder->on_publish(req)) != srs_success) { + if ((err = encoder->on_publish(req_)) != srs_success) { return srs_error_wrap(err, "encoder publish"); } @@ -1140,7 +1140,7 @@ srs_error_t SrsOriginHub::on_publish() } // @see https://github.com/ossrs/srs/issues/1613#issuecomment-961657927 - if ((err = dvr->on_publish(req)) != srs_success) { + if ((err = dvr->on_publish(req_)) != srs_success) { return srs_error_wrap(err, "dvr publish"); } @@ -1152,7 +1152,7 @@ srs_error_t SrsOriginHub::on_publish() #endif // TODO: FIXME: use initialize to set req. - if ((err = ng_exec->on_publish(req)) != srs_success) { + if ((err = ng_exec->on_publish(req_)) != srs_success) { return srs_error_wrap(err, "exec publish"); } @@ -1237,7 +1237,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_forward(string vhost) { srs_error_t err = srs_success; - if (req->vhost != vhost) { + if (req_->vhost != vhost) { return err; } @@ -1264,7 +1264,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_dash(string vhost) { srs_error_t err = srs_success; - if (req->vhost != vhost) { + if (req_->vhost != vhost) { return err; } @@ -1306,7 +1306,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_hls(string vhost) { srs_error_t err = srs_success; - if (req->vhost != vhost) { + if (req_->vhost != vhost) { return err; } @@ -1356,7 +1356,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_hds(string vhost) { srs_error_t err = srs_success; - if (req->vhost != vhost) { + if (req_->vhost != vhost) { return err; } @@ -1383,7 +1383,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_dvr(string vhost) { srs_error_t err = srs_success; - if (req->vhost != vhost) { + if (req_->vhost != vhost) { return err; } @@ -1398,12 +1398,12 @@ srs_error_t SrsOriginHub::on_reload_vhost_dvr(string vhost) } // reinitialize the dvr, update plan. - if ((err = dvr->initialize(this, req)) != srs_success) { + if ((err = dvr->initialize(this, req_)) != srs_success) { return srs_error_wrap(err, "reload dvr"); } // start to publish by new plan. - if ((err = dvr->on_publish(req)) != srs_success) { + if ((err = dvr->on_publish(req_)) != srs_success) { return srs_error_wrap(err, "dvr publish failed"); } @@ -1420,7 +1420,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_transcode(string vhost) { srs_error_t err = srs_success; - if (req->vhost != vhost) { + if (req_->vhost != vhost) { return err; } @@ -1433,7 +1433,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_transcode(string vhost) return err; } - if ((err = encoder->on_publish(req)) != srs_success) { + if ((err = encoder->on_publish(req_)) != srs_success) { return srs_error_wrap(err, "start encoder failed"); } srs_trace("vhost %s transcode reload success", vhost.c_str()); @@ -1445,7 +1445,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_exec(string vhost) { srs_error_t err = srs_success; - if (req->vhost != vhost) { + if (req_->vhost != vhost) { return err; } @@ -1458,7 +1458,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_exec(string vhost) return err; } - if ((err = ng_exec->on_publish(req)) != srs_success) { + if ((err = ng_exec->on_publish(req_)) != srs_success) { return srs_error_wrap(err, "start exec failed"); } srs_trace("vhost %s exec reload success", vhost.c_str()); @@ -1470,63 +1470,19 @@ srs_error_t SrsOriginHub::create_forwarders() { srs_error_t err = srs_success; - if (!_srs_config->get_forward_enabled(req->vhost)) { + if (!_srs_config->get_forward_enabled(req_->vhost)) { return err; } - - srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); // For backend config - if (true) { - SrsConfDirective* conf = _srs_config->get_forward_backend(req->vhost); - if (conf) { - // only get first backend url - std::string backend_url = conf->arg0(); - - // get urls on forward backend - std::vector urls; - if ((err = SrsHttpHooks::on_forward_backend(backend_url, req, urls)) != srs_success) { - return srs_error_wrap(err, "get forward backend failed"); - } - - // create forwarders by urls - std::vector::iterator it; - for (it = urls.begin(); it != urls.end(); ++it) { - std::string url = *it; - - // create temp SrsRequest by url - SrsRequest* freq = new SrsRequest(); - SrsAutoFree(SrsRequest, freq); - srs_parse_rtmp_url(url, freq->tcUrl, freq->stream); - srs_discovery_tc_url(freq->tcUrl, freq->schema, freq->host, freq->vhost, freq->app, freq->stream, freq->port, freq->param); - - // create forwarder - SrsForwarder* forwarder = new SrsForwarder(this); - forwarders.push_back(forwarder); - - std::stringstream forward_server; - forward_server << freq->host << ":" << freq->port; - - // initialize the forwarder with request. - if ((err = forwarder->initialize(freq, forward_server.str())) != srs_success) { - return srs_error_wrap(err, "init forwarder"); - } - - forwarder->set_queue_size(queue_size); - - if ((err = forwarder->on_publish()) != srs_success) { - return srs_error_wrap(err, "start backend forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s", - req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), forward_server.str().c_str()); - } - } - - // if backend is configured and normal, return directly - return err; - } + // If you configure backend service, return backend status(default: false) + bool backend_status = false; + if (((err = create_backend_forwarders(backend_status)) != srs_success) || backend_status) { + return err; } // For destanition config - SrsConfDirective* conf = _srs_config->get_forwards(req->vhost); + SrsConfDirective* conf = _srs_config->get_forwards(req_->vhost); for (int i = 0; conf && i < (int)conf->args.size(); i++) { std::string forward_server = conf->args.at(i); @@ -1534,21 +1490,81 @@ srs_error_t SrsOriginHub::create_forwarders() forwarders.push_back(forwarder); // initialize the forwarder with request. - if ((err = forwarder->initialize(req, forward_server)) != srs_success) { + if ((err = forwarder->initialize(req_, forward_server)) != srs_success) { return srs_error_wrap(err, "init forwarder"); } + srs_utime_t queue_size = _srs_config->get_queue_length(req_->vhost); forwarder->set_queue_size(queue_size); if ((err = forwarder->on_publish()) != srs_success) { return srs_error_wrap(err, "start forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s", - req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), forward_server.c_str()); + req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), forward_server.c_str()); } } return err; } +srs_error_t SrsOriginHub::create_backend_forwarders(bool& status) +{ + srs_error_t err = srs_success; + + // default not configure backend service + status = false; + + SrsConfDirective* conf = _srs_config->get_forward_backend(req_->vhost); + if (!conf || conf->arg0().empty()) { + return err; + } + + // configure backend service + status = true; + + // only get first backend url + std::string backend_url = conf->arg0(); + + // get urls on forward backend + std::vector urls; + if ((err = SrsHttpHooks::on_forward_backend(backend_url, req_, urls)) != srs_success) { + return srs_error_wrap(err, "get forward backend failed"); + } + + // create forwarders by urls + std::vector::iterator it; + for (it = urls.begin(); it != urls.end(); ++it) { + std::string url = *it; + + // create temp Request by url + SrsRequest* req = new SrsRequest(); + SrsAutoFree(SrsRequest, req); + srs_parse_rtmp_url(url, req->tcUrl, req->stream); + srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param); + + // create forwarder + SrsForwarder* forwarder = new SrsForwarder(this); + forwarders.push_back(forwarder); + + std::stringstream forward_server; + forward_server << req->host << ":" << req->port; + + // initialize the forwarder with request. + if ((err = forwarder->initialize(req, forward_server.str())) != srs_success) { + return srs_error_wrap(err, "init forwarder"); + } + + srs_utime_t queue_size = _srs_config->get_queue_length(req_->vhost); + forwarder->set_queue_size(queue_size); + + if ((err = forwarder->on_publish()) != srs_success) { + return srs_error_wrap(err, "start backend forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s", + req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), forward_server.str().c_str()); + } + } + + return err; +} + void SrsOriginHub::destroy_forwarders() { std::vector::iterator it; diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 3043034125..677fa5c49d 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -310,7 +310,7 @@ class SrsOriginHub : public ISrsReloadHandler { private: SrsLiveSource* source; - SrsRequest* req; + SrsRequest* req_; bool is_active; private: // The format, codec information. @@ -375,6 +375,7 @@ class SrsOriginHub : public ISrsReloadHandler virtual srs_error_t on_reload_vhost_exec(std::string vhost); private: virtual srs_error_t create_forwarders(); + virtual srs_error_t create_backend_forwarders(bool& status); virtual void destroy_forwarders(); }; From 66af013a9ecc7dd27dd42c448308f05e2ef8f3c1 Mon Sep 17 00:00:00 2001 From: chundonglinlin Date: Tue, 15 Feb 2022 17:45:43 +0800 Subject: [PATCH 5/6] Forward: add backend comment in full.conf and update forward.backend.conf. --- trunk/conf/forward.backend.conf | 2 +- trunk/conf/full.conf | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/trunk/conf/forward.backend.conf b/trunk/conf/forward.backend.conf index 2997fdca77..b032d2cd68 100644 --- a/trunk/conf/forward.backend.conf +++ b/trunk/conf/forward.backend.conf @@ -4,7 +4,7 @@ listen 1935; max_connections 1000; -pid ./objs/srs.master.pid; +pid ./objs/srs.backend.pid; daemon off; srs_log_tank console; vhost __defaultVhost__ { diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 8670e4db7f..0bf35da53b 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -670,18 +670,18 @@ vhost same.vhost.forward.srs.com { # format: {ip}:{port} {ip_N}:{port_N} destination 127.0.0.1:1936 127.0.0.1:1937; - # when client(encoder) publish to vhost/app/stream, then create forwarder, call the hook, + # when client(encoder) publish to vhost/app/stream, call the hook in creating backend forwarder. # the request in the POST data string is a object encode by json: # { # "action": "on_forward", # "server_id": "vid-k21d7y2", # "client_id": "9o7g1330", # "ip": "127.0.0.1", - # "vhost": "video.test.com", + # "vhost": "__defaultVhost__", # "app": "live", # "tcUrl": "rtmp://127.0.0.1:1935/live", # "stream": "livestream", - # "param": "?forward=rtmp://ossrs.net/live/livestream" + # "param": "" # } # if valid, the hook must return HTTP code 200(Status OK) and response # an int value specifies the error code(0 corresponding to success): @@ -689,11 +689,13 @@ vhost same.vhost.forward.srs.com { # "code": 0, # "data": { # "urls":[ - # "rtmp://ossrs.net:1935/live/livestream?auth_token=xxx", - # "rtmp://cdn.com:1935/live/livestream?auth_token=xxx" + # "rtmp://127.0.0.1:19350/test/teststream" # ] # } # } + # PS: you can transform params to backend service, such as: + # { "param": "?forward=rtmp://127.0.0.1:19351/test/livestream" } + # then backend return forward's url in response. # only support one api hook, format: # backend http://xxx/api0 backend http://127.0.0.1:8085/api/v1/forward; From 77a14e1f4ea42475e318c81346a482b087e6da1d Mon Sep 17 00:00:00 2001 From: chundonglinlin Date: Wed, 16 Feb 2022 09:13:23 +0800 Subject: [PATCH 6/6] Forward: rename backend param and add comment tips. --- trunk/conf/full.conf | 1 + trunk/src/app/srs_app_source.cpp | 21 +++++++++++++-------- trunk/src/app/srs_app_source.hpp | 2 +- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 0bf35da53b..e015769238 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -696,6 +696,7 @@ vhost same.vhost.forward.srs.com { # PS: you can transform params to backend service, such as: # { "param": "?forward=rtmp://127.0.0.1:19351/test/livestream" } # then backend return forward's url in response. + # if backend return empty urls, destanition is still disabled. # only support one api hook, format: # backend http://xxx/api0 backend http://127.0.0.1:8085/api/v1/forward; diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 9417a18959..f715610330 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1475,9 +1475,14 @@ srs_error_t SrsOriginHub::create_forwarders() } // For backend config - // If you configure backend service, return backend status(default: false) - bool backend_status = false; - if (((err = create_backend_forwarders(backend_status)) != srs_success) || backend_status) { + // If backend is enabled and applied, ignore destination. + bool applied_backend_server = false; + if ((err = create_backend_forwarders(applied_backend_server)) != srs_success) { + return srs_error_wrap(err, "create backend applied=%d", applied_backend_server); + } + + // Already applied backend server, ignore destination. + if (applied_backend_server) { return err; } @@ -1506,12 +1511,12 @@ srs_error_t SrsOriginHub::create_forwarders() return err; } -srs_error_t SrsOriginHub::create_backend_forwarders(bool& status) +srs_error_t SrsOriginHub::create_backend_forwarders(bool& applied) { srs_error_t err = srs_success; // default not configure backend service - status = false; + applied = false; SrsConfDirective* conf = _srs_config->get_forward_backend(req_->vhost); if (!conf || conf->arg0().empty()) { @@ -1519,7 +1524,7 @@ srs_error_t SrsOriginHub::create_backend_forwarders(bool& status) } // configure backend service - status = true; + applied = true; // only get first backend url std::string backend_url = conf->arg0(); @@ -1527,7 +1532,7 @@ srs_error_t SrsOriginHub::create_backend_forwarders(bool& status) // get urls on forward backend std::vector urls; if ((err = SrsHttpHooks::on_forward_backend(backend_url, req_, urls)) != srs_success) { - return srs_error_wrap(err, "get forward backend failed"); + return srs_error_wrap(err, "get forward backend failed, backend=%s", backend_url.c_str()); } // create forwarders by urls @@ -1550,7 +1555,7 @@ srs_error_t SrsOriginHub::create_backend_forwarders(bool& status) // initialize the forwarder with request. if ((err = forwarder->initialize(req, forward_server.str())) != srs_success) { - return srs_error_wrap(err, "init forwarder"); + return srs_error_wrap(err, "init backend forwarder failed, forward-to=%s", forward_server.str().c_str()); } srs_utime_t queue_size = _srs_config->get_queue_length(req_->vhost); diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 677fa5c49d..065deba014 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -375,7 +375,7 @@ class SrsOriginHub : public ISrsReloadHandler virtual srs_error_t on_reload_vhost_exec(std::string vhost); private: virtual srs_error_t create_forwarders(); - virtual srs_error_t create_backend_forwarders(bool& status); + virtual srs_error_t create_backend_forwarders(bool& applied); virtual void destroy_forwarders(); };