Skip to content

Commit

Permalink
shim: parallel submission
Browse files Browse the repository at this point in the history
rollkit demo submits a lot of blobs. In my testing I saw
dozens of blobs submitted at the same time. Ofc submitting
those serially won't cut it, esp. so if waiting until
finalization.

This changeset solves the problem by splitting the signing
part, which is still serial, and the submission, which is
now parallel.

As a drive-by fix, all the errors for rollkit are now
encapsulated into an enum.

Closes #259
  • Loading branch information
pepyakin committed Mar 4, 2024
1 parent 417ed72 commit 3cf1316
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 65 deletions.
6 changes: 5 additions & 1 deletion ikura/shim/src/cmd/query/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ pub async fn run(params: Params) -> anyhow::Result<()> {
let namespace = read_namespace(&namespace)?;
let client = connect_rpc(rpc).await?;
tracing::info!("submitting blob to namespace {}", namespace);
let (block_hash, _) = client.submit_blob(blob, namespace, key).await?;
let nonce = client.get_last_nonce(&key).await?;
let blob_extrinsic = client
.make_blob_extrinsic(blob, namespace, &key, nonce)
.await?;
let (block_hash, _) = client.submit_blob(&blob_extrinsic).await?;
tracing::info!("submitted blob to block hash 0x{}", hex::encode(block_hash));
Ok(())
}
Expand Down
181 changes: 134 additions & 47 deletions ikura/shim/src/dock/rollkit.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{collections::HashMap, fmt};
use std::{collections::HashMap, fmt, sync::Arc};
use tokio::sync::Mutex;
use tonic::{transport::Server, Request, Response, Status};
use tracing::info;

Expand Down Expand Up @@ -57,6 +58,7 @@ struct RollkitDock {
client: ikura_rpc::Client,
submit_key: Option<Keypair>,
namespace: Option<ikura_nmt::Namespace>,
cur_nonce: Arc<Mutex<Option<u64>>>,
}

