Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLV: Drop packet if header flag is not matched. v5.0.109 #3306

Merged
merged 2 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .run/regression-test.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="regression-test" type="CMakeRunConfiguration" factoryName="Application" PROGRAM_PARAMS="-c conf/regression-test-for-clion.conf" REDIRECT_INPUT="false" ELEVATE="false" USE_EXTERNAL_CONSOLE="false" WORKING_DIR="file://$CMakeCurrentBuildDir$/../../../" PASS_PARENT_ENVS_2="true" PROJECT_NAME="srs" TARGET_NAME="srs" CONFIG_NAME="Debug" RUN_TARGET_PROJECT_NAME="srs" RUN_TARGET_NAME="srs">
<method v="2">
<option name="com.jetbrains.cidr.execution.CidrBuildBeforeRunTaskProvider$BuildBeforeRunTask" enabled="true" />
</method>
</configuration>
</component>
20 changes: 6 additions & 14 deletions trunk/3rdparty/srs-bench/srs/rtc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2046,18 +2046,12 @@ func TestRtcPublish_FlvPlay(t *testing.T) {
}
defer f.Close()

var version uint8
var hasVideo, hasAudio bool
if version, hasVideo, hasAudio, err = f.ReadHeader(); err != nil {
if _, hasVideo, hasAudio, err = f.ReadHeader(); err != nil {
logger.Tf(ctx, "Flv demuxer read header failed, err=%v", err)
return
}

// Optional, user can check the header.
_ = version
_ = hasAudio
_ = hasVideo

var nnVideo, nnAudio int
var prevVideoTimestamp, prevAudioTimestamp int64

Expand All @@ -2083,14 +2077,12 @@ func TestRtcPublish_FlvPlay(t *testing.T) {
prevVideoTimestamp = (int64)(timestamp)
}

if nnAudio >= 10 && nnVideo >= 10 {
audioPacketsOK, videoPacketsOK := !hasAudio || nnAudio >= 10, !hasVideo || nnVideo >= 10
if audioPacketsOK && videoPacketsOK {
avDiff := prevVideoTimestamp - prevAudioTimestamp
// Check timestamp gap between video and audio, make sure audio timestamp align to video timestamp.
if avDiff <= 50 && avDiff >= -50 {
logger.Tf(ctx, "Flv recv %v audio, %v video, timestamp gap=%v", nnAudio, nnVideo, avDiff)
cancel()
break
}
logger.Tf(ctx, "Flv recv %v/%v audio, %v/%v video, avDiff=%v", hasAudio, nnAudio, hasVideo, nnVideo, avDiff)
cancel()
break
}

_ = tag
Expand Down
6 changes: 6 additions & 0 deletions trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1450,6 +1450,12 @@ vhost http.remux.srs.com {
# Overwrite by env SRS_VHOST_HTTP_REMUX_FAST_CACHE for all vhosts.
# default: 0
fast_cache 30;
# Whether drop packet if not match header. For example, there is has_audio and has video flag in FLV header, if
# this is set to on and has_audio is false, then SRS will drop audio packets when got audio packets. Generally
# it should work, but sometimes you might need SRS to keep packets even when FLV header is set to false.
# Overwrite by env SRS_VHOST_HTTP_REMUX_DROP_IF_NOT_MATCH for all vhosts.
# default: on
drop_if_not_match on;
# the stream mount for rtmp to remux to live streaming.
# typical mount to [vhost]/[app]/[stream].flv
# the variables:
Expand Down
67 changes: 67 additions & 0 deletions trunk/conf/regression-test-for-clion.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@

listen 1935;
max_connections 1000;
daemon off;
srs_log_tank console;

stream_caster {
enabled on;
caster gb28181;
output rtmp://127.0.0.1/live/[stream];
listen 9000;
sip {
enabled on;
listen 5060;
timeout 2.1;
reinvite 1.2;
}
}

http_server {
enabled on;
listen 8080;
dir ./objs/nginx/html;
}

http_api {
enabled on;
listen 1985;
}
stats {
network 0;
}
rtc_server {
enabled on;
listen 8000;
candidate $CANDIDATE;
}

vhost __defaultVhost__ {
rtc {
enabled on;
rtmp_to_rtc on;
keep_bframe off;
rtc_to_rtmp on;
}
play {
atc on;
}
http_remux {
enabled on;
mount [vhost]/[app]/[stream].flv;
drop_if_not_match on;
}
ingest livestream {
enabled on;
input {
type file;
url ./doc/source.200kbps.768x320.flv;
}
ffmpeg ./objs/ffmpeg/bin/ffmpeg;
engine {
enabled off;
output rtmp://127.0.0.1:[port]/live/livestream;
}
}
}

1 change: 1 addition & 0 deletions trunk/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The changelog for SRS.

## SRS 5.0 Changelog

* v5.0, 2022-12-13, For [#939](https://github.com/ossrs/srs/issues/939): FLV: Drop packet if header flag is not matched. v5.0.109
* v5.0, 2022-12-13, For [#939](https://github.com/ossrs/srs/issues/939): FLV: Reset has_audio or has_video if only sequence header.
* v5.0, 2022-12-12, Merge [#3301](https://github.com/ossrs/srs/pull/3301): DASH: Fix dash crash bug when writing file. v5.0.108
* v5.0, 2022-12-09, Merge [#3296](https://github.com/ossrs/srs/pull/3296): SRT: Support SRT to RTMP to WebRTC. v5.0.107
Expand Down
26 changes: 25 additions & 1 deletion trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2600,7 +2600,7 @@ srs_error_t SrsConfig::check_normal_config()
} else if (n == "http_remux") {
for (int j = 0; j < (int)conf->directives.size(); j++) {
string m = conf->at(j)->name;
if (m != "enabled" && m != "mount" && m != "fast_cache") {
if (m != "enabled" && m != "mount" && m != "fast_cache" && m != "drop_if_not_match") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.http_remux.%s of %s", m.c_str(), vhost->arg0().c_str());
}
}
Expand Down Expand Up @@ -8241,6 +8241,30 @@ srs_utime_t SrsConfig::get_vhost_http_remux_fast_cache(string vhost)
return srs_utime_t(::atof(conf->arg0().c_str()) * SRS_UTIME_SECONDS);
}

bool SrsConfig::get_vhost_http_remux_drop_if_not_match(string vhost)
{
SRS_OVERWRITE_BY_ENV_BOOL2("srs.vhost.http_remux.drop_if_not_match"); // SRS_VHOST_HTTP_REMUX_DROP_IF_NOT_MATCH

static bool DEFAULT = true;

SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return DEFAULT;
}

conf = conf->get("http_remux");
if (!conf) {
return DEFAULT;
}

conf = conf->get("drop_if_not_match");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}

return SRS_CONF_PERFER_TRUE(conf->arg0());
}

string SrsConfig::get_vhost_http_remux_mount(string vhost)
{
SRS_OVERWRITE_BY_ENV_STRING("srs.vhost.http_remux.mount"); // SRS_VHOST_HTTP_REMUX_MOUNT
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,8 @@ class SrsConfig
virtual bool get_vhost_http_remux_enabled(SrsConfDirective* vhost);
// Get the fast cache duration for http audio live stream.
virtual srs_utime_t get_vhost_http_remux_fast_cache(std::string vhost);
// Whether drop packet if not match header.
bool get_vhost_http_remux_drop_if_not_match(std::string vhost);
// Get the http flv live stream mount point for vhost.
// used to generate the flv stream mount path.
virtual std::string get_vhost_http_remux_mount(std::string vhost);
Expand Down
16 changes: 12 additions & 4 deletions trunk/src/app/srs_app_http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ srs_error_t SrsFlvStreamEncoder::write_metadata(int64_t timestamp, char* data, i
return enc->write_metadata(SrsFrameTypeScript, data, size);
}

void SrsFlvStreamEncoder::set_drop_if_not_match(bool v)
{
enc->set_drop_if_not_match(v);
}

bool SrsFlvStreamEncoder::has_cache()
{
// for flv stream, use gop cache of SrsLiveSource is ok.
Expand Down Expand Up @@ -356,7 +361,7 @@ srs_error_t SrsFlvStreamEncoder::write_header(bool has_video, bool has_audio)
return srs_error_wrap(err, "write header");
}

srs_trace("FLV: write header audio=%d, video=%d", has_audio, has_video);
srs_trace("FLV: write header audio=%d, video=%d, dinm=%d", has_audio, has_video, enc->drop_if_not_match());
}

return err;
Expand Down Expand Up @@ -576,12 +581,15 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess

string enc_desc;
ISrsBufferEncoder* enc = NULL;

srs_assert(entry);
bool drop_if_not_match = _srs_config->get_vhost_http_remux_drop_if_not_match(req->vhost);

if (srs_string_ends_with(entry->pattern, ".flv")) {
w->header()->set_content_type("video/x-flv");
enc_desc = "FLV";
enc = new SrsFlvStreamEncoder();
((SrsFlvStreamEncoder*)enc)->set_drop_if_not_match(drop_if_not_match);
} else if (srs_string_ends_with(entry->pattern, ".aac")) {
w->header()->set_content_type("audio/x-aac");
enc_desc = "AAC";
Expand Down Expand Up @@ -651,8 +659,8 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
}

srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost);
srs_trace("FLV %s, encoder=%s, mw_sleep=%dms, cache=%d, msgs=%d", entry->pattern.c_str(), enc_desc.c_str(),
srsu2msi(mw_sleep), enc->has_cache(), msgs.max);
srs_trace("FLV %s, encoder=%s, mw_sleep=%dms, cache=%d, msgs=%d, dinm=%d", entry->pattern.c_str(), enc_desc.c_str(),
srsu2msi(mw_sleep), enc->has_cache(), msgs.max, drop_if_not_match);

// TODO: free and erase the disabled entry after all related connections is closed.
// TODO: FXIME: Support timeout for player, quit infinite-loop.
Expand Down
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_http_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class SrsFlvStreamEncoder : public ISrsBufferEncoder
virtual srs_error_t write_video(int64_t timestamp, char* data, int size);
virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size);
public:
void set_drop_if_not_match(bool v);
virtual bool has_cache();
virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
public:
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/core/srs_core_version5.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@

#define VERSION_MAJOR 5
#define VERSION_MINOR 0
#define VERSION_REVISION 108
#define VERSION_REVISION 109

#endif
65 changes: 45 additions & 20 deletions trunk/src/kernel/srs_kernel_flv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,10 @@ SrsSharedPtrMessage* SrsSharedPtrMessage::copy2()
SrsFlvTransmuxer::SrsFlvTransmuxer()
{
writer = NULL;


drop_if_not_match_ = true;
has_audio_ = true;
has_video_ = true;
nb_tag_headers = 0;
tag_headers = NULL;
nb_iovss_cache = 0;
Expand All @@ -380,10 +383,23 @@ srs_error_t SrsFlvTransmuxer::initialize(ISrsWriter* fw)
return srs_success;
}

void SrsFlvTransmuxer::set_drop_if_not_match(bool v)
{
drop_if_not_match_ = v;
}

bool SrsFlvTransmuxer::drop_if_not_match()
{
return drop_if_not_match_;
}

srs_error_t SrsFlvTransmuxer::write_header(bool has_video, bool has_audio)
{
srs_error_t err = srs_success;

has_audio_ = has_audio;
has_video_ = has_video;

uint8_t av_flag = 0;
av_flag += (has_audio? 4:0);
av_flag += (has_video? 1:0);
Expand Down Expand Up @@ -444,6 +460,8 @@ srs_error_t SrsFlvTransmuxer::write_metadata(char type, char* data, int size)
srs_error_t SrsFlvTransmuxer::write_audio(int64_t timestamp, char* data, int size)
{
srs_error_t err = srs_success;

if (drop_if_not_match_ && !has_audio_) return err;

if (size > 0) {
cache_audio(timestamp, data, size, tag_header);
Expand All @@ -459,6 +477,8 @@ srs_error_t SrsFlvTransmuxer::write_audio(int64_t timestamp, char* data, int siz
srs_error_t SrsFlvTransmuxer::write_video(int64_t timestamp, char* data, int size)
{
srs_error_t err = srs_success;

if (drop_if_not_match_ && !has_video_) return err;

if (size > 0) {
cache_video(timestamp, data, size, tag_header);
Expand All @@ -481,17 +501,19 @@ srs_error_t SrsFlvTransmuxer::write_tags(SrsSharedPtrMessage** msgs, int count)
{
srs_error_t err = srs_success;

// realloc the iovss.
int nb_iovss = 3 * count;
// Do realloc the iovss if required.
iovec* iovss = iovss_cache;
if (nb_iovss_cache < nb_iovss) {
srs_freepa(iovss_cache);

nb_iovss_cache = nb_iovss;
iovss = iovss_cache = new iovec[nb_iovss];
}
do {
int nn_might_iovss = 3 * count;
if (nb_iovss_cache < nn_might_iovss) {
srs_freepa(iovss_cache);

nb_iovss_cache = nn_might_iovss;
iovss = iovss_cache = new iovec[nn_might_iovss];
}
} while (false);
xiaozhihong marked this conversation as resolved.
Show resolved Hide resolved

// realloc the tag headers.
// Do realloc the tag headers if required.
char* cache = tag_headers;
if (nb_tag_headers < count) {
srs_freepa(tag_headers);
Expand All @@ -500,7 +522,7 @@ srs_error_t SrsFlvTransmuxer::write_tags(SrsSharedPtrMessage** msgs, int count)
cache = tag_headers = new char[SRS_FLV_TAG_HEADER_SIZE * count];
}

// realloc the pts.
// Do realloc the pts if required.
char* pts = ppts;
if (nb_ppts < count) {
srs_freepa(ppts);
Expand All @@ -509,38 +531,41 @@ srs_error_t SrsFlvTransmuxer::write_tags(SrsSharedPtrMessage** msgs, int count)
pts = ppts = new char[SRS_FLV_PREVIOUS_TAG_SIZE * count];
}

// the cache is ok, write each messages.
iovec* iovs = iovss;
// Now all caches are ok, start to write all messages.
iovec* iovs = iovss; int nn_real_iovss = 0;
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs[i];

// cache all flv header.
// Cache FLV packet header.
if (msg->is_audio()) {
if (drop_if_not_match_ && !has_audio_) continue; // Ignore audio packets if no audio stream.
cache_audio(msg->timestamp, msg->payload, msg->size, cache);
} else if (msg->is_video()) {
if (drop_if_not_match_ && !has_video_) continue; // Ignore video packets if no video stream.
cache_video(msg->timestamp, msg->payload, msg->size, cache);
} else {
cache_metadata(SrsFrameTypeScript, msg->payload, msg->size, cache);
}

// cache all pts.
// Cache FLV pts.
cache_pts(SRS_FLV_TAG_HEADER_SIZE + msg->size, pts);

// all ioves.
// Set cache to iovec.
iovs[0].iov_base = cache;
iovs[0].iov_len = SRS_FLV_TAG_HEADER_SIZE;
iovs[1].iov_base = msg->payload;
iovs[1].iov_len = msg->size;
iovs[2].iov_base = pts;
iovs[2].iov_len = SRS_FLV_PREVIOUS_TAG_SIZE;

// move next.
// Move to next cache.
cache += SRS_FLV_TAG_HEADER_SIZE;
pts += SRS_FLV_PREVIOUS_TAG_SIZE;
iovs += 3;
iovs += 3; nn_real_iovss += 3;
}

if ((err = writer->writev(iovss, nb_iovss, NULL)) != srs_success) {

// Send out all data carried by iovec.
if ((err = writer->writev(iovss, nn_real_iovss, NULL)) != srs_success) {
return srs_error_wrap(err, "write flv tags failed");
}

Expand Down
Loading