Skip to content

Commit

Permalink
Enhance HLS: support http callback on_play/stop, support statistic (#…
Browse files Browse the repository at this point in the history
…2578)

* Enhance HLS: support http callback on_play/stop, support statistic

* make code readable

* make code readable

* rename secret
  • Loading branch information
Haibo Chen authored Sep 23, 2021
1 parent 40f8460 commit f901831
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 5 deletions.
9 changes: 7 additions & 2 deletions trunk/src/app/srs_app_http_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -840,8 +840,13 @@ srs_error_t SrsGoApiClients::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa
return srs_api_response_code(w, r, ERROR_RTMP_CLIENT_NOT_FOUND);
}

client->conn->expire();
srs_warn("kickoff client id=%s ok", client_id.c_str());
if (client->conn) {
client->conn->expire();
srs_warn("kickoff client id=%s ok", client_id.c_str());
} else {
srs_error("kickoff client id=%s error", client_id.c_str());
return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest);
}
} else {
return srs_go_http_error(w, SRS_CONSTS_HTTP_MethodNotAllowed);
}
Expand Down
181 changes: 180 additions & 1 deletion trunk/src/app/srs_app_http_static.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,27 @@ using namespace std;
#include <srs_app_pithy_print.hpp>
#include <srs_app_source.hpp>
#include <srs_app_server.hpp>
#include <srs_service_utility.hpp>
#include <srs_app_http_hooks.hpp>
#include <srs_app_statistic.hpp>

#define SRS_CONTEXT_IN_HLS "hls_ctx"

SrsVodStream::SrsVodStream(string root_dir) : SrsHttpFileServer(root_dir)
{
_srs_hybrid->timer5s()->subscribe(this);
}

SrsVodStream::~SrsVodStream()
{
_srs_hybrid->timer5s()->unsubscribe(this);
std::map<std::string, SrsM3u8CtxInfo>::iterator it;
for (it = map_ctx_info_.begin(); it != map_ctx_info_.end(); ++it) {
srs_freep(it->second.req);
}
map_ctx_info_.clear();
}


srs_error_t SrsVodStream::serve_flv_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, string fullpath, int offset)
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -171,6 +183,173 @@ srs_error_t SrsVodStream::serve_mp4_stream(ISrsHttpResponseWriter* w, ISrsHttpMe
}

return err;
}

srs_error_t SrsVodStream::serve_m3u8_ctx(ISrsHttpResponseWriter * w, ISrsHttpMessage * r, std::string fullpath)
{
srs_error_t err = srs_success;

SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);
srs_assert(hr);

SrsRequest* req = hr->to_request(hr->host())->as_http();
SrsAutoFree(SrsRequest, req);

string ctx = hr->query_get(SRS_CONTEXT_IN_HLS);
if (!ctx.empty() && ctx_is_exist(ctx)) {
alive(ctx, NULL);
return SrsHttpFileServer::serve_m3u8_ctx(w, r, fullpath);
}

if ((err = http_hooks_on_play(req)) != srs_success) {
return srs_error_wrap(err, "HLS: http_hooks_on_play");
}

if (ctx.empty()) {
// make sure unique
do {
ctx = srs_random_str(8); // the same as cid
} while (ctx_is_exist(ctx));
}

std::stringstream ss;
ss << "#EXTM3U" << SRS_CONSTS_LF;
ss << "#EXT-X-STREAM-INF:BANDWIDTH=1,AVERAGE-BANDWIDTH=1" << SRS_CONSTS_LF;
ss << hr->path() << "?" << SRS_CONTEXT_IN_HLS << "=" << ctx;
if (!hr->query().empty() && hr->query_get(SRS_CONTEXT_IN_HLS).empty())
{
ss << "&" << hr->query();
}

std::string res = ss.str();
int length = res.length();

w->header()->set_content_length(length);
w->header()->set_content_type("application/vnd.apple.mpegurl");
w->write_header(SRS_CONSTS_HTTP_OK);

if ((err = w->write((char*)res.c_str(), length)) != srs_success) {
return srs_error_wrap(err, "write bytes=%d", length);
}

if ((err = w->final_request()) != srs_success) {
return srs_error_wrap(err, "final request");
}

alive(ctx, req->copy());

// update the statistic when source disconveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(ctx, req, NULL, SrsRtmpConnPlay)) != srs_success) {
return srs_error_wrap(err, "stat on client");
}

return err;
}

bool SrsVodStream::ctx_is_exist(std::string ctx)
{
return (map_ctx_info_.find(ctx) != map_ctx_info_.end());
}

void SrsVodStream::alive(std::string ctx, SrsRequest* req)
{
std::map<std::string, SrsM3u8CtxInfo>::iterator it;
if ((it = map_ctx_info_.find(ctx)) != map_ctx_info_.end()) {
it->second.request_time = srs_get_system_time();
} else {
SrsM3u8CtxInfo info;
info.req = req;
info.request_time = srs_get_system_time();
map_ctx_info_.insert(make_pair(ctx, info));
}
}

srs_error_t SrsVodStream::http_hooks_on_play(SrsRequest* req)
{
srs_error_t err = srs_success;

if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return err;
}

