Skip to content

Commit

Permalink
[Indexer] Create objects_version table. (#17542)
Browse files Browse the repository at this point in the history
## Description

This table maps an object's ID and version to a checkpoint sequence
number, in a table partitioned by the first byte of the object ID. This
speeds up look ups into `objects_history` by offering a path for a first
look-up to the correct partition in that table for a given object's ID
and version.

This PR introduces the table, and the logic to populate it in the
indexer.

## Test plan

```
sui$ cargo nextest run -p sui-indexer
sui$ cargo nextest run -p sui-graphql-rpc
sui$ cargo nextest run -p sui-graphql-e2e-tests --features pg_integration
```

A future PR will make use of this table from GraphQL, which will test it
further.
## Stack

- #17686
- #17687
- #17688
- #17689
- #17691
- #17694
- #17695

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol:
- [ ] Nodes (Validators and Full nodes):
- [ ] Indexer:
- [ ] JSON-RPC:
- [ ] GraphQL:
- [ ] CLI:
- [ ] Rust SDK:
  • Loading branch information
amnn committed May 14, 2024
1 parent 408b7e9 commit 2462bbe
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS objects_version;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- The Postgres version of this table is partitioned by the first byte
-- of object_id, but this kind of partition is not easily supported in
-- MySQL, so this variant is unpartitioned for now.
CREATE TABLE objects_version (
object_id BLOB NOT NULL,
object_version BIGINT NOT NULL,
cp_sequence_number BIGINT NOT NULL,
PRIMARY KEY (object_id(32), object_version)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS objects_version;
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-- Indexing table mapping an object's ID and version to its checkpoint
-- sequence number, partitioned by the first byte of its Object ID.
CREATE TABLE objects_version (
object_id bytea NOT NULL,
object_version bigint NOT NULL,
cp_sequence_number bigint NOT NULL,
PRIMARY KEY (object_id, object_version)
) PARTITION BY RANGE (object_id);

-- Create a partition for each first byte value.
DO $$
DECLARE
lo text;
hi text;
BEGIN
FOR i IN 0..254 LOOP
lo := LPAD(TO_HEX(i), 2, '0');
hi := LPAD(TO_HEX(i + 1), 2, '0');
EXECUTE FORMAT($F$
CREATE TABLE objects_version_%1$s PARTITION OF objects_version FOR VALUES
FROM (E'\\x%1$s00000000000000000000000000000000000000000000000000000000000000')
TO (E'\\x%2$s00000000000000000000000000000000000000000000000000000000000000');
$F$, lo, hi);
END LOOP;
END;
$$ LANGUAGE plpgsql;

-- Special case for the last partition, because of the upper bound.
CREATE TABLE objects_version_ff PARTITION OF objects_version FOR VALUES
FROM (E'\\xff00000000000000000000000000000000000000000000000000000000000000')
TO (MAXVALUE);
1 change: 1 addition & 0 deletions crates/sui-indexer/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod checkpoints;
pub mod display;
pub mod epoch;
pub mod events;
pub mod obj_indices;
pub mod objects;
pub mod packages;
pub mod transactions;
Expand Down
40 changes: 40 additions & 0 deletions crates/sui-indexer/src/models/obj_indices.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use diesel::prelude::*;

use crate::schema::objects_version;

use super::objects::StoredDeletedObject;
use super::objects::StoredObject;

/// Model types related to tables that support efficient execution of queries on the `objects`,
/// `objects_history` and `objects_snapshot` tables.

#[derive(Queryable, Insertable, Debug, Identifiable, Clone, QueryableByName)]
#[diesel(table_name = objects_version, primary_key(object_id, object_version))]
pub struct StoredObjectVersion {
pub object_id: Vec<u8>,
pub object_version: i64,
pub cp_sequence_number: i64,
}

impl From<&StoredObject> for StoredObjectVersion {
fn from(o: &StoredObject) -> Self {
Self {
object_id: o.object_id.clone(),
object_version: o.object_version,
cp_sequence_number: o.checkpoint_sequence_number,
}
}
}

impl From<&StoredDeletedObject> for StoredObjectVersion {
fn from(o: &StoredDeletedObject) -> Self {
Self {
object_id: o.object_id.clone(),
object_version: o.object_version,
cp_sequence_number: o.checkpoint_sequence_number,
}
}
}
3 changes: 3 additions & 0 deletions crates/sui-indexer/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod inner {
pub use crate::schema::pg::objects;
pub use crate::schema::pg::objects_history;
pub use crate::schema::pg::objects_snapshot;
pub use crate::schema::pg::objects_version;
pub use crate::schema::pg::packages;
pub use crate::schema::pg::transactions;
pub use crate::schema::pg::tx_calls;
Expand All @@ -39,6 +40,7 @@ mod inner {
pub use crate::schema::mysql::objects;
pub use crate::schema::mysql::objects_history;
pub use crate::schema::mysql::objects_snapshot;
pub use crate::schema::mysql::objects_version;
pub use crate::schema::mysql::packages;
pub use crate::schema::mysql::transactions;
pub use crate::schema::mysql::tx_calls;
Expand All @@ -56,6 +58,7 @@ pub use inner::events;
pub use inner::objects;
pub use inner::objects_history;
pub use inner::objects_snapshot;
pub use inner::objects_version;
pub use inner::packages;
pub use inner::transactions;
pub use inner::tx_calls;
Expand Down
8 changes: 8 additions & 0 deletions crates/sui-indexer/src/schema/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ diesel::table! {
}
}

diesel::table! {
objects_version (object_id, object_version) {
object_id -> Blob,
object_version -> Bigint,
cp_sequence_number -> Bigint,
}
}

diesel::table! {
packages (package_id) {
package_id -> Blob,
Expand Down
9 changes: 9 additions & 0 deletions crates/sui-indexer/src/schema/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,14 @@ diesel::table! {
}
}

diesel::table! {
objects_version (object_id, object_version) {
object_id -> Bytea,
object_version -> Int8,
cp_sequence_number -> Int8,
}
}

diesel::table! {
packages (package_id) {
package_id -> Bytea,
Expand Down Expand Up @@ -282,6 +290,7 @@ diesel::allow_tables_to_appear_in_same_query!(
objects_history,
objects_history_partition_0,
objects_snapshot,
objects_version,
packages,
transactions,
transactions_partition_0,
Expand Down
15 changes: 12 additions & 3 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@ use crate::models::checkpoints::StoredCheckpoint;
use crate::models::display::StoredDisplay;
use crate::models::epoch::StoredEpochInfo;
use crate::models::events::StoredEvent;
use crate::models::obj_indices::StoredObjectVersion;
use crate::models::objects::{
StoredDeletedHistoryObject, StoredDeletedObject, StoredHistoryObject, StoredObject,
StoredObjectSnapshot,
};
use crate::models::packages::StoredPackage;
use crate::models::transactions::StoredTransaction;
use crate::schema::{
checkpoints, display, epochs, events, objects, objects_history, objects_snapshot, packages,
transactions, tx_calls, tx_changed_objects, tx_digests, tx_input_objects, tx_recipients,
tx_senders,
checkpoints, display, epochs, events, objects, objects_history, objects_snapshot,
objects_version, packages, transactions, tx_calls, tx_changed_objects, tx_digests,
tx_input_objects, tx_recipients, tx_senders,
};
use crate::types::{IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex};
use crate::{
Expand Down Expand Up @@ -437,13 +438,16 @@ impl<T: R2D2Connection + 'static> PgIndexerStore<T> {
.checkpoint_db_commit_latency_objects_history_chunks
.start_timer();
let mut mutated_objects: Vec<StoredHistoryObject> = vec![];
let mut object_versions: Vec<StoredObjectVersion> = vec![];
let mut deleted_object_ids: Vec<StoredDeletedHistoryObject> = vec![];
for object in objects {
match object {
ObjectChangeToCommit::MutatedObject(stored_object) => {
object_versions.push(StoredObjectVersion::from(&stored_object));
mutated_objects.push(stored_object.into());
}
ObjectChangeToCommit::DeletedObject(stored_deleted_object) => {
object_versions.push(StoredObjectVersion::from(&stored_deleted_object));
deleted_object_ids.push(stored_deleted_object.into());
}
}
Expand All @@ -462,6 +466,11 @@ impl<T: R2D2Connection + 'static> PgIndexerStore<T> {
);
}

for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
}

for deleted_objects_chunk in
deleted_object_ids.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
Expand Down

0 comments on commit 2462bbe

Please sign in to comment.