Skip to content
This repository has been archived by the owner on Dec 2, 2022. It is now read-only.

Commit

Permalink
gRPC state traversal (#292)
Browse files Browse the repository at this point in the history
  • Loading branch information
vorot93 authored Sep 14, 2022
1 parent 5938dad commit 196fc1d
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 35 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions bin/akula-rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use akula::{
binutil::AkulaDataDir,
kv::{mdbx::*, MdbxWithDirHandle},
rpc::{
erigon::ErigonApiServerImpl, eth::EthApiServerImpl, net::NetApiServerImpl,
otterscan::OtterscanApiServerImpl, parity::ParityApiServerImpl, trace::TraceApiServerImpl,
web3::Web3ApiServerImpl,
debug::DebugApiServerImpl, erigon::ErigonApiServerImpl, eth::EthApiServerImpl,
net::NetApiServerImpl, otterscan::OtterscanApiServerImpl, parity::ParityApiServerImpl,
trace::TraceApiServerImpl, web3::Web3ApiServerImpl,
},
};
use anyhow::format_err;
Expand Down Expand Up @@ -142,6 +142,11 @@ async fn main() -> anyhow::Result<()> {
async move {
info!("Starting gRPC server on {}", opt.grpc_listen_address);
tonic::transport::Server::builder()
.add_service(
ethereum_interfaces::web3::debug_api_server::DebugApiServer::new(
DebugApiServerImpl { db: db.clone() },
),
)
.add_service(
ethereum_interfaces::web3::trace_api_server::TraceApiServer::new(
TraceApiServerImpl {
Expand Down
13 changes: 10 additions & 3 deletions bin/akula.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use akula::{
models::*,
p2p::node::NodeBuilder,
rpc::{
erigon::ErigonApiServerImpl, eth::EthApiServerImpl, net::NetApiServerImpl,
otterscan::OtterscanApiServerImpl, parity::ParityApiServerImpl, trace::TraceApiServerImpl,
web3::Web3ApiServerImpl,
debug::DebugApiServerImpl, erigon::ErigonApiServerImpl, eth::EthApiServerImpl,
net::NetApiServerImpl, otterscan::OtterscanApiServerImpl, parity::ParityApiServerImpl,
trace::TraceApiServerImpl, web3::Web3ApiServerImpl,
},
stagedsync,
stages::*,
Expand Down Expand Up @@ -312,6 +312,13 @@ fn main() -> anyhow::Result<()> {
async move {
info!("Starting gRPC server on {}", opt.grpc_listen_address);
tonic::transport::Server::builder()
.add_service(
ethereum_interfaces::web3::debug_api_server::DebugApiServer::new(
DebugApiServerImpl {
db: db.clone(),
}
)
)
.add_service(
ethereum_interfaces::web3::trace_api_server::TraceApiServer::new(
TraceApiServerImpl {
Expand Down
85 changes: 82 additions & 3 deletions src/accessors/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{

pub mod account {
use super::*;
use crate::kv::tables::BitmapKey;

pub fn read<K: TransactionKind, E: EnvironmentKind>(
tx: &MdbxTransaction<'_, K, E>,
Expand Down Expand Up @@ -45,6 +46,54 @@ pub mod account {
Ok(None)
}
}

pub fn walk<'db, 'tx, K: TransactionKind, E: EnvironmentKind>(
tx: &'tx MdbxTransaction<'db, K, E>,
offset: Option<Address>,
block: Option<BlockNumber>,
) -> impl Iterator<Item = anyhow::Result<(Address, Account)>> + 'tx
where
'db: 'tx,
{
TryGenIter::from(move || {
if let Some(block_number) = block {
// Traverse history index and add to set if non-zero at our block

let mut index = tx
.cursor(tables::AccountHistory)?
.walk(offset.map(|offset| BitmapKey {
inner: offset,
block_number: BlockNumber(0),
}));

let mut last_entry = None;

while let Some((BitmapKey { inner: address, .. }, _)) = index.next().transpose()? {
if last_entry != Some(address) {
continue;
}

last_entry = Some(address);

let v =
crate::accessors::state::account::read(tx, address, Some(block_number))?;

if let Some(account) = v {
yield (address, account);
}
}
} else {
// Simply traverse the current state
let mut walker = tx.cursor(tables::Account)?.walk(offset);

while let Some(v) = walker.next().transpose()? {
yield v;
}
}

Ok(())
})
}
}

pub mod storage {
Expand Down Expand Up @@ -89,12 +138,15 @@ pub mod storage {
.unwrap_or(U256::ZERO))
}

pub fn walk<'tx, K: TransactionKind, E: EnvironmentKind>(
tx: &'tx MdbxTransaction<'_, K, E>,
pub fn walk<'db, 'tx, K: TransactionKind, E: EnvironmentKind>(
tx: &'tx MdbxTransaction<'db, K, E>,
searched_address: Address,
offset: Option<H256>,
block: Option<BlockNumber>,
) -> impl Iterator<Item = anyhow::Result<(H256, U256)>> + 'tx {
) -> impl Iterator<Item = anyhow::Result<(H256, U256)>> + 'tx
where
'db: 'tx,
{
TryGenIter::from(move || {
if let Some(block_number) = block {
// Traverse history index and add to set if non-zero at our block
Expand All @@ -104,6 +156,8 @@ pub mod storage {
block_number: BlockNumber(0),
}));

let mut last_entry = None;

while let Some((
BitmapKey {
inner: (address, slot),
Expand All @@ -116,6 +170,12 @@ pub mod storage {
break;
}

if last_entry != Some((address, slot)) {
continue;
}

last_entry = Some((address, slot));

let v = crate::accessors::state::storage::read(
tx,
address,
Expand Down Expand Up @@ -143,6 +203,25 @@ pub mod storage {
}
}

pub mod code {
use super::*;
use anyhow::format_err;
use bytes::Bytes;

pub fn read<K: TransactionKind, E: EnvironmentKind>(
tx: &MdbxTransaction<'_, K, E>,
code_hash: H256,
) -> anyhow::Result<Bytes> {
if code_hash == EMPTY_HASH {
Ok(Bytes::new())
} else {
Ok(tx
.get(tables::Code, code_hash)?
.ok_or_else(|| format_err!("code expected but not found"))?)
}
}
}

pub mod history_index {
use super::*;
use crate::kv::{mdbx::MdbxTransaction, tables::BitmapKey};
Expand Down
188 changes: 188 additions & 0 deletions src/rpc/debug.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
use super::helpers;
use crate::kv::{mdbx::*, MdbxWithDirHandle};
use async_trait::async_trait;
use ethereum_interfaces::web3::{
debug_api_server::DebugApi, AccountStreamRequest, StorageSlot, StorageStreamRequest,
};
use ethereum_jsonrpc::types;
use futures::stream::BoxStream;
use std::sync::Arc;

pub struct DebugApiServerImpl<SE>
where
SE: EnvironmentKind,
{
pub db: Arc<MdbxWithDirHandle<SE>>,
}

#[async_trait]
impl<DB> DebugApi for DebugApiServerImpl<DB>
where
DB: EnvironmentKind,
{
type AccountStreamStream =
BoxStream<'static, Result<ethereum_interfaces::web3::Account, tonic::Status>>;
type StorageStreamStream =
BoxStream<'static, Result<ethereum_interfaces::web3::StorageSlot, tonic::Status>>;

async fn account_stream(
&self,
request: tonic::Request<AccountStreamRequest>,
) -> Result<tonic::Response<Self::AccountStreamStream>, tonic::Status> {
let AccountStreamRequest { block_id, offset } = request.into_inner();

let db = self.db.clone();

let (res_tx, rx) = tokio::sync::mpsc::channel(1);

tokio::task::spawn_blocking(move || {
let f = {
let res_tx = res_tx.clone();
move || {
let tx = db
.begin()
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let block = match helpers::grpc_block_id(
block_id
.ok_or_else(|| tonic::Status::invalid_argument("expected block id"))?,
) {
None
| Some(types::BlockId::Number(types::BlockNumber::Latest))
| Some(types::BlockId::Number(types::BlockNumber::Pending)) => None,
Some(block_id) => {
if let Some((block_number, _)) =
helpers::resolve_block_id(&tx, block_id)
.map_err(|e| tonic::Status::internal(e.to_string()))?
{
Some(block_number)
} else {
return Ok(());
}
}
};

let mut it = crate::accessors::state::account::walk(
&tx,
offset.map(|v| v.into()),
block,
);

while let Some((
address,
crate::models::Account {
nonce,
balance,
code_hash,
},
)) = it
.next()
.transpose()
.map_err(|e| tonic::Status::internal(e.to_string()))?
{
if res_tx
.blocking_send(Ok(ethereum_interfaces::web3::Account {
address: Some(address.into()),
balance: Some(balance.into()),
nonce,
code: crate::accessors::state::code::read(&tx, code_hash)
.map_err(|e| tonic::Status::internal(e.to_string()))?,
}))
.is_err()
{
return Ok(());
}
}

Ok(())
}
};
if let Err::<_, tonic::Status>(e) = (f)() {
let _ = res_tx.blocking_send(Err(e));
}
});

Ok(tonic::Response::new(Box::pin(
tokio_stream::wrappers::ReceiverStream::new(rx),
)))
}

async fn storage_stream(
&self,
request: tonic::Request<StorageStreamRequest>,
) -> Result<tonic::Response<Self::StorageStreamStream>, tonic::Status> {
let StorageStreamRequest {
block_id,
address,
offset,
} = request.into_inner();

let db = self.db.clone();

let (res_tx, rx) = tokio::sync::mpsc::channel(1);

tokio::task::spawn_blocking(move || {
let f = {
let res_tx = res_tx.clone();
move || {
let tx = db
.begin()
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let block = match helpers::grpc_block_id(
block_id
.ok_or_else(|| tonic::Status::invalid_argument("expected block id"))?,
) {
None
| Some(types::BlockId::Number(types::BlockNumber::Latest))
| Some(types::BlockId::Number(types::BlockNumber::Pending)) => None,
Some(block_id) => {
if let Some((block_number, _)) =
helpers::resolve_block_id(&tx, block_id)
.map_err(|e| tonic::Status::internal(e.to_string()))?
{
Some(block_number)
} else {
return Ok(());
}
}
};

let mut it = crate::accessors::state::storage::walk(
&tx,
address
.ok_or_else(|| tonic::Status::invalid_argument("expected address"))?
.into(),
offset.map(|v| v.into()),
block,
);

while let Some((key, value)) = it
.next()
.transpose()
.map_err(|e| tonic::Status::internal(e.to_string()))?
{
if res_tx
.blocking_send(Ok(StorageSlot {
key: Some(key.into()),
value: Some(value.into()),
}))
.is_err()
{
return Ok(());
}
}

Ok(())
}
};
if let Err::<_, tonic::Status>(e) = (f)() {
let _ = res_tx.blocking_send(Err(e));
}
});

Ok(tonic::Response::new(Box::pin(
tokio_stream::wrappers::ReceiverStream::new(rx),
)))
}
}
Loading

0 comments on commit 196fc1d

Please sign in to comment.