Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Edge: Improve stability for state and fd closing. v5.0.214 v6.0.139 #4126

Merged
merged 3 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions trunk/src/app/srs_app_edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,10 @@ srs_error_t SrsEdgeForwarder::start()

url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);
}

// We must stop the coroutine before disposing the sdk.
srs_freep(trd);
trd = new SrsSTCoroutine("edge-fwr", this, _srs_context->get_id());

// open socket.
srs_freep(sdk);
Expand All @@ -806,10 +810,8 @@ srs_error_t SrsEdgeForwarder::start()
if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost), false, &stream)) != srs_success) {
return srs_error_wrap(err, "sdk publish");
}

srs_freep(trd);
trd = new SrsSTCoroutine("edge-fwr", this, _srs_context->get_id());


// Start the forwarding coroutine.
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "coroutine");
}
Expand All @@ -821,9 +823,12 @@ srs_error_t SrsEdgeForwarder::start()

void SrsEdgeForwarder::stop()
{
// Make sure the coroutine is stopped before disposing the sdk,
// for sdk is used by coroutine.
trd->stop();
queue->clear();
srs_freep(sdk);

queue->clear();
}

// when error, edge ingester sleep for a while and retry.
Expand All @@ -840,7 +845,13 @@ srs_error_t SrsEdgeForwarder::cycle()
return srs_error_wrap(err, "thread pull");
}

if ((err = do_cycle()) != srs_success) {
// If coroutine stopping, we should always set the quit error code.
err = do_cycle();
if (send_error_code == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (send_error_code == 0) {
if ((err = do_cycle()) != srs_success) {
if (send_error_code == ERROR_SUCCESS) {
// If coroutine stopping, we should always set the quit error code.
send_error_code = srs_error_code(err);
}
return srs_error_wrap(err, "do cycle");
}

suggest another way.

About send_error_code, I believe there is a dead loop:

while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "edge forward pull");
}
if (send_error_code != ERROR_SUCCESS) {
srs_usleep(SRS_EDGE_FORWARDER_TIMEOUT);
continue;
}
// read from client.
if (true) {
SrsCommonMessage* msg = NULL;
err = sdk->recv_message(&msg);
if (err != srs_success && srs_error_code(err) != ERROR_SOCKET_TIMEOUT) {
srs_error("edge push get server control message failed. err=%s", srs_error_desc(err).c_str());
send_error_code = srs_error_code(err);
srs_error_reset(err);
continue;
}

there are no place to set send_error_code = ERROR_SUCCESS in the loop, only set the value when sdk->recv_message(&msg).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not dead loop, the publish owner will stop it when error.

send_error_code = srs_error_code(err);
}

if (err != srs_success) {
return srs_error_wrap(err, "do cycle");
}

Expand Down
4 changes: 3 additions & 1 deletion trunk/src/app/srs_app_rtmp_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,9 @@ srs_error_t SrsRtmpConn::publishing(SrsSharedPtr<SrsLiveSource> source)
// but failed, so we must cleanup it.
// @see https://github.com/ossrs/srs/issues/474
// @remark when stream is busy, should never release it.
if (srs_error_code(err) != ERROR_SYSTEM_STREAM_BUSY) {
// @remark If state is invalid, should not release it because it's not published by this session.
int code = srs_error_code(err);
if (code != ERROR_SYSTEM_STREAM_BUSY && code != ERROR_RTMP_EDGE_PUBLISH_STATE) {
release_publish(source);
}

Expand Down
Loading