Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add change bundler mode api #90

Merged
merged 6 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions bin/bundler-rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use aa_bundler::{
debug::DebugApiServerImpl, debug_api::DebugApiServer, eth::EthApiServerImpl,
eth_api::EthApiServer,
},
uopool::server::uopool::uo_pool_client::UoPoolClient,
uopool::server::{
bundler::bundler_client::BundlerClient, uopool::uo_pool_client::UoPoolClient,
},
};

#[derive(Parser)]
Expand All @@ -23,6 +25,9 @@ pub struct Opt {
#[clap(long, default_value = "127.0.0.1:3001")]
pub uopool_grpc_listen_address: String,

#[clap(long, default_value = "127.0.0.1:3002")]
pub bundler_grpc_listen_address: String,

#[clap(long, value_delimiter=',', default_value = "eth", value_parser = ["eth", "debug"])]
pub rpc_api: Vec<String>,
}
Expand Down Expand Up @@ -54,7 +59,15 @@ async fn main() -> Result<()> {
}

if rpc_api.contains("debug") {
api.merge(DebugApiServerImpl { uopool_grpc_client }.into_rpc())?;
let bundler_grpc_client =
BundlerClient::connect(format!("http://{}", opt.bundler_grpc_listen_address)).await?;
api.merge(
DebugApiServerImpl {
uopool_grpc_client,
bundler_grpc_client,
}
.into_rpc(),
)?;
}

