Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: Support statistic and on_play/stop for HLS stream #2578

Merged
merged 4 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions trunk/src/app/srs_app_http_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -836,8 +836,13 @@ srs_error_t SrsGoApiClients::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa
return srs_api_response_code(w, r, ERROR_RTMP_CLIENT_NOT_FOUND);
}

client->conn->expire();
srs_warn("kickoff client id=%s ok", client_id.c_str());
if (client->conn) {
client->conn->expire();
srs_warn("kickoff client id=%s ok", client_id.c_str());
} else {
srs_error("kickoff client id=%s error", client_id.c_str());
return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest);
}
} else {
return srs_go_http_error(w, SRS_CONSTS_HTTP_MethodNotAllowed);
}
Expand Down
181 changes: 180 additions & 1 deletion trunk/src/app/srs_app_http_static.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,27 @@ using namespace std;
#include <srs_app_pithy_print.hpp>
#include <srs_app_source.hpp>
#include <srs_app_server.hpp>
#include <srs_service_utility.hpp>
#include <srs_app_http_hooks.hpp>
#include <srs_app_statistic.hpp>

#define SRS_CONTEXT_IN_HLS "hls_ctx"

SrsVodStream::SrsVodStream(string root_dir) : SrsHttpFileServer(root_dir)
{
_srs_hybrid->timer5s()->subscribe(this);
}

SrsVodStream::~SrsVodStream()
{
_srs_hybrid->timer5s()->unsubscribe(this);
std::map<std::string, SrsM3u8CtxInfo>::iterator it;
for (it = map_ctx_info_.begin(); it != map_ctx_info_.end(); ++it) {
srs_freep(it->second.req);
}
map_ctx_info_.clear();
}

srs_error_t SrsVodStream::serve_flv_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, string fullpath, int offset)
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -171,6 +183,173 @@ srs_error_t SrsVodStream::serve_mp4_stream(ISrsHttpResponseWriter* w, ISrsHttpMe
}

return err;
}

srs_error_t SrsVodStream::serve_m3u8_ctx(ISrsHttpResponseWriter * w, ISrsHttpMessage * r, std::string fullpath)
{
srs_error_t err = srs_success;

SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);
srs_assert(hr);

SrsRequest* req = hr->to_request(hr->host())->as_http();
SrsAutoFree(SrsRequest, req);

string ctx = hr->query_get(SRS_CONTEXT_IN_HLS);
if (!ctx.empty() && ctx_is_exist(ctx)) {
alive(ctx, NULL);
return SrsHttpFileServer::serve_m3u8_ctx(w, r, fullpath);
}

if ((err = http_hooks_on_play(req)) != srs_success) {
return srs_error_wrap(err, "HLS: http_hooks_on_play");
}

if (ctx.empty()) {
// make sure unique
duiniuluantanqin marked this conversation as resolved.
Show resolved Hide resolved
do {
ctx = srs_random_str(8); // the same as cid
} while (ctx_is_exist(ctx));
}

std::stringstream ss;
ss << "#EXTM3U" << SRS_CONSTS_LF;
ss << "#EXT-X-STREAM-INF:BANDWIDTH=1,AVERAGE-BANDWIDTH=1" << SRS_CONSTS_LF;
ss << hr->path() << "?" << SRS_CONTEXT_IN_HLS << "=" << ctx;
if (!hr->query().empty() && hr->query_get(SRS_CONTEXT_IN_HLS).empty())
{
ss << "&" << hr->query();
}

xiaozhihong marked this conversation as resolved.
Show resolved Hide resolved
std::string res = ss.str();
int length = res.length();

w->header()->set_content_length(length);
w->header()->set_content_type("application/vnd.apple.mpegurl");
w->write_header(SRS_CONSTS_HTTP_OK);

if ((err = w->write((char*)res.c_str(), length)) != srs_success) {
return srs_error_wrap(err, "write bytes=%d", length);
}

if ((err = w->final_request()) != srs_success) {
return srs_error_wrap(err, "final request");
}

alive(ctx, req->copy());

// update the statistic when source disconveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(ctx, req, NULL, SrsRtmpConnPlay)) != srs_success) {
return srs_error_wrap(err, "stat on client");
}

return err;
}

bool SrsVodStream::ctx_is_exist(std::string ctx)
{
return (map_ctx_info_.find(ctx) != map_ctx_info_.end());
}

void SrsVodStream::alive(std::string ctx, SrsRequest* req)
{
std::map<std::string, SrsM3u8CtxInfo>::iterator it;
if ((it = map_ctx_info_.find(ctx)) != map_ctx_info_.end()) {
it->second.request_time = srs_get_system_time();
} else {
SrsM3u8CtxInfo info;
info.req = req;
info.request_time = srs_get_system_time();
map_ctx_info_.insert(make_pair(ctx, info));
}
}

srs_error_t SrsVodStream::http_hooks_on_play(SrsRequest* req)
{
srs_error_t err = srs_success;

if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return err;
}

