Skip to content

Commit

Permalink
rename back to scan_limit
Browse files Browse the repository at this point in the history
  • Loading branch information
wlmyng committed Jun 28, 2024
1 parent a11abd0 commit f114258
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 81 deletions.
33 changes: 19 additions & 14 deletions crates/sui-graphql-rpc/src/raw_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,25 @@ macro_rules! or_filter {
}};
}

/// Accepts two `Rawquery` instances and a third expression consisting of which columns to join on.
#[macro_export]
macro_rules! inner_join {
($lhs:expr, $rhs:ident => ($rhs_query:expr, $alias:expr), using: [$using:expr $(, $more_using:expr)*]) => {{
use $crate::raw_query::RawQuery;

let (lhs_sql, mut binds) = $lhs.finish();
let (rhs_sql, rhs_binds) = $rhs_query.finish();

binds.extend(rhs_binds);

let aliased = format!("({}) AS {}", rhs_sql, $alias);
let using_clause = format!("USING ({})", stringify!($using $(, $more_using)*));
let sql = format!("{} INNER JOIN {} {}", lhs_sql, aliased, using_clause);

RawQuery::new(sql, binds)
}};
}

/// Accepts a `SELECT FROM` format string and optional subqueries. If subqueries are provided, there
/// should be curly braces `{}` in the format string to interpolate each subquery's sql string into.
/// Concatenates subqueries to the `SELECT FROM` clause, and creates a new `RawQuery` from the
Expand All @@ -192,20 +211,6 @@ macro_rules! query {
$crate::raw_query::RawQuery::new($select, vec![])
};

// Handle a select clause with ongoing subquery and a new subquery with alias
($select:expr, $ongoing_subquery:expr, aliased => ($new_subquery:expr, $alias:expr)) => {{
use $crate::raw_query::RawQuery;
let (ongoing_sql, mut ongoing_binds) = $ongoing_subquery.finish();
let (new_sql, new_binds) = $new_subquery.finish();

ongoing_binds.extend(new_binds);

let new_subquery_sql = format!("({}) AS {}", new_sql, $alias);
let select_formatted = format!($select, ongoing_sql, new_subquery_sql);

RawQuery::new(select_formatted, ongoing_binds)
}};

