Skip to content

Commit

Permalink
Merge branch 'main' into kurtosis-setup-and-docs
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigo-o committed Jul 9, 2024
2 parents 2fe05e3 + 8e450d0 commit 5adf232
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 74 deletions.
26 changes: 26 additions & 0 deletions bench/deposit_tree.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
alias LambdaEthereumConsensus.Execution.ExecutionChain

# The --mode db flag is needed to run this benchmark.

compressed_tree = File.read!("deposit_tree_file")
{:ok, encoded_tree} = :snappyer.decompress(compressed_tree)
deposit_tree = :erlang.binary_to_term(encoded_tree)

Benchee.run(
%{
"ExecutionChain.put" => fn v -> ExecutionChain.put("", v) end
},
warmup: 2,
time: 5,
inputs: %{
"DepositTree" => deposit_tree
}
)

Benchee.run(
%{
"ExecutionChain.get" => fn -> ExecutionChain.get("") end
},
warmup: 2,
time: 5
)
Binary file added deposit_tree_file
Binary file not shown.
4 changes: 2 additions & 2 deletions lib/beacon_api/controllers/v1/beacon_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ defmodule BeaconApi.V1.BeaconController do
alias BeaconApi.ErrorController
alias BeaconApi.Helpers
alias BeaconApi.Utils
alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.Store.BlockDb
alias LambdaEthereumConsensus.Store.Blocks
alias LambdaEthereumConsensus.Store.StoreDb

plug(OpenApiSpex.Plug.CastAndValidate, json_render_error_v2: true)

