Skip to content

Commit

Permalink
feat(eth-watch): do not query events from earliest block (#2810)
Browse files Browse the repository at this point in the history
## What ❔

Removes querying from the earliest batch in eth watch. Instead queries
for constant block range and splits queried range in parts if needed

## Why ❔

Vanilla reth doesn't allow eth_logs requests where block range is
greater than 1_000_000. This changes allows eth watch to work with this
limitation.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
perekopskiy committed Sep 5, 2024
1 parent 958dfdc commit 1da3f7e
Showing 1 changed file with 78 additions and 61 deletions.
139 changes: 78 additions & 61 deletions core/node/eth_watch/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,75 +88,34 @@ impl EthHttpQueryClient {
}
}

async fn get_filter_logs(
fn get_default_address_list(&self) -> Vec<Address> {
[
Some(self.diamond_proxy_addr),
Some(self.governance_address),
self.state_transition_manager_address,
self.chain_admin_address,
]
.into_iter()
.flatten()
.collect()
}

async fn get_events_inner(
&self,
from: BlockNumber,
to: BlockNumber,
topics: Vec<H256>,
topics1: Vec<H256>,
topics2: Vec<H256>,
addresses: Vec<Address>,
retries_left: usize,
) -> EnrichedClientResult<Vec<Log>> {
let filter = FilterBuilder::default()
.address(
[
Some(self.diamond_proxy_addr),
Some(self.governance_address),
self.state_transition_manager_address,
self.chain_admin_address,
]
.into_iter()
.flatten()
.collect(),
)
.from_block(from)
.to_block(to)
.topics(Some(topics), None, None, None)
.topics(Some(topics1), Some(topics2), None, None)
.address(addresses)
.build();
self.client.logs(&filter).await
}
}

#[async_trait::async_trait]
impl EthClient for EthHttpQueryClient {
async fn scheduler_vk_hash(
&self,
verifier_address: Address,
) -> Result<H256, ContractCallError> {
// New verifier returns the hash of the verification key.
CallFunctionArgs::new("verificationKeyHash", ())
.for_contract(verifier_address, &self.verifier_contract_abi)
.call(&self.client)
.await
}

async fn diamond_cut_by_version(
&self,
packed_version: H256,
) -> EnrichedClientResult<Option<Vec<u8>>> {
let Some(state_transition_manager_address) = self.state_transition_manager_address else {
return Ok(None);
};

let filter = FilterBuilder::default()
.address(vec![state_transition_manager_address])
.from_block(BlockNumber::Earliest)
.to_block(BlockNumber::Latest)
.topics(
Some(vec![self.new_upgrade_cut_data_signature]),
Some(vec![packed_version]),
None,
None,
)
.build();
let logs = self.client.logs(&filter).await?;
Ok(logs.into_iter().next().map(|log| log.data.0))
}

async fn get_events(
&self,
from: BlockNumber,
to: BlockNumber,
retries_left: usize,
) -> EnrichedClientResult<Vec<Log>> {
let mut result = self.get_filter_logs(from, to, self.topics.clone()).await;
let mut result = self.client.logs(&filter).await;

// This code is compatible with both Infura and Alchemy API providers.
// Note: we don't handle rate-limits here - assumption is that we're never going to hit them.
Expand Down Expand Up @@ -225,6 +184,64 @@ impl EthClient for EthHttpQueryClient {

result
}
}

#[async_trait::async_trait]
impl EthClient for EthHttpQueryClient {
async fn scheduler_vk_hash(
&self,
verifier_address: Address,
) -> Result<H256, ContractCallError> {
// New verifier returns the hash of the verification key.
CallFunctionArgs::new("verificationKeyHash", ())
.for_contract(verifier_address, &self.verifier_contract_abi)
.call(&self.client)
.await
}

async fn diamond_cut_by_version(
&self,
packed_version: H256,
) -> EnrichedClientResult<Option<Vec<u8>>> {
const LOOK_BACK_BLOCK_RANGE: u64 = 1_000_000;

let Some(state_transition_manager_address) = self.state_transition_manager_address else {
return Ok(None);
};

let to_block = self.client.block_number().await?;
let from_block = to_block.saturating_sub((LOOK_BACK_BLOCK_RANGE - 1).into());

let logs = self
.get_events_inner(
from_block.into(),
to_block.into(),
vec![self.new_upgrade_cut_data_signature],
vec![packed_version],
vec![state_transition_manager_address],
RETRY_LIMIT,
)
.await?;

Ok(logs.into_iter().next().map(|log| log.data.0))
}

async fn get_events(
&self,
from: BlockNumber,
to: BlockNumber,
retries_left: usize,
) -> EnrichedClientResult<Vec<Log>> {
self.get_events_inner(
from,
to,
self.topics.clone(),
Vec::new(),
self.get_default_address_list(),
retries_left,
)
.await
}

async fn finalized_block_number(&self) -> EnrichedClientResult<u64> {
if let Some(confirmations) = self.confirmations_for_eth_event {
Expand Down

0 comments on commit 1da3f7e

Please sign in to comment.