// the http hooks will cause context switch,
// so we must copy all hooks for the on_connect may freed.
// @see https://github.com/ossrs/srs/issues/475
vector<string> hooks;

if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_on_play(req->vhost);

if (!conf) {
return err;
}

hooks = conf->args;
}

for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
if ((err = SrsHttpHooks::on_play(url, req)) != srs_success) {
return srs_error_wrap(err, "http on_play %s", url.c_str());
}
}

return err;
}

void SrsVodStream::http_hooks_on_stop(SrsRequest* req)
{
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return;
}

// the http hooks will cause context switch,
// so we must copy all hooks for the on_connect may freed.
// @see https://github.com/ossrs/srs/issues/475
vector<string> hooks;

if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_on_stop(req->vhost);

if (!conf) {
srs_info("ignore the empty http callback: on_stop");
return;
}

hooks = conf->args;
}

for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
SrsHttpHooks::on_stop(url, req);
}

return;
}

srs_error_t SrsVodStream::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;

std::map<std::string, SrsM3u8CtxInfo>::iterator it;
for (it = map_ctx_info_.begin(); it != map_ctx_info_.end(); ++it) {
string ctx = it->first;
SrsRequest* req = it->second.req;
srs_utime_t hls_window = _srs_config->get_hls_window(req->vhost);
if (it->second.request_time + (2 * hls_window) < srs_get_system_time()) {
http_hooks_on_stop(req);
srs_freep(req);

SrsStatistic* stat = SrsStatistic::instance();
stat->on_disconnect(ctx);
map_ctx_info_.erase(it);

break;
}
}

return err;
}

SrsHttpStaticServer::SrsHttpStaticServer(SrsServer* svr)
Expand Down
20 changes: 19 additions & 1 deletion trunk/src/app/srs_app_http_static.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,36 @@

#include <srs_app_http_conn.hpp>

struct SrsM3u8CtxInfo
{
srs_utime_t request_time;
SrsRequest* req;
};

// The flv vod stream supports flv?start=offset-bytes.
// For example, http://server/file.flv?start=10240
// server will write flv header and sequence header,
// then seek(10240) and response flv tag data.
class SrsVodStream : public SrsHttpFileServer
class SrsVodStream : public SrsHttpFileServer, public ISrsFastTimer
{
private:
// The period of validity of the ctx
std::map<std::string, SrsM3u8CtxInfo> map_ctx_info_;
public:
SrsVodStream(std::string root_dir);
virtual ~SrsVodStream();
protected:
virtual srs_error_t serve_flv_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath, int offset);
virtual srs_error_t serve_mp4_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath, int start, int end);
virtual srs_error_t serve_m3u8_ctx(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath);
private:
virtual bool ctx_is_exist(std::string ctx);
virtual void alive(std::string ctx, SrsRequest* req);
virtual srs_error_t http_hooks_on_play(SrsRequest* req);
virtual void http_hooks_on_stop(SrsRequest* req);
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval);
};

// The http static server instance,
Expand Down
16 changes: 15 additions & 1 deletion trunk/src/protocol/srs_http_stack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ srs_error_t SrsHttpFileServer::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMes

string upath = r->path();
string fullpath = srs_http_fs_fullpath(dir, entry->pattern, upath);

// stat current dir, if exists, return error.
if (!_srs_path_exists(fullpath)) {
srs_warn("http miss file=%s, pattern=%s, upath=%s",
Expand All @@ -380,6 +380,8 @@ srs_error_t SrsHttpFileServer::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMes
return serve_flv_file(w, r, fullpath);
} else if (srs_string_ends_with(fullpath, ".mp4")) {
return serve_mp4_file(w, r, fullpath);
} else if (srs_string_ends_with(upath, ".m3u8")) {
return serve_m3u8_file(w, r, fullpath);
}

// serve common static file.
Expand Down Expand Up @@ -522,6 +524,11 @@ srs_error_t SrsHttpFileServer::serve_mp4_file(ISrsHttpResponseWriter* w, ISrsHtt
}

return serve_mp4_stream(w, r, fullpath, start, end);
}

srs_error_t SrsHttpFileServer::serve_m3u8_file(ISrsHttpResponseWriter * w, ISrsHttpMessage * r, std::string fullpath)
{
return serve_m3u8_ctx(w, r, fullpath);
}

srs_error_t SrsHttpFileServer::serve_flv_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, string fullpath, int offset)
Expand All @@ -536,6 +543,13 @@ srs_error_t SrsHttpFileServer::serve_mp4_stream(ISrsHttpResponseWriter* w, ISrsH
// @remark For common http file server, we don't support stream request, please use SrsVodStream instead.
// TODO: FIXME: Support range in header https://developer.mozilla.org/zh-CN/docs/Web/HTTP/Range_requests
return serve_file(w, r, fullpath);
}