impl RollkitDock {
Expand All @@ -69,6 +71,7 @@ impl RollkitDock {
client,
submit_key,
namespace,
cur_nonce: Arc::new(Mutex::new(None)),
}
}
}
Expand All @@ -91,18 +94,16 @@ impl da_service_server::DaService for RollkitDock {
let mut cache = HashMap::new();
let mut response = GetResponse { blobs: vec![] };
for (index, id) in ids.into_iter().enumerate() {
let blob_id = BlobId::try_from(id)
.map_err(|_| Status::invalid_argument(format!("not a valid ID at {index}")))?;
let blob_id =
BlobId::try_from(id).map_err(|_| RollkitDockError::GetInvalidBlobId { index })?;
let block_number = blob_id.block_number;
if !cache.contains_key(&block_number) {
let block_hash = self.client.await_finalized_height(block_number).await;
let block = self
.client
.await_block_at(Some(block_hash))
.await
.map_err(|_| {
Status::internal("failed to retrieve block number {block_number}")
})?;
.map_err(|_| RollkitDockError::GetRetrieveBlock { block_number })?;
cache.insert(blob_id.block_number, block);
}
// unwrap: at this point we know the block is in the cache, because at this point
Expand All @@ -117,7 +118,7 @@ impl da_service_server::DaService for RollkitDock {
value: needle.data.clone(),
});
} else {
return Err(Status::not_found(format!("blob not found at {blob_id}")));
return Err(RollkitDockError::CantResolveBlobId(blob_id).into());
}
}
Ok(Response::new(response))
Expand Down Expand Up @@ -158,57 +159,73 @@ impl da_service_server::DaService for RollkitDock {
.submit_key
.as_ref()
.cloned()
.ok_or_else(|| Status::failed_precondition("no key for signing blobs"))?;
let namespace = self.namespace.ok_or_else(|| {
Status::failed_precondition("no namespace provided, and no default namespace set")
})?;
.ok_or_else(|| RollkitDockError::NoSigningKey)?;
let namespace = self
.namespace
.ok_or_else(|| RollkitDockError::NamespaceNotProvided)?;
let SubmitRequest {
blobs,
gas_price: _,
} = request.into_inner();
let mut response = SubmitResponse {
ids: vec![],
proofs: vec![],
};
let blob_n = blobs.len();

// First, prepare a list of extrinsics to submit.
let mut extrinsics = vec![];
for (i, blob) in blobs.into_iter().enumerate() {
let data_hash = sha2_hash(&blob.value);
info!(
"submitting blob {i}/{blob_n} (0x{}) to namespace {}",
hex::encode(&data_hash),
namespace,
);
let (block_hash, extrinsic_index) = self
.client
.submit_blob(blob.value, namespace, submit_key.clone())
let nonce = self
.gen_nonce()
.await
.map_err(|err| Status::internal(format!("failed to submit blob: {err}")))?;
// TODO: getting the whole block is a bit inefficient, consider optimizing.
let block_number = match self
.map_err(RollkitDockError::NonceGeneration)?;
let extrinsic = self
.client
.await_block_at(Some(block_hash))
.make_blob_extrinsic(blob.value, namespace, &submit_key, nonce)
.await
.map(|block| block.number)
{
Ok(block_number) => block_number,
Err(err) => {
return Err(Status::internal(format!(
"failed to obtain block number for 0x{}: {:?}",
hex::encode(&block_hash),
err,
)));
}
};
let blob_id = BlobId {
block_number,
extrinsic_index,
data_hash,
};
info!("blob landed: {blob_id}");
response.ids.push(blob_id.into());
response.proofs.push(pbda::Proof { value: vec![] });
.map_err(RollkitDockError::MakeSubmitBlobExtrinsic)?;
extrinsics.push((i, data_hash, extrinsic));
}
Ok(Response::new(response))

// Then, submit the extrinsics in parallel and collect the results.
let futs = extrinsics
.into_iter()
.map(|(i, data_hash, extrinsic)| async move {
info!(
"submitting blob {i}/{blob_n} (0x{}) to namespace {}",
hex::encode(&data_hash),
namespace
);
let (block_hash, extrinsic_index) = self
.client
.submit_blob(&extrinsic)
.await
.map_err(RollkitDockError::SubmitBlob)?;
// TODO: getting the whole block is a bit inefficient, consider optimizing.
let block_number = match self
.client
.await_block_at(Some(block_hash))
.await
.map(|block| block.number)
{
Ok(block_number) => block_number,
Err(err) => {
return Err(RollkitDockError::SubmitRetrieveBlockNumber {
block_hash,
err,
});
}
};
let blob_id = BlobId {
block_number,
extrinsic_index,
data_hash,
};
info!("blob landed: {blob_id}");
Ok(blob_id.into())
});

let ids: Vec<_> = futures::future::try_join_all(futs).await?;
let proofs = ids.iter().map(|_| pbda::Proof { value: vec![] }).collect();
Ok(Response::new(SubmitResponse { proofs, ids }))
}

async fn validate(
Expand Down Expand Up @@ -241,11 +258,81 @@ impl da_service_server::DaService for RollkitDock {
}
}

impl RollkitDock {
/// Generates a new nonce suitable for signing an extrinsic from the signer.
async fn gen_nonce(&self) -> anyhow::Result<u64> {
let submit_key = self
.submit_key
.as_ref()
.ok_or_else(|| anyhow::anyhow!("no key for signing blobs"))?; // should be unreachable
let mut cur_nonce = self.cur_nonce.lock().await;
let nonce = match *cur_nonce {
Some(nonce) => nonce,
None => self.client.get_last_nonce(&submit_key).await?,
};
cur_nonce.replace(nonce + 1);
Ok(nonce)
}
}

fn sha2_hash(data: &[u8]) -> [u8; 32] {
use sha2::Digest;
sha2::Sha256::digest(data).into()
}

enum RollkitDockError {
NoSigningKey,
MakeSubmitBlobExtrinsic(anyhow::Error),
SubmitBlob(anyhow::Error),
NonceGeneration(anyhow::Error),
GetInvalidBlobId {
index: usize,
},
GetRetrieveBlock {
block_number: u64,
},
SubmitRetrieveBlockNumber {
block_hash: [u8; 32],
err: anyhow::Error,
},
CantResolveBlobId(BlobId),
NamespaceNotProvided,
}

