From 8e450d03a36c906999b81a6880d21a320bf0c458 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Avila=20Gast=C3=B3n?= <72628438+avilagaston9@users.noreply.github.com> Date: Mon, 8 Jul 2024 19:53:58 -0300 Subject: [PATCH] refactor: remove execution_chain GenServer (#1185) --- bench/deposit_tree.exs | 26 ++++ deposit_tree_file | Bin 0 -> 1385 bytes .../controllers/v1/beacon_controller.ex | 4 +- .../beacon/beacon_node.ex | 18 ++- .../execution/execution_chain.ex | 129 +++++++++++------- .../fork_choice/fork_choice.ex | 8 +- .../store/store_db.ex | 13 ++ test/unit/beacon_api/beacon_api_v1_test.exs | 5 +- 8 files changed, 129 insertions(+), 74 deletions(-) create mode 100644 bench/deposit_tree.exs create mode 100644 deposit_tree_file diff --git a/bench/deposit_tree.exs b/bench/deposit_tree.exs new file mode 100644 index 000000000..2ca331fee --- /dev/null +++ b/bench/deposit_tree.exs @@ -0,0 +1,26 @@ +alias LambdaEthereumConsensus.Execution.ExecutionChain + +# The --mode db flag is needed to run this benchmark. + +compressed_tree = File.read!("deposit_tree_file") +{:ok, encoded_tree} = :snappyer.decompress(compressed_tree) +deposit_tree = :erlang.binary_to_term(encoded_tree) + +Benchee.run( + %{ + "ExecutionChain.put" => fn v -> ExecutionChain.put("", v) end + }, + warmup: 2, + time: 5, + inputs: %{ + "DepositTree" => deposit_tree + } +) + +Benchee.run( + %{ + "ExecutionChain.get" => fn -> ExecutionChain.get("") end + }, + warmup: 2, + time: 5 +) diff --git a/deposit_tree_file b/deposit_tree_file new file mode 100644 index 0000000000000000000000000000000000000000..8b3a32d36d8e32c683b7607fa564c22a2b16ac43 GIT binary patch literal 1385 zcmXZce>7BC00;0pV`gcnjKs|N{p(aJ6_WJBn2beB$d9OuH{$h5XTkWVt!E?a9I~mTR{MDAl$(xZeLe zWGahqp;+h3Lw?DxtwD*`_K1%{=(hKV+-ZMm*Tte%V0DJ*CVlIk`>#7?|8IlBNpFjt zKcX`kllwi>x|(9b-0sR|OnVR0O9V!FzDkh7a;ePMG725sCoF8MDz0!wwDjcX+-Rx!FI-()b`XFyT)~X*nxPLq;AVkz9C-4>zjvtR}on@*h`esmA$O``#EljZ(CJ4vn||d>{3#z z^T$4>nI!5^+xt2qXbBv*fbcMm0w|jNLp^`5m-kOl<*+p^yWF%h^KSZru;c}bl~bh! zv7JO(2SoS*VHi-zP@5!1M{a4>rwHdr@L(+9mK&GaB_iM(XrG(J&wIMV;zgy+D1M(B7gDN=PncHwj(7cZtlv4 zC6F@gh=0BV=|11A%M_n)Ds|?WC*mau2X4X-eMY{ z5%dU)2#X0u1Y-h&z$CB;CInN08Nr-j;YHTb8x`ZslieG6a3U;_Nl~$g_x(SVKh*H< z!s*iaUDRiZlGWX>R(?buW*eK_&U~9P`qcGWkBwm_pX8kEfYSvs41+~d9xP>@*YWD5 z78QIirBO2h&$>V-EgSR(6W^*frXq942PnahR;rrx;jSug96xtw>uGlT4*@ json(%{ "data" => %{ - "genesis_time" => ForkChoice.get_genesis_time(), + "genesis_time" => StoreDb.fetch_genesis_time!(), "genesis_validators_root" => ChainSpec.get_genesis_validators_root() |> Utils.hex_encode(), "genesis_fork_version" => ChainSpec.get("GENESIS_FORK_VERSION") |> Utils.hex_encode() diff --git a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex index cc5fa3578..79463f0c8 100644 --- a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex +++ b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex @@ -30,12 +30,11 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do ForkChoice.init_store(store, time) - validator_children = - get_validator_children( + validator_manager = + get_validator_manager( deposit_tree_snapshot, store.head_slot, - store.head_root, - store.genesis_time + store.head_root ) children = @@ -46,23 +45,22 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do {Task.Supervisor, name: PruneStatesSupervisor}, {Task.Supervisor, name: PruneBlocksSupervisor}, {Task.Supervisor, name: PruneBlobsSupervisor} - ] ++ validator_children + ] ++ validator_manager Supervisor.init(children, strategy: :one_for_all) end - defp get_validator_children(nil, _, _, _) do + defp get_validator_manager(nil, _, _) do Logger.warning("Deposit data not found. Validator will be disabled.") - [] end - defp get_validator_children(snapshot, slot, head_root, genesis_time) do + defp get_validator_manager(snapshot, slot, head_root) do %BeaconState{eth1_data_votes: votes} = BlockStates.get_state_info!(head_root).beacon_state + LambdaEthereumConsensus.Execution.ExecutionChain.init(snapshot, votes) # TODO: move checkpoint sync outside and move this to application.ex [ - {ValidatorManager, {slot, head_root}}, - {LambdaEthereumConsensus.Execution.ExecutionChain, {genesis_time, snapshot, votes}} + {ValidatorManager, {slot, head_root}} ] end diff --git a/lib/lambda_ethereum_consensus/execution/execution_chain.ex b/lib/lambda_ethereum_consensus/execution/execution_chain.ex index 1cfe0b0f5..52fc7b376 100644 --- a/lib/lambda_ethereum_consensus/execution/execution_chain.ex +++ b/lib/lambda_ethereum_consensus/execution/execution_chain.ex @@ -4,9 +4,9 @@ defmodule LambdaEthereumConsensus.Execution.ExecutionChain do stores the canonical Eth1 chain for block proposing. """ require Logger - use GenServer alias LambdaEthereumConsensus.Execution.ExecutionClient + alias LambdaEthereumConsensus.Store.KvSchema alias LambdaEthereumConsensus.Store.StoreDb alias Types.Deposit alias Types.DepositTree @@ -14,18 +14,43 @@ defmodule LambdaEthereumConsensus.Execution.ExecutionChain do alias Types.Eth1Data alias Types.ExecutionPayload - @spec start_link(Types.uint64()) :: GenServer.on_start() - def start_link(opts) do - GenServer.start_link(__MODULE__, opts, name: __MODULE__) - end + use KvSchema, prefix: "execution_chain" + + @type state :: %{ + eth1_data_votes: map(), + eth1_chain: list(map()), + current_eth1_data: %Types.Eth1Data{}, + deposit_tree: %Types.DepositTree{}, + last_period: integer() + } + + @impl KvSchema + @spec encode_key(String.t()) :: {:ok, binary()} | {:error, binary()} + def encode_key(key), do: {:ok, key} + + @impl KvSchema + @spec decode_key(binary()) :: {:ok, String.t()} | {:error, binary()} + def decode_key(key), do: {:ok, key} + + @impl KvSchema + @spec encode_value(map()) :: {:ok, binary()} | {:error, binary()} + def encode_value(state), do: {:ok, :erlang.term_to_binary(state)} + + @impl KvSchema + @spec decode_value(binary()) :: {:ok, map()} | {:error, binary()} + def decode_value(bin), do: {:ok, :erlang.binary_to_term(bin)} @spec get_eth1_vote(Types.slot()) :: {:ok, Eth1Data.t() | nil} | {:error, any} def get_eth1_vote(slot) do - GenServer.call(__MODULE__, {:get_eth1_vote, slot}) + state = fetch_execution_state!() + compute_eth1_vote(state, slot) end - @spec get_eth1_vote(Types.slot()) :: DepositTreeSnapshot.t() - def get_deposit_snapshot(), do: GenServer.call(__MODULE__, :get_deposit_snapshot) + @spec get_deposit_snapshot() :: DepositTreeSnapshot.t() + def get_deposit_snapshot() do + state = fetch_execution_state!() + DepositTree.get_snapshot(state.deposit_tree) + end @spec get_deposits(Eth1Data.t(), Eth1Data.t(), Range.t()) :: {:ok, [Deposit.t()] | nil} | {:error, any} @@ -33,23 +58,37 @@ defmodule LambdaEthereumConsensus.Execution.ExecutionChain do if Range.size(deposit_range) == 0 do {:ok, []} else - GenServer.call(__MODULE__, {:get_deposits, current_eth1_data, eth1_vote, deposit_range}) + state = fetch_execution_state!() + votes = state.eth1_data_votes + + eth1_data = + if Map.has_key?(votes, eth1_vote) and has_majority?(votes, eth1_vote), + do: eth1_vote, + else: current_eth1_data + + compute_deposits(state, eth1_data, deposit_range) end end @spec notify_new_block(Types.slot(), Eth1Data.t(), ExecutionPayload.t()) :: :ok def notify_new_block(slot, eth1_data, %ExecutionPayload{} = execution_payload) do payload_info = Map.take(execution_payload, [:block_hash, :block_number, :timestamp]) - GenServer.cast(__MODULE__, {:new_block, slot, eth1_data, payload_info}) + + fetch_execution_state!() + |> prune_state(slot) + |> update_state_with_payload(payload_info) + |> update_state_with_vote(eth1_data) + |> persist_execution_state() end - @impl true - def init({genesis_time, %DepositTreeSnapshot{} = snapshot, eth1_votes}) do + @doc """ + Initializes the table in the db by storing the initial state of the execution chain. + """ + def init(%DepositTreeSnapshot{} = snapshot, eth1_votes) do state = %{ # PERF: we could use some kind of ordered map for storing votes eth1_data_votes: %{}, eth1_chain: [], - genesis_time: genesis_time, current_eth1_data: DepositTreeSnapshot.get_eth1_data(snapshot), deposit_tree: DepositTree.from_snapshot(snapshot), last_period: 0 @@ -59,44 +98,14 @@ defmodule LambdaEthereumConsensus.Execution.ExecutionChain do StoreDb.persist_deposits_snapshot(snapshot) - {:ok, updated_state} - end - - @impl true - def handle_call({:get_eth1_vote, slot}, _from, state) do - {:reply, compute_eth1_vote(state, slot), state} - end - - @impl true - def handle_call(:get_deposit_snapshot, _from, state) do - {:reply, DepositTree.get_snapshot(state.deposit_tree), state} - end - - def handle_call({:get_deposits, current_eth1_data, eth1_vote, deposit_range}, _from, state) do - votes = state.eth1_data_votes - - eth1_data = - if Map.has_key?(votes, eth1_vote) and has_majority?(votes, eth1_vote), - do: eth1_vote, - else: current_eth1_data - - {:reply, compute_deposits(state, eth1_data, deposit_range), state} - end - - @impl true - def handle_cast({:new_block, slot, eth1_data, payload_info}, state) do - state - |> prune_state(slot) - |> update_state_with_payload(payload_info) - |> update_state_with_vote(eth1_data) - |> then(&{:noreply, &1}) + persist_execution_state(updated_state) end - defp prune_state(%{genesis_time: genesis_time, last_period: last_period} = state, slot) do + defp prune_state(%{last_period: last_period} = state, slot) do current_period = compute_period(slot) if current_period > last_period do - new_chain = drop_old_payloads(state.eth1_chain, genesis_time, slot) + new_chain = drop_old_payloads(state.eth1_chain, slot) %{state | eth1_data_votes: %{}, eth1_chain: new_chain, last_period: current_period} else state @@ -107,8 +116,8 @@ defmodule LambdaEthereumConsensus.Execution.ExecutionChain do %{state | eth1_chain: [payload_info | eth1_chain]} end - defp drop_old_payloads(eth1_chain, genesis_time, slot) do - period_start = voting_period_start_time(slot, genesis_time) + defp drop_old_payloads(eth1_chain, slot) do + period_start = voting_period_start_time(slot) follow_time_distance = ChainSpec.get("SECONDS_PER_ETH1_BLOCK") * ChainSpec.get("ETH1_FOLLOW_DISTANCE") @@ -172,22 +181,23 @@ defmodule LambdaEthereumConsensus.Execution.ExecutionChain do end end - defp validate_range(%{deposit_count: count}, _..deposit_end) when deposit_end >= count, do: :ok + defp validate_range(%{deposit_count: count}, _..deposit_end//_) when deposit_end >= count, + do: :ok + defp validate_range(_, _), do: {:error, "deposit range out of bounds"} - defp compute_eth1_vote(%{eth1_data_votes: []}, _), do: {:ok, nil} + defp compute_eth1_vote(%{eth1_data_votes: map}, _) when map == %{}, do: {:ok, nil} defp compute_eth1_vote(%{eth1_chain: []}, _), do: {:ok, nil} defp compute_eth1_vote( %{ eth1_chain: eth1_chain, eth1_data_votes: seen_votes, - genesis_time: genesis_time, deposit_tree: deposit_tree }, slot ) do - period_start = voting_period_start_time(slot, genesis_time) + period_start = voting_period_start_time(slot) follow_time = ChainSpec.get("SECONDS_PER_ETH1_BLOCK") * ChainSpec.get("ETH1_FOLLOW_DISTANCE") blocks_to_consider = @@ -257,7 +267,8 @@ defmodule LambdaEthereumConsensus.Execution.ExecutionChain do timestamp in (period_start - follow_time * 2)..(period_start - follow_time) end - defp voting_period_start_time(slot, genesis_time) do + defp voting_period_start_time(slot) do + genesis_time = StoreDb.fetch_genesis_time!() period_start_slot = slot - rem(slot, slots_per_eth1_voting_period()) genesis_time + period_start_slot * ChainSpec.get("SECONDS_PER_SLOT") end @@ -266,4 +277,16 @@ defmodule LambdaEthereumConsensus.Execution.ExecutionChain do defp slots_per_eth1_voting_period(), do: ChainSpec.get("EPOCHS_PER_ETH1_VOTING_PERIOD") * ChainSpec.get("SLOTS_PER_EPOCH") + + @spec persist_execution_state(state()) :: :ok | {:error, binary()} + defp persist_execution_state(state), do: put("", state) + + @spec fetch_execution_state() :: {:ok, state()} | {:error, binary()} | :not_found + defp fetch_execution_state(), do: get("") + + @spec fetch_execution_state!() :: state() + defp fetch_execution_state!() do + {:ok, state} = fetch_execution_state() + state + end end diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index 72028d5fe..2dc812e5e 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -114,16 +114,10 @@ defmodule LambdaEthereumConsensus.ForkChoice do persist_store(new_store) end - @spec get_genesis_time() :: Types.uint64() - def get_genesis_time() do - %{genesis_time: genesis_time} = fetch_store!() - genesis_time - end - @spec get_current_chain_slot() :: Types.slot() def get_current_chain_slot() do time = Clock.get_current_time() - genesis_time = get_genesis_time() + genesis_time = StoreDb.fetch_genesis_time!() compute_current_slot(time, genesis_time) end diff --git a/lib/lambda_ethereum_consensus/store/store_db.ex b/lib/lambda_ethereum_consensus/store/store_db.ex index 792e86d18..f439b9452 100644 --- a/lib/lambda_ethereum_consensus/store/store_db.ex +++ b/lib/lambda_ethereum_consensus/store/store_db.ex @@ -21,6 +21,19 @@ defmodule LambdaEthereumConsensus.Store.StoreDb do end) end + @spec fetch_genesis_time() :: {:ok, Types.uint64()} | :not_found + def fetch_genesis_time() do + with {:ok, store} <- fetch_store() do + store.genesis_time + end + end + + @spec fetch_genesis_time!() :: Types.uint64() + def fetch_genesis_time!() do + {:ok, %{genesis_time: genesis_time}} = fetch_store() + genesis_time + end + @spec fetch_deposits_snapshot() :: {:ok, Types.DepositTreeSnapshot.t()} | :not_found def fetch_deposits_snapshot(), do: get(@snapshot_prefix) diff --git a/test/unit/beacon_api/beacon_api_v1_test.exs b/test/unit/beacon_api/beacon_api_v1_test.exs index 12d47567d..eefe28d02 100644 --- a/test/unit/beacon_api/beacon_api_v1_test.exs +++ b/test/unit/beacon_api/beacon_api_v1_test.exs @@ -8,6 +8,7 @@ defmodule Unit.BeaconApiTest.V1 do alias LambdaEthereumConsensus.ForkChoice alias LambdaEthereumConsensus.Store.BlockDb alias LambdaEthereumConsensus.Store.Db + alias LambdaEthereumConsensus.Store.StoreDb alias Types.BlockInfo @moduletag :beacon_api_case @@ -35,7 +36,7 @@ defmodule Unit.BeaconApiTest.V1 do start_link_supervised!({Db, dir: tmp_dir}) patch(ForkChoice, :get_current_status_message, status_message) - patch(ForkChoice, :get_genesis_time, 42) + patch(StoreDb, :fetch_genesis_time!, 42) :ok end @@ -132,7 +133,7 @@ defmodule Unit.BeaconApiTest.V1 do test "get genesis data" do expected_response = %{ "data" => %{ - "genesis_time" => ForkChoice.get_genesis_time(), + "genesis_time" => StoreDb.fetch_genesis_time!(), "genesis_validators_root" => ChainSpec.get_genesis_validators_root() |> Utils.hex_encode(), "genesis_fork_version" => ChainSpec.get("GENESIS_FORK_VERSION") |> Utils.hex_encode()