let _jsonrpc_server_handle = jsonrpc_server.start(api.clone())?;
Expand Down
48 changes: 35 additions & 13 deletions bin/bundler.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use aa_bundler::{
bundler::Bundler,
bundler::BundlerService,
models::wallet::Wallet,
rpc::{
debug::DebugApiServerImpl, debug_api::DebugApiServer, eth::EthApiServerImpl,
eth_api::EthApiServer,
},
uopool::server::uopool::uo_pool_client::UoPoolClient,
uopool::server::{
bundler::bundler_client::BundlerClient, uopool::uo_pool_client::UoPoolClient,
},
utils::{parse_address, parse_u256},
};
use anyhow::Result;
Expand Down Expand Up @@ -107,16 +109,25 @@ fn main() -> Result<()> {
.await?;
info!("Connected to uopool grpc");

for entry_point in opt.entry_points.iter() {
let _bundler = Bundler::new(
&wallet,
opt.bundler_opts.beneficiary,
uopool_grpc_client.clone(),
opt.bundler_opts.bundle_interval,
*entry_point,
opt.eth_client_address.clone(),
);
}
let bundler_manager = BundlerService::new(
zsluedem marked this conversation as resolved.
Show resolved Hide resolved
wallet,
opt.bundler_opts.beneficiary,
uopool_grpc_client.clone(),
opt.entry_points,
opt.eth_client_address.clone(),
opt.bundler_opts.bundle_interval,
);
info!("Starting bundler manager");
zsluedem marked this conversation as resolved.
Show resolved Hide resolved
bundler_manager.start_bundling();
info!("Starting bundler rpc server");
aa_bundler::bundler::service::run_server(
bundler_manager,
opt.bundler_opts.bundler_grpc_listen_address,
);
info!(
"Starting bundler rpc server at {:}",
opt.bundler_opts.bundler_grpc_listen_address
);

if !opt.no_rpc {
info!("Starting rpc server with bundler");
Expand All @@ -142,7 +153,18 @@ fn main() -> Result<()> {
}

if rpc_api.contains("debug") {
api.merge(DebugApiServerImpl { uopool_grpc_client }.into_rpc())?;
let bundler_grpc_client = BundlerClient::connect(format!(
"http://{}",
opt.bundler_opts.bundler_grpc_listen_address
))
.await?;
api.merge(
DebugApiServerImpl {
uopool_grpc_client,
bundler_grpc_client,
}
.into_rpc(),
)?;
}

let _jsonrpc_server_handle = jsonrpc_server.start(api.clone())?;
Expand Down
176 changes: 147 additions & 29 deletions src/bundler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,46 @@
use std::{sync::Arc, time::Duration};
pub mod service;

use std::{net::SocketAddr, sync::Arc, time::Duration};

use clap::Parser;
use ethers::{
prelude::SignerMiddleware,
providers::{Http, Middleware, Provider},
signers::Signer,
types::{transaction::eip2718::TypedTransaction, Address, U256},
types::{transaction::eip2718::TypedTransaction, Address, H256, U256},
};
use tokio::time;
use tracing::debug;
use parking_lot::Mutex;
use serde::Deserialize;
use tracing::{debug, error, info};

use crate::{
contracts::gen::EntryPointAPI,
models::wallet::Wallet,
types::user_operation::UserOperation,
uopool::server::uopool::{uo_pool_client::UoPoolClient, GetSortedRequest},
uopool::server::{
bundler::Mode as GrpcMode,
uopool::{uo_pool_client::UoPoolClient, GetSortedRequest},
},
utils::{parse_address, parse_u256},
};

#[derive(Debug, Deserialize)]
pub enum Mode {
#[serde(rename = "auto")]
Auto,
#[serde(rename = "manual")]
Manual,
}

impl From<Mode> for GrpcMode {
fn from(value: Mode) -> Self {
match value {
Mode::Auto => Self::Auto,
Mode::Manual => Self::Manual,
}
}
}

#[derive(Debug, Parser, PartialEq)]
pub struct BundlerOpts {
#[clap(long, value_parser=parse_address)]
Expand All @@ -30,55 +53,45 @@ pub struct BundlerOpts {
pub min_balance: U256,

#[clap(long, default_value = "127.0.0.1:3002")]
pub bundler_grpc_listen_address: String,
pub bundler_grpc_listen_address: SocketAddr,

#[clap(long, default_value = "10")]
pub bundle_interval: u64,
}

pub struct Bundler<'a> {
pub wallet: &'a Wallet,
#[derive(Clone)]
pub struct Bundler {
pub wallet: Wallet,
pub beneficiary: Address,
pub uopool_grpc_client: UoPoolClient<tonic::transport::Channel>,
pub bundle_interval: u64,
pub entry_point: Address,
pub eth_client_address: String,
}

impl<'a> Bundler<'a> {
impl Bundler {
pub fn new(
wallet: &'a Wallet,
wallet: Wallet,
beneficiary: Address,
uopool_grpc_client: UoPoolClient<tonic::transport::Channel>,
bundle_interval: u64,
entry_point: Address,
eth_client_address: String,
) -> Self {
Self {
wallet,
beneficiary,
uopool_grpc_client,
bundle_interval,
entry_point,
eth_client_address,
}
}

pub async fn run(mut self) -> anyhow::Result<()> {
let mut interval = time::interval(Duration::from_secs(self.bundle_interval));

loop {
interval.tick().await;
self.send_next_bundle().await?;
}
}

async fn create_bundle(&mut self) -> anyhow::Result<Vec<UserOperation>> {
async fn create_bundle(&self) -> anyhow::Result<Vec<UserOperation>> {
let request = tonic::Request::new(GetSortedRequest {
entry_point: Some(self.entry_point.into()),
});
let response = self
.uopool_grpc_client
.clone()
.get_sorted_user_operations(request)
.await?;
let user_operations: Vec<UserOperation> = response
Expand All @@ -90,8 +103,10 @@ impl<'a> Bundler<'a> {
Ok(user_operations)
}

async fn send_next_bundle(&mut self) -> anyhow::Result<()> {
async fn send_next_bundle(&self) -> anyhow::Result<H256> {
info!("Creating the next bundle");
let bundles = self.create_bundle().await?;
info!("Got {} bundles", bundles.len());
let provider = Provider::<Http>::try_from(self.eth_client_address.clone())?;
let client = Arc::new(SignerMiddleware::new(provider, self.wallet.signer.clone()));
let entry_point = EntryPointAPI::new(self.entry_point, client.clone());
Expand All @@ -118,17 +133,117 @@ impl<'a> Bundler<'a> {
tx.set_gas_price(max_fee_per_gas);
}
};
let res = client.send_transaction(tx, None).await?.await?;
let tx = client.send_transaction(tx, None).await?;
let tx_hash = tx.tx_hash();
debug!("Send bundles with transaction: {tx:?}");

debug!("Send bundles with ret: {res:?}");
Ok(())
let res = tx.await?;
debug!("Send bundles with receipt: {res:?}");
Ok(tx_hash)
}
}

pub struct BundlerService {
pub bundlers: Vec<Bundler>,
pub bundle_interval: u64,
pub running: Arc<Mutex<bool>>,
}

fn is_running(running: Arc<Mutex<bool>>) -> bool {
let r = running.lock();
*r
}

impl BundlerService {
pub fn new(
wallet: Wallet,
beneficiary: Address,
uopool_grpc_client: UoPoolClient<tonic::transport::Channel>,
entry_points: Vec<Address>,
eth_client_address: String,
bundle_interval: u64,
) -> Self {
let bundlers: Vec<Bundler> = entry_points
.iter()
.map(|entry_point| {
Bundler::new(
wallet.clone(),
beneficiary,
uopool_grpc_client.clone(),
*entry_point,
eth_client_address.clone(),
)
})
.collect();

Self {
bundlers,
bundle_interval,
running: Arc::new(Mutex::new(false)),
}
}

pub async fn send_bundles_now(&self) -> anyhow::Result<H256> {
info!("Sending bundle now");
let mut tx_hashes: Vec<H256> = vec![];
for bundler in self.bundlers.iter() {
info!("Sending bundle for entry point: {:?}", bundler.entry_point);
let tx_hash = bundler.send_next_bundle().await?;
tx_hashes.push(tx_hash)
}

// FIXME: Because currently the bundler support multiple bundler and
// we don't have a way to know which bundler is the one that is
zsluedem marked this conversation as resolved.
Show resolved Hide resolved
Ok(tx_hashes
.into_iter()
.next()
.expect("Must have at least one tx hash"))
}

pub fn stop_bundling(&self) {
info!("Stopping auto bundling");
let mut r = self.running.lock();
*r = false;
}

pub fn is_running(&self) -> bool {
is_running(self.running.clone())
}

pub fn start_bundling(&self) {
if !self.is_running() {
for bundler in self.bundlers.iter() {
info!(
"Starting auto bundling process for entry point: {:?}",
bundler.entry_point
);
let bundler_own = bundler.clone();
let interval = self.bundle_interval;
let running_lock = self.running.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(interval));
loop {
if !is_running(running_lock.clone()) {
break;
}
interval.tick().await;

if let Err(e) = bundler_own.send_next_bundle().await {
error!("Error while sending bundle: {e:?}");
}
}
});
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
use std::{
net::{IpAddr, Ipv4Addr},
str::FromStr,
};

#[test]
fn bundler_opts() {
Expand All @@ -151,7 +266,10 @@ mod tests {
.unwrap(),
gas_factor: U256::from(600),
min_balance: U256::from(1),
bundler_grpc_listen_address: String::from("127.0.0.1:3002"),
bundler_grpc_listen_address: SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
3002
),
bundle_interval: 10,
},
BundlerOpts::try_parse_from(args).unwrap()
Expand Down
Loading