impl From<RollkitDockError> for Status {
fn from(me: RollkitDockError) -> Status {
use RollkitDockError::*;
match me {
NoSigningKey => {
Status::failed_precondition("the key for signing blobs is not provided")
}
MakeSubmitBlobExtrinsic(err) => {
Status::internal(format!("failed to create a submit blob extrinsic: {err}"))
}
SubmitBlob(err) => Status::internal(format!("failed to submit blob: {err}")),
NonceGeneration(err) => Status::internal(format!("failed to generate a nonce: {err}")),
GetInvalidBlobId { index } => {
Status::invalid_argument(format!("not a valid blob ID at index {index}"))
}
GetRetrieveBlock { block_number } => {
Status::internal(format!("failed to retrieve block number {block_number}"))
}
SubmitRetrieveBlockNumber { block_hash, err } => Status::internal(format!(
"failed to obtain block number for 0x{}: {}",
hex::encode(block_hash),
err,
)),
CantResolveBlobId(blob_id) => {
Status::not_found(format!("cannot resolve blob ID: {blob_id}"))
}
NamespaceNotProvided => Status::failed_precondition(
"no namespace provided, and no default names
pace set",
),
}
}
}

struct BlobId {
/// The block number at which the blob in question has been landed.
block_number: u64,
Expand Down
16 changes: 16 additions & 0 deletions ikura/shim/src/dock/rpc_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@ pub fn no_signing_key() -> ErrorObjectOwned {
)
}

pub fn nonce_obtain_error(e: anyhow::Error) -> ErrorObjectOwned {
ErrorObjectOwned::owned(
jsonrpsee::types::error::INTERNAL_ERROR_CODE,
format!("Internal Error: failed to obtain nonce: {:?}", e),
None::<()>,
)
}

pub fn submit_extrinsic_error(e: anyhow::Error) -> ErrorObjectOwned {
ErrorObjectOwned::owned(
jsonrpsee::types::error::INTERNAL_ERROR_CODE,
format!("Internal Error: failed to create a submit blob extrinsic: {:?}", e),
None::<()>,
)
}

pub fn submission_error(e: anyhow::Error) -> ErrorObjectOwned {
ErrorObjectOwned::owned(
jsonrpsee::types::error::INTERNAL_ERROR_CODE,
Expand Down
34 changes: 32 additions & 2 deletions ikura/shim/src/dock/sovereign.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::sync::Arc;

use ikura_shim_common_sovereign::{Block, SovereignRPCServer};
use jsonrpsee::{server::Server, types::ErrorObjectOwned};
use tokio::sync::Mutex;
use tracing::info;

use super::rpc_error as err;
Expand Down Expand Up @@ -32,11 +35,16 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
struct SovereignDock {
client: ikura_rpc::Client,
submit_key: Option<Keypair>,
cur_nonce: Arc<Mutex<Option<u64>>>,
}

impl SovereignDock {
fn new(client: ikura_rpc::Client, submit_key: Option<Keypair>) -> Self {
Self { client, submit_key }
Self {
client,
submit_key,
cur_nonce: Arc::new(Mutex::new(None)),
}
}
}

Expand Down Expand Up @@ -81,14 +89,36 @@ impl SovereignRPCServer for SovereignDock {
.as_ref()
.cloned()
.ok_or_else(err::no_signing_key)?;
let nonce = self.gen_nonce().await.map_err(err::nonce_obtain_error)?;
let blob_extrinsic = self
.client
.make_blob_extrinsic(blob, namespace, &submit_key, nonce)
.await
.map_err(err::submit_extrinsic_error)?;
self.client
.submit_blob(blob, namespace, submit_key)
.submit_blob(&blob_extrinsic)
.await
.map_err(err::submission_error)?;
Ok(())
}
}

impl SovereignDock {
async fn gen_nonce(&self) -> anyhow::Result<u64> {
let submit_key = self
.submit_key
.as_ref()
.ok_or_else(|| anyhow::anyhow!("no key for signing blobs"))?; // should be unreachable
let mut cur_nonce = self.cur_nonce.lock().await;
let nonce = match *cur_nonce {
Some(nonce) => nonce,
None => self.client.get_last_nonce(&submit_key).await?,
};
cur_nonce.replace(nonce + 1);
Ok(nonce)
}
}

/// Creates a namespace proof for the given namespace in the given block.
fn make_namespace_proof(
block: &ikura_rpc::Block,
Expand Down
Loading

0 comments on commit 3cf1316

Please sign in to comment.