Skip to content

Commit

Permalink
transform to worker
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp committed Oct 2, 2024
1 parent 1011b7b commit 5794439
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 51 deletions.
55 changes: 26 additions & 29 deletions crates/sui-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::collections::BTreeMap;
use async_trait::async_trait;
use futures::{FutureExt, StreamExt};

use sui_rest_api::CheckpointData;
use tokio_util::sync::CancellationToken;

use crate::{
Expand Down Expand Up @@ -63,11 +62,14 @@ impl<T> CommonHandler<T> {
Self { handler }
}

async fn start_transform_and_load(
async fn start_transform_and_load<E>(
&self,
cp_receiver: mysten_metrics::metered_channel::Receiver<CheckpointData>,
cp_receiver: mysten_metrics::metered_channel::Receiver<E>,
cancel: CancellationToken,
) -> IndexerResult<()> {
) -> IndexerResult<()>
where
E: CommitDataEnvelope<T>,
{
let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE")
.unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE.to_string())
.parse::<usize>()
Expand All @@ -76,7 +78,7 @@ impl<T> CommonHandler<T> {
.ready_chunks(checkpoint_commit_batch_size);

let mut unprocessed = BTreeMap::new();
let mut batch: Vec<CheckpointData> = vec![];
let mut envelope_batch = vec![];
let mut next_cp_to_process = self
.handler
.get_watermark_hi()
Expand All @@ -89,50 +91,43 @@ impl<T> CommonHandler<T> {
return Ok(());
}

// Try to fetch new checkpoints from the stream
// Try to fetch new data envelope from the stream
match stream.next().now_or_never() {
Some(Some(indexed_checkpoint_batch)) => {
Some(Some(envelope_chunk)) => {
if cancel.is_cancelled() {
return Ok(());
}
for checkpoint in indexed_checkpoint_batch {
unprocessed
.insert(checkpoint.checkpoint_summary.sequence_number, checkpoint);
for envelope in envelope_chunk {
unprocessed.insert(envelope.sequence_number(), envelope);
}
}
Some(None) => break, // Stream has ended
None => {} // No new checkpoints available right now
None => {} // No new data envelope available right now
}

// Process unprocessed checkpoints, even no new checkpoints from stream
let checkpoint_lag_limiter = self.handler.get_checkpoint_lag_limiter().await?;
while next_cp_to_process <= checkpoint_lag_limiter {
if let Some(checkpoint) = unprocessed.remove(&next_cp_to_process) {
batch.push(checkpoint);
if let Some(data_envelope) = unprocessed.remove(&next_cp_to_process) {
envelope_batch.push(data_envelope);
next_cp_to_process += 1;
} else {
break;
}
}

if !batch.is_empty() {
let last_checkpoint_seq = batch.last().unwrap().checkpoint_summary.sequence_number;
let transformed_data = self.handler.transform(batch).await.map_err(|e| {
IndexerError::DataTransformationError(format!(
"Failed to transform checkpoint batch: {}. Handler: {}",
e,
self.handler.name()
))
})?;
self.handler.load(transformed_data).await.map_err(|e| {
if !envelope_batch.is_empty() {
let last_checkpoint_seq = envelope_batch.last().unwrap().sequence_number();
let batch = envelope_batch.into_iter().map(|c| c.data()).collect();
self.handler.load(batch).await.map_err(|e| {
IndexerError::PostgresWriteError(format!(
"Failed to load transformed data into DB for handler {}: {}",
self.handler.name(),
e
))
})?;
self.handler.set_watermark_hi(last_checkpoint_seq).await?;
batch = vec![];
envelope_batch = vec![];
}
}
Err(IndexerError::ChannelClosed(format!(
Expand All @@ -147,11 +142,8 @@ pub trait Handler<T>: Send + Sync {
/// return handler name
fn name(&self) -> String;

/// transform data from `CheckpointData` to .*ToCommit
async fn transform(&self, cp_batch: Vec<CheckpointData>) -> IndexerResult<Vec<T>>;

/// commit .*ToCommit to DB
async fn load(&self, tranformed_data: Vec<T>) -> IndexerResult<()>;
/// commit batch of transformed data to DB
async fn load(&self, batch: Vec<T>) -> IndexerResult<()>;

/// read high watermark of the table DB
async fn get_watermark_hi(&self) -> IndexerResult<Option<u64>>;
Expand All @@ -164,3 +156,8 @@ pub trait Handler<T>: Send + Sync {
Ok(u64::MAX)
}
}

pub trait CommitDataEnvelope<T>: Send + Sync {
fn sequence_number(&self) -> u64;
fn data(self) -> T;
}
51 changes: 29 additions & 22 deletions crates/sui-indexer/src/handlers/objects_snapshot_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use crate::{metrics::IndexerMetrics, store::IndexerStore};

use super::checkpoint_handler::CheckpointHandler;
use super::TransactionObjectChangesToCommit;
use super::{CommonHandler, Handler};
use super::{CommitDataEnvelope, CommonHandler, Handler};

#[derive(Clone)]
pub struct ObjectsSnapshotHandler {
pub store: PgIndexerStore,
pub cp_sender: Sender<CheckpointData>,
pub sender: Sender<ObjectsSnapshotCommitDataEnvelope>,
snapshot_config: SnapshotLagConfig,
metrics: IndexerMetrics,
}
Expand All @@ -34,9 +34,14 @@ pub struct CheckpointObjectChanges {

#[async_trait]
impl Worker for ObjectsSnapshotHandler {
// ?? change Worker trait to use Arc<CheckpointData> to avoid clone
async fn process_checkpoint(&self, checkpoint: &CheckpointData) -> anyhow::Result<()> {
self.cp_sender.send(checkpoint.clone()).await?;
let transformed_data = CheckpointHandler::index_objects(checkpoint, &self.metrics).await?;
self.sender
.send(ObjectsSnapshotCommitDataEnvelope {
sequence_number: checkpoint.checkpoint_summary.sequence_number,
data: transformed_data,
})
.await?;
Ok(())
}
}
Expand All @@ -47,19 +52,6 @@ impl Handler<TransactionObjectChangesToCommit> for ObjectsSnapshotHandler {
"objects_snapshot_handler".to_string()
}

async fn transform(
&self,
cp_batch: Vec<CheckpointData>,
) -> IndexerResult<Vec<TransactionObjectChangesToCommit>> {
futures::future::join_all(cp_batch.into_iter().map(|checkpoint| {
let metrics = self.metrics.clone();
async move { CheckpointHandler::index_objects(&checkpoint, &metrics).await }
}))
.await
.into_iter()
.collect::<Result<Vec<TransactionObjectChangesToCommit>, _>>()
}

async fn load(
&self,
transformed_data: Vec<TransactionObjectChangesToCommit>,
Expand Down Expand Up @@ -102,34 +94,49 @@ pub async fn start_objects_snapshot_handler(
info!("Starting object snapshot handler...");

let global_metrics = get_metrics().unwrap();
let (cp_sender, cp_receiver) = mysten_metrics::metered_channel::channel(
let (sender, receiver) = mysten_metrics::metered_channel::channel(
600,
&global_metrics
.channel_inflight
.with_label_values(&["objects_snapshot_handler_checkpoint_data"]),
);

let objects_snapshot_handler =
ObjectsSnapshotHandler::new(store.clone(), cp_sender, metrics.clone(), snapshot_config);
ObjectsSnapshotHandler::new(store.clone(), sender, metrics.clone(), snapshot_config);

let watermark_hi = objects_snapshot_handler.get_watermark_hi().await?;
let common_handler = CommonHandler::new(Box::new(objects_snapshot_handler.clone()));
spawn_monitored_task!(common_handler.start_transform_and_load(cp_receiver, cancel));
spawn_monitored_task!(common_handler.start_transform_and_load(receiver, cancel));
Ok((objects_snapshot_handler, watermark_hi.unwrap_or_default()))
}

impl ObjectsSnapshotHandler {
pub fn new(
store: PgIndexerStore,
cp_sender: Sender<CheckpointData>,
sender: Sender<ObjectsSnapshotCommitDataEnvelope>,
metrics: IndexerMetrics,
snapshot_config: SnapshotLagConfig,
) -> ObjectsSnapshotHandler {
Self {
store,
cp_sender,
sender,
metrics,
snapshot_config,
}
}
}

pub struct ObjectsSnapshotCommitDataEnvelope {
pub sequence_number: u64,
pub data: TransactionObjectChangesToCommit,
}

impl CommitDataEnvelope<TransactionObjectChangesToCommit> for ObjectsSnapshotCommitDataEnvelope {
fn sequence_number(&self) -> u64 {
self.sequence_number
}

fn data(self) -> TransactionObjectChangesToCommit {
self.data
}
}

0 comments on commit 5794439

Please sign in to comment.