From 20a87ead2ad79c58da5e1b99b9e5ee6a279fbde9 Mon Sep 17 00:00:00 2001 From: Ashok Menon Date: Tue, 30 Apr 2024 21:46:39 +0100 Subject: [PATCH] [pick 2024.4][GraphQL] Object DataLoader (#17332) (#17424) ## Description Implement data loaders for fetching historical object versions, objects bounded by their parent versions, and the latest versions of an object at a given checkpoint. By implementing an Object DataLoader, we also implicitly get support for data-loading all derived types (`MoveObject`, `MovePackage`, `MoveModule`, `DynamicField`, `Coin`, etc). These implementations (particularly historical queries and queries where the version can be bounded by a parent version) can be made even more efficient with the existence of an index/side table that maps an object's ID and version to the checkpoint it is part of. This change has not been included in this PR, but we will follow up on this as part of Object query benchmarking. As part of this change, I enabled queries for historical objects outside the available range. Later (with the use of an `obj_version` index) it will also be possible to enable dynamic field look-ups on historical objects as well. ## Test Plan ``` sui$ cargo nextest run -p sui-graphql-rpc sui$ cargo nextest run -p sui-graphql-e2e-tests --features pg_integration ``` Run the following query -- after this change, it takes about 8s to complete on the server, fetching about 80 objects, while previously it would either timeout or squeak in *just* under the 40s timeout. I expect this number to improve further once we have an efficient way to map object ids and versions to a checkpoint sequence number. ```graphql query { transactionBlocks(last: 5) { nodes { effects { objectChanges(first: 50) { pageInfo { hasNextPage } nodes { idCreated idDeleted inputState { asMoveObject { contents { json } } } outputState { asMoveObject { contents { json } } } } } } } } } ``` --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [x] GraphQL: Queries for historical versions of objects will now return data even if that version of the object is outside the available range. - [ ] CLI: - [ ] Rust SDK: --- .../dynamic_fields/dynamic_fields.exp | 43 +- .../tests/consistency/object_at_version.exp | 21 +- .../tests/consistency/object_at_version.move | 1 + .../sui-graphql-rpc/src/types/checkpoint.rs | 7 +- .../src/types/dynamic_field.rs | 31 +- crates/sui-graphql-rpc/src/types/event.rs | 2 +- crates/sui-graphql-rpc/src/types/gas.rs | 10 +- .../src/types/move_function.rs | 18 +- .../sui-graphql-rpc/src/types/move_module.rs | 16 +- .../sui-graphql-rpc/src/types/move_object.rs | 8 +- .../sui-graphql-rpc/src/types/move_package.rs | 9 +- .../sui-graphql-rpc/src/types/move_struct.rs | 2 +- crates/sui-graphql-rpc/src/types/object.rs | 478 +++++++++--------- .../src/types/object_change.rs | 19 +- .../sui-graphql-rpc/src/types/object_read.rs | 12 +- crates/sui-graphql-rpc/src/types/owner.rs | 9 +- crates/sui-graphql-rpc/src/types/query.rs | 25 +- .../src/types/suins_registration.rs | 6 +- .../transaction_block_kind/programmable.rs | 2 +- crates/sui-graphql-rpc/src/types/validator.rs | 14 +- .../src/types/zklogin_verify_signature.rs | 2 +- 21 files changed, 380 insertions(+), 355 deletions(-) diff --git a/crates/sui-graphql-e2e-tests/tests/consistency/dynamic_fields/dynamic_fields.exp b/crates/sui-graphql-e2e-tests/tests/consistency/dynamic_fields/dynamic_fields.exp index b427d796ca598..a5130682191d9 100644 --- a/crates/sui-graphql-e2e-tests/tests/consistency/dynamic_fields/dynamic_fields.exp +++ b/crates/sui-graphql-e2e-tests/tests/consistency/dynamic_fields/dynamic_fields.exp @@ -819,7 +819,24 @@ Checkpoint created: 7 task 33 'run-graphql'. lines 445-495: Response: { "data": { - "parent_version_4_outside_consistent_range": null, + "parent_version_4_outside_consistent_range": { + "dynamicFields": { + "edges": [ + { + "cursor": "IPzS8naECYD4KyjqOrwvPZSgKp0kO3zeSAqOIyZX0G0bBwAAAAAAAAA=", + "node": { + "name": { + "bcs": "pAEAAAAAAAA=", + "type": { + "repr": "u64" + } + }, + "value": null + } + } + ] + } + }, "parent_version_4_paginated_outside_consistent_range": null, "parent_version_6_no_df_1_2_3": { "dynamicFields": { @@ -886,13 +903,33 @@ Response: { "edges": [] } } - } + }, + "errors": [ + { + "message": "Requested data is outside the available range", + "locations": [ + { + "line": 34, + "column": 5 + } + ], + "path": [ + "parent_version_4_paginated_outside_consistent_range", + "dynamicFields" + ], + "extensions": { + "code": "BAD_USER_INPUT" + } + } + ] } task 34 'run-graphql'. lines 497-528: Response: { "data": { - "parent_version_4": null, + "parent_version_4": { + "dfAtParentVersion4_outside_range": null + }, "parent_version_6": { "dfAtParentVersion6": null } diff --git a/crates/sui-graphql-e2e-tests/tests/consistency/object_at_version.exp b/crates/sui-graphql-e2e-tests/tests/consistency/object_at_version.exp index 5ef87778fb781..d1a1dcea0d4ec 100644 --- a/crates/sui-graphql-e2e-tests/tests/consistency/object_at_version.exp +++ b/crates/sui-graphql-e2e-tests/tests/consistency/object_at_version.exp @@ -177,7 +177,7 @@ Objects snapshot updated to [0 to 5) task 18 'create-checkpoint'. lines 219-219: Checkpoint created: 6 -task 19 'run-graphql'. lines 221-259: +task 19 'run-graphql'. lines 221-260: Response: { "data": { "object_within_available_range": { @@ -192,7 +192,22 @@ Response: { } } }, - "object_outside_available_range": null, - "object_not_in_snapshot": null + "object_outside_available_range": { + "status": "WRAPPED_OR_DELETED", + "version": 5, + "asMoveObject": null + }, + "object_not_in_snapshot": { + "status": "INDEXED", + "version": 3, + "asMoveObject": { + "contents": { + "json": { + "id": "0x7a166d93d67f2815b463e90d909798a5c572b3e15cdbd6dc8fc10f9fc88618f9", + "value": "0" + } + } + } + } } } diff --git a/crates/sui-graphql-e2e-tests/tests/consistency/object_at_version.move b/crates/sui-graphql-e2e-tests/tests/consistency/object_at_version.move index d0e1c6f861142..c540d91309c30 100644 --- a/crates/sui-graphql-e2e-tests/tests/consistency/object_at_version.move +++ b/crates/sui-graphql-e2e-tests/tests/consistency/object_at_version.move @@ -219,6 +219,7 @@ module Test::M1 { //# create-checkpoint //# run-graphql +# Querying objects by version doesn't require it to be in the snapshot table. { object_within_available_range: object( address: "@{obj_2_0}" diff --git a/crates/sui-graphql-rpc/src/types/checkpoint.rs b/crates/sui-graphql-rpc/src/types/checkpoint.rs index 774ae83344650..a0e149471cb18 100644 --- a/crates/sui-graphql-rpc/src/types/checkpoint.rs +++ b/crates/sui-graphql-rpc/src/types/checkpoint.rs @@ -35,8 +35,8 @@ pub(crate) struct CheckpointId { pub sequence_number: Option, } -/// DataLoader key for fetching a `Checkpoint` by its sequence number, optionally constrained by a -/// consistency cursor. +/// DataLoader key for fetching a `Checkpoint` by its sequence number, constrained by a consistency +/// cursor. #[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)] struct SeqNumKey { pub sequence_number: u64, @@ -46,8 +46,7 @@ struct SeqNumKey { pub checkpoint_viewed_at: u64, } -/// DataLoader key for fetching a `Checkpoint` by its digest, optionally constrained by a -/// consistency cursor. +/// DataLoader key for fetching a `Checkpoint` by its digest, constrained by a consistency cursor. #[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)] struct DigestKey { pub digest: Digest, diff --git a/crates/sui-graphql-rpc/src/types/dynamic_field.rs b/crates/sui-graphql-rpc/src/types/dynamic_field.rs index a12455ea8cad8..46b2cee59051c 100644 --- a/crates/sui-graphql-rpc/src/types/dynamic_field.rs +++ b/crates/sui-graphql-rpc/src/types/dynamic_field.rs @@ -10,7 +10,7 @@ use sui_types::dynamic_field::{derive_dynamic_field_id, DynamicFieldInfo, Dynami use super::available_range::AvailableRange; use super::cursor::{Page, Target}; -use super::object::{self, deserialize_move_struct, Object, ObjectKind, ObjectLookupKey}; +use super::object::{self, deserialize_move_struct, Object, ObjectKind, ObjectLookup}; use super::type_filter::ExactTypeFilter; use super::{ base64::Base64, move_object::MoveObject, move_value::MoveValue, sui_address::SuiAddress, @@ -105,12 +105,14 @@ impl DynamicField { // object. Thus, we use the version of the field object to bound the value object at the // correct version. let obj = MoveObject::query( - ctx.data_unchecked(), + ctx, self.df_object_id, - ObjectLookupKey::LatestAtParentVersion { - version: self.super_.super_.version_impl(), - checkpoint_viewed_at: self.super_.super_.checkpoint_viewed_at, - }, + Object::under_parent( + // TODO (RPC-131): The dynamic object field value's version should be bounded by + // the field's parent version, not the version of the field object itself. + self.super_.super_.version_impl(), + self.super_.super_.checkpoint_viewed_at, + ), ) .await .extend()?; @@ -152,7 +154,7 @@ impl DynamicField { /// before the provided version. If `parent_version` is not provided, the latest version of the /// field is returned as bounded by the `checkpoint_viewed_at` parameter. pub(crate) async fn query( - db: &Db, + ctx: &Context<'_>, parent: SuiAddress, parent_version: Option, name: DynamicFieldName, @@ -169,16 +171,15 @@ impl DynamicField { let field_id = derive_dynamic_field_id(parent, &type_, &name.bcs.0) .map_err(|e| Error::Internal(format!("Failed to derive dynamic field id: {e}")))?; - use ObjectLookupKey as K; - let key = match parent_version { - None => K::LatestAt(checkpoint_viewed_at), - Some(version) => K::LatestAtParentVersion { - version, + let super_ = MoveObject::query( + ctx, + SuiAddress::from(field_id), + ObjectLookup::LatestAt { + parent_version, checkpoint_viewed_at, }, - }; - - let super_ = MoveObject::query(db, SuiAddress::from(field_id), key).await?; + ) + .await?; super_.map(Self::try_from).transpose() } diff --git a/crates/sui-graphql-rpc/src/types/event.rs b/crates/sui-graphql-rpc/src/types/event.rs index cfb9d93babe8c..cd4680d9308a1 100644 --- a/crates/sui-graphql-rpc/src/types/event.rs +++ b/crates/sui-graphql-rpc/src/types/event.rs @@ -99,7 +99,7 @@ impl Event { /// the sending module would be A::m1. async fn sending_module(&self, ctx: &Context<'_>) -> Result> { MoveModule::query( - ctx.data_unchecked(), + ctx, self.native.package_id.into(), &self.native.transaction_module.to_string(), self.checkpoint_viewed_at, diff --git a/crates/sui-graphql-rpc/src/types/gas.rs b/crates/sui-graphql-rpc/src/types/gas.rs index 0d78fcd7e5010..9969b70b9060e 100644 --- a/crates/sui-graphql-rpc/src/types/gas.rs +++ b/crates/sui-graphql-rpc/src/types/gas.rs @@ -1,7 +1,6 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::types::object::Object; use async_graphql::connection::Connection; use async_graphql::*; use sui_types::{ @@ -10,7 +9,7 @@ use sui_types::{ transaction::GasData, }; -use super::{address::Address, big_int::BigInt, object::ObjectLookupKey, sui_address::SuiAddress}; +use super::{address::Address, big_int::BigInt, object::Object, sui_address::SuiAddress}; use super::{ cursor::Page, object::{self, ObjectFilter, ObjectKey}, @@ -130,12 +129,9 @@ impl GasCostSummary { impl GasEffects { async fn gas_object(&self, ctx: &Context<'_>) -> Result> { Object::query( - ctx.data_unchecked(), + ctx, self.object_id, - ObjectLookupKey::VersionAt { - version: self.object_version, - checkpoint_viewed_at: self.checkpoint_viewed_at, - }, + Object::at_version(self.object_version, self.checkpoint_viewed_at), ) .await .extend() diff --git a/crates/sui-graphql-rpc/src/types/move_function.rs b/crates/sui-graphql-rpc/src/types/move_function.rs index 7325cbb2c3aa7..a9473efc712b1 100644 --- a/crates/sui-graphql-rpc/src/types/move_function.rs +++ b/crates/sui-graphql-rpc/src/types/move_function.rs @@ -4,7 +4,7 @@ use async_graphql::*; use sui_package_resolver::FunctionDef; -use crate::{data::Db, error::Error}; +use crate::error::Error; use super::{ move_module::MoveModule, @@ -34,14 +34,10 @@ pub(crate) struct MoveFunctionTypeParameter { impl MoveFunction { /// The module this function was defined in. async fn module(&self, ctx: &Context<'_>) -> Result { - let Some(module) = MoveModule::query( - ctx.data_unchecked(), - self.package, - &self.module, - self.checkpoint_viewed_at, - ) - .await - .extend()? + let Some(module) = + MoveModule::query(ctx, self.package, &self.module, self.checkpoint_viewed_at) + .await + .extend()? else { return Err(Error::Internal(format!( "Failed to load module for function: {}::{}::{}", @@ -123,13 +119,13 @@ impl MoveFunction { } pub(crate) async fn query( - db: &Db, + ctx: &Context<'_>, address: SuiAddress, module: &str, function: &str, checkpoint_viewed_at: u64, ) -> Result, Error> { - let Some(module) = MoveModule::query(db, address, module, checkpoint_viewed_at).await? + let Some(module) = MoveModule::query(ctx, address, module, checkpoint_viewed_at).await? else { return Ok(None); }; diff --git a/crates/sui-graphql-rpc/src/types/move_module.rs b/crates/sui-graphql-rpc/src/types/move_module.rs index 677e5d5644bc7..a59b058238dae 100644 --- a/crates/sui-graphql-rpc/src/types/move_module.rs +++ b/crates/sui-graphql-rpc/src/types/move_module.rs @@ -7,14 +7,13 @@ use move_disassembler::disassembler::Disassembler; use move_ir_types::location::Loc; use crate::consistency::{ConsistentIndexCursor, ConsistentNamedCursor}; -use crate::data::Db; use crate::error::Error; use sui_package_resolver::Module as ParsedMoveModule; use super::cursor::{JsonCursor, Page}; use super::move_function::MoveFunction; use super::move_struct::MoveStruct; -use super::object::ObjectLookupKey; +use super::object::Object; use super::{base64::Base64, move_package::MovePackage, sui_address::SuiAddress}; #[derive(Clone)] @@ -37,9 +36,9 @@ impl MoveModule { /// The package that this Move module was defined in async fn package(&self, ctx: &Context<'_>) -> Result { MovePackage::query( - ctx.data_unchecked(), + ctx, self.storage_id, - ObjectLookupKey::LatestAt(self.checkpoint_viewed_at), + Object::latest_at(self.checkpoint_viewed_at), ) .await .extend()? @@ -88,9 +87,9 @@ impl MoveModule { let runtime_id = *bytecode.self_id().address(); let Some(package) = MovePackage::query( - ctx.data_unchecked(), + ctx, self.storage_id, - ObjectLookupKey::LatestAt(checkpoint_viewed_at), + Object::latest_at(checkpoint_viewed_at), ) .await .extend()? @@ -315,14 +314,13 @@ impl MoveModule { } pub(crate) async fn query( - db: &Db, + ctx: &Context<'_>, address: SuiAddress, name: &str, checkpoint_viewed_at: u64, ) -> Result, Error> { let Some(package) = - MovePackage::query(db, address, ObjectLookupKey::LatestAt(checkpoint_viewed_at)) - .await? + MovePackage::query(ctx, address, Object::latest_at(checkpoint_viewed_at)).await? else { return Ok(None); }; diff --git a/crates/sui-graphql-rpc/src/types/move_object.rs b/crates/sui-graphql-rpc/src/types/move_object.rs index 1453b9d01a60d..faa67dd7d3238 100644 --- a/crates/sui-graphql-rpc/src/types/move_object.rs +++ b/crates/sui-graphql-rpc/src/types/move_object.rs @@ -11,7 +11,7 @@ use super::display::DisplayEntry; use super::dynamic_field::{DynamicField, DynamicFieldName}; use super::move_type::MoveType; use super::move_value::MoveValue; -use super::object::{self, ObjectFilter, ObjectImpl, ObjectLookupKey, ObjectOwner, ObjectStatus}; +use super::object::{self, ObjectFilter, ObjectImpl, ObjectLookup, ObjectOwner, ObjectStatus}; use super::owner::OwnerImpl; use super::stake::StakedSuiDowncastError; use super::sui_address::SuiAddress; @@ -423,11 +423,11 @@ impl MoveObjectImpl<'_> { impl MoveObject { pub(crate) async fn query( - db: &Db, + ctx: &Context<'_>, address: SuiAddress, - key: ObjectLookupKey, + key: ObjectLookup, ) -> Result, Error> { - let Some(object) = Object::query(db, address, key).await? else { + let Some(object) = Object::query(ctx, address, key).await? else { return Ok(None); }; diff --git a/crates/sui-graphql-rpc/src/types/move_package.rs b/crates/sui-graphql-rpc/src/types/move_package.rs index 115c639645f09..0bf275a97ac86 100644 --- a/crates/sui-graphql-rpc/src/types/move_package.rs +++ b/crates/sui-graphql-rpc/src/types/move_package.rs @@ -9,7 +9,7 @@ use super::cursor::{JsonCursor, Page}; use super::move_module::MoveModule; use super::move_object::MoveObject; use super::object::{ - self, Object, ObjectFilter, ObjectImpl, ObjectLookupKey, ObjectOwner, ObjectStatus, + self, Object, ObjectFilter, ObjectImpl, ObjectLookup, ObjectOwner, ObjectStatus, }; use super::owner::OwnerImpl; use super::stake::StakedSui; @@ -18,7 +18,6 @@ use super::suins_registration::{DomainFormat, SuinsRegistration}; use super::transaction_block::{self, TransactionBlock, TransactionBlockFilter}; use super::type_filter::ExactTypeFilter; use crate::consistency::ConsistentNamedCursor; -use crate::data::Db; use crate::error::Error; use async_graphql::connection::{Connection, CursorType, Edge}; use async_graphql::*; @@ -413,11 +412,11 @@ impl MovePackage { } pub(crate) async fn query( - db: &Db, + ctx: &Context<'_>, address: SuiAddress, - key: ObjectLookupKey, + key: ObjectLookup, ) -> Result, Error> { - let Some(object) = Object::query(db, address, key).await? else { + let Some(object) = Object::query(ctx, address, key).await? else { return Ok(None); }; diff --git a/crates/sui-graphql-rpc/src/types/move_struct.rs b/crates/sui-graphql-rpc/src/types/move_struct.rs index 8bae7ec46c1d9..c10a44786f242 100644 --- a/crates/sui-graphql-rpc/src/types/move_struct.rs +++ b/crates/sui-graphql-rpc/src/types/move_struct.rs @@ -43,7 +43,7 @@ impl MoveStruct { /// The module this struct was originally defined in. async fn module(&self, ctx: &Context<'_>) -> Result { let Some(module) = MoveModule::query( - ctx.data_unchecked(), + ctx, self.defining_id, &self.module, self.checkpoint_viewed_at, diff --git a/crates/sui-graphql-rpc/src/types/object.rs b/crates/sui-graphql-rpc/src/types/object.rs index 1bfe5bb72ab92..0336d5667a023 100644 --- a/crates/sui-graphql-rpc/src/types/object.rs +++ b/crates/sui-graphql-rpc/src/types/object.rs @@ -1,7 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::fmt::Write; use super::available_range::AvailableRange; @@ -9,7 +9,7 @@ use super::balance::{self, Balance}; use super::big_int::BigInt; use super::coin::Coin; use super::coin_metadata::CoinMetadata; -use super::cursor::{self, Page, Paginated, RawPaginated, Target}; +use super::cursor::{self, Page, RawPaginated, Target}; use super::digest::Digest; use super::display::{Display, DisplayEntry}; use super::dynamic_field::{DynamicField, DynamicFieldName}; @@ -24,20 +24,21 @@ use super::type_filter::{ExactTypeFilter, TypeFilter}; use super::{owner::Owner, sui_address::SuiAddress, transaction_block::TransactionBlock}; use crate::consistency::{build_objects_query, Checkpointed, View}; use crate::data::package_resolver::PackageResolver; -use crate::data::{self, Db, DbConnection, QueryExecutor}; +use crate::data::{DataLoader, Db, DbConnection, QueryExecutor}; use crate::error::Error; use crate::raw_query::RawQuery; use crate::types::base64::Base64; use crate::types::intersect; use crate::{filter, or_filter}; use async_graphql::connection::{CursorType, Edge}; +use async_graphql::dataloader::Loader; use async_graphql::{connection::Connection, *}; -use diesel::{CombineDsl, ExpressionMethods, OptionalExtension, QueryDsl}; +use diesel::{BoolExpressionMethods, ExpressionMethods, QueryDsl}; use move_core_types::annotated_value::{MoveStruct, MoveTypeLayout}; use move_core_types::language_storage::StructTag; use serde::{Deserialize, Serialize}; -use sui_indexer::models::objects::{StoredDeletedHistoryObject, StoredHistoryObject, StoredObject}; -use sui_indexer::schema::{objects, objects_history, objects_snapshot}; +use sui_indexer::models::objects::{StoredDeletedHistoryObject, StoredHistoryObject}; +use sui_indexer::schema::objects_history; use sui_indexer::types::ObjectStatus as NativeObjectStatus; use sui_indexer::types::OwnerType; use sui_types::object::bounded_visitor::BoundedVisitor; @@ -129,7 +130,7 @@ pub(crate) struct ObjectKey { /// The object's owner type: Immutable, Shared, Parent, or Address. #[derive(Union, Clone)] -pub enum ObjectOwner { +pub(crate) enum ObjectOwner { Immutable(Immutable), Shared(Shared), Parent(Parent), @@ -139,7 +140,7 @@ pub enum ObjectOwner { /// An immutable object is an object that can't be mutated, transferred, or deleted. /// Immutable objects have no owner, so anyone can use them. #[derive(SimpleObject, Clone)] -pub struct Immutable { +pub(crate) struct Immutable { #[graphql(name = "_")] dummy: Option, } @@ -147,7 +148,7 @@ pub struct Immutable { /// A shared object is an object that is shared using the 0x2::transfer::share_object function. /// Unlike owned objects, once an object is shared, it stays mutable and is accessible by anyone. #[derive(SimpleObject, Clone)] -pub struct Shared { +pub(crate) struct Shared { initial_shared_version: u64, } @@ -155,7 +156,7 @@ pub struct Shared { /// the dynamic field, or the intermediate Field object itself). Also note that if the owner /// is a parent, then it's guaranteed to be an object. #[derive(SimpleObject, Clone)] -pub struct Parent { +pub(crate) struct Parent { parent: Option, } @@ -163,28 +164,28 @@ pub struct Parent { /// either an account address (derived from a particular signature scheme) or /// an object ID. An address-owned object is accessible only to its owner and no others. #[derive(SimpleObject, Clone)] -pub struct AddressOwner { +pub(crate) struct AddressOwner { owner: Option, } -pub(crate) enum ObjectLookupKey { - LatestAt(u64), - VersionAt { - version: u64, - /// The checkpoint sequence number at which this was viewed at. +pub(crate) enum ObjectLookup { + LatestAt { + /// The parent version to be used as an optional upper bound for the query. Look for the + /// latest version of a child object that is less than or equal to this upper bound. + parent_version: Option, + /// The checkpoint sequence number at which this was viewed at checkpoint_viewed_at: u64, }, - LatestAtParentVersion { - /// The parent version to be used as the upper bound for the query. Look for the latest - /// version of a child object that is less than or equal to this upper bound. + + VersionAt { + /// The exact version of the object to be fetched. version: u64, - /// The checkpoint sequence number at which this was viewed at + /// The checkpoint sequence number at which this was viewed at. checkpoint_viewed_at: u64, }, } pub(crate) type Cursor = cursor::BcsCursor; -type Query = data::Query; /// The inner struct for the `Object`'s cursor. The `object_id` is used as the cursor, while the /// `checkpoint_viewed_at` sets the consistent upper bound for the cursor. @@ -259,6 +260,26 @@ pub(crate) enum IObject { SuinsRegistration(SuinsRegistration), } +/// DataLoader key for fetching an `Object` at a specific version, constrained by a consistency +/// cursor (if that version was created after the checkpoint the query is viewing at, then it will +/// fail). +#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)] +struct HistoricalKey { + id: SuiAddress, + version: u64, + checkpoint_viewed_at: u64, +} + +/// DataLoader key for fetching the latest version of an `Object` as of a consistency cursor. The +/// query can optionally be bounded by a `parent_version` which imposes an additional requirement +/// that the object's version is bounded above by the parent version. +#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)] +struct LatestAtKey { + id: SuiAddress, + parent_version: Option, + checkpoint_viewed_at: u64, +} + /// An object in Sui is a package (set of Move bytecode modules) or object (typed data structure /// with fields) with additional metadata detailing its id, version, transaction digest, owner /// field indicating how this object can be accessed. @@ -525,9 +546,9 @@ impl ObjectImpl<'_> { O::Immutable => Some(ObjectOwner::Immutable(Immutable { dummy: None })), O::ObjectOwner(address) => { let parent = Object::query( - ctx.data_unchecked(), + ctx, address.into(), - ObjectLookupKey::LatestAt(self.0.checkpoint_viewed_at), + Object::latest_at(self.0.checkpoint_viewed_at), ) .await .ok() @@ -756,200 +777,64 @@ impl Object { Ok(conn) } - /// Query for the object at a specific version, at the checkpoint_viewed_at if given, else - /// against the latest checkpoint. - /// - /// `checkpoint_viewed_at` represents the checkpoint sequence number at which this `Object` was - /// queried in. This is stored on `Object` so that when viewing that entity's state, it will be - /// as if it was read at the same checkpoint. - async fn query_at_version( - db: &Db, - address: SuiAddress, - version: u64, - checkpoint_viewed_at: u64, - ) -> Result, Error> { - use objects_history::dsl as history; - use objects_snapshot::dsl as snapshot; - - let version = version as i64; - - let stored_objs: Option> = db - .execute_repeatable(move |conn| { - let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else { - return Ok::<_, diesel::result::Error>(None); - }; - - conn.results(move || { - // If an object was created or mutated in a checkpoint outside the current - // available range, and never touched again, it will not show up in the - // objects_history table. Thus, we always need to check the objects_snapshot - // table as well. - let snapshot_query = snapshot::objects_snapshot - .filter(snapshot::object_id.eq(address.into_vec())) - .filter(snapshot::object_version.eq(version)); - - let historical_query = history::objects_history - .filter(history::object_id.eq(address.into_vec())) - .filter(history::object_version.eq(version)) - .filter( - history::checkpoint_sequence_number - .between(range.first as i64, range.last as i64), - ) - .order_by(history::object_version.desc()) - .limit(1); - - snapshot_query.union(historical_query) - }) - // Return optional to match the state when checkpoint_viewed_at is out of range - .optional() - }) - .await?; - - let Some(stored_objs) = stored_objs else { - return Ok(None); - }; - - // Select the max by key after the union query, because Diesel currently does not support - // order_by on union - stored_objs - .into_iter() - .max_by_key(|o| o.object_version) - .map(|obj| Self::try_from_stored_history_object(obj, checkpoint_viewed_at)) - .transpose() + /// Look-up the latest version of the object as of a given checkpoint. + pub(crate) fn latest_at(checkpoint_viewed_at: u64) -> ObjectLookup { + ObjectLookup::LatestAt { + parent_version: None, + checkpoint_viewed_at, + } } - /// Query for the latest version of an object bounded by the provided `parent_version`. - /// - /// `checkpoint_viewed_at` represents the checkpoint sequence number at which this `Object` was - /// queried in. This is stored on `Object` so that when viewing that entity's state, it will be - /// as if it was read at the same checkpoint. - async fn query_latest_at_version( - db: &Db, - address: SuiAddress, - parent_version: u64, - checkpoint_viewed_at: u64, - ) -> Result, Error> { - use objects_history::dsl as history; - use objects_snapshot::dsl as snapshot; - - let version = parent_version as i64; - let stored_objs: Option> = db - .execute_repeatable(move |conn| { - let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else { - return Ok::<_, diesel::result::Error>(None); - }; - - conn.results(move || { - // If an object was created or mutated in a checkpoint outside the current - // available range, and never touched again, it will not show up in the - // objects_history table. Thus, we always need to check the objects_snapshot - // table as well. - let snapshot_query = snapshot::objects_snapshot - .filter(snapshot::object_id.eq(address.into_vec())) - .filter(snapshot::object_version.le(version)); - - let historical_query = history::objects_history - .filter(history::object_id.eq(address.into_vec())) - .filter(history::object_version.le(version)) - .filter( - history::checkpoint_sequence_number - .between(range.first as i64, range.last as i64), - ) - .order_by(history::object_version.desc()) - .limit(1); - - snapshot_query.union(historical_query) - }) - // Return optional to match the state when checkpoint_viewed_at is out of range - .optional() - }) - .await?; - - let Some(stored_objs) = stored_objs else { - return Ok(None); - }; - - // Select the max by key after the union query, because Diesel currently does not support - // order_by on union - stored_objs - .into_iter() - .max_by_key(|o| o.object_version) - .map(|obj| Self::try_from_stored_history_object(obj, checkpoint_viewed_at)) - .transpose() + /// Look-up the latest version of an object whose version is less than or equal to its parent's + /// version, as of a given checkpoint. + pub(crate) fn under_parent(parent_version: u64, checkpoint_viewed_at: u64) -> ObjectLookup { + ObjectLookup::LatestAt { + parent_version: Some(parent_version), + checkpoint_viewed_at, + } } - /// Query for the object at the latest version at the checkpoint sequence number if given, else - /// the latest version of the object against the latest checkpoint. - async fn query_latest_at_checkpoint( - db: &Db, - address: SuiAddress, - checkpoint_viewed_at: u64, - ) -> Result, Error> { - use objects_history::dsl as history; - use objects_snapshot::dsl as snapshot; - - let stored_objs: Option> = db - .execute_repeatable(move |conn| { - let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else { - return Ok::<_, diesel::result::Error>(None); - }; - - conn.results(move || { - // If an object was created or mutated in a checkpoint outside the current - // available range, and never touched again, it will not show up in the - // objects_history table. Thus, we always need to check the objects_snapshot - // table as well. - let snapshot_query = snapshot::objects_snapshot - .filter(snapshot::object_id.eq(address.into_vec())); - - let historical_query = history::objects_history - .filter(history::object_id.eq(address.into_vec())) - .filter( - history::checkpoint_sequence_number - .between(range.first as i64, range.last as i64), - ) - .order_by(history::object_version.desc()) - .limit(1); - - snapshot_query.union(historical_query) - }) - // Return optional to match the state when checkpoint_viewed_at is out of range - .optional() - }) - .await?; - - let Some(stored_objs) = stored_objs else { - return Ok(None); - }; - - // Select the max by key after the union query, because Diesel currently does not support - // order_by on union - stored_objs - .into_iter() - .max_by_key(|o| o.object_version) - .map(|obj| Self::try_from_stored_history_object(obj, checkpoint_viewed_at)) - .transpose() + /// Look-up a specific version of the object, as of a given checkpoint. + pub(crate) fn at_version(version: u64, checkpoint_viewed_at: u64) -> ObjectLookup { + ObjectLookup::VersionAt { + version, + checkpoint_viewed_at, + } } pub(crate) async fn query( - db: &Db, - address: SuiAddress, - key: ObjectLookupKey, + ctx: &Context<'_>, + id: SuiAddress, + key: ObjectLookup, ) -> Result, Error> { + let DataLoader(loader) = &ctx.data_unchecked(); + match key { - ObjectLookupKey::LatestAt(checkpoint_sequence_number) => { - Self::query_latest_at_checkpoint(db, address, checkpoint_sequence_number).await - } - ObjectLookupKey::VersionAt { + ObjectLookup::VersionAt { version, checkpoint_viewed_at, - } => Self::query_at_version(db, address, version, checkpoint_viewed_at).await, - ObjectLookupKey::LatestAtParentVersion { - version, + } => { + loader + .load_one(HistoricalKey { + id, + version, + checkpoint_viewed_at, + }) + .await + } + ObjectLookup::LatestAt { + parent_version, checkpoint_viewed_at, - } => Self::query_latest_at_version(db, address, version, checkpoint_viewed_at).await, + } => { + loader + .load_one(LatestAtKey { + id, + parent_version, + checkpoint_viewed_at, + }) + .await + } } - .map_err(|e| Error::Internal(format!("Failed to fetch object: {e}"))) } /// Query for a singleton object identified by its type. Note: the object is assumed to be a @@ -1192,36 +1077,6 @@ impl Checkpointed for Cursor { } } -impl Paginated for StoredObject { - type Source = objects::table; - - fn filter_ge(cursor: &Cursor, query: Query) -> Query { - query.filter(objects::dsl::object_id.ge(cursor.object_id.clone())) - } - - fn filter_le(cursor: &Cursor, query: Query) -> Query { - query.filter(objects::dsl::object_id.le(cursor.object_id.clone())) - } - - fn order(asc: bool, query: Query) -> Query { - use objects::dsl; - if asc { - query.order_by(dsl::object_id.asc()) - } else { - query.order_by(dsl::object_id.desc()) - } - } -} - -impl Target for StoredObject { - fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor { - Cursor::new(HistoricalObjectCursor::new( - self.object_id.clone(), - checkpoint_viewed_at, - )) - } -} - impl RawPaginated for StoredHistoryObject { fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery { filter!( @@ -1261,6 +1116,160 @@ impl Target for StoredHistoryObject { } } +#[async_trait::async_trait] +impl Loader for Db { + type Value = Object; + type Error = Error; + + async fn load(&self, keys: &[HistoricalKey]) -> Result, Error> { + use objects_history::dsl; + + let id_versions: BTreeSet<_> = keys + .iter() + .map(|key| (key.id.into_vec(), key.version as i64)) + .collect(); + + let objects: Vec = self + .execute(move |conn| { + conn.results(move || { + let mut query = dsl::objects_history.into_boxed(); + + // TODO: Speed up using an `obj_version` table. + for (id, version) in id_versions.iter().cloned() { + query = query + .or_filter(dsl::object_id.eq(id).and(dsl::object_version.eq(version))); + } + + query + }) + }) + .await + .map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?; + + let mut id_version_to_stored = BTreeMap::new(); + for stored in objects { + let key = (addr(&stored.object_id)?, stored.object_version as u64); + id_version_to_stored.insert(key, stored); + } + + let mut result = HashMap::new(); + for key in keys { + let Some(stored) = id_version_to_stored.get(&(key.id, key.version)) else { + continue; + }; + + // Filter by key's checkpoint viewed at here. Doing this in memory because it should be + // quite rare that this query actually filters something, but encoding it in SQL is + // complicated. + if key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 { + continue; + } + + let object = + Object::try_from_stored_history_object(stored.clone(), key.checkpoint_viewed_at)?; + result.insert(*key, object); + } + + Ok(result) + } +} + +#[async_trait::async_trait] +impl Loader for Db { + type Value = Object; + type Error = Error; + + async fn load(&self, keys: &[LatestAtKey]) -> Result, Error> { + // Group keys by checkpoint viewed at and parent version -- we'll issue a separate query for + // each group. + #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)] + struct GroupKey { + checkpoint_viewed_at: u64, + parent_version: Option, + } + + let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new(); + for key in keys { + let group_key = GroupKey { + checkpoint_viewed_at: key.checkpoint_viewed_at, + parent_version: key.parent_version, + }; + + keys_by_cursor_and_parent_version + .entry(group_key) + .or_default() + .insert(key.id); + } + + // Issue concurrent reads for each group of keys. + let futures = keys_by_cursor_and_parent_version + .into_iter() + .map(|(group_key, ids)| { + self.execute_repeatable(move |conn| { + let Some(range) = AvailableRange::result(conn, group_key.checkpoint_viewed_at)? + else { + return Ok::, diesel::result::Error>( + vec![], + ); + }; + + let filter = ObjectFilter { + object_ids: Some(ids.iter().cloned().collect()), + ..Default::default() + }; + + // TODO: Implement queries that use a parent version bound using an + // `obj_version` table. + let apply_parent_bound = |q: RawQuery| { + if let Some(parent_version) = group_key.parent_version { + filter!(q, format!("object_version <= {parent_version}")) + } else { + q + } + }; + + Ok(conn + .results(move || { + build_objects_query( + View::Consistent, + range, + &Page::bounded(ids.len() as u64), + |q| apply_parent_bound(filter.apply(q)), + apply_parent_bound, + ) + .into_boxed() + })? + .into_iter() + .map(|r| (group_key, r)) + .collect()) + }) + }); + + // Wait for the reads to all finish, and gather them into the result map. + let groups = futures::future::join_all(futures).await; + + let mut results = HashMap::new(); + for group in groups { + for (group_key, stored) in + group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))? + { + let object = + Object::try_from_stored_history_object(stored, group_key.checkpoint_viewed_at)?; + + let key = LatestAtKey { + id: object.address, + checkpoint_viewed_at: group_key.checkpoint_viewed_at, + parent_version: group_key.parent_version, + }; + + results.insert(key, object); + } + } + + Ok(results) + } +} + impl From<&ObjectKind> for ObjectStatus { fn from(kind: &ObjectKind) -> Self { match kind { @@ -1328,6 +1337,7 @@ fn objects_query(filter: &ObjectFilter, range: AvailableRange, page: &Page) -> Result> { Object::query( - ctx.data_unchecked(), + ctx, self.address_impl(), - ObjectLookupKey::VersionAt { - version: self.version_impl(), - checkpoint_viewed_at: self.checkpoint_viewed_at, - }, + Object::at_version(self.version_impl(), self.checkpoint_viewed_at), ) .await .extend() diff --git a/crates/sui-graphql-rpc/src/types/owner.rs b/crates/sui-graphql-rpc/src/types/owner.rs index ca509dea582cf..be94e38311fb9 100644 --- a/crates/sui-graphql-rpc/src/types/owner.rs +++ b/crates/sui-graphql-rpc/src/types/owner.rs @@ -7,7 +7,6 @@ use super::cursor::Page; use super::dynamic_field::DynamicField; use super::dynamic_field::DynamicFieldName; use super::move_package::MovePackage; -use super::object::ObjectLookupKey; use super::stake::StakedSui; use super::suins_registration::{DomainFormat, NameService, SuinsRegistration}; use crate::data::Db; @@ -235,9 +234,9 @@ impl Owner { async fn as_object(&self, ctx: &Context<'_>) -> Result> { Object::query( - ctx.data_unchecked(), + ctx, self.address, - ObjectLookupKey::LatestAt(self.checkpoint_viewed_at), + Object::latest_at(self.checkpoint_viewed_at), ) .await .extend() @@ -446,7 +445,7 @@ impl OwnerImpl { ) -> Result> { use DynamicFieldType as T; DynamicField::query( - ctx.data_unchecked(), + ctx, self.address, parent_version, name, @@ -465,7 +464,7 @@ impl OwnerImpl { ) -> Result> { use DynamicFieldType as T; DynamicField::query( - ctx.data_unchecked(), + ctx, self.address, parent_version, name, diff --git a/crates/sui-graphql-rpc/src/types/query.rs b/crates/sui-graphql-rpc/src/types/query.rs index 75cf4bfe44cf6..b7a56dd2c22ae 100644 --- a/crates/sui-graphql-rpc/src/types/query.rs +++ b/crates/sui-graphql-rpc/src/types/query.rs @@ -26,7 +26,7 @@ use super::{ epoch::Epoch, event::{self, Event, EventFilter}, move_type::MoveType, - object::{self, Object, ObjectFilter, ObjectLookupKey}, + object::{self, Object, ObjectFilter}, owner::Owner, protocol_config::ProtocolConfigs, sui_address::SuiAddress, @@ -194,23 +194,12 @@ impl Query { let Watermark { checkpoint, .. } = *ctx.data()?; match version { - Some(version) => Object::query( - ctx.data_unchecked(), - address, - ObjectLookupKey::VersionAt { - version, - checkpoint_viewed_at: checkpoint, - }, - ) - .await - .extend(), - None => Object::query( - ctx.data_unchecked(), - address, - ObjectLookupKey::LatestAt(checkpoint), - ) - .await - .extend(), + Some(version) => Object::query(ctx, address, Object::at_version(version, checkpoint)) + .await + .extend(), + None => Object::query(ctx, address, Object::latest_at(checkpoint)) + .await + .extend(), } } diff --git a/crates/sui-graphql-rpc/src/types/suins_registration.rs b/crates/sui-graphql-rpc/src/types/suins_registration.rs index c67afcf683726..145a29ea7d093 100644 --- a/crates/sui-graphql-rpc/src/types/suins_registration.rs +++ b/crates/sui-graphql-rpc/src/types/suins_registration.rs @@ -15,7 +15,7 @@ use super::{ dynamic_field::{DynamicField, DynamicFieldName}, move_object::{MoveObject, MoveObjectImpl}, move_value::MoveValue, - object::{self, Object, ObjectFilter, ObjectImpl, ObjectLookupKey, ObjectOwner, ObjectStatus}, + object::{self, Object, ObjectFilter, ObjectImpl, ObjectOwner, ObjectStatus}, owner::OwnerImpl, stake::StakedSui, string_input::impl_string_input, @@ -411,9 +411,9 @@ impl NameService { let reverse_record_id = config.reverse_record_field_id(address.as_slice()); let Some(object) = MoveObject::query( - ctx.data_unchecked(), + ctx, reverse_record_id.into(), - ObjectLookupKey::LatestAt(checkpoint_viewed_at), + Object::latest_at(checkpoint_viewed_at), ) .await? else { diff --git a/crates/sui-graphql-rpc/src/types/transaction_block_kind/programmable.rs b/crates/sui-graphql-rpc/src/types/transaction_block_kind/programmable.rs index c4d185f841a8b..c59c02f9c9c94 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block_kind/programmable.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block_kind/programmable.rs @@ -283,7 +283,7 @@ impl MoveCallTransaction { /// The function being called, resolved. async fn function(&self, ctx: &Context<'_>) -> Result> { MoveFunction::query( - ctx.data_unchecked(), + ctx, self.native.package.into(), self.native.module.as_str(), self.native.function.as_str(), diff --git a/crates/sui-graphql-rpc/src/types/validator.rs b/crates/sui-graphql-rpc/src/types/validator.rs index 44c8f1d7d73e9..de01a9bb10bf2 100644 --- a/crates/sui-graphql-rpc/src/types/validator.rs +++ b/crates/sui-graphql-rpc/src/types/validator.rs @@ -8,7 +8,7 @@ use async_graphql::connection::{Connection, CursorType, Edge}; use super::big_int::BigInt; use super::move_object::MoveObject; -use super::object::ObjectLookupKey; +use super::object::Object; use super::sui_address::SuiAddress; use super::validator_credentials::ValidatorCredentials; use super::{address::Address, base64::Base64}; @@ -96,9 +96,9 @@ impl Validator { /// can then update the reference gas price and tallying rule on behalf of the validator. async fn operation_cap(&self, ctx: &Context<'_>) -> Result> { MoveObject::query( - ctx.data_unchecked(), + ctx, self.operation_cap_id(), - ObjectLookupKey::LatestAt(self.checkpoint_viewed_at), + Object::latest_at(self.checkpoint_viewed_at), ) .await .extend() @@ -108,9 +108,9 @@ impl Validator { /// and to compound staking rewards. async fn staking_pool(&self, ctx: &Context<'_>) -> Result> { MoveObject::query( - ctx.data_unchecked(), + ctx, self.staking_pool_id(), - ObjectLookupKey::LatestAt(self.checkpoint_viewed_at), + Object::latest_at(self.checkpoint_viewed_at), ) .await .extend() @@ -120,9 +120,9 @@ impl Validator { /// the amount of SUI tokens that each past SUI staker can withdraw in the future. async fn exchange_rates(&self, ctx: &Context<'_>) -> Result> { MoveObject::query( - ctx.data_unchecked(), + ctx, self.exchange_rates_id(), - ObjectLookupKey::LatestAt(self.checkpoint_viewed_at), + Object::latest_at(self.checkpoint_viewed_at), ) .await .extend() diff --git a/crates/sui-graphql-rpc/src/types/zklogin_verify_signature.rs b/crates/sui-graphql-rpc/src/types/zklogin_verify_signature.rs index 95063b53531de..fd64b731177ae 100644 --- a/crates/sui-graphql-rpc/src/types/zklogin_verify_signature.rs +++ b/crates/sui-graphql-rpc/src/types/zklogin_verify_signature.rs @@ -77,7 +77,7 @@ pub(crate) async fn verify_zklogin_signature( // fetch on-chain JWKs from dynamic field of system object. let df = DynamicField::query( - ctx.data_unchecked(), + ctx, SUI_AUTHENTICATOR_STATE_ADDRESS.into(), None, DynamicFieldName {