Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(util): add restore_read function in rpc_message #442

Merged
merged 3 commits into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/dsn/cpp/rpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,4 @@ class rpc_write_stream : public binary_writer
int _last_write_next_total_size;
};
typedef ::dsn::ref_ptr<rpc_write_stream> rpc_write_stream_ptr;
}
} // namespace dsn
6 changes: 6 additions & 0 deletions include/dsn/tool-api/rpc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ class message_ex : public ref_counter,
size_t body_size() { return (size_t)header->body_length; }
DSN_API void *rw_ptr(size_t offset_begin);

// rpc_read_stream can read a msg many times by restore()
// rpc_read_stream stream1(msg)
// msg->restore_read()
// rpc_read_stream stream2(msg)
DSN_API void restore_read();

bool is_backup_request() const { return header->context.u.is_backup_request; }

private:
Expand Down
7 changes: 7 additions & 0 deletions src/core/core/rpc_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,13 @@ void message_ex::read_commit(size_t size)
this->_rw_committed = true;
}

void message_ex::restore_read()
{
_rw_index = -1;
_rw_committed = true;
_rw_offset = 0;
}

void *message_ex::rw_ptr(size_t offset_begin)
{
// printf("%p %s\n", this, __FUNCTION__);
Expand Down
12 changes: 12 additions & 0 deletions src/core/tests/rpc_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <dsn/utility/crc.h>
#include <dsn/utility/transient_memory.h>
#include <dsn/tool-api/rpc_message.h>
#include <core/core/message_utils.cpp>
#include <gtest/gtest.h>

using namespace ::dsn;
Expand Down Expand Up @@ -183,3 +184,14 @@ TEST(core, message_ex)
request->release_ref();
}
}

TEST(rpc_message, restore_read)
{
using namespace dsn;
configuration_query_by_index_request request, result;
message_ptr msg = from_thrift_request_to_received_message(request, RPC_CODE_FOR_TEST);
for (int i = 0; i < 10; i++) {
unmarshall(msg, result);
msg->restore_read();
}
}