Skip to content

Commit

Permalink
Merge pull request #1996 from AntelopeIO/clean_up_controller_signalling
Browse files Browse the repository at this point in the history
Clean up controller signals to improve
  • Loading branch information
linh2931 authored Dec 19, 2023
2 parents 9658bd1 + 4938640 commit 9924152
Show file tree
Hide file tree
Showing 35 changed files with 379 additions and 375 deletions.
29 changes: 10 additions & 19 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,9 @@ struct controller_impl {
set_activation_handler<builtin_protocol_feature_t::bls_primitives>();
set_activation_handler<builtin_protocol_feature_t::disable_deferred_trxs_stage_2>();

self.irreversible_block.connect([this](const block_state_legacy_ptr& bsp) {
wasmif.current_lib(bsp->block_num);
self.irreversible_block.connect([this](const block_signal_params& t) {
const auto& [ block, id] = t;
wasmif.current_lib(block->block_num());
});


Expand All @@ -368,7 +369,7 @@ struct controller_impl {
}

/**
* Plugins / observers listening to signals emited (such as accepted_transaction) might trigger
* Plugins / observers listening to signals emited might trigger
* errors and throw exceptions. Unless those exceptions are caught it could impact consensus and/or
* cause a node to fork.
*
Expand Down Expand Up @@ -448,7 +449,7 @@ struct controller_impl {
apply_block( br, *bitr, controller::block_status::complete, trx_meta_cache_lookup{} );
}

emit( self.irreversible_block, *bitr );
emit( self.irreversible_block, std::tie((*bitr)->block, (*bitr)->id) );

// blog.append could fail due to failures like running out of space.
// Do it before commit so that in case it throws, DB can be rolled back.
Expand Down Expand Up @@ -1336,7 +1337,6 @@ struct controller_impl {
pending->_block_report.total_cpu_usage_us += billed_cpu_time_us;
pending->_block_report.total_elapsed_time += trace->elapsed;
pending->_block_report.total_time += trace->elapsed;
emit( self.accepted_transaction, trx );
dmlog_applied_transaction(trace);
emit( self.applied_transaction, std::tie(trace, trx->packed_trx()) );
undo_session.squash();
Expand Down Expand Up @@ -1402,7 +1402,6 @@ struct controller_impl {

trace->account_ram_delta = account_delta( gtrx.payer, trx_removal_ram_delta );

emit( self.accepted_transaction, trx );
dmlog_applied_transaction(trace);
emit( self.applied_transaction, std::tie(trace, trx->packed_trx()) );

Expand Down Expand Up @@ -1447,7 +1446,6 @@ struct controller_impl {
if( !trace->except_ptr ) {
trace->account_ram_delta = account_delta( gtrx.payer, trx_removal_ram_delta );
trace->elapsed = fc::time_point::now() - start;
emit( self.accepted_transaction, trx );
dmlog_applied_transaction(trace);
emit( self.applied_transaction, std::tie(trace, trx->packed_trx()) );
undo_session.squash();
Expand Down Expand Up @@ -1493,13 +1491,11 @@ struct controller_impl {
trace->receipt = push_receipt(gtrx.trx_id, transaction_receipt::hard_fail, cpu_time_to_bill_us, 0);
trace->account_ram_delta = account_delta( gtrx.payer, trx_removal_ram_delta );

emit( self.accepted_transaction, trx );
dmlog_applied_transaction(trace);
emit( self.applied_transaction, std::tie(trace, trx->packed_trx()) );

undo_session.squash();
} else {
emit( self.accepted_transaction, trx );
dmlog_applied_transaction(trace);
emit( self.applied_transaction, std::tie(trace, trx->packed_trx()) );
}
Expand Down Expand Up @@ -1633,7 +1629,6 @@ struct controller_impl {
// call the accept signal but only once for this transaction
if (!trx->accepted) {
trx->accepted = true;
emit(self.accepted_transaction, trx);
}

dmlog_applied_transaction(trace, &trn);
Expand Down Expand Up @@ -1680,7 +1675,6 @@ struct controller_impl {
}

if (!trx->is_transient()) {
emit(self.accepted_transaction, trx);
dmlog_applied_transaction(trace);
emit(self.applied_transaction, std::tie(trace, trx->packed_trx()));

Expand Down Expand Up @@ -1950,7 +1944,7 @@ struct controller_impl {
if( s == controller::block_status::incomplete ) {
fork_db.add( bsp );
fork_db.mark_valid( bsp );
emit( self.accepted_block_header, bsp );
emit( self.accepted_block_header, std::tie(bsp->block, bsp->id) );
EOS_ASSERT( bsp == fork_db.head(), fork_database_exception, "committed block did not become the new head in fork database");
} else if (s != controller::block_status::irreversible) {
fork_db.mark_valid( bsp );
Expand All @@ -1962,7 +1956,7 @@ struct controller_impl {
dm_logger->on_accepted_block(bsp);
}

emit( self.accepted_block, bsp );
emit( self.accepted_block, std::tie(bsp->block, bsp->id) );

if( s == controller::block_status::incomplete ) {
log_irreversible();
Expand Down Expand Up @@ -2258,15 +2252,13 @@ struct controller_impl {
return;
}

emit( self.pre_accepted_block, b );

fork_db.add( bsp );

if (self.is_trusted_producer(b->producer)) {
trusted_producer_light_validation = true;
};

emit( self.accepted_block_header, bsp );
emit( self.accepted_block_header, std::tie(bsp->block, bsp->id) );

if( read_mode != db_read_mode::IRREVERSIBLE ) {
maybe_switch_forks( br, fork_db.pending_head(), s, forked_branch_cb, trx_lookup );
Expand All @@ -2293,7 +2285,6 @@ struct controller_impl {
return;
}

emit( self.pre_accepted_block, b );
const bool skip_validate_signee = !conf.force_all_checks;

auto bsp = std::make_shared<block_state_legacy>(
Expand All @@ -2311,15 +2302,15 @@ struct controller_impl {
fork_db.add( bsp, true );
}

emit( self.accepted_block_header, bsp );
emit( self.accepted_block_header, std::tie(bsp->block, bsp->id) );

controller::block_report br;
if( s == controller::block_status::irreversible ) {
apply_block( br, bsp, s, trx_meta_cache_lookup{} );

// On replay, log_irreversible is not called and so no irreversible_block signal is emitted.
// So emit it explicitly here.
emit( self.irreversible_block, bsp );
emit( self.irreversible_block, std::tie(bsp->block, bsp->id) );

if (!self.skip_db_sessions(s)) {
db.commit(bsp->block_num);
Expand Down
23 changes: 6 additions & 17 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ namespace eosio { namespace chain {
// lookup transaction_metadata via supplied function to avoid re-creation
using trx_meta_cache_lookup = std::function<transaction_metadata_ptr( const transaction_id_type&)>;

using block_signal_params = std::tuple<const signed_block_ptr&, const block_id_type&>;

class fork_database;

enum class db_read_mode {
Expand Down Expand Up @@ -326,24 +328,11 @@ namespace eosio { namespace chain {

static std::optional<uint64_t> convert_exception_to_error_code( const fc::exception& e );

signal<void(uint32_t)> block_start; // block_num
signal<void(const signed_block_ptr&)> pre_accepted_block;
signal<void(const block_state_legacy_ptr&)> accepted_block_header;
signal<void(const block_state_legacy_ptr&)> accepted_block;
signal<void(const block_state_legacy_ptr&)> irreversible_block;
signal<void(const transaction_metadata_ptr&)> accepted_transaction;
signal<void(uint32_t)> block_start;
signal<void(const block_signal_params&)> accepted_block_header;
signal<void(const block_signal_params&)> accepted_block;
signal<void(const block_signal_params&)> irreversible_block;
signal<void(std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&>)> applied_transaction;
signal<void(const int&)> bad_alloc;

/*
signal<void()> pre_apply_block;
signal<void()> post_apply_block;
signal<void()> abort_apply_block;
signal<void(const transaction_metadata_ptr&)> pre_apply_transaction;
signal<void(const transaction_trace_ptr&)> post_apply_transaction;
signal<void(const transaction_trace_ptr&)> pre_apply_action;
signal<void(const transaction_trace_ptr&)> post_apply_action;
*/

const apply_handler* find_apply_handler( account_name contract, scope_name scope, action_name act )const;
wasm_interface& get_wasm_interface();
Expand Down
11 changes: 5 additions & 6 deletions libraries/chain/include/eosio/chain/subjective_billing.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include <eosio/chain/types.hpp>
#include <eosio/chain/exceptions.hpp>
#include <eosio/chain/block_state_legacy.hpp>
#include <eosio/chain/transaction.hpp>
#include <eosio/chain/resource_limits.hpp>
#include <eosio/chain/resource_limits_private.hpp>
Expand Down Expand Up @@ -81,9 +80,9 @@ class subjective_billing {
}
}

void remove_subjective_billing( const chain::block_state_legacy_ptr& bsp, uint32_t time_ordinal ) {
void remove_subjective_billing( const chain::signed_block_ptr& block, uint32_t time_ordinal ) {
if( !_trx_cache_index.empty() ) {
for( const auto& receipt : bsp->block->transactions ) {
for( const auto& receipt : block->transactions ) {
if( std::holds_alternative<chain::packed_transaction>(receipt.trx) ) {
const auto& pt = std::get<chain::packed_transaction>(receipt.trx);
remove_subjective_billing( pt.id(), time_ordinal );
Expand Down Expand Up @@ -151,11 +150,11 @@ class subjective_billing {
}
}

void on_block( fc::logger& log, const chain::block_state_legacy_ptr& bsp, const fc::time_point& now ) {
if( bsp == nullptr || _disabled ) return;
void on_block( fc::logger& log, const chain::signed_block_ptr& block, const fc::time_point& now ) {
if( block == nullptr || _disabled ) return;
const auto time_ordinal = time_ordinal_for(now);
const auto orig_count = _account_subjective_bill_cache.size();
remove_subjective_billing( bsp, time_ordinal );
remove_subjective_billing( block, time_ordinal );
if (orig_count > 0) {
fc_dlog( log, "Subjective billed accounts ${n} removed ${r}",
("n", orig_count)("r", orig_count - _account_subjective_bill_cache.size()) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ class unapplied_transaction_queue {
return true;
}

void clear_applied( const block_state_legacy_ptr& bs ) {
void clear_applied( const signed_block_ptr& block ) {
if( empty() ) return;
auto& idx = queue.get<by_trx_id>();
for( const auto& receipt : bs->block->transactions ) {
for( const auto& receipt : block->transactions ) {
if( std::holds_alternative<packed_transaction>(receipt.trx) ) {
const auto& pt = std::get<packed_transaction>(receipt.trx);
auto itr = idx.find( pt.id() );
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
#pragma once

#include <eosio/chain/block_state_legacy.hpp>
#include <eosio/state_history/types.hpp>
#include <boost/iostreams/filtering_streambuf.hpp>

namespace eosio {
namespace state_history {

using chain::block_state_legacy_ptr;
using chain::transaction_id_type;

struct trace_converter {
std::map<transaction_id_type, augmented_transaction_trace> cached_traces;
std::optional<augmented_transaction_trace> onblock_trace;

void add_transaction(const transaction_trace_ptr& trace, const chain::packed_transaction_ptr& transaction);
void pack(boost::iostreams::filtering_ostreambuf& ds, bool trace_debug_mode, const block_state_legacy_ptr& block_state_legacy);
void pack(boost::iostreams::filtering_ostreambuf& ds, bool trace_debug_mode, const chain::signed_block_ptr& block);
};

} // namespace state_history
Expand Down
4 changes: 2 additions & 2 deletions libraries/state_history/trace_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ void trace_converter::add_transaction(const transaction_trace_ptr& trace, const
}
}

void trace_converter::pack(boost::iostreams::filtering_ostreambuf& obuf, bool trace_debug_mode, const block_state_legacy_ptr& block_state) {
void trace_converter::pack(boost::iostreams::filtering_ostreambuf& obuf, bool trace_debug_mode, const chain::signed_block_ptr& block) {
std::vector<augmented_transaction_trace> traces;
if (onblock_trace)
traces.push_back(*onblock_trace);
for (auto& r : block_state->block->transactions) {
for (auto& r : block->transactions) {
transaction_id_type id;
if (std::holds_alternative<transaction_id_type>(r.trx))
id = std::get<transaction_id_type>(r.trx);
Expand Down
7 changes: 4 additions & 3 deletions libraries/testing/tester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,10 @@ namespace eosio { namespace testing {
control->add_indices();
if (lambda) lambda();
chain_transactions.clear();
control->accepted_block.connect([this]( const block_state_legacy_ptr& block_state ){
FC_ASSERT( block_state->block );
for( auto receipt : block_state->block->transactions ) {
control->accepted_block.connect([this]( block_signal_params t ){
const auto& [ block, id ] = t;
FC_ASSERT( block );
for( auto receipt : block->transactions ) {
if( std::holds_alternative<packed_transaction>(receipt.trx) ) {
auto &pt = std::get<packed_transaction>(receipt.trx);
chain_transactions[pt.get_transaction().id()] = std::move(receipt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ namespace eosio::chain::plugin_interface {
struct chain_plugin_interface;

namespace channels {
using pre_accepted_block = channel_decl<struct pre_accepted_block_tag, signed_block_ptr>;
using rejected_block = channel_decl<struct rejected_block_tag, signed_block_ptr>;
using accepted_block_header = channel_decl<struct accepted_block_header_tag, block_state_legacy_ptr>;
using accepted_block = channel_decl<struct accepted_block_tag, block_state_legacy_ptr>;
using irreversible_block = channel_decl<struct irreversible_block_tag, block_state_legacy_ptr>;
using accepted_transaction = channel_decl<struct accepted_transaction_tag, transaction_metadata_ptr>;
using accepted_block_header = channel_decl<struct accepted_block_header_tag, block_signal_params>;
using accepted_block = channel_decl<struct accepted_block_tag, block_signal_params>;
using irreversible_block = channel_decl<struct irreversible_block_tag,block_signal_params>;
using applied_transaction = channel_decl<struct applied_transaction_tag, transaction_trace_ptr>;
}

Expand Down
Loading

0 comments on commit 9924152

Please sign in to comment.