From 3cf1316d34ab761a15fd8cb835073ad0d6fc57a6 Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Thu, 29 Feb 2024 16:52:45 +0700 Subject: [PATCH] shim: parallel submission rollkit demo submits a lot of blobs. In my testing I saw dozens of blobs submitted at the same time. Ofc submitting those serially won't cut it, esp. so if waiting until finalization. This changeset solves the problem by splitting the signing part, which is still serial, and the submission, which is now parallel. As a drive-by fix, all the errors for rollkit are now encapsulated into an enum. Closes #259 --- ikura/shim/src/cmd/query/submit.rs | 6 +- ikura/shim/src/dock/rollkit.rs | 181 +++++++++++++++++++++-------- ikura/shim/src/dock/rpc_error.rs | 16 +++ ikura/shim/src/dock/sovereign.rs | 34 +++++- ikura/shim/src/ikura_rpc/mod.rs | 74 +++++++++--- 5 files changed, 246 insertions(+), 65 deletions(-) diff --git a/ikura/shim/src/cmd/query/submit.rs b/ikura/shim/src/cmd/query/submit.rs index 206c0eb..2885621 100644 --- a/ikura/shim/src/cmd/query/submit.rs +++ b/ikura/shim/src/cmd/query/submit.rs @@ -20,7 +20,11 @@ pub async fn run(params: Params) -> anyhow::Result<()> { let namespace = read_namespace(&namespace)?; let client = connect_rpc(rpc).await?; tracing::info!("submitting blob to namespace {}", namespace); - let (block_hash, _) = client.submit_blob(blob, namespace, key).await?; + let nonce = client.get_last_nonce(&key).await?; + let blob_extrinsic = client + .make_blob_extrinsic(blob, namespace, &key, nonce) + .await?; + let (block_hash, _) = client.submit_blob(&blob_extrinsic).await?; tracing::info!("submitted blob to block hash 0x{}", hex::encode(block_hash)); Ok(()) } diff --git a/ikura/shim/src/dock/rollkit.rs b/ikura/shim/src/dock/rollkit.rs index a26db07..7c05b88 100644 --- a/ikura/shim/src/dock/rollkit.rs +++ b/ikura/shim/src/dock/rollkit.rs @@ -1,4 +1,5 @@ -use std::{collections::HashMap, fmt}; +use std::{collections::HashMap, fmt, sync::Arc}; +use tokio::sync::Mutex; use tonic::{transport::Server, Request, Response, Status}; use tracing::info; @@ -57,6 +58,7 @@ struct RollkitDock { client: ikura_rpc::Client, submit_key: Option, namespace: Option, + cur_nonce: Arc>>, } impl RollkitDock { @@ -69,6 +71,7 @@ impl RollkitDock { client, submit_key, namespace, + cur_nonce: Arc::new(Mutex::new(None)), } } } @@ -91,8 +94,8 @@ impl da_service_server::DaService for RollkitDock { let mut cache = HashMap::new(); let mut response = GetResponse { blobs: vec![] }; for (index, id) in ids.into_iter().enumerate() { - let blob_id = BlobId::try_from(id) - .map_err(|_| Status::invalid_argument(format!("not a valid ID at {index}")))?; + let blob_id = + BlobId::try_from(id).map_err(|_| RollkitDockError::GetInvalidBlobId { index })?; let block_number = blob_id.block_number; if !cache.contains_key(&block_number) { let block_hash = self.client.await_finalized_height(block_number).await; @@ -100,9 +103,7 @@ impl da_service_server::DaService for RollkitDock { .client .await_block_at(Some(block_hash)) .await - .map_err(|_| { - Status::internal("failed to retrieve block number {block_number}") - })?; + .map_err(|_| RollkitDockError::GetRetrieveBlock { block_number })?; cache.insert(blob_id.block_number, block); } // unwrap: at this point we know the block is in the cache, because at this point @@ -117,7 +118,7 @@ impl da_service_server::DaService for RollkitDock { value: needle.data.clone(), }); } else { - return Err(Status::not_found(format!("blob not found at {blob_id}"))); + return Err(RollkitDockError::CantResolveBlobId(blob_id).into()); } } Ok(Response::new(response)) @@ -158,57 +159,73 @@ impl da_service_server::DaService for RollkitDock { .submit_key .as_ref() .cloned() - .ok_or_else(|| Status::failed_precondition("no key for signing blobs"))?; - let namespace = self.namespace.ok_or_else(|| { - Status::failed_precondition("no namespace provided, and no default namespace set") - })?; + .ok_or_else(|| RollkitDockError::NoSigningKey)?; + let namespace = self + .namespace + .ok_or_else(|| RollkitDockError::NamespaceNotProvided)?; let SubmitRequest { blobs, gas_price: _, } = request.into_inner(); - let mut response = SubmitResponse { - ids: vec![], - proofs: vec![], - }; let blob_n = blobs.len(); + + // First, prepare a list of extrinsics to submit. + let mut extrinsics = vec![]; for (i, blob) in blobs.into_iter().enumerate() { let data_hash = sha2_hash(&blob.value); - info!( - "submitting blob {i}/{blob_n} (0x{}) to namespace {}", - hex::encode(&data_hash), - namespace, - ); - let (block_hash, extrinsic_index) = self - .client - .submit_blob(blob.value, namespace, submit_key.clone()) + let nonce = self + .gen_nonce() .await - .map_err(|err| Status::internal(format!("failed to submit blob: {err}")))?; - // TODO: getting the whole block is a bit inefficient, consider optimizing. - let block_number = match self + .map_err(RollkitDockError::NonceGeneration)?; + let extrinsic = self .client - .await_block_at(Some(block_hash)) + .make_blob_extrinsic(blob.value, namespace, &submit_key, nonce) .await - .map(|block| block.number) - { - Ok(block_number) => block_number, - Err(err) => { - return Err(Status::internal(format!( - "failed to obtain block number for 0x{}: {:?}", - hex::encode(&block_hash), - err, - ))); - } - }; - let blob_id = BlobId { - block_number, - extrinsic_index, - data_hash, - }; - info!("blob landed: {blob_id}"); - response.ids.push(blob_id.into()); - response.proofs.push(pbda::Proof { value: vec![] }); + .map_err(RollkitDockError::MakeSubmitBlobExtrinsic)?; + extrinsics.push((i, data_hash, extrinsic)); } - Ok(Response::new(response)) + + // Then, submit the extrinsics in parallel and collect the results. + let futs = extrinsics + .into_iter() + .map(|(i, data_hash, extrinsic)| async move { + info!( + "submitting blob {i}/{blob_n} (0x{}) to namespace {}", + hex::encode(&data_hash), + namespace + ); + let (block_hash, extrinsic_index) = self + .client + .submit_blob(&extrinsic) + .await + .map_err(RollkitDockError::SubmitBlob)?; + // TODO: getting the whole block is a bit inefficient, consider optimizing. + let block_number = match self + .client + .await_block_at(Some(block_hash)) + .await + .map(|block| block.number) + { + Ok(block_number) => block_number, + Err(err) => { + return Err(RollkitDockError::SubmitRetrieveBlockNumber { + block_hash, + err, + }); + } + }; + let blob_id = BlobId { + block_number, + extrinsic_index, + data_hash, + }; + info!("blob landed: {blob_id}"); + Ok(blob_id.into()) + }); + + let ids: Vec<_> = futures::future::try_join_all(futs).await?; + let proofs = ids.iter().map(|_| pbda::Proof { value: vec![] }).collect(); + Ok(Response::new(SubmitResponse { proofs, ids })) } async fn validate( @@ -241,11 +258,81 @@ impl da_service_server::DaService for RollkitDock { } } +impl RollkitDock { + /// Generates a new nonce suitable for signing an extrinsic from the signer. + async fn gen_nonce(&self) -> anyhow::Result { + let submit_key = self + .submit_key + .as_ref() + .ok_or_else(|| anyhow::anyhow!("no key for signing blobs"))?; // should be unreachable + let mut cur_nonce = self.cur_nonce.lock().await; + let nonce = match *cur_nonce { + Some(nonce) => nonce, + None => self.client.get_last_nonce(&submit_key).await?, + }; + cur_nonce.replace(nonce + 1); + Ok(nonce) + } +} + fn sha2_hash(data: &[u8]) -> [u8; 32] { use sha2::Digest; sha2::Sha256::digest(data).into() } +enum RollkitDockError { + NoSigningKey, + MakeSubmitBlobExtrinsic(anyhow::Error), + SubmitBlob(anyhow::Error), + NonceGeneration(anyhow::Error), + GetInvalidBlobId { + index: usize, + }, + GetRetrieveBlock { + block_number: u64, + }, + SubmitRetrieveBlockNumber { + block_hash: [u8; 32], + err: anyhow::Error, + }, + CantResolveBlobId(BlobId), + NamespaceNotProvided, +} + +impl From for Status { + fn from(me: RollkitDockError) -> Status { + use RollkitDockError::*; + match me { + NoSigningKey => { + Status::failed_precondition("the key for signing blobs is not provided") + } + MakeSubmitBlobExtrinsic(err) => { + Status::internal(format!("failed to create a submit blob extrinsic: {err}")) + } + SubmitBlob(err) => Status::internal(format!("failed to submit blob: {err}")), + NonceGeneration(err) => Status::internal(format!("failed to generate a nonce: {err}")), + GetInvalidBlobId { index } => { + Status::invalid_argument(format!("not a valid blob ID at index {index}")) + } + GetRetrieveBlock { block_number } => { + Status::internal(format!("failed to retrieve block number {block_number}")) + } + SubmitRetrieveBlockNumber { block_hash, err } => Status::internal(format!( + "failed to obtain block number for 0x{}: {}", + hex::encode(block_hash), + err, + )), + CantResolveBlobId(blob_id) => { + Status::not_found(format!("cannot resolve blob ID: {blob_id}")) + } + NamespaceNotProvided => Status::failed_precondition( + "no namespace provided, and no default names + pace set", + ), + } + } +} + struct BlobId { /// The block number at which the blob in question has been landed. block_number: u64, diff --git a/ikura/shim/src/dock/rpc_error.rs b/ikura/shim/src/dock/rpc_error.rs index 34b6f49..8993733 100644 --- a/ikura/shim/src/dock/rpc_error.rs +++ b/ikura/shim/src/dock/rpc_error.rs @@ -8,6 +8,22 @@ pub fn no_signing_key() -> ErrorObjectOwned { ) } +pub fn nonce_obtain_error(e: anyhow::Error) -> ErrorObjectOwned { + ErrorObjectOwned::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + format!("Internal Error: failed to obtain nonce: {:?}", e), + None::<()>, + ) +} + +pub fn submit_extrinsic_error(e: anyhow::Error) -> ErrorObjectOwned { + ErrorObjectOwned::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + format!("Internal Error: failed to create a submit blob extrinsic: {:?}", e), + None::<()>, + ) +} + pub fn submission_error(e: anyhow::Error) -> ErrorObjectOwned { ErrorObjectOwned::owned( jsonrpsee::types::error::INTERNAL_ERROR_CODE, diff --git a/ikura/shim/src/dock/sovereign.rs b/ikura/shim/src/dock/sovereign.rs index 9edcc05..d483131 100644 --- a/ikura/shim/src/dock/sovereign.rs +++ b/ikura/shim/src/dock/sovereign.rs @@ -1,5 +1,8 @@ +use std::sync::Arc; + use ikura_shim_common_sovereign::{Block, SovereignRPCServer}; use jsonrpsee::{server::Server, types::ErrorObjectOwned}; +use tokio::sync::Mutex; use tracing::info; use super::rpc_error as err; @@ -32,11 +35,16 @@ pub async fn run(config: Config) -> anyhow::Result<()> { struct SovereignDock { client: ikura_rpc::Client, submit_key: Option, + cur_nonce: Arc>>, } impl SovereignDock { fn new(client: ikura_rpc::Client, submit_key: Option) -> Self { - Self { client, submit_key } + Self { + client, + submit_key, + cur_nonce: Arc::new(Mutex::new(None)), + } } } @@ -81,14 +89,36 @@ impl SovereignRPCServer for SovereignDock { .as_ref() .cloned() .ok_or_else(err::no_signing_key)?; + let nonce = self.gen_nonce().await.map_err(err::nonce_obtain_error)?; + let blob_extrinsic = self + .client + .make_blob_extrinsic(blob, namespace, &submit_key, nonce) + .await + .map_err(err::submit_extrinsic_error)?; self.client - .submit_blob(blob, namespace, submit_key) + .submit_blob(&blob_extrinsic) .await .map_err(err::submission_error)?; Ok(()) } } +impl SovereignDock { + async fn gen_nonce(&self) -> anyhow::Result { + let submit_key = self + .submit_key + .as_ref() + .ok_or_else(|| anyhow::anyhow!("no key for signing blobs"))?; // should be unreachable + let mut cur_nonce = self.cur_nonce.lock().await; + let nonce = match *cur_nonce { + Some(nonce) => nonce, + None => self.client.get_last_nonce(&submit_key).await?, + }; + cur_nonce.replace(nonce + 1); + Ok(nonce) + } +} + /// Creates a namespace proof for the given namespace in the given block. fn make_namespace_proof( block: &ikura_rpc::Block, diff --git a/ikura/shim/src/ikura_rpc/mod.rs b/ikura/shim/src/ikura_rpc/mod.rs index 088e7d0..018b39b 100644 --- a/ikura/shim/src/ikura_rpc/mod.rs +++ b/ikura/shim/src/ikura_rpc/mod.rs @@ -5,8 +5,16 @@ use anyhow::Context; use ikura_nmt::Namespace; use ikura_subxt::{ ikura::runtime_types::pallet_ikura_blobs::namespace_param::UnvalidatedNamespace, Header, + IkuraConfig, +}; +use subxt::{ + config::Header as _, + error::BlockError, + rpc_params, + tx::{Signer, SubmittableExtrinsic}, + utils::H256, + OnlineClient, }; -use subxt::{config::Header as _, error::BlockError, rpc_params, utils::H256}; use tokio::sync::watch; use tracing::Level; @@ -193,40 +201,76 @@ impl Client { Block::from_header_and_extrinsics(self.await_header_and_extrinsics(block_hash).await) } - /// Submit a blob with the given namespace and signed with the given key. The block is submitted - /// at best effort. Not much is done to ensure that the blob is actually included. If this - /// function returned an error, that does not mean that the blob was not included. - /// - /// Returns a block hash in which the extrinsic was included and the extrinsic index. - #[tracing::instrument(level = Level::DEBUG, skip(self))] - pub async fn submit_blob( + /// Creates a submit blob extrinsic with the given data, namespace and signed with the given key + /// and nonce. + pub async fn make_blob_extrinsic( &self, blob: Vec, namespace: ikura_nmt::Namespace, - key: Keypair, - ) -> anyhow::Result<([u8; 32], u32)> { + key: &Keypair, + nonce: u64, + ) -> anyhow::Result { + let conn = self.connector.ensure_connected().await; let extrinsic = ikura_subxt::ikura::tx() .blobs() .submit_blob(UnvalidatedNamespace(namespace.to_raw_bytes()), blob); - - let conn = self.connector.ensure_connected().await; let signed = conn .subxt .tx() - .create_signed(&extrinsic, &key, Default::default()) - .await + .create_signed_with_nonce(&extrinsic, key, nonce, Default::default()) .with_context(|| format!("failed to validate or sign extrinsic"))?; + Ok(BlobExtrinsic(signed)) + } + + /// Submit a blob with the given namespace and signed with the given key. The block is submitted + /// at best effort. Not much is done to ensure that the blob is actually included. If this + /// function returned an error, that does not mean that the blob was not included. + /// + /// Returns a block hash in which the extrinsic was included and the extrinsic index. + #[tracing::instrument(level = Level::DEBUG, skip(self))] + pub async fn submit_blob( + &self, + blob_extrinsic: &BlobExtrinsic, + ) -> anyhow::Result<([u8; 32], u32)> { + let BlobExtrinsic(signed) = blob_extrinsic; let events = signed .submit_and_watch() .await .with_context(|| format!("failed to submit extrinsic"))? .wait_for_finalized_success() .await?; - let block_hash = events.block_hash(); let extrinsic_index = events.extrinsic_index(); Ok((block_hash.0, extrinsic_index)) } + + /// Returns the last nonce observed on the account of the signer. + pub async fn get_last_nonce(&self, key: &Keypair) -> anyhow::Result { + let conn = self.connector.ensure_connected().await; + let nonce = conn + .subxt + .tx() + .account_nonce( + &>::account_id(key), + ) + .await?; + Ok(nonce) + } +} + +/// Signed blob extrinsic. The extirnsic is signed against a certain nonce value. +/// The extrinsic is ready to be submitted to the network. +pub struct BlobExtrinsic(SubmittableExtrinsic>); + +impl fmt::Debug for BlobExtrinsic { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let hash = self.0.hash(); + let len = self.0.encoded().len(); + f.debug_struct("BlobExtrinsic") + .field("hash", &hash) + .field("len", &len) + .finish() + } } /// Iterates over the extrinsics in a block and extracts the timestamp.