Skip to content

Commit

Permalink
for bug #251, refine the send use cond wait.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Dec 5, 2014
1 parent 4c1d5c0 commit dde05c6
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 7 deletions.
13 changes: 7 additions & 6 deletions trunk/src/app/srs_app_rtmp_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,10 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
// collect elapse for pithy print.
pithy_print.elapse();

// wait for message to incoming.
// @see https://github.com/winlinvip/simple-rtmp-server/issues/251
consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep);

// get messages from consumer.
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
int count = 0;
Expand All @@ -606,12 +610,9 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
return ret;
}

// no message to send, sleep a while.
if (count <= 0) {
srs_verbose("sleep for no messages to send");
st_usleep(mw_sleep * 1000);
}
srs_info("got %d msgs, mw=%d", count, mw_sleep);
// we use wait to get messages, so the count must be positive.
srs_assert(count > 0);
srs_info("got %d msgs, min=%d, mw=%d", count, SRS_PERF_MW_MIN_MSGS, mw_sleep);

// reportable
if (pithy_print.can_print()) {
Expand Down
38 changes: 38 additions & 0 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,16 @@ SrsMessageQueue::~SrsMessageQueue()
clear();
}

int SrsMessageQueue::count()
{
return (int)msgs.size();
}

int SrsMessageQueue::duration()
{
return (int)(av_end_time - av_start_time);
}

void SrsMessageQueue::set_queue_size(double queue_size)
{
queue_size_ms = (int)(queue_size * 1000);
Expand Down Expand Up @@ -290,13 +300,19 @@ SrsConsumer::SrsConsumer(SrsSource* _source)
jitter = new SrsRtmpJitter();
queue = new SrsMessageQueue();
should_update_source_id = false;

mw_wait = st_cond_new();
mw_min_msgs = 0;
mw_duration = 0;
mw_waiting = false;
}

SrsConsumer::~SrsConsumer()
{
source->on_consumer_destroy(this);
srs_freep(jitter);
srs_freep(queue);
st_cond_destroy(mw_wait);
}

void SrsConsumer::set_queue_size(double queue_size)
Expand Down Expand Up @@ -329,6 +345,12 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S
return ret;
}

// fire the mw when msgs is enough.
if (mw_waiting && queue->count() > mw_min_msgs && queue->duration() > mw_duration) {
st_cond_signal(mw_wait);
mw_waiting = false;
}

return ret;
}

Expand All @@ -349,6 +371,22 @@ int SrsConsumer::dump_packets(int max_count, SrsMessage** pmsgs, int& count)
return queue->dump_packets(max_count, pmsgs, count);
}

void SrsConsumer::wait(int nb_msgs, int duration)
{
mw_min_msgs = nb_msgs;
mw_duration = duration;

// already ok, donot wait.
if (queue->count() > mw_min_msgs && queue->duration() > mw_duration) {
return;
}

// the enqueue will notify this cond.
mw_waiting = true;

st_cond_wait(mw_wait);
}

int SrsConsumer::on_play_client_pause(bool is_pause)
{
int ret = ERROR_SUCCESS;
Expand Down
20 changes: 20 additions & 0 deletions trunk/src/app/srs_app_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ class SrsMessageQueue
SrsMessageQueue();
virtual ~SrsMessageQueue();
public:
/**
* get the count of queue.
*/
virtual int count();
/**
* get duration of queue.
*/
virtual int duration();
/**
* set the queue size
* @param queue_size the queue size in seconds.
Expand Down Expand Up @@ -154,6 +162,12 @@ class SrsConsumer
bool paused;
// when source id changed, notice all consumers
bool should_update_source_id;
// the cond wait for mw.
// @see https://github.com/winlinvip/simple-rtmp-server/issues/251
st_cond_t mw_wait;
bool mw_waiting;
int mw_min_msgs;
int mw_duration;
public:
SrsConsumer(SrsSource* _source);
virtual ~SrsConsumer();
Expand Down Expand Up @@ -189,6 +203,12 @@ class SrsConsumer
*/
virtual int dump_packets(int max_count, SrsMessage** pmsgs, int& count);
/**
* wait for messages incomming, atleast nb_msgs and in duration.
* @param nb_msgs the messages count to wait.
* @param duration the messgae duration to wait.
*/
virtual void wait(int nb_msgs, int duration);
/**
* when client send the pause message.
*/
virtual int on_play_client_pause(bool is_pause);
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/core/srs_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR 2
#define VERSION_MINOR 0
#define VERSION_REVISION 55
#define VERSION_REVISION 56
// server info.
#define RTMP_SIG_SRS_KEY "SRS"
#define RTMP_SIG_SRS_ROLE "origin/edge server"
Expand Down
5 changes: 5 additions & 0 deletions trunk/src/core/srs_core_performance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
* @remark, recomment to 156.
*/
#define SRS_PERF_MW_MSGS 156
/**
* how many msgs atleast to send.
* @remark, recomment to 8.
*/
#define SRS_PERF_MW_MIN_MSGS 8

/**
* how many chunk stream to cache, [0, N].
Expand Down

0 comments on commit dde05c6

Please sign in to comment.