From 29324fab469e0f7cef9ad04ffdbce832ac7dd9ff Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 4 Dec 2014 13:05:13 +0800 Subject: [PATCH] fix #248, improve about 15% performance for fast buffer. 2.0.49 --- trunk/src/core/srs_core.hpp | 2 +- trunk/src/kernel/srs_kernel_error.hpp | 1 + trunk/src/rtmp/srs_protocol_buffer.cpp | 93 +++++++++++++++++--------- trunk/src/rtmp/srs_protocol_buffer.hpp | 43 ++++++------ trunk/src/rtmp/srs_protocol_stack.cpp | 91 +++++++++++-------------- trunk/src/rtmp/srs_protocol_stack.hpp | 9 +-- 6 files changed, 129 insertions(+), 110 deletions(-) diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index df7b42d152..2bc63f30e2 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 48 +#define VERSION_REVISION 49 // server info. #define RTMP_SIG_SRS_KEY "SRS" #define RTMP_SIG_SRS_ROLE "origin/edge server" diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index aa6aa41093..75db04fafa 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -134,6 +134,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_OpenSslSha256DigestSize 2037 #define ERROR_OpenSslGetPeerPublicKey 2038 #define ERROR_OpenSslComputeSharedKey 2039 +#define ERROR_RTMP_BUFFER_OVERFLOW 2040 // // system control message, // not an error, but special control logic. diff --git a/trunk/src/rtmp/srs_protocol_buffer.cpp b/trunk/src/rtmp/srs_protocol_buffer.cpp index fa38f87779..12cc48b305 100644 --- a/trunk/src/rtmp/srs_protocol_buffer.cpp +++ b/trunk/src/rtmp/srs_protocol_buffer.cpp @@ -27,6 +27,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +// the max header size, +// @see SrsProtocol::read_message_header(). +#define SRS_RTMP_MAX_MESSAGE_HEADER 11 + SrsSimpleBuffer::SrsSimpleBuffer() { } @@ -81,8 +85,10 @@ SrsFastBuffer::SrsFastBuffer() merged_read = false; _handler = NULL; - nb_buffer = SOCKET_READ_SIZE; - buffer = new char[nb_buffer]; + p = end = buffer = NULL; + nb_buffer = 0; + + reset_buffer(SOCKET_READ_SIZE); } SrsFastBuffer::~SrsFastBuffer() @@ -90,37 +96,34 @@ SrsFastBuffer::~SrsFastBuffer() srs_freep(buffer); } -int SrsFastBuffer::length() +char SrsFastBuffer::read_1byte() { - int len = (int)data.size(); - srs_assert(len >= 0); - return len; -} - -char* SrsFastBuffer::bytes() -{ - return (length() == 0)? NULL : &data.at(0); + srs_assert(end - p >= 1); + return *p++; } -void SrsFastBuffer::erase(int size) +char* SrsFastBuffer::read_slice(int size) { - if (size <= 0) { - return; - } + srs_assert(end - p >= size); + srs_assert(p + size > buffer); - if (size >= length()) { - data.clear(); - return; - } + char* ptr = p; + p += size; - data.erase(data.begin(), data.begin() + size); + // reset when consumed all. + if (p == end) { + p = end = buffer; + srs_verbose("all consumed, reset fast buffer"); + } + + return ptr; } -void SrsFastBuffer::append(const char* bytes, int size) +void SrsFastBuffer::skip(int size) { - srs_assert(size > 0); - - data.insert(data.end(), bytes, bytes + size); + srs_assert(end - p >= size); + srs_assert(p + size > buffer); + p += size; } int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size) @@ -133,9 +136,27 @@ int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size) return ret; } - while (length() < required_size) { + // when read payload and need to grow, reset buffer. + if (end - p < required_size && required_size > SRS_RTMP_MAX_MESSAGE_HEADER) { + int nb_cap = end - p; + srs_verbose("move fast buffer %d bytes", nb_cap); + buffer = (char*)memmove(buffer, p, nb_cap); + p = buffer; + end = p + nb_cap; + } + + while (end - p < required_size) { + // the max to read is the left bytes. + size_t max_to_read = buffer + nb_buffer - end; + + if (max_to_read <= 0) { + ret = ERROR_RTMP_BUFFER_OVERFLOW; + srs_error("buffer overflow, required=%d, max=%d, ret=%d", required_size, nb_buffer, ret); + return ret; + } + ssize_t nread; - if ((ret = reader->read(buffer, nb_buffer, &nread)) != ERROR_SUCCESS) { + if ((ret = reader->read(end, max_to_read, &nread)) != ERROR_SUCCESS) { return ret; } @@ -149,8 +170,9 @@ int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size) _handler->on_read(nread); } + // we just move the ptr to next. srs_assert((int)nread > 0); - append(buffer, (int)nread); + end += nread; } return ret; @@ -198,8 +220,19 @@ int SrsFastBuffer::buffer_size() void SrsFastBuffer::reset_buffer(int size) { + // remember the cap. + int nb_cap = end - p; + + // atleast to put the old data. + nb_buffer = srs_max(nb_cap, size); + + // copy old data to buf. + char* buf = new char[nb_buffer]; + if (nb_cap > 0) { + memcpy(buf, p, nb_cap); + } + srs_freep(buffer); - - nb_buffer = size; - buffer = new char[nb_buffer]; + p = buffer = buf; + end = p + nb_cap; } diff --git a/trunk/src/rtmp/srs_protocol_buffer.hpp b/trunk/src/rtmp/srs_protocol_buffer.hpp index 72047ac1ba..6cae2915aa 100644 --- a/trunk/src/rtmp/srs_protocol_buffer.hpp +++ b/trunk/src/rtmp/srs_protocol_buffer.hpp @@ -116,39 +116,40 @@ class SrsFastBuffer // the merged handler bool merged_read; IMergeReadHandler* _handler; - // data and socket buffer - std::vector data; + // the user-space buffer to fill by reader, + // which use fast index and reset when chunk body read ok. + // @see https://github.com/winlinvip/simple-rtmp-server/issues/248 + // ptr to the current read position. + char* p; + // ptr to the content end. + char* end; + // ptr to the buffer. + // buffer <= p <= end <= buffer+nb_buffer char* buffer; + // the max size of buffer. int nb_buffer; public: SrsFastBuffer(); virtual ~SrsFastBuffer(); public: /** - * get the length of buffer. empty if zero. - * @remark assert length() is not negative. - */ - virtual int length(); - /** - * get the buffer bytes. - * @return the bytes, NULL if empty. + * read 1byte from buffer, move to next bytes. + * @remark assert buffer already grow(1). */ - virtual char* bytes(); -public: + virtual char read_1byte(); /** - * erase size of bytes from begin. - * @param size to erase size of bytes. - * clear if size greater than or equals to length() - * @remark ignore size is not positive. + * read a slice in size bytes, move to next bytes. + * user can use this char* ptr directly, and should never free it. + * @remark assert buffer already grow(size). + * @remark the ptr returned maybe invalid after grow(x). */ - virtual void erase(int size); -private: + virtual char* read_slice(int size); /** - * append specified bytes to buffer. - * @param size the size of bytes - * @remark assert size is positive. + * skip some bytes in buffer. + * @param size the bytes to skip. positive to next; negative to previous. + * @remark assert buffer already grow(size). */ - virtual void append(const char* bytes, int size); + virtual void skip(int size); public: /** * grow buffer to the required size, loop to read from skt to fill. diff --git a/trunk/src/rtmp/srs_protocol_stack.cpp b/trunk/src/rtmp/srs_protocol_stack.cpp index 40a001e132..609753992f 100644 --- a/trunk/src/rtmp/srs_protocol_stack.cpp +++ b/trunk/src/rtmp/srs_protocol_stack.cpp @@ -1075,14 +1075,13 @@ int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg) // chunk stream basic header. char fmt = 0; int cid = 0; - int bh_size = 0; - if ((ret = read_basic_header(fmt, cid, bh_size)) != ERROR_SUCCESS) { + if ((ret = read_basic_header(fmt, cid)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { srs_error("read basic header failed. ret=%d", ret); } return ret; } - srs_verbose("read basic header success. fmt=%d, cid=%d, bh_size=%d", fmt, cid, bh_size); + srs_verbose("read basic header success. fmt=%d, cid=%d", fmt, cid); // once we got the chunk message header, // that is there is a real message in cache, @@ -1115,8 +1114,7 @@ int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg) } // chunk stream message header - int mh_size = 0; - if ((ret = read_message_header(chunk, fmt, bh_size, mh_size)) != ERROR_SUCCESS) { + if ((ret = read_message_header(chunk, fmt)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { srs_error("read message header failed. ret=%d", ret); } @@ -1129,8 +1127,7 @@ int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg) // read msg payload from chunk stream. SrsMessage* msg = NULL; - int payload_size = 0; - if ((ret = read_message_payload(chunk, bh_size, mh_size, payload_size, &msg)) != ERROR_SUCCESS) { + if ((ret = read_message_payload(chunk, &msg)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { srs_error("read message payload failed. ret=%d", ret); } @@ -1203,59 +1200,52 @@ int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg) * Chunk stream IDs with values 64-319 could be represented by both 2- * byte version and 3-byte version of this field. */ -int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size) +int SrsProtocol::read_basic_header(char& fmt, int& cid) { int ret = ERROR_SUCCESS; - int required_size = 1; - if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) { + if ((ret = in_buffer->grow(skt, 1)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { - srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret); + srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", 1, ret); } return ret; } - char* p = in_buffer->bytes(); - - fmt = (*p >> 6) & 0x03; - cid = *p & 0x3f; - bh_size = 1; + fmt = in_buffer->read_1byte(); + cid = fmt & 0x3f; + fmt = (fmt >> 6) & 0x03; // 2-63, 1B chunk header if (cid > 1) { - srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid); + srs_verbose("basic header parsed. fmt=%d, cid=%d", fmt, cid); return ret; } // 64-319, 2B chunk header if (cid == 0) { - required_size = 2; - if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) { + if ((ret = in_buffer->grow(skt, 1)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { - srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret); + srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", 1, ret); } return ret; } cid = 64; - cid += (u_int8_t)*(++p); - bh_size = 2; - srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid); + cid += (u_int8_t)in_buffer->read_1byte(); + srs_verbose("2bytes basic header parsed. fmt=%d, cid=%d", fmt, cid); // 64-65599, 3B chunk header } else if (cid == 1) { - required_size = 3; - if ((ret = in_buffer->grow(skt, 3)) != ERROR_SUCCESS) { + if ((ret = in_buffer->grow(skt, 2)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { - srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret); + srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", 2, ret); } return ret; } cid = 64; - cid += (u_int8_t)*(++p); - cid += ((u_int8_t)*(++p)) * 256; - bh_size = 3; - srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid); + cid += (u_int8_t)in_buffer->read_1byte(); + cid += ((u_int8_t)in_buffer->read_1byte()) * 256; + srs_verbose("3bytes basic header parsed. fmt=%d, cid=%d", fmt, cid); } else { srs_error("invalid path, impossible basic header."); srs_assert(false); @@ -1276,7 +1266,7 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size) * fmt=2, 0x8X * fmt=3, 0xCX */ -int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size) +int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt) { int ret = ERROR_SUCCESS; @@ -1344,17 +1334,15 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz // read message header from socket to buffer. static char mh_sizes[] = {11, 7, 3, 0}; - mh_size = mh_sizes[(int)fmt]; + int mh_size = mh_sizes[(int)fmt]; srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size); - int required_size = bh_size + mh_size; - if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) { + if (mh_size > 0 && (ret = in_buffer->grow(skt, mh_size)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { - srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); + srs_error("read %dbytes message header failed. ret=%d", mh_size, ret); } return ret; } - char* p = in_buffer->bytes() + bh_size; /** * parse the message header. @@ -1370,6 +1358,8 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz */ // see also: ngx_rtmp_recv if (fmt <= RTMP_FMT_TYPE2) { + char* p = in_buffer->read_slice(mh_size); + char* pp = (char*)&chunk->header.timestamp_delta; pp[2] = *p++; pp[1] = *p++; @@ -1466,14 +1456,16 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz // read extended-timestamp if (chunk->extended_timestamp) { mh_size += 4; - required_size = bh_size + mh_size; srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size); - if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) { + if ((ret = in_buffer->grow(skt, 4)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { - srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); + srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, 4, ret); } return ret; } + // the ptr to the slice maybe invalid when grow() + // reset the p to get 4bytes slice. + char* p = in_buffer->read_slice(4); u_int32_t timestamp = 0x00; char* pp = (char*)×tamp; @@ -1515,6 +1507,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz */ if (!is_first_chunk_of_msg && chunk_timestamp > 0 && chunk_timestamp != timestamp) { mh_size -= 4; + in_buffer->skip(-4); srs_info("no 4bytes extended timestamp in the continued chunk"); } else { chunk->header.timestamp = timestamp; @@ -1557,15 +1550,12 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz return ret; } -int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg) +int SrsProtocol::read_message_payload(SrsChunkStream* chunk, SrsMessage** pmsg) { int ret = ERROR_SUCCESS; // empty message if (chunk->header.payload_length <= 0) { - // need erase the header in buffer. - in_buffer->erase(bh_size + mh_size); - srs_trace("get an empty RTMP " "message(type=%d, size=%d, time=%"PRId64", sid=%d)", chunk->header.message_type, chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); @@ -1578,7 +1568,7 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh srs_assert(chunk->header.payload_length > 0); // the chunk payload size. - payload_size = chunk->header.payload_length - chunk->msg->size; + int payload_size = chunk->header.payload_length - chunk->msg->size; payload_size = srs_min(payload_size, in_chunk_size); srs_verbose("chunk payload size is %d, message_size=%d, received_size=%d, in_chunk_size=%d", payload_size, chunk->header.payload_length, chunk->msg->size, in_chunk_size); @@ -1586,23 +1576,20 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh // create msg payload if not initialized if (!chunk->msg->payload) { chunk->msg->payload = new char[chunk->header.payload_length]; - memset(chunk->msg->payload, 0, chunk->header.payload_length); - srs_verbose("create empty payload for RTMP message. size=%d", chunk->header.payload_length); + srs_verbose("create payload for RTMP message. size=%d", chunk->header.payload_length); } // read payload to buffer - int required_size = bh_size + mh_size + payload_size; - if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) { + if ((ret = in_buffer->grow(skt, payload_size)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { - srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret); + srs_error("read payload failed. required_size=%d, ret=%d", payload_size, ret); } return ret; } - memcpy(chunk->msg->payload + chunk->msg->size, in_buffer->bytes() + bh_size + mh_size, payload_size); - in_buffer->erase(bh_size + mh_size + payload_size); + memcpy(chunk->msg->payload + chunk->msg->size, in_buffer->read_slice(payload_size), payload_size); chunk->msg->size += payload_size; - srs_verbose("chunk payload read completed. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size); + srs_verbose("chunk payload read completed. payload_size=%d", payload_size); // got entire RTMP message? if (chunk->header.payload_length == chunk->msg->size) { diff --git a/trunk/src/rtmp/srs_protocol_stack.hpp b/trunk/src/rtmp/srs_protocol_stack.hpp index 9acbecb010..18531224f7 100644 --- a/trunk/src/rtmp/srs_protocol_stack.hpp +++ b/trunk/src/rtmp/srs_protocol_stack.hpp @@ -434,21 +434,18 @@ class SrsProtocol /** * read the chunk basic header(fmt, cid) from chunk stream. * user can discovery a SrsChunkStream by cid. - * @bh_size return the chunk basic header size, to remove the used bytes when finished. */ - virtual int read_basic_header(char& fmt, int& cid, int& bh_size); + virtual int read_basic_header(char& fmt, int& cid); /** * read the chunk message header(timestamp, payload_length, message_type, stream_id) * from chunk stream and save to SrsChunkStream. - * @mh_size return the chunk message header size, to remove the used bytes when finished. */ - virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size); + virtual int read_message_header(SrsChunkStream* chunk, char fmt); /** * read the chunk payload, remove the used bytes in buffer, * if got entire message, set the pmsg. - * @payload_size read size in this roundtrip, generally a chunk size or left message size. */ - virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg); + virtual int read_message_payload(SrsChunkStream* chunk, SrsMessage** pmsg); /** * when recv message, update the context. */