srs_error_t SrsHttpFileServer::serve_m3u8_ctx(ISrsHttpResponseWriter * w, ISrsHttpMessage * r, std::string fullpath)
{
// @remark For common http file server, we don't support stream request, please use SrsVodStream instead.
// TODO: FIXME: Support range in header https://developer.mozilla.org/zh-CN/docs/Web/HTTP/Range_requests
return serve_file(w, r, fullpath);
}

srs_error_t SrsHttpFileServer::copy(ISrsHttpResponseWriter* w, SrsFileReader* fs, ISrsHttpMessage* r, int size)
Expand Down
11 changes: 11 additions & 0 deletions trunk/src/protocol/srs_http_stack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ class SrsHttpFileServer : public ISrsHttpHandler
virtual srs_error_t serve_file(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath);
virtual srs_error_t serve_flv_file(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath);
virtual srs_error_t serve_mp4_file(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath);
virtual srs_error_t serve_m3u8_file(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath);
protected:
// When access flv file with x.flv?start=xxx
virtual srs_error_t serve_flv_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath, int offset);
Expand All @@ -293,6 +294,16 @@ class SrsHttpFileServer : public ISrsHttpHandler
// @param end the end offset in bytes. -1 to end of file.
// @remark response data in [start, end].
virtual srs_error_t serve_mp4_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath, int start, int end);
// For HLS protocol.
// When the request url, like as "http://127.0.0.1:8080/live/livestream.m3u8",
// returns the response like as "http://127.0.0.1:8080/live/livestream.m3u8?hls_ctx=12345678" .
// SRS use "hls_ctx" to keep track of subsequent requests that is short-connection.
// Remark 1:
// Fill the parameter "hls_ctx" by yourself in the first request is allowed, SRS will use it.
// And MUST make sure it is unique.
// Remark 2:
// If use two same "hls_ctx" in different requests, SRS cannot detect so that they will be treated as one.
virtual srs_error_t serve_m3u8_ctx(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath);
protected:
// Copy the fs to response writer in size bytes.
virtual srs_error_t copy(ISrsHttpResponseWriter* w, SrsFileReader* fs, ISrsHttpMessage* r, int size);
Expand Down
41 changes: 41 additions & 0 deletions trunk/src/utest/srs_utest_http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ string mock_http_response2(int status, string content)
return ss.str();
}

bool is_string_contain(string substr, string str)
{
return (string::npos != str.find(substr));
}

class MockFileReaderFactory : public ISrsFileReaderFactory
{
public:
Expand Down Expand Up @@ -1183,6 +1188,42 @@ VOID TEST(ProtocolHTTPTest, VodStreamHandlers)
HELPER_ASSERT_SUCCESS(h.serve_http(&w, &r));
__MOCK_HTTP_EXPECT_STREQ(200, "Hello, world!", w);
}

// should return "hls_ctx"
if (true) {
SrsHttpMuxEntry e;
e.pattern = "/";

SrsVodStream h("/tmp");
h.set_fs_factory(new MockFileReaderFactory("Hello, world!"));
h.set_path_check(_mock_srs_path_always_exists);
h.entry = &e;

MockResponseWriter w;
SrsHttpMessage r(NULL, NULL);
HELPER_ASSERT_SUCCESS(r.set_url("/index.m3u8", false));

HELPER_ASSERT_SUCCESS(h.serve_http(&w, &r));
__MOCK_HTTP_EXPECT_STRCT(200, "index.m3u8?hls_ctx=", w);
}

// should return "hls_ctx"
if (true) {
SrsHttpMuxEntry e;
e.pattern = "/";

SrsVodStream h("/tmp");
h.set_fs_factory(new MockFileReaderFactory("Hello, world!"));
h.set_path_check(_mock_srs_path_always_exists);
h.entry = &e;

MockResponseWriter w;
SrsHttpMessage r(NULL, NULL);
HELPER_ASSERT_SUCCESS(r.set_url("/index.m3u8?hls_ctx=123456", false));

HELPER_ASSERT_SUCCESS(h.serve_http(&w, &r));
__MOCK_HTTP_EXPECT_STRCT(200, "index.m3u8?hls_ctx=123456", w);
}
}

VOID TEST(ProtocolHTTPTest, BasicHandlers)
Expand Down
4 changes: 4 additions & 0 deletions trunk/src/utest/srs_utest_http.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ class MockResponseWriter : public ISrsHttpResponseWriter, public ISrsHttpHeaderF

string mock_http_response(int status, string content);
string mock_http_response2(int status, string content);
bool is_string_contain(string substr, string str);

#define __MOCK_HTTP_EXPECT_STREQ(status, text, w) \
EXPECT_STREQ(mock_http_response(status, text).c_str(), HELPER_BUFFER2STR(&w.io.out_buffer).c_str())

#define __MOCK_HTTP_EXPECT_STREQ2(status, text, w) \
EXPECT_STREQ(mock_http_response2(status, text).c_str(), HELPER_BUFFER2STR(&w.io.out_buffer).c_str())

#define __MOCK_HTTP_EXPECT_STRCT(status, text, w) \
EXPECT_PRED2(is_string_contain, text, HELPER_BUFFER2STR(&w.io.out_buffer).c_str())

#endif

0 comments on commit f901831

Please sign in to comment.