From c162510598b45dc062c2c91085868f8aa966360e Mon Sep 17 00:00:00 2001 From: Patrick Date: Mon, 26 Aug 2024 10:53:32 +0200 Subject: [PATCH] fix(proof_data_handler): TEE blob fetching error handling (#2674) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We ran into a problem in the staging environment where TEE blob fetching failed because of a 30-day retention policy on blobs in Google Cloud Storage. The TEE prover was failing for all old batches (`l1_batch_number < 58300`). This commit fixes the issue by adding better error handling when the blob for a given batch number isn't available. ## What ❔ Graceful error handling for the TEE proof data handler when there is no blob in Google Cloud Storage for the specified batch number. ## Why ❔ We need more robust error handling. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. --- core/bin/zksync_tee_prover/src/tee_prover.rs | 2 +- ...fee3209a950943dc2b4da82c324e1c09132f.json} | 7 +- ...468765628fd2c3b7c2a408d18b5aba0df9a30.json | 15 +++ core/lib/dal/doc/TeeProofGenerationDal.md | 4 +- core/lib/dal/src/tee_proof_generation_dal.rs | 46 ++++++- core/node/proof_data_handler/src/errors.rs | 6 + .../src/tee_request_processor.rs | 120 +++++++++++------- 7 files changed, 146 insertions(+), 54 deletions(-) rename core/lib/dal/.sqlx/{query-286f27e32a152c293d07e7c22e893c6f5a43386d4183745a4668507cf672b3f6.json => query-47975cc0b5e4f3a6b5224cb452b8fee3209a950943dc2b4da82c324e1c09132f.json} (75%) create mode 100644 core/lib/dal/.sqlx/query-5c5bdb0e419049f9fb4d8b3bbec468765628fd2c3b7c2a408d18b5aba0df9a30.json diff --git a/core/bin/zksync_tee_prover/src/tee_prover.rs b/core/bin/zksync_tee_prover/src/tee_prover.rs index 64a3a9c5749..7f874533b4b 100644 --- a/core/bin/zksync_tee_prover/src/tee_prover.rs +++ b/core/bin/zksync_tee_prover/src/tee_prover.rs @@ -201,8 +201,8 @@ impl Task for TeeProver { if !err.is_retriable() || retries > self.config.max_retries { return Err(err.into()); } - retries += 1; tracing::warn!(%err, "Failed TEE prover step function {retries}/{}, retrying in {} milliseconds.", self.config.max_retries, backoff.as_millis()); + retries += 1; backoff = std::cmp::min( backoff.mul_f32(self.config.retry_backoff_multiplier), self.config.max_backoff, diff --git a/core/lib/dal/.sqlx/query-286f27e32a152c293d07e7c22e893c6f5a43386d4183745a4668507cf672b3f6.json b/core/lib/dal/.sqlx/query-47975cc0b5e4f3a6b5224cb452b8fee3209a950943dc2b4da82c324e1c09132f.json similarity index 75% rename from core/lib/dal/.sqlx/query-286f27e32a152c293d07e7c22e893c6f5a43386d4183745a4668507cf672b3f6.json rename to core/lib/dal/.sqlx/query-47975cc0b5e4f3a6b5224cb452b8fee3209a950943dc2b4da82c324e1c09132f.json index 540660bddf3..7e5f9e1713c 100644 --- a/core/lib/dal/.sqlx/query-286f27e32a152c293d07e7c22e893c6f5a43386d4183745a4668507cf672b3f6.json +++ b/core/lib/dal/.sqlx/query-47975cc0b5e4f3a6b5224cb452b8fee3209a950943dc2b4da82c324e1c09132f.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE tee_proof_generation_details\n SET\n status = 'picked_by_prover',\n updated_at = NOW(),\n prover_taken_at = NOW()\n WHERE\n tee_type = $1\n AND l1_batch_number = (\n SELECT\n proofs.l1_batch_number\n FROM\n tee_proof_generation_details AS proofs\n JOIN tee_verifier_input_producer_jobs AS inputs ON proofs.l1_batch_number = inputs.l1_batch_number\n WHERE\n inputs.status = $2\n AND (\n proofs.status = 'ready_to_be_proven'\n OR (\n proofs.status = 'picked_by_prover'\n AND proofs.prover_taken_at < NOW() - $3::INTERVAL\n )\n )\n ORDER BY\n l1_batch_number ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n tee_proof_generation_details.l1_batch_number\n ", + "query": "\n UPDATE tee_proof_generation_details\n SET\n status = 'picked_by_prover',\n updated_at = NOW(),\n prover_taken_at = NOW()\n WHERE\n tee_type = $1\n AND l1_batch_number = (\n SELECT\n proofs.l1_batch_number\n FROM\n tee_proof_generation_details AS proofs\n JOIN tee_verifier_input_producer_jobs AS inputs ON proofs.l1_batch_number = inputs.l1_batch_number\n WHERE\n inputs.status = $2\n AND (\n proofs.status = 'ready_to_be_proven'\n OR (\n proofs.status = 'picked_by_prover'\n AND proofs.prover_taken_at < NOW() - $3::INTERVAL\n )\n )\n AND proofs.l1_batch_number >= $4\n ORDER BY\n l1_batch_number ASC\n LIMIT\n 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n tee_proof_generation_details.l1_batch_number\n ", "describe": { "columns": [ { @@ -26,12 +26,13 @@ } } }, - "Interval" + "Interval", + "Int8" ] }, "nullable": [ false ] }, - "hash": "286f27e32a152c293d07e7c22e893c6f5a43386d4183745a4668507cf672b3f6" + "hash": "47975cc0b5e4f3a6b5224cb452b8fee3209a950943dc2b4da82c324e1c09132f" } diff --git a/core/lib/dal/.sqlx/query-5c5bdb0e419049f9fb4d8b3bbec468765628fd2c3b7c2a408d18b5aba0df9a30.json b/core/lib/dal/.sqlx/query-5c5bdb0e419049f9fb4d8b3bbec468765628fd2c3b7c2a408d18b5aba0df9a30.json new file mode 100644 index 00000000000..2d9a24d6d79 --- /dev/null +++ b/core/lib/dal/.sqlx/query-5c5bdb0e419049f9fb4d8b3bbec468765628fd2c3b7c2a408d18b5aba0df9a30.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE tee_proof_generation_details\n SET\n status = 'unpicked',\n updated_at = NOW()\n WHERE\n l1_batch_number = $1\n AND tee_type = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Text" + ] + }, + "nullable": [] + }, + "hash": "5c5bdb0e419049f9fb4d8b3bbec468765628fd2c3b7c2a408d18b5aba0df9a30" +} diff --git a/core/lib/dal/doc/TeeProofGenerationDal.md b/core/lib/dal/doc/TeeProofGenerationDal.md index 23474d5cb5c..167e6b3c42c 100644 --- a/core/lib/dal/doc/TeeProofGenerationDal.md +++ b/core/lib/dal/doc/TeeProofGenerationDal.md @@ -12,8 +12,10 @@ title: Status Diagram --- stateDiagram-v2 [*] --> ready_to_be_proven : insert_tee_proof_generation_job -ready_to_be_proven --> picked_by_prover : get_next_batch_to_be_proven +ready_to_be_proven --> picked_by_prover : lock_batch_for_proving picked_by_prover --> generated : save_proof_artifacts_metadata generated --> [*] +picked_by_prover --> unpicked : unlock_batch +unpicked --> [*] ``` diff --git a/core/lib/dal/src/tee_proof_generation_dal.rs b/core/lib/dal/src/tee_proof_generation_dal.rs index 2bd73323eb1..80e364273f6 100644 --- a/core/lib/dal/src/tee_proof_generation_dal.rs +++ b/core/lib/dal/src/tee_proof_generation_dal.rs @@ -2,7 +2,9 @@ use std::time::Duration; use zksync_db_connection::{ - connection::Connection, error::DalResult, instrument::Instrumented, + connection::Connection, + error::DalResult, + instrument::{InstrumentExt, Instrumented}, utils::pg_interval_from_duration, }; use zksync_types::{tee_types::TeeType, L1BatchNumber}; @@ -18,12 +20,14 @@ pub struct TeeProofGenerationDal<'a, 'c> { } impl TeeProofGenerationDal<'_, '_> { - pub async fn get_next_batch_to_be_proven( + pub async fn lock_batch_for_proving( &mut self, tee_type: TeeType, processing_timeout: Duration, + min_batch_number: Option, ) -> DalResult> { let processing_timeout = pg_interval_from_duration(processing_timeout); + let min_batch_number = min_batch_number.map_or(0, |num| i64::from(num.0)); let query = sqlx::query!( r#" UPDATE tee_proof_generation_details @@ -48,6 +52,7 @@ impl TeeProofGenerationDal<'_, '_> { AND proofs.prover_taken_at < NOW() - $3::INTERVAL ) ) + AND proofs.l1_batch_number >= $4 ORDER BY l1_batch_number ASC LIMIT @@ -58,13 +63,16 @@ impl TeeProofGenerationDal<'_, '_> { RETURNING tee_proof_generation_details.l1_batch_number "#, - &tee_type.to_string(), + tee_type.to_string(), TeeVerifierInputProducerJobStatus::Successful as TeeVerifierInputProducerJobStatus, - &processing_timeout, + processing_timeout, + min_batch_number ); - let batch_number = Instrumented::new("get_next_batch_to_be_proven") + + let batch_number = Instrumented::new("lock_batch_for_proving") .with_arg("tee_type", &tee_type) .with_arg("processing_timeout", &processing_timeout) + .with_arg("l1_batch_number", &min_batch_number) .with(query) .fetch_optional(self.storage) .await? @@ -73,6 +81,34 @@ impl TeeProofGenerationDal<'_, '_> { Ok(batch_number) } + pub async fn unlock_batch( + &mut self, + l1_batch_number: L1BatchNumber, + tee_type: TeeType, + ) -> DalResult<()> { + let batch_number = i64::from(l1_batch_number.0); + sqlx::query!( + r#" + UPDATE tee_proof_generation_details + SET + status = 'unpicked', + updated_at = NOW() + WHERE + l1_batch_number = $1 + AND tee_type = $2 + "#, + batch_number, + tee_type.to_string() + ) + .instrument("unlock_batch") + .with_arg("l1_batch_number", &batch_number) + .with_arg("tee_type", &tee_type) + .execute(self.storage) + .await?; + + Ok(()) + } + pub async fn save_proof_artifacts_metadata( &mut self, batch_number: L1BatchNumber, diff --git a/core/node/proof_data_handler/src/errors.rs b/core/node/proof_data_handler/src/errors.rs index f170b3b53e7..15ef393294a 100644 --- a/core/node/proof_data_handler/src/errors.rs +++ b/core/node/proof_data_handler/src/errors.rs @@ -10,6 +10,12 @@ pub(crate) enum RequestProcessorError { Dal(DalError), } +impl From for RequestProcessorError { + fn from(err: DalError) -> Self { + RequestProcessorError::Dal(err) + } +} + impl IntoResponse for RequestProcessorError { fn into_response(self) -> Response { let (status_code, message) = match self { diff --git a/core/node/proof_data_handler/src/tee_request_processor.rs b/core/node/proof_data_handler/src/tee_request_processor.rs index d85591dd2c9..4ae1a5026f1 100644 --- a/core/node/proof_data_handler/src/tee_request_processor.rs +++ b/core/node/proof_data_handler/src/tee_request_processor.rs @@ -3,15 +3,12 @@ use std::sync::Arc; use axum::{extract::Path, Json}; use zksync_config::configs::ProofDataHandlerConfig; use zksync_dal::{ConnectionPool, Core, CoreDal}; -use zksync_object_store::ObjectStore; -use zksync_prover_interface::{ - api::{ - RegisterTeeAttestationRequest, RegisterTeeAttestationResponse, SubmitProofResponse, - SubmitTeeProofRequest, TeeProofGenerationDataRequest, TeeProofGenerationDataResponse, - }, - inputs::TeeVerifierInput, +use zksync_object_store::{ObjectStore, ObjectStoreError}; +use zksync_prover_interface::api::{ + RegisterTeeAttestationRequest, RegisterTeeAttestationResponse, SubmitProofResponse, + SubmitTeeProofRequest, TeeProofGenerationDataRequest, TeeProofGenerationDataResponse, }; -use zksync_types::L1BatchNumber; +use zksync_types::{tee_types::TeeType, L1BatchNumber}; use crate::errors::RequestProcessorError; @@ -41,32 +38,77 @@ impl TeeRequestProcessor { ) -> Result, RequestProcessorError> { tracing::info!("Received request for proof generation data: {:?}", request); - let mut connection = self - .pool - .connection() - .await - .map_err(RequestProcessorError::Dal)?; - - let l1_batch_number_result = connection - .tee_proof_generation_dal() - .get_next_batch_to_be_proven(request.tee_type, self.config.proof_generation_timeout()) - .await - .map_err(RequestProcessorError::Dal)?; - - let l1_batch_number = match l1_batch_number_result { - Some(number) => number, - None => return Ok(Json(TeeProofGenerationDataResponse(None))), + let mut min_batch_number: Option = None; + let mut missing_range: Option<(L1BatchNumber, L1BatchNumber)> = None; + + let result = loop { + let l1_batch_number = match self + .lock_batch_for_proving(request.tee_type, min_batch_number) + .await? + { + Some(number) => number, + None => break Ok(Json(TeeProofGenerationDataResponse(None))), + }; + + match self.blob_store.get(l1_batch_number).await { + Ok(input) => break Ok(Json(TeeProofGenerationDataResponse(Some(Box::new(input))))), + Err(ObjectStoreError::KeyNotFound(_)) => { + missing_range = match missing_range { + Some((start, _)) => Some((start, l1_batch_number)), + None => Some((l1_batch_number, l1_batch_number)), + }; + self.unlock_batch(l1_batch_number, request.tee_type).await?; + min_batch_number = Some(min_batch_number.unwrap_or(l1_batch_number) + 1); + } + Err(err) => { + self.unlock_batch(l1_batch_number, request.tee_type).await?; + break Err(RequestProcessorError::ObjectStore(err)); + } + } }; - let tee_verifier_input: TeeVerifierInput = self - .blob_store - .get(l1_batch_number) - .await - .map_err(RequestProcessorError::ObjectStore)?; + if let Some((start, end)) = missing_range { + tracing::warn!( + "Blobs for batch numbers {} to {} not found in the object store. Marked as unpicked.", + start, + end + ); + } + + result + } - let response = TeeProofGenerationDataResponse(Some(Box::new(tee_verifier_input))); + async fn lock_batch_for_proving( + &self, + tee_type: TeeType, + min_batch_number: Option, + ) -> Result, RequestProcessorError> { + let result = self + .pool + .connection() + .await? + .tee_proof_generation_dal() + .lock_batch_for_proving( + tee_type, + self.config.proof_generation_timeout(), + min_batch_number, + ) + .await?; + Ok(result) + } - Ok(Json(response)) + async fn unlock_batch( + &self, + l1_batch_number: L1BatchNumber, + tee_type: TeeType, + ) -> Result<(), RequestProcessorError> { + self.pool + .connection() + .await? + .tee_proof_generation_dal() + .unlock_batch(l1_batch_number, tee_type) + .await?; + Ok(()) } pub(crate) async fn submit_proof( @@ -75,11 +117,7 @@ impl TeeRequestProcessor { Json(proof): Json, ) -> Result, RequestProcessorError> { let l1_batch_number = L1BatchNumber(l1_batch_number); - let mut connection = self - .pool - .connection() - .await - .map_err(RequestProcessorError::Dal)?; + let mut connection = self.pool.connection().await?; let mut dal = connection.tee_proof_generation_dal(); tracing::info!( @@ -94,8 +132,7 @@ impl TeeRequestProcessor { &proof.0.signature, &proof.0.proof, ) - .await - .map_err(RequestProcessorError::Dal)?; + .await?; Ok(Json(SubmitProofResponse::Success)) } @@ -106,16 +143,11 @@ impl TeeRequestProcessor { ) -> Result, RequestProcessorError> { tracing::info!("Received attestation: {:?}", payload); - let mut connection = self - .pool - .connection() - .await - .map_err(RequestProcessorError::Dal)?; + let mut connection = self.pool.connection().await?; let mut dal = connection.tee_proof_generation_dal(); dal.save_attestation(&payload.pubkey, &payload.attestation) - .await - .map_err(RequestProcessorError::Dal)?; + .await?; Ok(Json(RegisterTeeAttestationResponse::Success)) }