// Expects a select clause and one or more subqueries. The select clause should contain curly
// braces for subqueries to be interpolated into. Use when the subqueries can be aliased
// directly in the select statement.
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-graphql-rpc/src/types/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl Address {
before: Option<transaction_block::Cursor>,
relation: Option<AddressTransactionBlockRelationship>,
filter: Option<TransactionBlockFilter>,
within_checkpoints: Option<u64>,
scan_limit: Option<u64>,
) -> Result<Connection<String, TransactionBlock>> {
use AddressTransactionBlockRelationship as R;
let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
Expand All @@ -169,7 +169,7 @@ impl Address {
page,
filter,
self.checkpoint_viewed_at,
Some(within_checkpoints.unwrap_or(10000000)),
Some(scan_limit.unwrap_or(10000000)),
)
.await
.extend()
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-graphql-rpc/src/types/coin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl Coin {
last: Option<u64>,
before: Option<transaction_block::Cursor>,
filter: Option<TransactionBlockFilter>,
within_checkpoints: Option<u64>,
scan_limit: Option<u64>,
) -> Result<Connection<String, TransactionBlock>> {
ObjectImpl(&self.super_.super_)
.received_transaction_blocks(
Expand All @@ -210,7 +210,7 @@ impl Coin {
last,
before,
filter,
within_checkpoints,
scan_limit,
)
.await
}
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-graphql-rpc/src/types/coin_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl CoinMetadata {
last: Option<u64>,
before: Option<transaction_block::Cursor>,
filter: Option<TransactionBlockFilter>,
within_checkpoints: Option<u64>,
scan_limit: Option<u64>,
) -> Result<Connection<String, TransactionBlock>> {
ObjectImpl(&self.super_.super_)
.received_transaction_blocks(
Expand All @@ -199,7 +199,7 @@ impl CoinMetadata {
last,
before,
filter,
within_checkpoints,
scan_limit,
)
.await
}
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-graphql-rpc/src/types/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl Epoch {
return Ok(Connection::new(false, false));
};

let within_checkpoints = self
let scan_limit = self
.stored
.last_checkpoint_id
.map(|id| id as u64)
Expand All @@ -262,7 +262,7 @@ impl Epoch {
page,
filter,
self.checkpoint_viewed_at,
Some(within_checkpoints),
Some(scan_limit),
)
.await
.extend()
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-graphql-rpc/src/types/move_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl MoveObject {
last: Option<u64>,
before: Option<transaction_block::Cursor>,
filter: Option<TransactionBlockFilter>,
within_checkpoints: Option<u64>,
scan_limit: Option<u64>,
) -> Result<Connection<String, TransactionBlock>> {
ObjectImpl(&self.super_)
.received_transaction_blocks(
Expand All @@ -278,7 +278,7 @@ impl MoveObject {
last,
before,
filter,
within_checkpoints,
scan_limit,
)
.await
}
Expand Down
12 changes: 2 additions & 10 deletions crates/sui-graphql-rpc/src/types/move_package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,18 +243,10 @@ impl MovePackage {
last: Option<u64>,
before: Option<transaction_block::Cursor>,
filter: Option<TransactionBlockFilter>,
within_checkpoints: Option<u64>,
scan_limit: Option<u64>,
) -> Result<Connection<String, TransactionBlock>> {
ObjectImpl(&self.super_)
.received_transaction_blocks(
ctx,
first,
after,
last,
before,
filter,
within_checkpoints,
)
.received_transaction_blocks(ctx, first, after, last, before, filter, scan_limit)
.await
}

Expand Down
10 changes: 5 additions & 5 deletions crates/sui-graphql-rpc/src/types/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ pub(crate) struct HistoricalObjectCursor {
arg(name = "last", ty = "Option<u64>"),
arg(name = "before", ty = "Option<transaction_block::Cursor>"),
arg(name = "filter", ty = "Option<TransactionBlockFilter>"),
arg(name = "within_checkpoints", ty = "Option<u64>"),
arg(name = "scan_limit", ty = "Option<u64>"),
ty = "Connection<String, TransactionBlock>",
desc = "The transaction blocks that sent objects to this object."
),
Expand Down Expand Up @@ -433,7 +433,7 @@ impl Object {
last: Option<u64>,
before: Option<transaction_block::Cursor>,
filter: Option<TransactionBlockFilter>,
within_checkpoints: Option<u64>,
scan_limit: Option<u64>,
) -> Result<Connection<String, TransactionBlock>> {
ObjectImpl(self)
.received_transaction_blocks(
Expand All @@ -443,7 +443,7 @@ impl Object {
last,
before,
filter,
within_checkpoints,
scan_limit,
)
.await
}
Expand Down Expand Up @@ -602,7 +602,7 @@ impl ObjectImpl<'_> {
last: Option<u64>,
before: Option<transaction_block::Cursor>,
filter: Option<TransactionBlockFilter>,
within_checkpoints: Option<u64>,
scan_limit: Option<u64>,
) -> Result<Connection<String, TransactionBlock>> {
let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;

Expand All @@ -621,7 +621,7 @@ impl ObjectImpl<'_> {
page,
filter,
self.0.checkpoint_viewed_at,
Some(within_checkpoints.unwrap_or(10000000)),
Some(scan_limit.unwrap_or(10000000)),
)
.await
.extend()
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-graphql-rpc/src/types/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ impl Query {
last: Option<u64>,
before: Option<transaction_block::Cursor>,
filter: Option<TransactionBlockFilter>,
within_checkpoints: Option<u64>,
scan_limit: Option<u64>,
) -> Result<Connection<String, TransactionBlock>> {
let Watermark { checkpoint, .. } = *ctx.data()?;

Expand All @@ -323,7 +323,7 @@ impl Query {
page,
filter.unwrap_or_default(),
checkpoint,
Some(within_checkpoints.unwrap_or(10000000)),
Some(scan_limit.unwrap_or(10000000)),
)
.await
.extend()
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-graphql-rpc/src/types/stake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl StakedSui {
last: Option<u64>,
before: Option<transaction_block::Cursor>,
filter: Option<TransactionBlockFilter>,
within_checkpoints: Option<u64>,
scan_limit: Option<u64>,
) -> Result<Connection<String, TransactionBlock>> {
ObjectImpl(&self.super_.super_)
.received_transaction_blocks(
Expand All @@ -218,7 +218,7 @@ impl StakedSui {
last,
before,
filter,
within_checkpoints,
scan_limit,
)
.await
}
Expand Down
12 changes: 2 additions & 10 deletions crates/sui-graphql-rpc/src/types/suins_registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,18 +245,10 @@ impl SuinsRegistration {
last: Option<u64>,
before: Option<transaction_block::Cursor>,
filter: Option<TransactionBlockFilter>,
within_checkpoints: Option<u64>,
scan_limit: Option<u64>,
) -> Result<Connection<String, TransactionBlock>> {
ObjectImpl(&self.super_.super_)
.received_transaction_blocks(
ctx,
first,
after,
last,
before,
filter,
within_checkpoints,
)
.received_transaction_blocks(ctx, first, after, last, before, filter, scan_limit)
.await
}

Expand Down
56 changes: 28 additions & 28 deletions crates/sui-graphql-rpc/src/types/transaction_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
consistency::Checkpointed,
data::{self, DataLoader, Db, DbConnection, QueryExecutor},
error::Error,
filter, query,
filter, inner_join, query,
raw_query::RawQuery,
server::watermark_task::Watermark,
tx_lookups::{
Expand Down Expand Up @@ -307,13 +307,14 @@ impl TransactionBlock {
/// the cursor if they are consistent.
///
/// Filters that involve a combination of `recvAddress`, `inputObject`, `changedObject`, and
/// `function` should provide a value for `within_checkpoints`.
/// `function` should provide a value for `scan_limit`. This indicates how many transactions to
/// scan through per the filter conditions.
pub(crate) async fn paginate(
ctx: &Context<'_>,
page: Page<Cursor>,
filter: TransactionBlockFilter,
checkpoint_viewed_at: u64,
within_checkpoints: Option<u64>,
scan_limit: Option<u64>,
) -> Result<Connection<String, TransactionBlock>, Error> {
filter.is_consistent()?;
// If cursors are provided, defer to the `checkpoint_viewed_at` in the cursor if they are
Expand Down Expand Up @@ -344,10 +345,10 @@ impl TransactionBlock {
)
.unwrap();

// If `within_checkpoints` is set, we need to adjust the lower and upper bounds. It is up to
// the caller of `TransactionBlock::paginate` to determine whether `within_checkpoints` is
// If `scan_limit` is set, we need to adjust the lower and upper bounds. It is up to
// the caller of `TransactionBlock::paginate` to determine whether `scan_limit` is
// required.
if let Some(scan_limit) = within_checkpoints {
if let Some(scan_limit) = scan_limit {
if page.is_from_front() {
hi_cp = std::cmp::min(hi_cp, lo_cp.saturating_add(scan_limit));
} else {
Expand All @@ -362,19 +363,19 @@ impl TransactionBlock {

let (prev, next, transactions): (bool, bool, Vec<StoredTransaction>) = db
.execute_repeatable(move |conn| {

// The min or first `tx_sequence_number` of a checkpoint is the previous
// checkpoint's `network_total_transactions`. Because this refers to a historical
// checkpoint, if we yield a `None` result, we can return early.
let lo_tx = match lo_cp {
0 => 0,
_ => {
let sequence_number: Option<i64> = conn
.first(move || {
cp::checkpoints
.select(cp::network_total_transactions)
.filter(cp::sequence_number.eq((lo_cp - 1) as i64))
}).optional()?;
.first(move || {
cp::checkpoints
.select(cp::network_total_transactions)
.filter(cp::sequence_number.eq((lo_cp - 1) as i64))
})
.optional()?;

match sequence_number {
Some(sequence_number) => sequence_number as u64,
Expand All @@ -393,7 +394,8 @@ impl TransactionBlock {
cp::checkpoints
.select(cp::network_total_transactions)
.filter(cp::sequence_number.eq(hi_cp as i64))
}).optional()?;
})
.optional()?;

match sequence_number {
Some(sequence_number) => (sequence_number as u64).saturating_sub(1),
Expand All @@ -408,11 +410,17 @@ impl TransactionBlock {
// If `transaction_ids` is specified, we can use that in lieu of the range under the
// assumption that it will further constrain the rows we look up.
if let Some(txs) = &filter.transaction_ids {
let transaction_ids: Vec<TxLookup> = conn.results(move || select_ids(txs, &bound).into_boxed())?;
let transaction_ids: Vec<TxLookup> =
conn.results(move || select_ids(txs, &bound).into_boxed())?;
if transaction_ids.is_empty() {
return Ok::<_, diesel::result::Error>((false, false, vec![]));
}
bound = TxLookupBound::from_set(transaction_ids.into_iter().map(|tx| tx.tx_sequence_number as u64).collect());
bound = TxLookupBound::from_set(
transaction_ids
.into_iter()
.map(|tx| tx.tx_sequence_number as u64)
.collect(),
);
}

let mut subqueries = vec![];
Expand Down Expand Up @@ -458,11 +466,9 @@ impl TransactionBlock {
subqueries.push((select_sender(sender, &bound), "tx_senders"));
}
// And if there are no filters at all, we can operate directly on the main table
// TODO (wlmyng): at this point we can directly make the query. we just need to produce the `tx_sequence_numbers: Vec<i64>`
else {
subqueries.push((
select_tx(None, &bound, "transactions"),
"transactions",
));
subqueries.push((select_tx(None, &bound, "transactions"), "transactions"));
}
}

Expand All @@ -477,13 +483,9 @@ impl TransactionBlock {
let mut subquery = subqueries.pop().unwrap().0;

if !subqueries.is_empty() {
subquery = query!(
"SELECT tx_sequence_number FROM ({}) AS initial",
subquery
);
subquery = query!("SELECT tx_sequence_number FROM ({}) AS initial", subquery);
while let Some((subselect, alias)) = subqueries.pop() {
subquery =
query!("{} INNER JOIN {} USING (tx_sequence_number)", subquery, aliased => (subselect, alias));
subquery = inner_join!(subquery, rhs => (subselect, alias), using: ["tx_sequence_number"]);
}
}

Expand All @@ -494,9 +496,7 @@ impl TransactionBlock {

let tx_sequence_numbers = results
.into_iter()
.map(|x| {
x.tx_sequence_number
})
.map(|x| x.tx_sequence_number)
.collect::<Vec<i64>>();

// then just do a multi-get
Expand Down

0 comments on commit f114258

Please sign in to comment.