Skip to content

Commit

Permalink
fix(proof_data_handler): TEE blob fetching error handling (#2674)
Browse files Browse the repository at this point in the history
We ran into a problem in the staging environment where TEE blob fetching
failed because of a 30-day retention policy on blobs in Google Cloud
Storage. The TEE prover was failing for all old batches
(`l1_batch_number < 58300`). This commit fixes the issue by adding
better error handling when the blob for a given batch number isn't
available.

## What ❔

Graceful error handling for the TEE proof data handler when there is no
blob in Google Cloud Storage for the specified batch number.

## Why ❔

We need more robust error handling.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
pbeza authored Aug 26, 2024
1 parent 62d7e19 commit c162510
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 54 deletions.
2 changes: 1 addition & 1 deletion core/bin/zksync_tee_prover/src/tee_prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ impl Task for TeeProver {
if !err.is_retriable() || retries > self.config.max_retries {
return Err(err.into());
}
retries += 1;
tracing::warn!(%err, "Failed TEE prover step function {retries}/{}, retrying in {} milliseconds.", self.config.max_retries, backoff.as_millis());
retries += 1;
backoff = std::cmp::min(
backoff.mul_f32(self.config.retry_backoff_multiplier),
self.config.max_backoff,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion core/lib/dal/doc/TeeProofGenerationDal.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ title: Status Diagram
---
stateDiagram-v2
[*] --> ready_to_be_proven : insert_tee_proof_generation_job
ready_to_be_proven --> picked_by_prover : get_next_batch_to_be_proven
ready_to_be_proven --> picked_by_prover : lock_batch_for_proving
picked_by_prover --> generated : save_proof_artifacts_metadata
generated --> [*]
picked_by_prover --> unpicked : unlock_batch
unpicked --> [*]
```
46 changes: 41 additions & 5 deletions core/lib/dal/src/tee_proof_generation_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
use std::time::Duration;

use zksync_db_connection::{
connection::Connection, error::DalResult, instrument::Instrumented,
connection::Connection,
error::DalResult,
instrument::{InstrumentExt, Instrumented},
utils::pg_interval_from_duration,
};
use zksync_types::{tee_types::TeeType, L1BatchNumber};
Expand All @@ -18,12 +20,14 @@ pub struct TeeProofGenerationDal<'a, 'c> {
}

impl TeeProofGenerationDal<'_, '_> {
pub async fn get_next_batch_to_be_proven(
pub async fn lock_batch_for_proving(
&mut self,
tee_type: TeeType,
processing_timeout: Duration,
min_batch_number: Option<L1BatchNumber>,
) -> DalResult<Option<L1BatchNumber>> {
let processing_timeout = pg_interval_from_duration(processing_timeout);
let min_batch_number = min_batch_number.map_or(0, |num| i64::from(num.0));
let query = sqlx::query!(
r#"
UPDATE tee_proof_generation_details
Expand All @@ -48,6 +52,7 @@ impl TeeProofGenerationDal<'_, '_> {
AND proofs.prover_taken_at < NOW() - $3::INTERVAL
)
)
AND proofs.l1_batch_number >= $4
ORDER BY
l1_batch_number ASC
LIMIT
Expand All @@ -58,13 +63,16 @@ impl TeeProofGenerationDal<'_, '_> {
RETURNING
tee_proof_generation_details.l1_batch_number
"#,
&tee_type.to_string(),
tee_type.to_string(),
TeeVerifierInputProducerJobStatus::Successful as TeeVerifierInputProducerJobStatus,
&processing_timeout,
processing_timeout,
min_batch_number
);
let batch_number = Instrumented::new("get_next_batch_to_be_proven")

let batch_number = Instrumented::new("lock_batch_for_proving")
.with_arg("tee_type", &tee_type)
.with_arg("processing_timeout", &processing_timeout)
.with_arg("l1_batch_number", &min_batch_number)
.with(query)
.fetch_optional(self.storage)
.await?
Expand All @@ -73,6 +81,34 @@ impl TeeProofGenerationDal<'_, '_> {
Ok(batch_number)
}

pub async fn unlock_batch(
&mut self,
l1_batch_number: L1BatchNumber,
tee_type: TeeType,
) -> DalResult<()> {
let batch_number = i64::from(l1_batch_number.0);
sqlx::query!(
r#"
UPDATE tee_proof_generation_details
SET
status = 'unpicked',
updated_at = NOW()
WHERE
l1_batch_number = $1
AND tee_type = $2
"#,
batch_number,
tee_type.to_string()
)
.instrument("unlock_batch")
.with_arg("l1_batch_number", &batch_number)
.with_arg("tee_type", &tee_type)
.execute(self.storage)
.await?;

Ok(())
}

pub async fn save_proof_artifacts_metadata(
&mut self,
batch_number: L1BatchNumber,
Expand Down
6 changes: 6 additions & 0 deletions core/node/proof_data_handler/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ pub(crate) enum RequestProcessorError {
Dal(DalError),
}

impl From<DalError> for RequestProcessorError {
fn from(err: DalError) -> Self {
RequestProcessorError::Dal(err)
}
}

impl IntoResponse for RequestProcessorError {
fn into_response(self) -> Response {
let (status_code, message) = match self {
Expand Down
120 changes: 76 additions & 44 deletions core/node/proof_data_handler/src/tee_request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ use std::sync::Arc;
use axum::{extract::Path, Json};
use zksync_config::configs::ProofDataHandlerConfig;
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_object_store::ObjectStore;
use zksync_prover_interface::{
api::{
RegisterTeeAttestationRequest, RegisterTeeAttestationResponse, SubmitProofResponse,
SubmitTeeProofRequest, TeeProofGenerationDataRequest, TeeProofGenerationDataResponse,
},
inputs::TeeVerifierInput,
use zksync_object_store::{ObjectStore, ObjectStoreError};
use zksync_prover_interface::api::{
RegisterTeeAttestationRequest, RegisterTeeAttestationResponse, SubmitProofResponse,
SubmitTeeProofRequest, TeeProofGenerationDataRequest, TeeProofGenerationDataResponse,
};
use zksync_types::L1BatchNumber;
use zksync_types::{tee_types::TeeType, L1BatchNumber};

use crate::errors::RequestProcessorError;

Expand Down Expand Up @@ -41,32 +38,77 @@ impl TeeRequestProcessor {
) -> Result<Json<TeeProofGenerationDataResponse>, RequestProcessorError> {
tracing::info!("Received request for proof generation data: {:?}", request);

let mut connection = self
.pool
.connection()
.await
.map_err(RequestProcessorError::Dal)?;

let l1_batch_number_result = connection
.tee_proof_generation_dal()
.get_next_batch_to_be_proven(request.tee_type, self.config.proof_generation_timeout())
.await
.map_err(RequestProcessorError::Dal)?;

let l1_batch_number = match l1_batch_number_result {
Some(number) => number,
None => return Ok(Json(TeeProofGenerationDataResponse(None))),
let mut min_batch_number: Option<L1BatchNumber> = None;
let mut missing_range: Option<(L1BatchNumber, L1BatchNumber)> = None;

let result = loop {
let l1_batch_number = match self
.lock_batch_for_proving(request.tee_type, min_batch_number)
.await?
{
Some(number) => number,
None => break Ok(Json(TeeProofGenerationDataResponse(None))),
};

match self.blob_store.get(l1_batch_number).await {
Ok(input) => break Ok(Json(TeeProofGenerationDataResponse(Some(Box::new(input))))),
Err(ObjectStoreError::KeyNotFound(_)) => {
missing_range = match missing_range {
Some((start, _)) => Some((start, l1_batch_number)),
None => Some((l1_batch_number, l1_batch_number)),
};
self.unlock_batch(l1_batch_number, request.tee_type).await?;
min_batch_number = Some(min_batch_number.unwrap_or(l1_batch_number) + 1);
}
Err(err) => {
self.unlock_batch(l1_batch_number, request.tee_type).await?;
break Err(RequestProcessorError::ObjectStore(err));
}
}
};

let tee_verifier_input: TeeVerifierInput = self
.blob_store
.get(l1_batch_number)
.await
.map_err(RequestProcessorError::ObjectStore)?;
if let Some((start, end)) = missing_range {
tracing::warn!(
"Blobs for batch numbers {} to {} not found in the object store. Marked as unpicked.",
start,
end
);
}

result
}

let response = TeeProofGenerationDataResponse(Some(Box::new(tee_verifier_input)));
async fn lock_batch_for_proving(
&self,
tee_type: TeeType,
min_batch_number: Option<L1BatchNumber>,
) -> Result<Option<L1BatchNumber>, RequestProcessorError> {
let result = self
.pool
.connection()
.await?
.tee_proof_generation_dal()
.lock_batch_for_proving(
tee_type,
self.config.proof_generation_timeout(),
min_batch_number,
)
.await?;
Ok(result)
}

Ok(Json(response))
async fn unlock_batch(
&self,
l1_batch_number: L1BatchNumber,
tee_type: TeeType,
) -> Result<(), RequestProcessorError> {
self.pool
.connection()
.await?
.tee_proof_generation_dal()
.unlock_batch(l1_batch_number, tee_type)
.await?;
Ok(())
}

pub(crate) async fn submit_proof(
Expand All @@ -75,11 +117,7 @@ impl TeeRequestProcessor {
Json(proof): Json<SubmitTeeProofRequest>,
) -> Result<Json<SubmitProofResponse>, RequestProcessorError> {
let l1_batch_number = L1BatchNumber(l1_batch_number);
let mut connection = self
.pool
.connection()
.await
.map_err(RequestProcessorError::Dal)?;
let mut connection = self.pool.connection().await?;
let mut dal = connection.tee_proof_generation_dal();

tracing::info!(
Expand All @@ -94,8 +132,7 @@ impl TeeRequestProcessor {
&proof.0.signature,
&proof.0.proof,
)
.await
.map_err(RequestProcessorError::Dal)?;
.await?;

Ok(Json(SubmitProofResponse::Success))
}
Expand All @@ -106,16 +143,11 @@ impl TeeRequestProcessor {
) -> Result<Json<RegisterTeeAttestationResponse>, RequestProcessorError> {
tracing::info!("Received attestation: {:?}", payload);

let mut connection = self
.pool
.connection()
.await
.map_err(RequestProcessorError::Dal)?;
let mut connection = self.pool.connection().await?;
let mut dal = connection.tee_proof_generation_dal();

dal.save_attestation(&payload.pubkey, &payload.attestation)
.await
.map_err(RequestProcessorError::Dal)?;
.await?;

Ok(Json(RegisterTeeAttestationResponse::Success))
}
Expand Down

0 comments on commit c162510

Please sign in to comment.