Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Indexer] Index Package original ID, version, cp sequence number #17726

Merged
merged 2 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
CREATE TABLE packages
(
package_id blob NOT NULL,
package_id BLOB NOT NULL,
original_id BLOB NOT NULL,
package_version BIGINT NOT NULL,
-- bcs serialized MovePackage
move_package MEDIUMBLOB NOT NULL,
CONSTRAINT packages_pk PRIMARY KEY (package_id(255))
move_package MEDIUMBLOB NOT NULL,
checkpoint_sequence_number BIGINT NOT NULL,
CONSTRAINT packages_pk PRIMARY KEY (package_id(32), original_id(32), package_version),
CONSTRAINT packages_unique_package_id UNIQUE (package_id(32))
);

CREATE INDEX packages_cp_id_version ON packages (checkpoint_sequence_number, original_id(32), package_version);
CREATE INDEX packages_id_version_cp ON packages (original_id(32), package_version, checkpoint_sequence_number);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS objects_version;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- The Postgres version of this table is partitioned by the first byte
-- of object_id, but this kind of partition is not easily supported in
-- MySQL, so this variant is unpartitioned for now.
CREATE TABLE objects_version (
object_id BLOB NOT NULL,
object_version BIGINT NOT NULL,
cp_sequence_number BIGINT NOT NULL,
PRIMARY KEY (object_id(32), object_version)
)
14 changes: 11 additions & 3 deletions crates/sui-indexer/migrations/pg/2023-08-19-060729_packages/up.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
CREATE TABLE packages
CREATE TABLE packages
(
package_id bytea PRIMARY KEY,
package_id bytea NOT NULL,
original_id bytea NOT NULL,
package_version bigint NOT NULL,
-- bcs serialized MovePackage
move_package bytea NOT NULL
move_package bytea NOT NULL,
checkpoint_sequence_number bigint NOT NULL,
CONSTRAINT packages_pkey PRIMARY KEY (package_id, original_id, package_version),
CONSTRAINT packages_unique_package_id UNIQUE (package_id)
);

CREATE INDEX packages_cp_id_version ON packages (checkpoint_sequence_number, original_id, package_version);
CREATE INDEX packages_id_version_cp ON packages (original_id, package_version, checkpoint_sequence_number);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS objects_version;
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-- Indexing table mapping an object's ID and version to its checkpoint
-- sequence number, partitioned by the first byte of its Object ID.
CREATE TABLE objects_version (
object_id bytea NOT NULL,
object_version bigint NOT NULL,
cp_sequence_number bigint NOT NULL,
PRIMARY KEY (object_id, object_version)
) PARTITION BY RANGE (object_id);

-- Create a partition for each first byte value.
DO $$
DECLARE
lo text;
hi text;
BEGIN
FOR i IN 0..254 LOOP
lo := LPAD(TO_HEX(i), 2, '0');
hi := LPAD(TO_HEX(i + 1), 2, '0');
EXECUTE FORMAT($F$
CREATE TABLE objects_version_%1$s PARTITION OF objects_version FOR VALUES
FROM (E'\\x%1$s00000000000000000000000000000000000000000000000000000000000000')
TO (E'\\x%2$s00000000000000000000000000000000000000000000000000000000000000');
$F$, lo, hi);
END LOOP;
END;
$$ LANGUAGE plpgsql;

-- Special case for the last partition, because of the upper bound.
CREATE TABLE objects_version_ff PARTITION OF objects_version FOR VALUES
FROM (E'\\xff00000000000000000000000000000000000000000000000000000000000000')
TO (MAXVALUE);
1 change: 1 addition & 0 deletions crates/sui-indexer/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod checkpoints;
pub mod display;
pub mod epoch;
pub mod events;
pub mod obj_indices;
pub mod objects;
pub mod packages;
pub mod transactions;
Expand Down
40 changes: 40 additions & 0 deletions crates/sui-indexer/src/models/obj_indices.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use diesel::prelude::*;

use crate::schema::objects_version;

use super::objects::StoredDeletedObject;
use super::objects::StoredObject;

/// Model types related to tables that support efficient execution of queries on the `objects`,
/// `objects_history` and `objects_snapshot` tables.

#[derive(Queryable, Insertable, Debug, Identifiable, Clone, QueryableByName)]
#[diesel(table_name = objects_version, primary_key(object_id, object_version))]
pub struct StoredObjectVersion {
pub object_id: Vec<u8>,
pub object_version: i64,
pub cp_sequence_number: i64,
}

impl From<&StoredObject> for StoredObjectVersion {
fn from(o: &StoredObject) -> Self {
Self {
object_id: o.object_id.clone(),
object_version: o.object_version,
cp_sequence_number: o.checkpoint_sequence_number,
}
}
}

impl From<&StoredDeletedObject> for StoredObjectVersion {
fn from(o: &StoredDeletedObject) -> Self {
Self {
object_id: o.object_id.clone(),
object_version: o.object_version,
cp_sequence_number: o.checkpoint_sequence_number,
}
}
}
6 changes: 6 additions & 0 deletions crates/sui-indexer/src/models/packages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@ use diesel::prelude::*;
#[diesel(table_name = packages, primary_key(package_id))]
pub struct StoredPackage {
pub package_id: Vec<u8>,
pub original_id: Vec<u8>,
pub package_version: i64,
pub move_package: Vec<u8>,
pub checkpoint_sequence_number: i64,
}