// the http hooks will cause context switch,
// so we must copy all hooks for the on_connect may freed.
// @see https://github.com/ossrs/srs/issues/475
vector<string> hooks;

if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_on_play(req->vhost);

if (!conf) {
return err;
}

hooks = conf->args;
}

for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
if ((err = SrsHttpHooks::on_play(url, req)) != srs_success) {
return srs_error_wrap(err, "http on_play %s", url.c_str());
}
}

return err;
}

void SrsVodStream::http_hooks_on_stop(SrsRequest* req)
{
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return;
}

// the http hooks will cause context switch,
// so we must copy all hooks for the on_connect may freed.
// @see https://github.com/ossrs/srs/issues/475
vector<string> hooks;

if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_on_stop(req->vhost);

if (!conf) {
srs_info("ignore the empty http callback: on_stop");
return;
}

hooks = conf->args;
}

for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
SrsHttpHooks::on_stop(url, req);
}

return;
}

srs_error_t SrsVodStream::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;

duiniuluantanqin marked this conversation as resolved.
Show resolved Hide resolved
std::map<std::string, SrsM3u8CtxInfo>::iterator it;
for (it = map_ctx_info_.begin(); it != map_ctx_info_.end(); ++it) {
string ctx = it->first;
SrsRequest* req = it->second.req;
srs_utime_t hls_window = _srs_config->get_hls_window(req->vhost);
if (it->second.request_time + (2 * hls_window) < srs_get_system_time()) {
http_hooks_on_stop(req);
srs_freep(req);

SrsStatistic* stat = SrsStatistic::instance();
stat->on_disconnect(ctx);
map_ctx_info_.erase(it);

break;
}
}

return err;
}

SrsHttpStaticServer::SrsHttpStaticServer(SrsServer* svr)
Expand Down
20 changes: 19 additions & 1 deletion trunk/src/app/srs_app_http_static.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,36 @@

#include <srs_app_http_conn.hpp>

struct SrsM3u8CtxInfo
{
srs_utime_t request_time;
SrsRequest* req;
};

// The flv vod stream supports flv?start=offset-bytes.
// For example, http://server/file.flv?start=10240
// server will write flv header and sequence header,
// then seek(10240) and response flv tag data.
class SrsVodStream : public SrsHttpFileServer
class SrsVodStream : public SrsHttpFileServer, public ISrsFastTimer
{
private:
// The period of validity of the ctx
std::map<std::string, SrsM3u8CtxInfo> map_ctx_info_;
public:
SrsVodStream(std::string root_dir);
virtual ~SrsVodStream();
protected:
virtual srs_error_t serve_flv_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath, int offset);
virtual srs_error_t serve_mp4_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath, int start, int end);
virtual srs_error_t serve_m3u8_ctx(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath);
private:
virtual bool ctx_is_exist(std::string ctx);
virtual void alive(std::string ctx, SrsRequest* req);
virtual srs_error_t http_hooks_on_play(SrsRequest* req);
virtual void http_hooks_on_stop(SrsRequest* req);
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval);
};

// The http static server instance,
Expand Down
16 changes: 15 additions & 1 deletion trunk/src/protocol/srs_http_stack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ srs_error_t SrsHttpFileServer::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMes

string upath = r->path();
string fullpath = srs_http_fs_fullpath(dir, entry->pattern, upath);

// stat current dir, if exists, return error.
if (!_srs_path_exists(fullpath)) {
srs_warn("http miss file=%s, pattern=%s, upath=%s",
Expand All @@ -380,6 +380,8 @@ srs_error_t SrsHttpFileServer::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMes
return serve_flv_file(w, r, fullpath);
} else if (srs_string_ends_with(fullpath, ".mp4")) {
return serve_mp4_file(w, r, fullpath);
} else if (srs_string_ends_with(upath, ".m3u8")) {
return serve_m3u8_file(w, r, fullpath);
}

// serve common static file.
Expand Down Expand Up @@ -522,6 +524,11 @@ srs_error_t SrsHttpFileServer::serve_mp4_file(ISrsHttpResponseWriter* w, ISrsHtt
}

return serve_mp4_stream(w, r, fullpath, start, end);
}

srs_error_t SrsHttpFileServer::serve_m3u8_file(ISrsHttpResponseWriter * w, ISrsHttpMessage * r, std::string fullpath)
{
return serve_m3u8_ctx(w, r, fullpath);
}

srs_error_t SrsHttpFileServer::serve_flv_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, string fullpath, int offset)
Expand All @@ -536,6 +543,13 @@ srs_error_t SrsHttpFileServer::serve_mp4_stream(ISrsHttpResponseWriter* w, ISrsH
// @remark For common http file server, we don't support stream request, please use SrsVodStream instead.
// TODO: FIXME: Support range in header https://developer.mozilla.org/zh-CN/docs/Web/HTTP/Range_requests
return serve_file(w, r, fullpath);
}

