Skip to content

Commit

Permalink
feat(prover): WitnessGenerator refactoring #2 (#2899)
Browse files Browse the repository at this point in the history
## What ❔

Introduce WitnessGenerator trait
Rename some methods

## Why ❔

<!-- Why are these changes done? What goal do they contribute to? What
are the principles behind them? -->
<!-- Example: PR templates ensure PR reviewers, observers, and future
iterators are in context about the evolution of repos. -->

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
Artemka374 authored Sep 19, 2024
1 parent 8363c1d commit 36e5340
Show file tree
Hide file tree
Showing 20 changed files with 602 additions and 530 deletions.
24 changes: 6 additions & 18 deletions prover/crates/bin/witness_generator/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,33 @@ use zksync_prover_dal::{ConnectionPool, Prover};

#[derive(Debug)]
pub(crate) struct AggregationBlobUrls {
pub aggregations_urls: String,
pub aggregation_urls: String,
pub circuit_ids_and_urls: Vec<(u8, String)>,
}

#[derive(Debug)]
pub(crate) struct SchedulerBlobUrls {
pub circuit_ids_and_urls: Vec<(u8, String)>,
pub closed_form_inputs_and_urls: Vec<(u8, String, usize)>,
pub scheduler_witness_url: String,
}

pub(crate) enum BlobUrls {
Url(String),
Aggregation(AggregationBlobUrls),
Scheduler(SchedulerBlobUrls),
}

#[async_trait]
pub(crate) trait ArtifactsManager {
type InputMetadata;
type InputArtifacts;
type OutputArtifacts;
type BlobUrls;

async fn get_artifacts(
metadata: &Self::InputMetadata,
object_store: &dyn ObjectStore,
) -> anyhow::Result<Self::InputArtifacts>;

async fn save_artifacts(
async fn save_to_bucket(
job_id: u32,
artifacts: Self::OutputArtifacts,
object_store: &dyn ObjectStore,
) -> BlobUrls;
) -> Self::BlobUrls;

async fn update_database(
async fn save_to_database(
connection_pool: &ConnectionPool<Prover>,
job_id: u32,
started_at: Instant,
blob_urls: BlobUrls,
blob_urls: Self::BlobUrls,
artifacts: Self::OutputArtifacts,
) -> anyhow::Result<()>;
}
30 changes: 12 additions & 18 deletions prover/crates/bin/witness_generator/src/basic_circuits/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use zksync_prover_fri_utils::get_recursive_layer_circuit_id_for_base_layer;
use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber};

use crate::{
artifacts::{ArtifactsManager, BlobUrls},
artifacts::ArtifactsManager,
basic_circuits::{BasicCircuitArtifacts, BasicWitnessGenerator, BasicWitnessGeneratorJob},
utils::SchedulerPartialInputWrapper,
};
Expand All @@ -18,6 +18,7 @@ impl ArtifactsManager for BasicWitnessGenerator {
type InputMetadata = L1BatchNumber;
type InputArtifacts = BasicWitnessGeneratorJob;
type OutputArtifacts = BasicCircuitArtifacts;
type BlobUrls = String;

async fn get_artifacts(
metadata: &Self::InputMetadata,
Expand All @@ -31,38 +32,31 @@ impl ArtifactsManager for BasicWitnessGenerator {
})
}