impl From<IndexedPackage> for StoredPackage {
fn from(p: IndexedPackage) -> Self {
Self {
package_id: p.package_id.to_vec(),
original_id: p.move_package.original_package_id().to_vec(),
package_version: p.move_package.version().value() as i64,
move_package: bcs::to_bytes(&p.move_package).unwrap(),
checkpoint_sequence_number: p.checkpoint_sequence_number as i64,
}
}
}
3 changes: 3 additions & 0 deletions crates/sui-indexer/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod inner {
pub use crate::schema::pg::objects;
pub use crate::schema::pg::objects_history;
pub use crate::schema::pg::objects_snapshot;
pub use crate::schema::pg::objects_version;
pub use crate::schema::pg::packages;
pub use crate::schema::pg::transactions;
pub use crate::schema::pg::tx_calls;
Expand All @@ -39,6 +40,7 @@ mod inner {
pub use crate::schema::mysql::objects;
pub use crate::schema::mysql::objects_history;
pub use crate::schema::mysql::objects_snapshot;
pub use crate::schema::mysql::objects_version;
pub use crate::schema::mysql::packages;
pub use crate::schema::mysql::transactions;
pub use crate::schema::mysql::tx_calls;
Expand All @@ -56,6 +58,7 @@ pub use inner::events;
pub use inner::objects;
pub use inner::objects_history;
pub use inner::objects_snapshot;
pub use inner::objects_version;
pub use inner::packages;
pub use inner::transactions;
pub use inner::tx_calls;
Expand Down
11 changes: 11 additions & 0 deletions crates/sui-indexer/src/schema/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,21 @@ diesel::table! {
}
}

diesel::table! {
objects_version (object_id, object_version) {
object_id -> Blob,
object_version -> Bigint,
cp_sequence_number -> Bigint,
}
}

diesel::table! {
packages (package_id) {
package_id -> Blob,
original_id -> Blob,
package_version -> Bigint,
move_package -> Mediumblob,
checkpoint_sequence_number -> Bigint,
}
}

Expand Down
12 changes: 12 additions & 0 deletions crates/sui-indexer/src/schema/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,21 @@ diesel::table! {
}
}

diesel::table! {
objects_version (object_id, object_version) {
object_id -> Bytea,
object_version -> Int8,
cp_sequence_number -> Int8,
}
}

diesel::table! {
packages (package_id) {
package_id -> Bytea,
original_id -> Bytea,
package_version -> Int8,
move_package -> Bytea,
checkpoint_sequence_number -> Int8,
}
}

Expand Down Expand Up @@ -282,6 +293,7 @@ diesel::allow_tables_to_appear_in_same_query!(
objects_history,
objects_history_partition_0,
objects_snapshot,
objects_version,
packages,
transactions,
transactions_partition_0,
Expand Down
15 changes: 12 additions & 3 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@ use crate::models::checkpoints::StoredCheckpoint;
use crate::models::display::StoredDisplay;
use crate::models::epoch::StoredEpochInfo;
use crate::models::events::StoredEvent;
use crate::models::obj_indices::StoredObjectVersion;
use crate::models::objects::{
StoredDeletedHistoryObject, StoredDeletedObject, StoredHistoryObject, StoredObject,
StoredObjectSnapshot,
};
use crate::models::packages::StoredPackage;
use crate::models::transactions::StoredTransaction;
use crate::schema::{
checkpoints, display, epochs, events, objects, objects_history, objects_snapshot, packages,
transactions, tx_calls, tx_changed_objects, tx_digests, tx_input_objects, tx_recipients,
tx_senders,
checkpoints, display, epochs, events, objects, objects_history, objects_snapshot,
objects_version, packages, transactions, tx_calls, tx_changed_objects, tx_digests,
tx_input_objects, tx_recipients, tx_senders,
};
use crate::types::{IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex};
use crate::{
Expand Down Expand Up @@ -437,13 +438,16 @@ impl<T: R2D2Connection + 'static> PgIndexerStore<T> {
.checkpoint_db_commit_latency_objects_history_chunks
.start_timer();
let mut mutated_objects: Vec<StoredHistoryObject> = vec![];
let mut object_versions: Vec<StoredObjectVersion> = vec![];
let mut deleted_object_ids: Vec<StoredDeletedHistoryObject> = vec![];
for object in objects {
match object {
ObjectChangeToCommit::MutatedObject(stored_object) => {
object_versions.push(StoredObjectVersion::from(&stored_object));
mutated_objects.push(stored_object.into());
}
ObjectChangeToCommit::DeletedObject(stored_deleted_object) => {
object_versions.push(StoredObjectVersion::from(&stored_deleted_object));
deleted_object_ids.push(stored_deleted_object.into());
}
}
Expand All @@ -462,6 +466,11 @@ impl<T: R2D2Connection + 'static> PgIndexerStore<T> {
);
}

for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
}

for deleted_objects_chunk in
deleted_object_ids.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
Expand Down
Loading