Skip to content

Commit

Permalink
Merge pull request #90 from zsluedem/add-bundler-mode
Browse files Browse the repository at this point in the history
Add change bundler mode api
  • Loading branch information
zsluedem committed Mar 30, 2023
2 parents 07fb714 + b44a5de commit c98f891
Show file tree
Hide file tree
Showing 14 changed files with 383 additions and 79 deletions.
17 changes: 15 additions & 2 deletions bin/bundler-rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use aa_bundler::{
debug::DebugApiServerImpl, debug_api::DebugApiServer, eth::EthApiServerImpl,
eth_api::EthApiServer,
},
uopool::server::uopool::uo_pool_client::UoPoolClient,
uopool::server::{
bundler::bundler_client::BundlerClient, uopool::uo_pool_client::UoPoolClient,
},
};

#[derive(Parser)]
Expand All @@ -23,6 +25,9 @@ pub struct Opt {
#[clap(long, default_value = "127.0.0.1:3001")]
pub uopool_grpc_listen_address: String,

#[clap(long, default_value = "127.0.0.1:3002")]
pub bundler_grpc_listen_address: String,

#[clap(long, value_delimiter=',', default_value = "eth", value_parser = ["eth", "debug"])]
pub rpc_api: Vec<String>,
}
Expand Down Expand Up @@ -54,7 +59,15 @@ async fn main() -> Result<()> {
}

if rpc_api.contains("debug") {
api.merge(DebugApiServerImpl { uopool_grpc_client }.into_rpc())?;
let bundler_grpc_client =
BundlerClient::connect(format!("http://{}", opt.bundler_grpc_listen_address)).await?;
api.merge(
DebugApiServerImpl {
uopool_grpc_client,
bundler_grpc_client,
}
.into_rpc(),
)?;
}

