From 4c213c72665bb023cbae881c396ecd626331112f Mon Sep 17 00:00:00 2001 From: Ge Gao Date: Wed, 2 Oct 2024 20:07:19 -0400 Subject: [PATCH] remove un-necessary trait --- crates/sui-indexer/src/handlers/mod.rs | 38 ++++++++----------- .../handlers/objects_snapshot_processor.rs | 29 ++++---------- 2 files changed, 22 insertions(+), 45 deletions(-) diff --git a/crates/sui-indexer/src/handlers/mod.rs b/crates/sui-indexer/src/handlers/mod.rs index c51afe46bed01..4e8f0e275ce38 100644 --- a/crates/sui-indexer/src/handlers/mod.rs +++ b/crates/sui-indexer/src/handlers/mod.rs @@ -62,14 +62,11 @@ impl CommonHandler { Self { handler } } - async fn start_transform_and_load( + async fn start_transform_and_load( &self, - cp_receiver: mysten_metrics::metered_channel::Receiver, + cp_receiver: mysten_metrics::metered_channel::Receiver<(u64, T)>, cancel: CancellationToken, - ) -> IndexerResult<()> - where - E: CommitDataEnvelope, - { + ) -> IndexerResult<()> { let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE") .unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE.to_string()) .parse::() @@ -78,7 +75,7 @@ impl CommonHandler { .ready_chunks(checkpoint_commit_batch_size); let mut unprocessed = BTreeMap::new(); - let mut envelope_batch = vec![]; + let mut tuple_batch = vec![]; let mut next_cp_to_process = self .handler .get_watermark_hi() @@ -91,34 +88,34 @@ impl CommonHandler { return Ok(()); } - // Try to fetch new data envelope from the stream + // Try to fetch new data tuple from the stream match stream.next().now_or_never() { - Some(Some(envelope_chunk)) => { + Some(Some(tuple_chunk)) => { if cancel.is_cancelled() { return Ok(()); } - for envelope in envelope_chunk { - unprocessed.insert(envelope.sequence_number(), envelope); + for tuple in tuple_chunk { + unprocessed.insert(tuple.0, tuple); } } Some(None) => break, // Stream has ended - None => {} // No new data envelope available right now + None => {} // No new data tuple available right now } // Process unprocessed checkpoints, even no new checkpoints from stream let checkpoint_lag_limiter = self.handler.get_checkpoint_lag_limiter().await?; while next_cp_to_process <= checkpoint_lag_limiter { - if let Some(data_envelope) = unprocessed.remove(&next_cp_to_process) { - envelope_batch.push(data_envelope); + if let Some(data_tuple) = unprocessed.remove(&next_cp_to_process) { + tuple_batch.push(data_tuple); next_cp_to_process += 1; } else { break; } } - if !envelope_batch.is_empty() { - let last_checkpoint_seq = envelope_batch.last().unwrap().sequence_number(); - let batch = envelope_batch.into_iter().map(|c| c.data()).collect(); + if !tuple_batch.is_empty() { + let last_checkpoint_seq = tuple_batch.last().unwrap().0; + let batch = tuple_batch.into_iter().map(|t| t.1).collect(); self.handler.load(batch).await.map_err(|e| { IndexerError::PostgresWriteError(format!( "Failed to load transformed data into DB for handler {}: {}", @@ -127,7 +124,7 @@ impl CommonHandler { )) })?; self.handler.set_watermark_hi(last_checkpoint_seq).await?; - envelope_batch = vec![]; + tuple_batch = vec![]; } } Err(IndexerError::ChannelClosed(format!( @@ -156,8 +153,3 @@ pub trait Handler: Send + Sync { Ok(u64::MAX) } } - -pub trait CommitDataEnvelope: Send + Sync { - fn sequence_number(&self) -> u64; - fn data(self) -> T; -} diff --git a/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs b/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs index 1271129778154..e845ddf4bd6e7 100644 --- a/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs +++ b/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs @@ -17,12 +17,12 @@ use crate::{metrics::IndexerMetrics, store::IndexerStore}; use super::checkpoint_handler::CheckpointHandler; use super::TransactionObjectChangesToCommit; -use super::{CommitDataEnvelope, CommonHandler, Handler}; +use super::{CommonHandler, Handler}; #[derive(Clone)] pub struct ObjectsSnapshotHandler { pub store: PgIndexerStore, - pub sender: Sender, + pub sender: Sender<(u64, TransactionObjectChangesToCommit)>, snapshot_config: SnapshotLagConfig, metrics: IndexerMetrics, } @@ -37,10 +37,10 @@ impl Worker for ObjectsSnapshotHandler { async fn process_checkpoint(&self, checkpoint: &CheckpointData) -> anyhow::Result<()> { let transformed_data = CheckpointHandler::index_objects(checkpoint, &self.metrics).await?; self.sender - .send(ObjectsSnapshotCommitDataEnvelope { - sequence_number: checkpoint.checkpoint_summary.sequence_number, - data: transformed_data, - }) + .send(( + checkpoint.checkpoint_summary.sequence_number, + transformed_data, + )) .await?; Ok(()) } @@ -113,7 +113,7 @@ pub async fn start_objects_snapshot_handler( impl ObjectsSnapshotHandler { pub fn new( store: PgIndexerStore, - sender: Sender, + sender: Sender<(u64, TransactionObjectChangesToCommit)>, metrics: IndexerMetrics, snapshot_config: SnapshotLagConfig, ) -> ObjectsSnapshotHandler { @@ -125,18 +125,3 @@ impl ObjectsSnapshotHandler { } } } - -pub struct ObjectsSnapshotCommitDataEnvelope { - pub sequence_number: u64, - pub data: TransactionObjectChangesToCommit, -} - -impl CommitDataEnvelope for ObjectsSnapshotCommitDataEnvelope { - fn sequence_number(&self) -> u64 { - self.sequence_number - } - - fn data(self) -> TransactionObjectChangesToCommit { - self.data - } -}