Expand All @@ -30,7 +30,7 @@ defmodule BeaconApi.V1.BeaconController do
conn
|> json(%{
"data" => %{
"genesis_time" => ForkChoice.get_genesis_time(),
"genesis_time" => StoreDb.fetch_genesis_time!(),
"genesis_validators_root" =>
ChainSpec.get_genesis_validators_root() |> Utils.hex_encode(),
"genesis_fork_version" => ChainSpec.get("GENESIS_FORK_VERSION") |> Utils.hex_encode()
Expand Down
18 changes: 8 additions & 10 deletions lib/lambda_ethereum_consensus/beacon/beacon_node.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do

ForkChoice.init_store(store, time)

validator_children =
get_validator_children(
validator_manager =
get_validator_manager(
deposit_tree_snapshot,
store.head_slot,
store.head_root,
store.genesis_time
store.head_root
)

children =
Expand All @@ -46,23 +45,22 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
{Task.Supervisor, name: PruneStatesSupervisor},
{Task.Supervisor, name: PruneBlocksSupervisor},
{Task.Supervisor, name: PruneBlobsSupervisor}
] ++ validator_children
] ++ validator_manager

Supervisor.init(children, strategy: :one_for_all)
end

defp get_validator_children(nil, _, _, _) do
defp get_validator_manager(nil, _, _) do
Logger.warning("Deposit data not found. Validator will be disabled.")

[]
end

defp get_validator_children(snapshot, slot, head_root, genesis_time) do
defp get_validator_manager(snapshot, slot, head_root) do
%BeaconState{eth1_data_votes: votes} = BlockStates.get_state_info!(head_root).beacon_state
LambdaEthereumConsensus.Execution.ExecutionChain.init(snapshot, votes)
# TODO: move checkpoint sync outside and move this to application.ex
[
{ValidatorManager, {slot, head_root}},
{LambdaEthereumConsensus.Execution.ExecutionChain, {genesis_time, snapshot, votes}}
{ValidatorManager, {slot, head_root}}
]
end

Expand Down
129 changes: 76 additions & 53 deletions lib/lambda_ethereum_consensus/execution/execution_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,91 @@ defmodule LambdaEthereumConsensus.Execution.ExecutionChain do
stores the canonical Eth1 chain for block proposing.
"""
require Logger
use GenServer

alias LambdaEthereumConsensus.Execution.ExecutionClient
alias LambdaEthereumConsensus.Store.KvSchema
alias LambdaEthereumConsensus.Store.StoreDb
alias Types.Deposit
alias Types.DepositTree
alias Types.DepositTreeSnapshot
alias Types.Eth1Data
alias Types.ExecutionPayload

@spec start_link(Types.uint64()) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
use KvSchema, prefix: "execution_chain"

@type state :: %{
eth1_data_votes: map(),
eth1_chain: list(map()),
current_eth1_data: %Types.Eth1Data{},
deposit_tree: %Types.DepositTree{},
last_period: integer()
}

@impl KvSchema
@spec encode_key(String.t()) :: {:ok, binary()} | {:error, binary()}
def encode_key(key), do: {:ok, key}

@impl KvSchema
@spec decode_key(binary()) :: {:ok, String.t()} | {:error, binary()}
def decode_key(key), do: {:ok, key}

@impl KvSchema
@spec encode_value(map()) :: {:ok, binary()} | {:error, binary()}
def encode_value(state), do: {:ok, :erlang.term_to_binary(state)}

@impl KvSchema
@spec decode_value(binary()) :: {:ok, map()} | {:error, binary()}
def decode_value(bin), do: {:ok, :erlang.binary_to_term(bin)}

@spec get_eth1_vote(Types.slot()) :: {:ok, Eth1Data.t() | nil} | {:error, any}
def get_eth1_vote(slot) do
GenServer.call(__MODULE__, {:get_eth1_vote, slot})
state = fetch_execution_state!()
compute_eth1_vote(state, slot)
end

@spec get_eth1_vote(Types.slot()) :: DepositTreeSnapshot.t()
def get_deposit_snapshot(), do: GenServer.call(__MODULE__, :get_deposit_snapshot)
@spec get_deposit_snapshot() :: DepositTreeSnapshot.t()
def get_deposit_snapshot() do
state = fetch_execution_state!()
DepositTree.get_snapshot(state.deposit_tree)
end

@spec get_deposits(Eth1Data.t(), Eth1Data.t(), Range.t()) ::
{:ok, [Deposit.t()] | nil} | {:error, any}
def get_deposits(current_eth1_data, eth1_vote, deposit_range) do
if Range.size(deposit_range) == 0 do
{:ok, []}
else
GenServer.call(__MODULE__, {:get_deposits, current_eth1_data, eth1_vote, deposit_range})
state = fetch_execution_state!()
votes = state.eth1_data_votes

eth1_data =
if Map.has_key?(votes, eth1_vote) and has_majority?(votes, eth1_vote),
do: eth1_vote,
else: current_eth1_data

compute_deposits(state, eth1_data, deposit_range)
end
end

@spec notify_new_block(Types.slot(), Eth1Data.t(), ExecutionPayload.t()) :: :ok
def notify_new_block(slot, eth1_data, %ExecutionPayload{} = execution_payload) do
payload_info = Map.take(execution_payload, [:block_hash, :block_number, :timestamp])
GenServer.cast(__MODULE__, {:new_block, slot, eth1_data, payload_info})

fetch_execution_state!()
|> prune_state(slot)
|> update_state_with_payload(payload_info)
|> update_state_with_vote(eth1_data)
|> persist_execution_state()
end

@impl true
def init({genesis_time, %DepositTreeSnapshot{} = snapshot, eth1_votes}) do
@doc """
Initializes the table in the db by storing the initial state of the execution chain.
"""
def init(%DepositTreeSnapshot{} = snapshot, eth1_votes) do
state = %{
# PERF: we could use some kind of ordered map for storing votes
eth1_data_votes: %{},
eth1_chain: [],
genesis_time: genesis_time,
current_eth1_data: DepositTreeSnapshot.get_eth1_data(snapshot),
deposit_tree: DepositTree.from_snapshot(snapshot),
last_period: 0
Expand All @@ -59,44 +98,14 @@ defmodule LambdaEthereumConsensus.Execution.ExecutionChain do

StoreDb.persist_deposits_snapshot(snapshot)

{:ok, updated_state}
end

@impl true
def handle_call({:get_eth1_vote, slot}, _from, state) do
{:reply, compute_eth1_vote(state, slot), state}
end

@impl true
def handle_call(:get_deposit_snapshot, _from, state) do
{:reply, DepositTree.get_snapshot(state.deposit_tree), state}
end

def handle_call({:get_deposits, current_eth1_data, eth1_vote, deposit_range}, _from, state) do
votes = state.eth1_data_votes

eth1_data =
if Map.has_key?(votes, eth1_vote) and has_majority?(votes, eth1_vote),
do: eth1_vote,
else: current_eth1_data

{:reply, compute_deposits(state, eth1_data, deposit_range), state}
end

@impl true
def handle_cast({:new_block, slot, eth1_data, payload_info}, state) do
state
|> prune_state(slot)
|> update_state_with_payload(payload_info)
|> update_state_with_vote(eth1_data)
|> then(&{:noreply, &1})
persist_execution_state(updated_state)
end

defp prune_state(%{genesis_time: genesis_time, last_period: last_period} = state, slot) do
defp prune_state(%{last_period: last_period} = state, slot) do
current_period = compute_period(slot)

if current_period > last_period do
new_chain = drop_old_payloads(state.eth1_chain, genesis_time, slot)
new_chain = drop_old_payloads(state.eth1_chain, slot)
%{state | eth1_data_votes: %{}, eth1_chain: new_chain, last_period: current_period}
else
state
Expand All @@ -107,8 +116,8 @@ defmodule LambdaEthereumConsensus.Execution.ExecutionChain do
%{state | eth1_chain: [payload_info | eth1_chain]}
end

defp drop_old_payloads(eth1_chain, genesis_time, slot) do
period_start = voting_period_start_time(slot, genesis_time)
defp drop_old_payloads(eth1_chain, slot) do
period_start = voting_period_start_time(slot)

follow_time_distance =
ChainSpec.get("SECONDS_PER_ETH1_BLOCK") * ChainSpec.get("ETH1_FOLLOW_DISTANCE")
Expand Down Expand Up @@ -172,22 +181,23 @@ defmodule LambdaEthereumConsensus.Execution.ExecutionChain do
end
end

defp validate_range(%{deposit_count: count}, _..deposit_end) when deposit_end >= count, do: :ok
defp validate_range(%{deposit_count: count}, _..deposit_end//_) when deposit_end >= count,
do: :ok

defp validate_range(_, _), do: {:error, "deposit range out of bounds"}

defp compute_eth1_vote(%{eth1_data_votes: []}, _), do: {:ok, nil}
defp compute_eth1_vote(%{eth1_data_votes: map}, _) when map == %{}, do: {:ok, nil}
defp compute_eth1_vote(%{eth1_chain: []}, _), do: {:ok, nil}

defp compute_eth1_vote(
%{
eth1_chain: eth1_chain,
eth1_data_votes: seen_votes,
genesis_time: genesis_time,
deposit_tree: deposit_tree
},
slot
) do
period_start = voting_period_start_time(slot, genesis_time)
period_start = voting_period_start_time(slot)
follow_time = ChainSpec.get("SECONDS_PER_ETH1_BLOCK") * ChainSpec.get("ETH1_FOLLOW_DISTANCE")

blocks_to_consider =
Expand Down Expand Up @@ -257,7 +267,8 @@ defmodule LambdaEthereumConsensus.Execution.ExecutionChain do
timestamp in (period_start - follow_time * 2)..(period_start - follow_time)
end

defp voting_period_start_time(slot, genesis_time) do
defp voting_period_start_time(slot) do
genesis_time = StoreDb.fetch_genesis_time!()
period_start_slot = slot - rem(slot, slots_per_eth1_voting_period())
genesis_time + period_start_slot * ChainSpec.get("SECONDS_PER_SLOT")
end
Expand All @@ -266,4 +277,16 @@ defmodule LambdaEthereumConsensus.Execution.ExecutionChain do

defp slots_per_eth1_voting_period(),
do: ChainSpec.get("EPOCHS_PER_ETH1_VOTING_PERIOD") * ChainSpec.get("SLOTS_PER_EPOCH")

@spec persist_execution_state(state()) :: :ok | {:error, binary()}
defp persist_execution_state(state), do: put("", state)

@spec fetch_execution_state() :: {:ok, state()} | {:error, binary()} | :not_found
defp fetch_execution_state(), do: get("")

@spec fetch_execution_state!() :: state()
defp fetch_execution_state!() do
{:ok, state} = fetch_execution_state()
state
end
end
8 changes: 1 addition & 7 deletions lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,10 @@ defmodule LambdaEthereumConsensus.ForkChoice do
persist_store(new_store)
end

@spec get_genesis_time() :: Types.uint64()
def get_genesis_time() do
%{genesis_time: genesis_time} = fetch_store!()
genesis_time
end

@spec get_current_chain_slot() :: Types.slot()
def get_current_chain_slot() do
time = Clock.get_current_time()
genesis_time = get_genesis_time()
genesis_time = StoreDb.fetch_genesis_time!()
compute_current_slot(time, genesis_time)
end

Expand Down
13 changes: 13 additions & 0 deletions lib/lambda_ethereum_consensus/store/store_db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ defmodule LambdaEthereumConsensus.Store.StoreDb do
end)
end

@spec fetch_genesis_time() :: {:ok, Types.uint64()} | :not_found
def fetch_genesis_time() do
with {:ok, store} <- fetch_store() do
store.genesis_time
end
end

@spec fetch_genesis_time!() :: Types.uint64()
def fetch_genesis_time!() do
{:ok, %{genesis_time: genesis_time}} = fetch_store()
genesis_time
end

@spec fetch_deposits_snapshot() :: {:ok, Types.DepositTreeSnapshot.t()} | :not_found
def fetch_deposits_snapshot(), do: get(@snapshot_prefix)

Expand Down
5 changes: 3 additions & 2 deletions test/unit/beacon_api/beacon_api_v1_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Unit.BeaconApiTest.V1 do
alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.Store.BlockDb
alias LambdaEthereumConsensus.Store.Db
alias LambdaEthereumConsensus.Store.StoreDb
alias Types.BlockInfo

@moduletag :beacon_api_case
Expand Down Expand Up @@ -35,7 +36,7 @@ defmodule Unit.BeaconApiTest.V1 do
start_link_supervised!({Db, dir: tmp_dir})

patch(ForkChoice, :get_current_status_message, status_message)
patch(ForkChoice, :get_genesis_time, 42)
patch(StoreDb, :fetch_genesis_time!, 42)

:ok
end
Expand Down Expand Up @@ -132,7 +133,7 @@ defmodule Unit.BeaconApiTest.V1 do
test "get genesis data" do
expected_response = %{
"data" => %{
"genesis_time" => ForkChoice.get_genesis_time(),
"genesis_time" => StoreDb.fetch_genesis_time!(),
"genesis_validators_root" =>
ChainSpec.get_genesis_validators_root() |> Utils.hex_encode(),
"genesis_fork_version" => ChainSpec.get("GENESIS_FORK_VERSION") |> Utils.hex_encode()
Expand Down

0 comments on commit 5adf232

Please sign in to comment.