let _jsonrpc_server_handle = jsonrpc_server.start(api.clone())?;
Expand Down
47 changes: 34 additions & 13 deletions bin/bundler.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use aa_bundler::{
bundler::Bundler,
bundler::BundlerService,
models::wallet::Wallet,
rpc::{
debug::DebugApiServerImpl, debug_api::DebugApiServer, eth::EthApiServerImpl,
eth_api::EthApiServer,
},
uopool::server::uopool::uo_pool_client::UoPoolClient,
uopool::server::{
bundler::bundler_client::BundlerClient, uopool::uo_pool_client::UoPoolClient,
},
utils::{parse_address, parse_u256},
};
use anyhow::Result;
Expand Down Expand Up @@ -107,16 +109,24 @@ fn main() -> Result<()> {
.await?;
info!("Connected to uopool grpc");

for entry_point in opt.entry_points.iter() {
let _bundler = Bundler::new(
&wallet,
opt.bundler_opts.beneficiary,
uopool_grpc_client.clone(),
opt.bundler_opts.bundle_interval,
*entry_point,
opt.eth_client_address.clone(),
);
}
let bundler_manager = BundlerService::new(
wallet,
opt.bundler_opts.beneficiary,
uopool_grpc_client.clone(),
opt.entry_points,
opt.eth_client_address.clone(),
);
info!("Starting bundler manager");
bundler_manager.start_bundling(opt.bundler_opts.bundle_interval);
info!("Starting bundler rpc server");
aa_bundler::bundler::service::run_server(
bundler_manager,
opt.bundler_opts.bundler_grpc_listen_address,
);
info!(
"Starting bundler rpc server at {:}",
opt.bundler_opts.bundler_grpc_listen_address
);

if !opt.no_rpc {
info!("Starting rpc server with bundler");
Expand All @@ -142,7 +152,18 @@ fn main() -> Result<()> {
}

if rpc_api.contains("debug") {
api.merge(DebugApiServerImpl { uopool_grpc_client }.into_rpc())?;
let bundler_grpc_client = BundlerClient::connect(format!(
"http://{}",
opt.bundler_opts.bundler_grpc_listen_address
))
.await?;
api.merge(
DebugApiServerImpl {
uopool_grpc_client,
bundler_grpc_client,
}
.into_rpc(),
)?;
}

let _jsonrpc_server_handle = jsonrpc_server.start(api.clone())?;
Expand Down
1 change: 1 addition & 0 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ fn make_protos(protos: &[&str]) {
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
tonic_build::configure()
.server_mod_attribute("uopool", r#"#[allow(clippy::unwrap_used)]"#)
.server_mod_attribute("bundler", r#"#[allow(clippy::unwrap_used)]"#)
.file_descriptor_set_path(out_dir.join("descriptor.bin"))
.compile_with_config(config(), protos, &["./src/proto"])
.unwrap();
Expand Down
172 changes: 143 additions & 29 deletions src/bundler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,46 @@
use std::{sync::Arc, time::Duration};
pub mod service;

use std::{net::SocketAddr, sync::Arc, time::Duration};

use clap::Parser;
use ethers::{
prelude::SignerMiddleware,
providers::{Http, Middleware, Provider},
signers::Signer,
types::{transaction::eip2718::TypedTransaction, Address, U256},
types::{transaction::eip2718::TypedTransaction, Address, H256, U256},
};
use tokio::time;
use tracing::debug;
use parking_lot::Mutex;
use serde::Deserialize;
use tracing::{debug, error, info};

use crate::{
contracts::gen::EntryPointAPI,
models::wallet::Wallet,
types::user_operation::UserOperation,
uopool::server::uopool::{uo_pool_client::UoPoolClient, GetSortedRequest},
uopool::server::{
bundler::Mode as GrpcMode,
uopool::{uo_pool_client::UoPoolClient, GetSortedRequest},
},
utils::{parse_address, parse_u256},
};

#[derive(Debug, Deserialize)]
pub enum Mode {
#[serde(rename = "auto")]
Auto,
#[serde(rename = "manual")]
Manual,
}

impl From<Mode> for GrpcMode {
fn from(value: Mode) -> Self {
match value {
Mode::Auto => Self::Auto,
Mode::Manual => Self::Manual,
}
}
}

#[derive(Debug, Parser, PartialEq)]
pub struct BundlerOpts {
#[clap(long, value_parser=parse_address)]
Expand All @@ -30,55 +53,45 @@ pub struct BundlerOpts {
pub min_balance: U256,

#[clap(long, default_value = "127.0.0.1:3002")]
pub bundler_grpc_listen_address: String,
pub bundler_grpc_listen_address: SocketAddr,

#[clap(long, default_value = "10")]
pub bundle_interval: u64,
}

pub struct Bundler<'a> {
pub wallet: &'a Wallet,
#[derive(Clone)]
pub struct Bundler {
pub wallet: Wallet,
pub beneficiary: Address,
pub uopool_grpc_client: UoPoolClient<tonic::transport::Channel>,
pub bundle_interval: u64,
pub entry_point: Address,
pub eth_client_address: String,
}

impl<'a> Bundler<'a> {
impl Bundler {
pub fn new(
wallet: &'a Wallet,
wallet: Wallet,
beneficiary: Address,
uopool_grpc_client: UoPoolClient<tonic::transport::Channel>,
bundle_interval: u64,
entry_point: Address,
eth_client_address: String,
) -> Self {
Self {
wallet,
beneficiary,
uopool_grpc_client,
bundle_interval,
entry_point,
eth_client_address,
}
}

pub async fn run(mut self) -> anyhow::Result<()> {
let mut interval = time::interval(Duration::from_secs(self.bundle_interval));

loop {
interval.tick().await;
self.send_next_bundle().await?;
}
}

async fn create_bundle(&mut self) -> anyhow::Result<Vec<UserOperation>> {
async fn create_bundle(&self) -> anyhow::Result<Vec<UserOperation>> {
let request = tonic::Request::new(GetSortedRequest {
entry_point: Some(self.entry_point.into()),
});
let response = self
.uopool_grpc_client
.clone()
.get_sorted_user_operations(request)
.await?;
let user_operations: Vec<UserOperation> = response
Expand All @@ -90,8 +103,10 @@ impl<'a> Bundler<'a> {
Ok(user_operations)
}

async fn send_next_bundle(&mut self) -> anyhow::Result<()> {
async fn send_next_bundle(&self) -> anyhow::Result<H256> {
info!("Creating the next bundle");
let bundles = self.create_bundle().await?;
info!("Got {} bundles", bundles.len());
let provider = Provider::<Http>::try_from(self.eth_client_address.clone())?;
let client = Arc::new(SignerMiddleware::new(provider, self.wallet.signer.clone()));
let entry_point = EntryPointAPI::new(self.entry_point, client.clone());
Expand All @@ -118,17 +133,113 @@ impl<'a> Bundler<'a> {
tx.set_gas_price(max_fee_per_gas);
}
};
let res = client.send_transaction(tx, None).await?.await?;
let tx = client.send_transaction(tx, None).await?;
let tx_hash = tx.tx_hash();
debug!("Send bundles with transaction: {tx:?}");

debug!("Send bundles with ret: {res:?}");
Ok(())
let res = tx.await?;
debug!("Send bundles with receipt: {res:?}");
Ok(tx_hash)
}
}

pub struct BundlerService {
pub bundlers: Vec<Bundler>,
pub running: Arc<Mutex<bool>>,
}

fn is_running(running: Arc<Mutex<bool>>) -> bool {
let r = running.lock();
*r
}

impl BundlerService {
pub fn new(
wallet: Wallet,
beneficiary: Address,
uopool_grpc_client: UoPoolClient<tonic::transport::Channel>,
entry_points: Vec<Address>,
eth_client_address: String,
) -> Self {
let bundlers: Vec<Bundler> = entry_points
.iter()
.map(|entry_point| {
Bundler::new(
wallet.clone(),
beneficiary,
uopool_grpc_client.clone(),
*entry_point,
eth_client_address.clone(),
)
})
.collect();

Self {
bundlers,
running: Arc::new(Mutex::new(false)),
}
}

pub async fn send_bundles_now(&self) -> anyhow::Result<H256> {
info!("Sending bundle now");
let mut tx_hashes: Vec<H256> = vec![];
for bundler in self.bundlers.iter() {
info!("Sending bundle for entry point: {:?}", bundler.entry_point);
let tx_hash = bundler.send_next_bundle().await?;
tx_hashes.push(tx_hash)
}

// FIXME: Because currently the bundler support multiple bundler and
// we don't have a way to know which bundler is the one that is
Ok(tx_hashes
.into_iter()
.next()
.expect("Must have at least one tx hash"))
}

pub fn stop_bundling(&self) {
info!("Stopping auto bundling");
let mut r = self.running.lock();
*r = false;
}

pub fn is_running(&self) -> bool {
is_running(self.running.clone())
}

pub fn start_bundling(&self, interval: u64) {
if !self.is_running() {
for bundler in self.bundlers.iter() {
info!(
"Starting auto bundling process for entry point: {:?}",
bundler.entry_point
);
let bundler_own = bundler.clone();
let running_lock = self.running.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(interval));
loop {
if !is_running(running_lock.clone()) {
break;
}
interval.tick().await;

if let Err(e) = bundler_own.send_next_bundle().await {
error!("Error while sending bundle: {e:?}");
}
}
});
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
use std::{
net::{IpAddr, Ipv4Addr},
str::FromStr,
};

#[test]
fn bundler_opts() {
Expand All @@ -151,7 +262,10 @@ mod tests {
.unwrap(),
gas_factor: U256::from(600),
min_balance: U256::from(1),
bundler_grpc_listen_address: String::from("127.0.0.1:3002"),
bundler_grpc_listen_address: SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
3002
),
bundle_interval: 10,
},
BundlerOpts::try_parse_from(args).unwrap()
Expand Down
Loading

0 comments on commit c98f891

Please sign in to comment.