Skip to content

Commit

Permalink
remove un-necessary trait
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp committed Oct 3, 2024
1 parent 5794439 commit 4c213c7
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 45 deletions.
38 changes: 15 additions & 23 deletions crates/sui-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,11 @@ impl<T> CommonHandler<T> {
Self { handler }
}

async fn start_transform_and_load<E>(
async fn start_transform_and_load(
&self,
cp_receiver: mysten_metrics::metered_channel::Receiver<E>,
cp_receiver: mysten_metrics::metered_channel::Receiver<(u64, T)>,
cancel: CancellationToken,
) -> IndexerResult<()>
where
E: CommitDataEnvelope<T>,
{
) -> IndexerResult<()> {
let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE")
.unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE.to_string())
.parse::<usize>()
Expand All @@ -78,7 +75,7 @@ impl<T> CommonHandler<T> {
.ready_chunks(checkpoint_commit_batch_size);

let mut unprocessed = BTreeMap::new();
let mut envelope_batch = vec![];
let mut tuple_batch = vec![];
let mut next_cp_to_process = self
.handler
.get_watermark_hi()
Expand All @@ -91,34 +88,34 @@ impl<T> CommonHandler<T> {
return Ok(());
}

// Try to fetch new data envelope from the stream
// Try to fetch new data tuple from the stream
match stream.next().now_or_never() {
Some(Some(envelope_chunk)) => {
Some(Some(tuple_chunk)) => {
if cancel.is_cancelled() {
return Ok(());
}
for envelope in envelope_chunk {
unprocessed.insert(envelope.sequence_number(), envelope);
for tuple in tuple_chunk {
unprocessed.insert(tuple.0, tuple);
}
}
Some(None) => break, // Stream has ended
None => {} // No new data envelope available right now
None => {} // No new data tuple available right now
}

// Process unprocessed checkpoints, even no new checkpoints from stream
let checkpoint_lag_limiter = self.handler.get_checkpoint_lag_limiter().await?;
while next_cp_to_process <= checkpoint_lag_limiter {
if let Some(data_envelope) = unprocessed.remove(&next_cp_to_process) {
envelope_batch.push(data_envelope);
if let Some(data_tuple) = unprocessed.remove(&next_cp_to_process) {
tuple_batch.push(data_tuple);
next_cp_to_process += 1;
} else {
break;
}
}

if !envelope_batch.is_empty() {
let last_checkpoint_seq = envelope_batch.last().unwrap().sequence_number();
let batch = envelope_batch.into_iter().map(|c| c.data()).collect();
if !tuple_batch.is_empty() {
let last_checkpoint_seq = tuple_batch.last().unwrap().0;
let batch = tuple_batch.into_iter().map(|t| t.1).collect();
self.handler.load(batch).await.map_err(|e| {
IndexerError::PostgresWriteError(format!(
"Failed to load transformed data into DB for handler {}: {}",
Expand All @@ -127,7 +124,7 @@ impl<T> CommonHandler<T> {
))
})?;
self.handler.set_watermark_hi(last_checkpoint_seq).await?;
envelope_batch = vec![];
tuple_batch = vec![];
}
}
Err(IndexerError::ChannelClosed(format!(
Expand Down Expand Up @@ -156,8 +153,3 @@ pub trait Handler<T>: Send + Sync {
Ok(u64::MAX)
}
}

pub trait CommitDataEnvelope<T>: Send + Sync {
fn sequence_number(&self) -> u64;
fn data(self) -> T;
}
29 changes: 7 additions & 22 deletions crates/sui-indexer/src/handlers/objects_snapshot_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use crate::{metrics::IndexerMetrics, store::IndexerStore};

use super::checkpoint_handler::CheckpointHandler;
use super::TransactionObjectChangesToCommit;
use super::{CommitDataEnvelope, CommonHandler, Handler};
use super::{CommonHandler, Handler};

#[derive(Clone)]
pub struct ObjectsSnapshotHandler {
pub store: PgIndexerStore,
pub sender: Sender<ObjectsSnapshotCommitDataEnvelope>,
pub sender: Sender<(u64, TransactionObjectChangesToCommit)>,
snapshot_config: SnapshotLagConfig,
metrics: IndexerMetrics,
}
Expand All @@ -37,10 +37,10 @@ impl Worker for ObjectsSnapshotHandler {
async fn process_checkpoint(&self, checkpoint: &CheckpointData) -> anyhow::Result<()> {
let transformed_data = CheckpointHandler::index_objects(checkpoint, &self.metrics).await?;
self.sender
.send(ObjectsSnapshotCommitDataEnvelope {
sequence_number: checkpoint.checkpoint_summary.sequence_number,
data: transformed_data,
})
.send((
checkpoint.checkpoint_summary.sequence_number,
transformed_data,
))
.await?;
Ok(())
}
Expand Down Expand Up @@ -113,7 +113,7 @@ pub async fn start_objects_snapshot_handler(
impl ObjectsSnapshotHandler {
pub fn new(
store: PgIndexerStore,
sender: Sender<ObjectsSnapshotCommitDataEnvelope>,
sender: Sender<(u64, TransactionObjectChangesToCommit)>,
metrics: IndexerMetrics,
snapshot_config: SnapshotLagConfig,
) -> ObjectsSnapshotHandler {
Expand All @@ -125,18 +125,3 @@ impl ObjectsSnapshotHandler {
}
}
}

pub struct ObjectsSnapshotCommitDataEnvelope {
pub sequence_number: u64,
pub data: TransactionObjectChangesToCommit,
}

impl CommitDataEnvelope<TransactionObjectChangesToCommit> for ObjectsSnapshotCommitDataEnvelope {
fn sequence_number(&self) -> u64 {
self.sequence_number
}

fn data(self) -> TransactionObjectChangesToCommit {
self.data
}
}

0 comments on commit 4c213c7

Please sign in to comment.