From 5794439508396fbaf2c505afefa1b492b8f17fea Mon Sep 17 00:00:00 2001 From: Ge Gao Date: Wed, 2 Oct 2024 18:09:43 -0400 Subject: [PATCH] transform to worker --- crates/sui-indexer/src/handlers/mod.rs | 55 +++++++++---------- .../handlers/objects_snapshot_processor.rs | 51 +++++++++-------- 2 files changed, 55 insertions(+), 51 deletions(-) diff --git a/crates/sui-indexer/src/handlers/mod.rs b/crates/sui-indexer/src/handlers/mod.rs index a596d9ee6277a7..c51afe46bed014 100644 --- a/crates/sui-indexer/src/handlers/mod.rs +++ b/crates/sui-indexer/src/handlers/mod.rs @@ -6,7 +6,6 @@ use std::collections::BTreeMap; use async_trait::async_trait; use futures::{FutureExt, StreamExt}; -use sui_rest_api::CheckpointData; use tokio_util::sync::CancellationToken; use crate::{ @@ -63,11 +62,14 @@ impl CommonHandler { Self { handler } } - async fn start_transform_and_load( + async fn start_transform_and_load( &self, - cp_receiver: mysten_metrics::metered_channel::Receiver, + cp_receiver: mysten_metrics::metered_channel::Receiver, cancel: CancellationToken, - ) -> IndexerResult<()> { + ) -> IndexerResult<()> + where + E: CommitDataEnvelope, + { let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE") .unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE.to_string()) .parse::() @@ -76,7 +78,7 @@ impl CommonHandler { .ready_chunks(checkpoint_commit_batch_size); let mut unprocessed = BTreeMap::new(); - let mut batch: Vec = vec![]; + let mut envelope_batch = vec![]; let mut next_cp_to_process = self .handler .get_watermark_hi() @@ -89,42 +91,35 @@ impl CommonHandler { return Ok(()); } - // Try to fetch new checkpoints from the stream + // Try to fetch new data envelope from the stream match stream.next().now_or_never() { - Some(Some(indexed_checkpoint_batch)) => { + Some(Some(envelope_chunk)) => { if cancel.is_cancelled() { return Ok(()); } - for checkpoint in indexed_checkpoint_batch { - unprocessed - .insert(checkpoint.checkpoint_summary.sequence_number, checkpoint); + for envelope in envelope_chunk { + unprocessed.insert(envelope.sequence_number(), envelope); } } Some(None) => break, // Stream has ended - None => {} // No new checkpoints available right now + None => {} // No new data envelope available right now } // Process unprocessed checkpoints, even no new checkpoints from stream let checkpoint_lag_limiter = self.handler.get_checkpoint_lag_limiter().await?; while next_cp_to_process <= checkpoint_lag_limiter { - if let Some(checkpoint) = unprocessed.remove(&next_cp_to_process) { - batch.push(checkpoint); + if let Some(data_envelope) = unprocessed.remove(&next_cp_to_process) { + envelope_batch.push(data_envelope); next_cp_to_process += 1; } else { break; } } - if !batch.is_empty() { - let last_checkpoint_seq = batch.last().unwrap().checkpoint_summary.sequence_number; - let transformed_data = self.handler.transform(batch).await.map_err(|e| { - IndexerError::DataTransformationError(format!( - "Failed to transform checkpoint batch: {}. Handler: {}", - e, - self.handler.name() - )) - })?; - self.handler.load(transformed_data).await.map_err(|e| { + if !envelope_batch.is_empty() { + let last_checkpoint_seq = envelope_batch.last().unwrap().sequence_number(); + let batch = envelope_batch.into_iter().map(|c| c.data()).collect(); + self.handler.load(batch).await.map_err(|e| { IndexerError::PostgresWriteError(format!( "Failed to load transformed data into DB for handler {}: {}", self.handler.name(), @@ -132,7 +127,7 @@ impl CommonHandler { )) })?; self.handler.set_watermark_hi(last_checkpoint_seq).await?; - batch = vec![]; + envelope_batch = vec![]; } } Err(IndexerError::ChannelClosed(format!( @@ -147,11 +142,8 @@ pub trait Handler: Send + Sync { /// return handler name fn name(&self) -> String; - /// transform data from `CheckpointData` to .*ToCommit - async fn transform(&self, cp_batch: Vec) -> IndexerResult>; - - /// commit .*ToCommit to DB - async fn load(&self, tranformed_data: Vec) -> IndexerResult<()>; + /// commit batch of transformed data to DB + async fn load(&self, batch: Vec) -> IndexerResult<()>; /// read high watermark of the table DB async fn get_watermark_hi(&self) -> IndexerResult>; @@ -164,3 +156,8 @@ pub trait Handler: Send + Sync { Ok(u64::MAX) } } + +pub trait CommitDataEnvelope: Send + Sync { + fn sequence_number(&self) -> u64; + fn data(self) -> T; +} diff --git a/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs b/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs index dd6edb128a4d8b..1271129778154f 100644 --- a/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs +++ b/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs @@ -17,12 +17,12 @@ use crate::{metrics::IndexerMetrics, store::IndexerStore}; use super::checkpoint_handler::CheckpointHandler; use super::TransactionObjectChangesToCommit; -use super::{CommonHandler, Handler}; +use super::{CommitDataEnvelope, CommonHandler, Handler}; #[derive(Clone)] pub struct ObjectsSnapshotHandler { pub store: PgIndexerStore, - pub cp_sender: Sender, + pub sender: Sender, snapshot_config: SnapshotLagConfig, metrics: IndexerMetrics, } @@ -34,9 +34,14 @@ pub struct CheckpointObjectChanges { #[async_trait] impl Worker for ObjectsSnapshotHandler { - // ?? change Worker trait to use Arc to avoid clone async fn process_checkpoint(&self, checkpoint: &CheckpointData) -> anyhow::Result<()> { - self.cp_sender.send(checkpoint.clone()).await?; + let transformed_data = CheckpointHandler::index_objects(checkpoint, &self.metrics).await?; + self.sender + .send(ObjectsSnapshotCommitDataEnvelope { + sequence_number: checkpoint.checkpoint_summary.sequence_number, + data: transformed_data, + }) + .await?; Ok(()) } } @@ -47,19 +52,6 @@ impl Handler for ObjectsSnapshotHandler { "objects_snapshot_handler".to_string() } - async fn transform( - &self, - cp_batch: Vec, - ) -> IndexerResult> { - futures::future::join_all(cp_batch.into_iter().map(|checkpoint| { - let metrics = self.metrics.clone(); - async move { CheckpointHandler::index_objects(&checkpoint, &metrics).await } - })) - .await - .into_iter() - .collect::, _>>() - } - async fn load( &self, transformed_data: Vec, @@ -102,7 +94,7 @@ pub async fn start_objects_snapshot_handler( info!("Starting object snapshot handler..."); let global_metrics = get_metrics().unwrap(); - let (cp_sender, cp_receiver) = mysten_metrics::metered_channel::channel( + let (sender, receiver) = mysten_metrics::metered_channel::channel( 600, &global_metrics .channel_inflight @@ -110,26 +102,41 @@ pub async fn start_objects_snapshot_handler( ); let objects_snapshot_handler = - ObjectsSnapshotHandler::new(store.clone(), cp_sender, metrics.clone(), snapshot_config); + ObjectsSnapshotHandler::new(store.clone(), sender, metrics.clone(), snapshot_config); let watermark_hi = objects_snapshot_handler.get_watermark_hi().await?; let common_handler = CommonHandler::new(Box::new(objects_snapshot_handler.clone())); - spawn_monitored_task!(common_handler.start_transform_and_load(cp_receiver, cancel)); + spawn_monitored_task!(common_handler.start_transform_and_load(receiver, cancel)); Ok((objects_snapshot_handler, watermark_hi.unwrap_or_default())) } impl ObjectsSnapshotHandler { pub fn new( store: PgIndexerStore, - cp_sender: Sender, + sender: Sender, metrics: IndexerMetrics, snapshot_config: SnapshotLagConfig, ) -> ObjectsSnapshotHandler { Self { store, - cp_sender, + sender, metrics, snapshot_config, } } } + +pub struct ObjectsSnapshotCommitDataEnvelope { + pub sequence_number: u64, + pub data: TransactionObjectChangesToCommit, +} + +impl CommitDataEnvelope for ObjectsSnapshotCommitDataEnvelope { + fn sequence_number(&self) -> u64 { + self.sequence_number + } + + fn data(self) -> TransactionObjectChangesToCommit { + self.data + } +}