srs_error_t SrsHttpFileServer::serve_m3u8_ctx(ISrsHttpResponseWriter * w, ISrsHttpMessage * r, std::string fullpath)
{
// @remark For common http file server, we don't support stream request, please use SrsVodStream instead.
// TODO: FIXME: Support range in header https://developer.mozilla.org/zh-CN/docs/Web/HTTP/Range_requests
return serve_file(w, r, fullpath);
}

srs_error_t SrsHttpFileServer::copy(ISrsHttpResponseWriter* w, SrsFileReader* fs, ISrsHttpMessage* r, int size)
Expand Down
11 changes: 11 additions & 0 deletions trunk/src/protocol/srs_http_stack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ class SrsHttpFileServer : public ISrsHttpHandler
virtual srs_error_t serve_file(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath);
virtual srs_error_t serve_flv_file(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath);
virtual srs_error_t serve_mp4_file(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath);
virtual srs_error_t serve_m3u8_file(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath);
protected:
// When access flv file with x.flv?start=xxx
virtual srs_error_t serve_flv_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath, int offset);
Expand All @@ -293,6 +294,16 @@ class SrsHttpFileServer : public ISrsHttpHandler
// @param end the end offset in bytes. -1 to end of file.
// @remark response data in [start, end].
virtual srs_error_t serve_mp4_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath, int start, int end);
// For HLS protocol.
// When the request url, like as "http://127.0.0.1:8080/live/livestream.m3u8",
// returns the response like as "http://127.0.0.1:8080/live/livestream.m3u8?hls_ctx=12345678" .
// SRS use "hls_ctx" to keep track of subsequent requests that is short-connection.
// Remark 1:
// Fill the parameter "hls_ctx" by yourself in the first request is allowed, SRS will use it.
// And MUST make sure it is unique.
// Remark 2:
// If use two same "hls_ctx" in different requests, SRS cannot detect so that they will be treated as one.
virtual srs_error_t serve_m3u8_ctx(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath);
protected:
// Copy the fs to response writer in size bytes.
virtual srs_error_t copy(ISrsHttpResponseWriter* w, SrsFileReader* fs, ISrsHttpMessage* r, int size);
Expand Down
41 changes: 41 additions & 0 deletions trunk/src/utest/srs_utest_http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ string mock_http_response2(int status, string content)
return ss.str();
}

bool is_string_contain(string substr, string str)
{
return (string::npos != str.find(substr));
}

class MockFileReaderFactory : public ISrsFileReaderFactory
{
public:
Expand Down Expand Up @@ -1183,6 +1188,42 @@ VOID TEST(ProtocolHTTPTest, VodStreamHandlers)
HELPER_ASSERT_SUCCESS(h.serve_http(&w, &r));
__MOCK_HTTP_EXPECT_STREQ(200, "Hello, world!", w);
}

// should return "hls_ctx"
if (true) {
SrsHttpMuxEntry e;
e.pattern = "/";

SrsVodStream h("/tmp");
h.set_fs_factory(new MockFileReaderFactory("Hello, world!"));
h.set_path_check(_mock_srs_path_always_exists);
h.entry = &e;

MockResponseWriter w;
SrsHttpMessage r(NULL, NULL);
HELPER_ASSERT_SUCCESS(r.set_url("/index.m3u8", false));

HELPER_ASSERT_SUCCESS(h.serve_http(&w, &r));
__MOCK_HTTP_EXPECT_STRCT(200, "index.m3u8?hls_ctx=", w);
}

// should return "hls_ctx"
if (true) {
SrsHttpMuxEntry e;
e.pattern = "/";

SrsVodStream h("/tmp");
h.set_fs_factory(new MockFileReaderFactory("Hello, world!"));
h.set_path_check(_mock_srs_path_always_exists);
h.entry = &e;

MockResponseWriter w;
SrsHttpMessage r(NULL, NULL);
HELPER_ASSERT_SUCCESS(r.set_url("/index.m3u8?hls_ctx=123456", false));

HELPER_ASSERT_SUCCESS(h.serve_http(&w, &r));
__MOCK_HTTP_EXPECT_STRCT(200, "index.m3u8?hls_ctx=123456", w);
}
}

VOID TEST(ProtocolHTTPTest, BasicHandlers)
Expand Down
4 changes: 4 additions & 0 deletions trunk/src/utest/srs_utest_http.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ class MockResponseWriter : public ISrsHttpResponseWriter, public ISrsHttpHeaderF

string mock_http_response(int status, string content);
string mock_http_response2(int status, string content);
bool is_string_contain(string substr, string str);

#define __MOCK_HTTP_EXPECT_STREQ(status, text, w) \
EXPECT_STREQ(mock_http_response(status, text).c_str(), HELPER_BUFFER2STR(&w.io.out_buffer).c_str())

#define __MOCK_HTTP_EXPECT_STREQ2(status, text, w) \
EXPECT_STREQ(mock_http_response2(status, text).c_str(), HELPER_BUFFER2STR(&w.io.out_buffer).c_str())

#define __MOCK_HTTP_EXPECT_STRCT(status, text, w) \
EXPECT_PRED2(is_string_contain, text, HELPER_BUFFER2STR(&w.io.out_buffer).c_str())

#endif