From ac06d35e0b1bdc3d6b48338f1f251bca9f8cd5d3 Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 14 Dec 2022 19:54:19 +0800 Subject: [PATCH] FLV: Drop packet if header flag is not matched. v5.0.109 1. Ignore audo or video packets if FLV header disable it. 2. Run: Add regression test config and run for IDEA. 3. Test: Refine regression test to allow no audio/video for FLV 4. Config: Whether drop packet if not match header. --- .run/regression-test.run.xml | 7 +++ trunk/3rdparty/srs-bench/srs/rtc_test.go | 20 ++----- trunk/conf/full.conf | 6 ++ trunk/conf/regression-test-for-clion.conf | 67 +++++++++++++++++++++++ trunk/doc/CHANGELOG.md | 1 + trunk/src/app/srs_app_config.cpp | 26 ++++++++- trunk/src/app/srs_app_config.hpp | 2 + trunk/src/app/srs_app_http_stream.cpp | 16 ++++-- trunk/src/app/srs_app_http_stream.hpp | 1 + trunk/src/core/srs_core_version5.hpp | 2 +- trunk/src/kernel/srs_kernel_flv.cpp | 65 +++++++++++++++------- trunk/src/kernel/srs_kernel_flv.hpp | 6 ++ trunk/src/utest/srs_utest_config.cpp | 14 ++++- 13 files changed, 191 insertions(+), 42 deletions(-) create mode 100644 .run/regression-test.run.xml create mode 100644 trunk/conf/regression-test-for-clion.conf diff --git a/.run/regression-test.run.xml b/.run/regression-test.run.xml new file mode 100644 index 0000000000..808b8d051a --- /dev/null +++ b/.run/regression-test.run.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/trunk/3rdparty/srs-bench/srs/rtc_test.go b/trunk/3rdparty/srs-bench/srs/rtc_test.go index ea1c39bedb..a387a68ed5 100644 --- a/trunk/3rdparty/srs-bench/srs/rtc_test.go +++ b/trunk/3rdparty/srs-bench/srs/rtc_test.go @@ -2046,18 +2046,12 @@ func TestRtcPublish_FlvPlay(t *testing.T) { } defer f.Close() - var version uint8 var hasVideo, hasAudio bool - if version, hasVideo, hasAudio, err = f.ReadHeader(); err != nil { + if _, hasVideo, hasAudio, err = f.ReadHeader(); err != nil { logger.Tf(ctx, "Flv demuxer read header failed, err=%v", err) return } - // Optional, user can check the header. - _ = version - _ = hasAudio - _ = hasVideo - var nnVideo, nnAudio int var prevVideoTimestamp, prevAudioTimestamp int64 @@ -2083,14 +2077,12 @@ func TestRtcPublish_FlvPlay(t *testing.T) { prevVideoTimestamp = (int64)(timestamp) } - if nnAudio >= 10 && nnVideo >= 10 { + audioPacketsOK, videoPacketsOK := !hasAudio || nnAudio >= 10, !hasVideo || nnVideo >= 10 + if audioPacketsOK && videoPacketsOK { avDiff := prevVideoTimestamp - prevAudioTimestamp - // Check timestamp gap between video and audio, make sure audio timestamp align to video timestamp. - if avDiff <= 50 && avDiff >= -50 { - logger.Tf(ctx, "Flv recv %v audio, %v video, timestamp gap=%v", nnAudio, nnVideo, avDiff) - cancel() - break - } + logger.Tf(ctx, "Flv recv %v/%v audio, %v/%v video, avDiff=%v", hasAudio, nnAudio, hasVideo, nnVideo, avDiff) + cancel() + break } _ = tag diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 9c15d8ff3e..96f411209a 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -1450,6 +1450,12 @@ vhost http.remux.srs.com { # Overwrite by env SRS_VHOST_HTTP_REMUX_FAST_CACHE for all vhosts. # default: 0 fast_cache 30; + # Whether drop packet if not match header. For example, there is has_audio and has video flag in FLV header, if + # this is set to on and has_audio is false, then SRS will drop audio packets when got audio packets. Generally + # it should work, but sometimes you might need SRS to keep packets even when FLV header is set to false. + # Overwrite by env SRS_VHOST_HTTP_REMUX_DROP_IF_NOT_MATCH for all vhosts. + # default: on + drop_if_not_match on; # the stream mount for rtmp to remux to live streaming. # typical mount to [vhost]/[app]/[stream].flv # the variables: diff --git a/trunk/conf/regression-test-for-clion.conf b/trunk/conf/regression-test-for-clion.conf new file mode 100644 index 0000000000..9a8712e856 --- /dev/null +++ b/trunk/conf/regression-test-for-clion.conf @@ -0,0 +1,67 @@ + +listen 1935; +max_connections 1000; +daemon off; +srs_log_tank console; + +stream_caster { + enabled on; + caster gb28181; + output rtmp://127.0.0.1/live/[stream]; + listen 9000; + sip { + enabled on; + listen 5060; + timeout 2.1; + reinvite 1.2; + } +} + +http_server { + enabled on; + listen 8080; + dir ./objs/nginx/html; +} + +http_api { + enabled on; + listen 1985; +} +stats { + network 0; +} +rtc_server { + enabled on; + listen 8000; + candidate $CANDIDATE; +} + +vhost __defaultVhost__ { + rtc { + enabled on; + rtmp_to_rtc on; + keep_bframe off; + rtc_to_rtmp on; + } + play { + atc on; + } + http_remux { + enabled on; + mount [vhost]/[app]/[stream].flv; + drop_if_not_match on; + } + ingest livestream { + enabled on; + input { + type file; + url ./doc/source.200kbps.768x320.flv; + } + ffmpeg ./objs/ffmpeg/bin/ffmpeg; + engine { + enabled off; + output rtmp://127.0.0.1:[port]/live/livestream; + } + } +} + diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 5262169f7a..3b28054d31 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -8,6 +8,7 @@ The changelog for SRS. ## SRS 5.0 Changelog +* v5.0, 2022-12-13, For [#939](https://github.com/ossrs/srs/issues/939): FLV: Drop packet if header flag is not matched. v5.0.109 * v5.0, 2022-12-12, Merge [#3301](https://github.com/ossrs/srs/pull/3301): DASH: Fix dash crash bug when writing file. v5.0.108 * v5.0, 2022-12-09, Merge [#3296](https://github.com/ossrs/srs/pull/3296): SRT: Support SRT to RTMP to WebRTC. v5.0.107 * v5.0, 2022-12-08, Merge [#3295](https://github.com/ossrs/srs/pull/3295): API: Parse fragment of URI. v5.0.106 diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index b5532d3f01..461119b3c3 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -2600,7 +2600,7 @@ srs_error_t SrsConfig::check_normal_config() } else if (n == "http_remux") { for (int j = 0; j < (int)conf->directives.size(); j++) { string m = conf->at(j)->name; - if (m != "enabled" && m != "mount" && m != "fast_cache") { + if (m != "enabled" && m != "mount" && m != "fast_cache" && m != "drop_if_not_match") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.http_remux.%s of %s", m.c_str(), vhost->arg0().c_str()); } } @@ -8241,6 +8241,30 @@ srs_utime_t SrsConfig::get_vhost_http_remux_fast_cache(string vhost) return srs_utime_t(::atof(conf->arg0().c_str()) * SRS_UTIME_SECONDS); } +bool SrsConfig::get_vhost_http_remux_drop_if_not_match(string vhost) +{ + SRS_OVERWRITE_BY_ENV_BOOL2("srs.vhost.http_remux.drop_if_not_match"); // SRS_VHOST_HTTP_REMUX_DROP_IF_NOT_MATCH + + static bool DEFAULT = true; + + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("http_remux"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("drop_if_not_match"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_TRUE(conf->arg0()); +} + string SrsConfig::get_vhost_http_remux_mount(string vhost) { SRS_OVERWRITE_BY_ENV_STRING("srs.vhost.http_remux.mount"); // SRS_VHOST_HTTP_REMUX_MOUNT diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index e4384b6bee..7597069cb5 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -1064,6 +1064,8 @@ class SrsConfig virtual bool get_vhost_http_remux_enabled(SrsConfDirective* vhost); // Get the fast cache duration for http audio live stream. virtual srs_utime_t get_vhost_http_remux_fast_cache(std::string vhost); + // Whether drop packet if not match header. + bool get_vhost_http_remux_drop_if_not_match(std::string vhost); // Get the http flv live stream mount point for vhost. // used to generate the flv stream mount path. virtual std::string get_vhost_http_remux_mount(std::string vhost); diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index e8c51873ba..613625fb80 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -289,6 +289,11 @@ srs_error_t SrsFlvStreamEncoder::write_metadata(int64_t timestamp, char* data, i return enc->write_metadata(SrsFrameTypeScript, data, size); } +void SrsFlvStreamEncoder::set_drop_if_not_match(bool v) +{ + enc->set_drop_if_not_match(v); +} + bool SrsFlvStreamEncoder::has_cache() { // for flv stream, use gop cache of SrsLiveSource is ok. @@ -343,7 +348,7 @@ srs_error_t SrsFlvStreamEncoder::write_header(bool has_video, bool has_audio) return srs_error_wrap(err, "write header"); } - srs_trace("FLV: write header audio=%d, video=%d", has_audio, has_video); + srs_trace("FLV: write header audio=%d, video=%d, dinm=%d", has_audio, has_video, enc->drop_if_not_match()); } return err; @@ -563,12 +568,15 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess string enc_desc; ISrsBufferEncoder* enc = NULL; - + srs_assert(entry); + bool drop_if_not_match = _srs_config->get_vhost_http_remux_drop_if_not_match(req->vhost); + if (srs_string_ends_with(entry->pattern, ".flv")) { w->header()->set_content_type("video/x-flv"); enc_desc = "FLV"; enc = new SrsFlvStreamEncoder(); + ((SrsFlvStreamEncoder*)enc)->set_drop_if_not_match(drop_if_not_match); } else if (srs_string_ends_with(entry->pattern, ".aac")) { w->header()->set_content_type("audio/x-aac"); enc_desc = "AAC"; @@ -638,8 +646,8 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess } srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost); - srs_trace("FLV %s, encoder=%s, mw_sleep=%dms, cache=%d, msgs=%d", entry->pattern.c_str(), enc_desc.c_str(), - srsu2msi(mw_sleep), enc->has_cache(), msgs.max); + srs_trace("FLV %s, encoder=%s, mw_sleep=%dms, cache=%d, msgs=%d, dinm=%d", entry->pattern.c_str(), enc_desc.c_str(), + srsu2msi(mw_sleep), enc->has_cache(), msgs.max, drop_if_not_match); // TODO: free and erase the disabled entry after all related connections is closed. // TODO: FXIME: Support timeout for player, quit infinite-loop. diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp index 38e73b5635..e24004ad7f 100755 --- a/trunk/src/app/srs_app_http_stream.hpp +++ b/trunk/src/app/srs_app_http_stream.hpp @@ -77,6 +77,7 @@ class SrsFlvStreamEncoder : public ISrsBufferEncoder virtual srs_error_t write_video(int64_t timestamp, char* data, int size); virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size); public: + void set_drop_if_not_match(bool v); virtual bool has_cache(); virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter); public: diff --git a/trunk/src/core/srs_core_version5.hpp b/trunk/src/core/srs_core_version5.hpp index cfed8e8fc8..1b01763af8 100644 --- a/trunk/src/core/srs_core_version5.hpp +++ b/trunk/src/core/srs_core_version5.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 5 #define VERSION_MINOR 0 -#define VERSION_REVISION 108 +#define VERSION_REVISION 109 #endif diff --git a/trunk/src/kernel/srs_kernel_flv.cpp b/trunk/src/kernel/srs_kernel_flv.cpp index 0681c467d6..7d1377e1bc 100644 --- a/trunk/src/kernel/srs_kernel_flv.cpp +++ b/trunk/src/kernel/srs_kernel_flv.cpp @@ -357,7 +357,10 @@ SrsSharedPtrMessage* SrsSharedPtrMessage::copy2() SrsFlvTransmuxer::SrsFlvTransmuxer() { writer = NULL; - + + drop_if_not_match_ = true; + has_audio_ = true; + has_video_ = true; nb_tag_headers = 0; tag_headers = NULL; nb_iovss_cache = 0; @@ -380,10 +383,23 @@ srs_error_t SrsFlvTransmuxer::initialize(ISrsWriter* fw) return srs_success; } +void SrsFlvTransmuxer::set_drop_if_not_match(bool v) +{ + drop_if_not_match_ = v; +} + +bool SrsFlvTransmuxer::drop_if_not_match() +{ + return drop_if_not_match_; +} + srs_error_t SrsFlvTransmuxer::write_header(bool has_video, bool has_audio) { srs_error_t err = srs_success; + has_audio_ = has_audio; + has_video_ = has_video; + uint8_t av_flag = 0; av_flag += (has_audio? 4:0); av_flag += (has_video? 1:0); @@ -444,6 +460,8 @@ srs_error_t SrsFlvTransmuxer::write_metadata(char type, char* data, int size) srs_error_t SrsFlvTransmuxer::write_audio(int64_t timestamp, char* data, int size) { srs_error_t err = srs_success; + + if (drop_if_not_match_ && !has_audio_) return err; if (size > 0) { cache_audio(timestamp, data, size, tag_header); @@ -459,6 +477,8 @@ srs_error_t SrsFlvTransmuxer::write_audio(int64_t timestamp, char* data, int siz srs_error_t SrsFlvTransmuxer::write_video(int64_t timestamp, char* data, int size) { srs_error_t err = srs_success; + + if (drop_if_not_match_ && !has_video_) return err; if (size > 0) { cache_video(timestamp, data, size, tag_header); @@ -481,17 +501,19 @@ srs_error_t SrsFlvTransmuxer::write_tags(SrsSharedPtrMessage** msgs, int count) { srs_error_t err = srs_success; - // realloc the iovss. - int nb_iovss = 3 * count; + // Do realloc the iovss if required. iovec* iovss = iovss_cache; - if (nb_iovss_cache < nb_iovss) { - srs_freepa(iovss_cache); - - nb_iovss_cache = nb_iovss; - iovss = iovss_cache = new iovec[nb_iovss]; - } + do { + int nn_might_iovss = 3 * count; + if (nb_iovss_cache < nn_might_iovss) { + srs_freepa(iovss_cache); + + nb_iovss_cache = nn_might_iovss; + iovss = iovss_cache = new iovec[nn_might_iovss]; + } + } while (false); - // realloc the tag headers. + // Do realloc the tag headers if required. char* cache = tag_headers; if (nb_tag_headers < count) { srs_freepa(tag_headers); @@ -500,7 +522,7 @@ srs_error_t SrsFlvTransmuxer::write_tags(SrsSharedPtrMessage** msgs, int count) cache = tag_headers = new char[SRS_FLV_TAG_HEADER_SIZE * count]; } - // realloc the pts. + // Do realloc the pts if required. char* pts = ppts; if (nb_ppts < count) { srs_freepa(ppts); @@ -509,24 +531,26 @@ srs_error_t SrsFlvTransmuxer::write_tags(SrsSharedPtrMessage** msgs, int count) pts = ppts = new char[SRS_FLV_PREVIOUS_TAG_SIZE * count]; } - // the cache is ok, write each messages. - iovec* iovs = iovss; + // Now all caches are ok, start to write all messages. + iovec* iovs = iovss; int nn_real_iovss = 0; for (int i = 0; i < count; i++) { SrsSharedPtrMessage* msg = msgs[i]; - // cache all flv header. + // Cache FLV packet header. if (msg->is_audio()) { + if (drop_if_not_match_ && !has_audio_) continue; // Ignore audio packets if no audio stream. cache_audio(msg->timestamp, msg->payload, msg->size, cache); } else if (msg->is_video()) { + if (drop_if_not_match_ && !has_video_) continue; // Ignore video packets if no video stream. cache_video(msg->timestamp, msg->payload, msg->size, cache); } else { cache_metadata(SrsFrameTypeScript, msg->payload, msg->size, cache); } - // cache all pts. + // Cache FLV pts. cache_pts(SRS_FLV_TAG_HEADER_SIZE + msg->size, pts); - // all ioves. + // Set cache to iovec. iovs[0].iov_base = cache; iovs[0].iov_len = SRS_FLV_TAG_HEADER_SIZE; iovs[1].iov_base = msg->payload; @@ -534,13 +558,14 @@ srs_error_t SrsFlvTransmuxer::write_tags(SrsSharedPtrMessage** msgs, int count) iovs[2].iov_base = pts; iovs[2].iov_len = SRS_FLV_PREVIOUS_TAG_SIZE; - // move next. + // Move to next cache. cache += SRS_FLV_TAG_HEADER_SIZE; pts += SRS_FLV_PREVIOUS_TAG_SIZE; - iovs += 3; + iovs += 3; nn_real_iovss += 3; } - - if ((err = writer->writev(iovss, nb_iovss, NULL)) != srs_success) { + + // Send out all data carried by iovec. + if ((err = writer->writev(iovss, nn_real_iovss, NULL)) != srs_success) { return srs_error_wrap(err, "write flv tags failed"); } diff --git a/trunk/src/kernel/srs_kernel_flv.hpp b/trunk/src/kernel/srs_kernel_flv.hpp index 420b110cca..934c3f3402 100644 --- a/trunk/src/kernel/srs_kernel_flv.hpp +++ b/trunk/src/kernel/srs_kernel_flv.hpp @@ -336,6 +336,9 @@ class SrsSharedPtrMessage class SrsFlvTransmuxer { private: + bool has_audio_; + bool has_video_; + bool drop_if_not_match_; ISrsWriter* writer; private: char tag_header[SRS_FLV_TAG_HEADER_SIZE]; @@ -347,6 +350,9 @@ class SrsFlvTransmuxer // @remark user can initialize multiple times to encode multiple flv files. // @remark, user must free the @param fw, flv encoder never close/free it. virtual srs_error_t initialize(ISrsWriter* fw); + // Drop packet if not match FLV header. + void set_drop_if_not_match(bool v); + bool drop_if_not_match(); public: // Write flv header. // Write following: diff --git a/trunk/src/utest/srs_utest_config.cpp b/trunk/src/utest/srs_utest_config.cpp index 16b918d5a1..8afbc934d7 100644 --- a/trunk/src/utest/srs_utest_config.cpp +++ b/trunk/src/utest/srs_utest_config.cpp @@ -4666,9 +4666,9 @@ VOID TEST(ConfigEnvTest, CheckEnvValuesHttpRemux) { srs_error_t err; - if (true) { - MockSrsConfig conf; + MockSrsConfig conf; + if (true) { SrsSetEnvConfig(http_remux_enabled, "SRS_VHOST_HTTP_REMUX_ENABLED", "on"); EXPECT_TRUE(conf.get_vhost_http_remux_enabled("__defaultVhost__")); @@ -4678,6 +4678,16 @@ VOID TEST(ConfigEnvTest, CheckEnvValuesHttpRemux) SrsSetEnvConfig(http_remux_mount, "SRS_VHOST_HTTP_REMUX_MOUNT", "xxx"); EXPECT_STREQ("xxx", conf.get_vhost_http_remux_mount("__defaultVhost__").c_str()); } + + if (true) { + EXPECT_TRUE(conf.get_vhost_http_remux_drop_if_not_match("__defaultVhost__")); + + SrsSetEnvConfig(drop_if_not_match, "SRS_VHOST_HTTP_REMUX_DROP_IF_NOT_MATCH", "off"); + EXPECT_FALSE(conf.get_vhost_http_remux_drop_if_not_match("__defaultVhost__")); + + SrsSetEnvConfig(drop_if_not_match2, "SRS_VHOST_HTTP_REMUX_DROP_IF_NOT_MATCH", "on"); + EXPECT_TRUE(conf.get_vhost_http_remux_drop_if_not_match("__defaultVhost__")); + } } VOID TEST(ConfigEnvTest, CheckEnvValuesDash)