Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make EthService generic over engine types and block executor #10212

Merged
merged 2 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 32 additions & 31 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ members = [
"crates/consensus/debug-client/",
"crates/e2e-test-utils/",
"crates/engine/primitives/",
"crates/engine/service",
"crates/engine/tree/",
"crates/engine/util/",
"crates/errors/",
Expand All @@ -35,7 +36,6 @@ members = [
"crates/ethereum/cli/",
"crates/ethereum/consensus/",
"crates/ethereum/engine-primitives/",
"crates/ethereum/engine/",
"crates/ethereum/evm",
"crates/ethereum/node",
"crates/ethereum/payload/",
Expand Down Expand Up @@ -297,13 +297,13 @@ reth-e2e-test-utils = { path = "crates/e2e-test-utils" }
reth-ecies = { path = "crates/net/ecies" }
reth-engine-primitives = { path = "crates/engine/primitives" }
reth-engine-tree = { path = "crates/engine/tree" }
reth-engine-service = { path = "crates/engine/service" }
reth-engine-util = { path = "crates/engine/util" }
reth-errors = { path = "crates/errors" }
reth-eth-wire = { path = "crates/net/eth-wire" }
reth-eth-wire-types = { path = "crates/net/eth-wire-types" }
reth-ethereum-cli = { path = "crates/ethereum/cli" }
reth-ethereum-consensus = { path = "crates/ethereum/consensus" }
reth-ethereum-engine = { path = "crates/ethereum/engine" }
reth-ethereum-engine-primitives = { path = "crates/ethereum/engine-primitives" }
reth-ethereum-forks = { path = "crates/ethereum-forks" }
reth-ethereum-payload-builder = { path = "crates/ethereum/payload" }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "reth-ethereum-engine"
name = "reth-engine-service"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
Expand All @@ -14,17 +14,18 @@ workspace = true
# reth
reth-beacon-consensus.workspace = true
reth-chainspec.workspace = true
reth-consensus.workspace = true
reth-db-api.workspace = true
reth-engine-primitives.workspace = true
reth-engine-tree.workspace = true
reth-ethereum-engine-primitives.workspace = true
reth-evm-ethereum.workspace = true
reth-evm.workspace = true
reth-network-p2p.workspace = true
reth-payload-builder.workspace = true
reth-payload-validator.workspace = true
reth-provider.workspace = true
reth-prune.workspace = true
reth-stages-api.workspace = true
reth-tasks.workspace = true
reth-payload-builder.workspace = true

# async
futures.workspace = true
Expand All @@ -38,9 +39,10 @@ thiserror.workspace = true
reth-blockchain-tree.workspace = true
reth-consensus.workspace = true
reth-engine-tree = { workspace = true, features = ["test-utils"] }
reth-evm.workspace = true
reth-ethereum-engine-primitives.workspace = true
reth-evm-ethereum.workspace = true
reth-exex-types.workspace = true
reth-primitives.workspace = true
reth-prune-types.workspace = true

tokio = { workspace = true, features = ["sync"] }
tokio = { workspace = true, features = ["sync"] }
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//! Ethereum engine implementation.
//! Engine service implementation.

#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

/// Ethereum engine service.
/// Engine Service
pub mod service;
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use futures::{Stream, StreamExt};
use pin_project::pin_project;
use reth_beacon_consensus::{BeaconConsensusEngineEvent, BeaconEngineMessage, EthBeaconConsensus};
use reth_beacon_consensus::{BeaconConsensusEngineEvent, BeaconEngineMessage};
use reth_chainspec::ChainSpec;
use reth_consensus::Consensus;
use reth_db_api::database::Database;
use reth_engine_primitives::EngineTypes;
use reth_engine_tree::{
backfill::PipelineSync,
download::BasicBlockDownloader,
Expand All @@ -14,8 +16,7 @@ pub use reth_engine_tree::{
chain::{ChainEvent, ChainOrchestrator},
engine::EngineApiEvent,
};
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_evm::execute::BlockExecutorProvider;
use reth_network_p2p::BlockClient;
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_validator::ExecutionPayloadValidator;
Expand All @@ -24,58 +25,64 @@ use reth_prune::Pruner;
use reth_stages_api::Pipeline;
use reth_tasks::TaskSpawner;
use std::{
marker::PhantomData,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio_stream::wrappers::UnboundedReceiverStream;

/// Alias for Ethereum chain orchestrator.
type EthServiceType<DB, Client> = ChainOrchestrator<
/// Alias for chain orchestrator.
type EngineServiceType<DB, Client, T> = ChainOrchestrator<
EngineHandler<
EngineApiRequestHandler<EngineApiRequest<EthEngineTypes>>,
UnboundedReceiverStream<BeaconEngineMessage<EthEngineTypes>>,
EngineApiRequestHandler<EngineApiRequest<T>>,
UnboundedReceiverStream<BeaconEngineMessage<T>>,
BasicBlockDownloader<Client>,
>,
PipelineSync<DB>,
>;

/// The type that drives the Ethereum chain forward and communicates progress.
/// The type that drives the chain forward and communicates progress.
#[pin_project]
#[allow(missing_debug_implementations)]
pub struct EthService<DB, Client>
pub struct EngineService<DB, Client, E, T>
where
DB: Database + 'static,
Client: BlockClient + 'static,
E: BlockExecutorProvider + 'static,
T: EngineTypes,
{
orchestrator: EthServiceType<DB, Client>,
orchestrator: EngineServiceType<DB, Client, T>,
_marker: PhantomData<E>,
}

impl<DB, Client> EthService<DB, Client>
impl<DB, Client, E, T> EngineService<DB, Client, E, T>
where
DB: Database + 'static,
Client: BlockClient + 'static,
E: BlockExecutorProvider + 'static,
T: EngineTypes + 'static,
{
/// Constructor for `EthService`.
/// Constructor for `EngineService`.
#[allow(clippy::too_many_arguments)]
pub fn new(
consensus: Arc<dyn Consensus>,
executor_factory: E,
chain_spec: Arc<ChainSpec>,
client: Client,
incoming_requests: UnboundedReceiverStream<BeaconEngineMessage<EthEngineTypes>>,
incoming_requests: UnboundedReceiverStream<BeaconEngineMessage<T>>,
pipeline: Pipeline<DB>,
pipeline_task_spawner: Box<dyn TaskSpawner>,
provider: ProviderFactory<DB>,
blockchain_db: BlockchainProvider2<DB>,
pruner: Pruner<DB, ProviderFactory<DB>>,
payload_builder: PayloadBuilderHandle<EthEngineTypes>,
payload_builder: PayloadBuilderHandle<T>,
tree_config: TreeConfig,
) -> Self {
let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
let downloader = BasicBlockDownloader::new(client, consensus.clone());

let persistence_handle = PersistenceHandle::spawn_service(provider, pruner);
let payload_validator = ExecutionPayloadValidator::new(chain_spec.clone());
let executor_factory = EthExecutorProvider::ethereum(chain_spec);
let payload_validator = ExecutionPayloadValidator::new(chain_spec);

let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();

Expand All @@ -95,19 +102,24 @@ where

let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);

Self { orchestrator: ChainOrchestrator::new(handler, backfill_sync) }
Self {
orchestrator: ChainOrchestrator::new(handler, backfill_sync),
_marker: Default::default(),
}
}

/// Returns a mutable reference to the orchestrator.
pub fn orchestrator_mut(&mut self) -> &mut EthServiceType<DB, Client> {
pub fn orchestrator_mut(&mut self) -> &mut EngineServiceType<DB, Client, T> {
&mut self.orchestrator
}
}

impl<DB, Client> Stream for EthService<DB, Client>
impl<DB, Client, E, T> Stream for EngineService<DB, Client, E, T>
where
DB: Database + 'static,
Client: BlockClient + 'static,
E: BlockExecutorProvider + 'static,
T: EngineTypes + 'static,
{
type Item = ChainEvent<BeaconConsensusEngineEvent>;

Expand All @@ -117,17 +129,19 @@ where
}
}

/// Potential error returned by `EthService`.
/// Potential error returned by `EngineService`.
#[derive(Debug, thiserror::Error)]
#[error("Eth service error.")]
pub struct EthServiceError {}
#[error("Engine service error.")]
pub struct EngineServiceError {}

#[cfg(test)]
mod tests {
use super::*;
use reth_beacon_consensus::EthBeaconConsensus;
use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_engine_tree::test_utils::TestPipelineBuilder;
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_exex_types::FinishedExExHeight;
use reth_network_p2p::test_utils::TestFullBlockClient;
use reth_primitives::SealedHeader;
Expand All @@ -145,6 +159,7 @@ mod tests {
.paris_activated()
.build(),
);
let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));

let client = TestFullBlockClient::default();

Expand All @@ -155,6 +170,7 @@ mod tests {
let pipeline_task_spawner = Box::<TokioTaskExecutor>::default();
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());

let executor_factory = EthExecutorProvider::ethereum(chain_spec.clone());
let blockchain_db =
BlockchainProvider2::with_latest(provider_factory.clone(), SealedHeader::default())
.unwrap();
Expand All @@ -164,7 +180,9 @@ mod tests {
Pruner::<_, ProviderFactory<_>>::new(provider_factory.clone(), vec![], 0, 0, None, rx);

let (tx, _rx) = unbounded_channel();
let _eth_service = EthService::new(
let _eth_service = EngineService::new(
consensus,
executor_factory,
chain_spec,
client,
incoming_requests,
Expand Down
2 changes: 1 addition & 1 deletion crates/ethereum/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ workspace = true
# reth
reth-payload-builder.workspace = true
reth-ethereum-engine-primitives.workspace = true
reth-ethereum-engine.workspace = true
reth-engine-service.workspace = true
reth-basic-payload-builder.workspace = true
reth-ethereum-payload-builder.workspace = true
reth-node-builder.workspace = true
Expand Down
6 changes: 4 additions & 2 deletions crates/ethereum/node/src/launch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use reth_beacon_consensus::{
BeaconConsensusEngineHandle,
};
use reth_blockchain_tree::BlockchainTreeConfig;
use reth_engine_service::service::{ChainEvent, EngineService};
use reth_engine_tree::tree::TreeConfig;
use reth_ethereum_engine::service::{ChainEvent, EthService};
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_exex::ExExManagerHandle;
use reth_network::{
Expand Down Expand Up @@ -173,7 +173,9 @@ where
info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");

// Configure the consensus engine
let mut eth_service = EthService::new(
let mut eth_service = EngineService::new(
ctx.consensus(),
ctx.components().block_executor().clone(),
Comment on lines +176 to +178
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

ctx.chain_spec(),
network_client.clone(),
UnboundedReceiverStream::new(consensus_engine_rx),
Expand Down
Loading