Skip to content

Commit

Permalink
Introduce get_blocks_request_v1 to support fetch_finality_data parame…
Browse files Browse the repository at this point in the history
…ter; add tests for previous SHiP clients interacting with new format
  • Loading branch information
linh2931 committed Mar 20, 2024
1 parent a3a0a6f commit bbe8735
Show file tree
Hide file tree
Showing 7 changed files with 490 additions and 329 deletions.
14 changes: 13 additions & 1 deletion libraries/state_history/abi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ extern const char* const state_history_plugin_abi = R"({
},
{
"name": "get_blocks_request_v0", "fields": [
{ "name": "start_block_num", "type": "uint32" },
{ "name": "end_block_num", "type": "uint32" },
{ "name": "max_messages_in_flight", "type": "uint32" },
{ "name": "have_positions", "type": "block_position[]" },
{ "name": "irreversible_only", "type": "bool" },
{ "name": "fetch_block", "type": "bool" },
{ "name": "fetch_traces", "type": "bool" },
{ "name": "fetch_deltas", "type": "bool" }
]
},
{
"name": "get_blocks_request_v1", "fields": [
{ "name": "start_block_num", "type": "uint32" },
{ "name": "end_block_num", "type": "uint32" },
{ "name": "max_messages_in_flight", "type": "uint32" },
Expand Down Expand Up @@ -554,7 +566,7 @@ extern const char* const state_history_plugin_abi = R"({
{ "new_type_name": "transaction_id", "type": "checksum256" }
],
"variants": [
{ "name": "request", "types": ["get_status_request_v0", "get_blocks_request_v0", "get_blocks_ack_request_v0"] },
{ "name": "request", "types": ["get_status_request_v0", "get_blocks_request_v0", "get_blocks_request_v1", "get_blocks_ack_request_v0"] },
{ "name": "result", "types": ["get_status_result_v0", "get_blocks_result_v0"] },
{ "name": "action_receipt", "types": ["action_receipt_v0"] },
Expand Down
11 changes: 8 additions & 3 deletions libraries/state_history/include/eosio/state_history/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ struct get_blocks_request_v0 {
bool fetch_block = false;
bool fetch_traces = false;
bool fetch_deltas = false;
bool fetch_finality_data = false;
};

struct get_blocks_request_v1 : get_blocks_request_v0 {
bool fetch_finality_data = false;;
};

struct get_blocks_ack_request_v0 {
Expand All @@ -123,7 +126,8 @@ struct get_blocks_result_v0 : get_blocks_result_base {
std::optional<bytes> finality_data;
};

using state_request = std::variant<get_status_request_v0, get_blocks_request_v0, get_blocks_ack_request_v0>;
using state_request = std::variant<get_status_request_v0, get_blocks_request_v0, get_blocks_request_v1, get_blocks_ack_request_v0>;
using get_blocks_request = std::variant<get_blocks_request_v0, get_blocks_request_v1>;
using state_result = std::variant<get_status_result_v0, get_blocks_result_v0>;

} // namespace state_history
Expand All @@ -134,7 +138,8 @@ FC_REFLECT(eosio::state_history::table_delta, (struct_version)(name)(rows));
FC_REFLECT(eosio::state_history::block_position, (block_num)(block_id));
FC_REFLECT_EMPTY(eosio::state_history::get_status_request_v0);
FC_REFLECT(eosio::state_history::get_status_result_v0, (head)(last_irreversible)(trace_begin_block)(trace_end_block)(chain_state_begin_block)(chain_state_end_block)(chain_id));
FC_REFLECT(eosio::state_history::get_blocks_request_v0, (start_block_num)(end_block_num)(max_messages_in_flight)(have_positions)(irreversible_only)(fetch_block)(fetch_traces)(fetch_deltas)(fetch_finality_data));
FC_REFLECT(eosio::state_history::get_blocks_request_v0, (start_block_num)(end_block_num)(max_messages_in_flight)(have_positions)(irreversible_only)(fetch_block)(fetch_traces)(fetch_deltas));
FC_REFLECT_DERIVED(eosio::state_history::get_blocks_request_v1, (eosio::state_history::get_blocks_request_v0), (fetch_finality_data));
FC_REFLECT(eosio::state_history::get_blocks_ack_request_v0, (num_messages));
FC_REFLECT(eosio::state_history::get_blocks_result_base, (head)(last_irreversible)(this_block)(prev_block)(block));
FC_REFLECT_DERIVED(eosio::state_history::get_blocks_result_v0, (eosio::state_history::get_blocks_result_base), (traces)(deltas)(finality_data));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct session_base {
virtual void send_update(const chain::signed_block_ptr& block, const chain::block_id_type& id) = 0;
virtual ~session_base() = default;

std::optional<state_history::get_blocks_request_v0> current_request;
std::optional<state_history::get_blocks_request> current_request;
bool need_to_send_update = false;
};

Expand Down Expand Up @@ -165,18 +165,25 @@ class blocks_ack_request_send_queue_entry : public send_queue_entry_base {
, req(std::move(r)) {}

void send_entry() override {
session->current_request->max_messages_in_flight += req.num_messages;
assert(session->current_request);
assert(std::holds_alternative<state_history::get_blocks_request_v0>(*session->current_request) ||
std::holds_alternative<state_history::get_blocks_request_v1>(*session->current_request));

std::visit(chain::overloaded{
[&](eosio::state_history::get_blocks_request_v0& request) { request.max_messages_in_flight += req.num_messages;},
[&](eosio::state_history::get_blocks_request_v1& request) { request.max_messages_in_flight += req.num_messages;} },
*session->current_request);
session->send_update(false);
}
};

template <typename Session>
class blocks_request_send_queue_entry : public send_queue_entry_base {
std::shared_ptr<Session> session;
eosio::state_history::get_blocks_request_v0 req;
eosio::state_history::get_blocks_request req;

public:
blocks_request_send_queue_entry(std::shared_ptr<Session> s, state_history::get_blocks_request_v0&& r)
blocks_request_send_queue_entry(std::shared_ptr<Session> s, state_history::get_blocks_request&& r)
: session(std::move(s))
, req(std::move(r)) {}

Expand Down Expand Up @@ -433,6 +440,14 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
session_mgr.add_send_queue(std::move(self), std::move(entry_ptr));
}

void process(state_history::get_blocks_request_v1& req) {
fc_dlog(plugin.get_logger(), "received get_blocks_request_v1 = ${req}", ("req", req));

auto self = this->shared_from_this();
auto entry_ptr = std::make_unique<blocks_request_send_queue_entry<session>>(self, std::move(req));
session_mgr.add_send_queue(std::move(self), std::move(entry_ptr));
}

void process(state_history::get_blocks_ack_request_v0& req) {
fc_dlog(plugin.get_logger(), "received get_blocks_ack_request_v0 = ${req}", ("req", req));
if (!current_request) {
Expand Down Expand Up @@ -468,7 +483,7 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
return result;
}

void update_current_request(state_history::get_blocks_request_v0& req) {
void update_current_request_impl(state_history::get_blocks_request_v0& req) {
fc_dlog(plugin.get_logger(), "replying get_blocks_request_v0 = ${req}", ("req", req));
to_send_block_num = std::max(req.start_block_num, plugin.get_first_available_block_num());
for (auto& cp : req.have_positions) {
Expand All @@ -492,24 +507,35 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
if( !req.have_positions.empty() ) {
position_it = req.have_positions.begin();
}
}

void update_current_request(state_history::get_blocks_request& req) {
assert(std::holds_alternative<state_history::get_blocks_request_v0>(req) ||
std::holds_alternative<state_history::get_blocks_request_v1>(req));

current_request = std::move(req);
std::visit(chain::overloaded{
[&](state_history::get_blocks_request_v0& request) {
update_current_request_impl(request);
current_request = std::move(req);},
[&](state_history::get_blocks_request_v1& request) {
update_current_request_impl(request);
fc_dlog(plugin.get_logger(), "replying get_blocks_request_v1, fetch_finality_data = ${fetch_finality_data}", ("fetch_finality_data", request.fetch_finality_data));
current_request = std::move(req);} },
req);
}

void send_update(state_history::get_blocks_result_v0 result, const chain::signed_block_ptr& block, const chain::block_id_type& id) {
void send_update(state_history::get_blocks_request_v0& request, bool fetch_finality_data, state_history::get_blocks_result_v0 result, const chain::signed_block_ptr& block, const chain::block_id_type& id) {
need_to_send_update = true;
if (!current_request || !current_request->max_messages_in_flight) {
session_mgr.pop_entry(false);
return;
}

result.last_irreversible = plugin.get_last_irreversible();
uint32_t current =
current_request->irreversible_only ? result.last_irreversible.block_num : result.head.block_num;
request.irreversible_only ? result.last_irreversible.block_num : result.head.block_num;

if (to_send_block_num > current || to_send_block_num >= current_request->end_block_num) {
fc_dlog( plugin.get_logger(), "Not sending, to_send_block_num: ${s}, current: ${c} current_request.end_block_num: ${b}",
("s", to_send_block_num)("c", current)("b", current_request->end_block_num) );
fc_dlog( plugin.get_logger(), "irreversible_only: ${i}, last_irreversible: ${p}, head.block_num: ${h}", ("i", request.irreversible_only)("p", result.last_irreversible.block_num)("h", result.head.block_num));
fc_dlog( plugin.get_logger(), "recved result: ${r}", ("r", result));
if (to_send_block_num > current || to_send_block_num >= request.end_block_num) {
fc_dlog( plugin.get_logger(), "Not sending, to_send_block_num: ${s}, current: ${c} request.end_block_num: ${b}",
("s", to_send_block_num)("c", current)("b", request.end_block_num) );
session_mgr.pop_entry(false);
return;
}
Expand All @@ -526,7 +552,7 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
auto& itr = *position_it;
auto block_id_seen_by_client = itr->block_id;
++itr;
if (itr == current_request->have_positions.end())
if (itr == request.have_positions.end())
position_it.reset();

if(block_id_seen_by_client == *block_id) {
Expand All @@ -541,15 +567,15 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
auto prev_block_id = plugin.get_block_id(to_send_block_num - 1);
if (prev_block_id)
result.prev_block = state_history::block_position{to_send_block_num - 1, *prev_block_id};
if (current_request->fetch_block) {
if (request.fetch_block) {
uint32_t block_num = block ? block->block_num() : 0; // block can be nullptr in testing
plugin.get_block(to_send_block_num, block_num, block, result.block);
}
if (current_request->fetch_traces && plugin.get_trace_log())
if (request.fetch_traces && plugin.get_trace_log())
result.traces.emplace();
if (current_request->fetch_deltas && plugin.get_chain_state_log())
if (request.fetch_deltas && plugin.get_chain_state_log())
result.deltas.emplace();
if (current_request->fetch_finality_data && plugin.get_finality_data_log()) {
if (fetch_finality_data && plugin.get_finality_data_log()) {
result.finality_data.emplace(); // create finality_data (it's an optional field)
}
}
Expand All @@ -567,15 +593,48 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
("this_id", result.this_block ? fc::variant{result.this_block->block_id} : fc::variant{}));
}

--current_request->max_messages_in_flight;
--request.max_messages_in_flight;
need_to_send_update = to_send_block_num <= current &&
to_send_block_num < current_request->end_block_num;
to_send_block_num < request.end_block_num;

std::make_shared<blocks_result_send_queue_entry<session>>(this->shared_from_this(), std::move(result))->send_entry();
}

bool no_request_or_not_max_messages_in_flight() {
if (!current_request)
return true;

uint32_t max_messages_in_flight = std::visit(
chain::overloaded{
[&](state_history::get_blocks_request_v0& request) -> uint32_t {
return request.max_messages_in_flight; },
[&](state_history::get_blocks_request_v1& request) -> uint32_t {
return request.max_messages_in_flight; }},
*current_request);

return !max_messages_in_flight;
}

void send_update(state_history::get_blocks_result_v0 result, const chain::signed_block_ptr& block, const chain::block_id_type& id) {
if (no_request_or_not_max_messages_in_flight()) {
session_mgr.pop_entry(false);
return;
}

assert(current_request);
assert(std::holds_alternative<state_history::get_blocks_request_v0>(*current_request) ||
std::holds_alternative<state_history::get_blocks_request_v1>(*current_request));

std::visit(eosio::chain::overloaded{
[&](eosio::state_history::get_blocks_request_v0& request) {
send_update(request, false, result, block, id); },
[&](eosio::state_history::get_blocks_request_v1& request) {
send_update(request, request.fetch_finality_data, result, block, id); } },
*current_request);
}

void send_update(const chain::signed_block_ptr& block, const chain::block_id_type& id) override {
if (!current_request || !current_request->max_messages_in_flight) {
if (no_request_or_not_max_messages_in_flight()) {
session_mgr.pop_entry(false);
return;
}
Expand Down
Loading

0 comments on commit bbe8735

Please sign in to comment.