async fn save_artifacts(
async fn save_to_bucket(
job_id: u32,
artifacts: Self::OutputArtifacts,
object_store: &dyn ObjectStore,
) -> BlobUrls {
) -> String {
let aux_output_witness_wrapper = AuxOutputWitnessWrapper(artifacts.aux_output_witness);
object_store
.put(L1BatchNumber(job_id), &aux_output_witness_wrapper)
.await
.unwrap();
let wrapper = SchedulerPartialInputWrapper(artifacts.scheduler_witness);
let url = object_store
object_store
.put(L1BatchNumber(job_id), &wrapper)
.await
.unwrap();

BlobUrls::Url(url)
.unwrap()
}

#[tracing::instrument(skip_all, fields(l1_batch = %job_id))]
async fn update_database(
async fn save_to_database(
connection_pool: &ConnectionPool<Prover>,
job_id: u32,
started_at: Instant,
blob_urls: BlobUrls,
_artifacts: Self::OutputArtifacts,
blob_urls: String,
artifacts: Self::OutputArtifacts,
) -> anyhow::Result<()> {
let blob_urls = match blob_urls {
BlobUrls::Scheduler(blobs) => blobs,
_ => unreachable!(),
};

let mut connection = connection_pool
.connection()
.await
Expand All @@ -79,7 +73,7 @@ impl ArtifactsManager for BasicWitnessGenerator {
.fri_prover_jobs_dal()
.insert_prover_jobs(
L1BatchNumber(job_id),
blob_urls.circuit_ids_and_urls,
artifacts.circuit_urls,
AggregationRound::BasicCircuits,
0,
protocol_version_id,
Expand All @@ -89,8 +83,8 @@ impl ArtifactsManager for BasicWitnessGenerator {
.fri_witness_generator_dal()
.create_aggregation_jobs(
L1BatchNumber(job_id),
&blob_urls.closed_form_inputs_and_urls,
&blob_urls.scheduler_witness_url,
&artifacts.queue_urls,
&blob_urls,
get_recursive_layer_circuit_id_for_base_layer,
protocol_version_id,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ use anyhow::Context as _;
use tracing::Instrument;
use zksync_prover_dal::ProverDal;
use zksync_prover_fri_types::{get_current_pod_name, AuxOutputWitnessWrapper};
use zksync_prover_keystore::keystore::Keystore;
use zksync_queued_job_processor::{async_trait, JobProcessor};
use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber};

use crate::{
artifacts::{ArtifactsManager, BlobUrls, SchedulerBlobUrls},
artifacts::ArtifactsManager,
basic_circuits::{BasicCircuitArtifacts, BasicWitnessGenerator, BasicWitnessGeneratorJob},
metrics::WITNESS_GENERATOR_METRICS,
witness_generator::WitnessGenerator,
};

#[async_trait]
Expand All @@ -35,19 +37,15 @@ impl JobProcessor for BasicWitnessGenerator {
)
.await
{
Some(block_number) => {
tracing::info!(
"Processing FRI basic witness-gen for block {}",
block_number
);
let started_at = Instant::now();
let job = Self::get_artifacts(&block_number, &*self.object_store).await?;

WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::BasicCircuits.into()]
.observe(started_at.elapsed());

Ok(Some((block_number, job)))
}
Some(block_number) => Ok(Some((
block_number,
<Self as WitnessGenerator>::prepare_job(
block_number,
&*self.object_store,
Keystore::locate(), // todo: this should be removed
)
.await?,
))),
None => Ok(None),
}
}
Expand All @@ -73,11 +71,15 @@ impl JobProcessor for BasicWitnessGenerator {
let max_circuits_in_flight = self.config.max_circuits_in_flight;
tokio::spawn(async move {
let block_number = job.block_number;
Ok(
Self::process_job_impl(object_store, job, started_at, max_circuits_in_flight)
.instrument(tracing::info_span!("basic_circuit", %block_number))
.await,
<Self as WitnessGenerator>::process_job(
job,
object_store,
Some(max_circuits_in_flight),
started_at,
)
.instrument(tracing::info_span!("basic_circuit", %block_number))
.await
.map(Some)
})
}

Expand All @@ -92,8 +94,6 @@ impl JobProcessor for BasicWitnessGenerator {
None => Ok(()),
Some(artifacts) => {
let blob_started_at = Instant::now();
let circuit_urls = artifacts.circuit_urls.clone();
let queue_urls = artifacts.queue_urls.clone();

let aux_output_witness_wrapper =
AuxOutputWitnessWrapper(artifacts.aux_output_witness.clone());
Expand All @@ -105,26 +105,17 @@ impl JobProcessor for BasicWitnessGenerator {
.unwrap();
}

let scheduler_witness_url =
match Self::save_artifacts(job_id.0, artifacts.clone(), &*self.object_store)
.await
{
BlobUrls::Url(url) => url,
_ => unreachable!(),
};
let blob_urls =
Self::save_to_bucket(job_id.0, artifacts.clone(), &*self.object_store).await;

WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::BasicCircuits.into()]
.observe(blob_started_at.elapsed());

Self::update_database(
Self::save_to_database(
&self.prover_connection_pool,
job_id.0,
started_at,
BlobUrls::Scheduler(SchedulerBlobUrls {
circuit_ids_and_urls: circuit_urls,
closed_form_inputs_and_urls: queue_urls,
scheduler_witness_url,
}),
blob_urls,
artifacts,
)
.await?;
Expand Down
Loading

0 comments on commit 36e5340

Please sign in to comment.