Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prune and restore mempool transactions with equal nonces for the same sender #2091

Merged
merged 3 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 110 additions & 66 deletions src/datastore/pg-write-store.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
import * as assert from 'assert';
import {
getOrAdd,
I32_MAX,
getIbdBlockHeight,
getUintEnvOrDefault,
unwrapOptionalProp,
} from '../helpers';
import { getOrAdd, I32_MAX, getIbdBlockHeight, getUintEnvOrDefault } from '../helpers';
import {
DbBlock,
DbTx,
Expand Down Expand Up @@ -92,7 +86,6 @@ import { parseResolver, parseZoneFileTxt } from '../event-stream/bns/bns-helpers
import { SyntheticPoxEventName } from '../pox-helpers';
import { logger } from '../logger';
import {
PgJsonb,
PgSqlClient,
batchIterate,
connectPostgres,
Expand Down Expand Up @@ -122,6 +115,8 @@ class MicroblockGapError extends Error {
}
}

type TransactionHeader = { txId: string; sender: string; nonce: number };

/**
* Extends `PgStore` to provide data insertion functions. These added features are usually called by
* the `EventServer` upon receiving blockchain events from a Stacks node. It also deals with chain data
Expand Down Expand Up @@ -208,8 +203,12 @@ export class PgWriteStore extends PgStore {
if (!isCanonical) {
markBlockUpdateDataAsNonCanonical(data);
} else {
const txIds = data.txs.map(d => d.tx.tx_id);
await this.pruneMempoolTxs(sql, txIds);
const prunableTxs: TransactionHeader[] = data.txs.map(d => ({
txId: d.tx.tx_id,
sender: d.tx.sender_address,
nonce: d.tx.nonce,
}));
await this.pruneMempoolTxs(sql, prunableTxs);
}
setTotalBlockUpdateDataExecutionCost(data);

Expand Down Expand Up @@ -245,7 +244,11 @@ export class PgWriteStore extends PgStore {
);
const restoredMempoolTxs = await this.restoreMempoolTxs(
sql,
orphanedAndMissingTxs.map(tx => tx.tx_id)
orphanedAndMissingTxs.map(tx => ({
txId: tx.tx_id,
sender: tx.sender_address,
nonce: tx.nonce,
}))
);
restoredMempoolTxs.restoredTxs.forEach(txId => {
logger.info(`Restored micro-orphaned tx to mempool ${txId}`);
Expand Down Expand Up @@ -688,15 +691,23 @@ export class PgWriteStore extends PgStore {
// Restore any micro-orphaned txs into the mempool
const restoredMempoolTxs = await this.restoreMempoolTxs(
sql,
microOrphanedTxs.map(tx => tx.tx_id)
microOrphanedTxs.map(tx => ({
txId: tx.tx_id,
sender: tx.sender_address,
nonce: tx.nonce,
}))
);
restoredMempoolTxs.restoredTxs.forEach(txId => {
logger.info(`Restored micro-orphaned tx to mempool ${txId}`);
});
}

const candidateTxIds = data.txs.map(d => d.tx.tx_id);
const removedTxsResult = await this.pruneMempoolTxs(sql, candidateTxIds);
const prunableTxs: TransactionHeader[] = data.txs.map(d => ({
txId: d.tx.tx_id,
sender: d.tx.sender_address,
nonce: d.tx.nonce,
}));
const removedTxsResult = await this.pruneMempoolTxs(sql, prunableTxs);
if (removedTxsResult.removedTxs.length > 0) {
logger.debug(
`Removed ${removedTxsResult.removedTxs.length} microblock-txs from mempool table during microblock ingestion`
Expand Down Expand Up @@ -2471,9 +2482,9 @@ export class PgWriteStore extends PgStore {
`;
// Any txs restored need to be pruned from the mempool
const updatedMbTxs = updatedMbTxsQuery.map(r => parseTxQueryResult(r));
const txsToPrune = updatedMbTxs
const txsToPrune: TransactionHeader[] = updatedMbTxs
.filter(tx => tx.canonical && tx.microblock_canonical)
.map(tx => tx.tx_id);
.map(tx => ({ txId: tx.tx_id, sender: tx.sender_address, nonce: tx.nonce }));
const removedTxsResult = await this.pruneMempoolTxs(sql, txsToPrune);
if (removedTxsResult.removedTxs.length > 0) {
logger.debug(
Expand Down Expand Up @@ -2648,17 +2659,31 @@ export class PgWriteStore extends PgStore {
* marked from canonical to non-canonical.
* @param txIds - List of transactions to update in the mempool
*/
async restoreMempoolTxs(sql: PgSqlClient, txIds: string[]): Promise<{ restoredTxs: string[] }> {
if (txIds.length === 0) return { restoredTxs: [] };
for (const txId of txIds) {
logger.debug(`Restoring mempool tx: ${txId}`);
}

async restoreMempoolTxs(
sql: PgSqlClient,
transactions: TransactionHeader[]
): Promise<{ restoredTxs: string[] }> {
if (transactions.length === 0) return { restoredTxs: [] };
if (logger.isLevelEnabled('debug'))
for (const tx of transactions)
logger.debug(`Restoring mempool tx: ${tx.txId} sender: ${tx.sender} nonce: ${tx.nonce}`);

// Also restore transactions for the same `sender_address` with the same `nonce`.
const inputData = transactions.map(t => [t.txId.replace('0x', '\\x'), t.sender, t.nonce]);
const updatedRows = await sql<{ tx_id: string }[]>`
WITH restored AS (
WITH input_data (tx_id, sender_address, nonce) AS (VALUES ${sql(inputData)}),
affected_mempool_tx_ids AS (
SELECT m.tx_id
FROM mempool_txs AS m
INNER JOIN input_data AS i
ON m.sender_address = i.sender_address AND m.nonce = i.nonce::int
UNION
SELECT tx_id::bytea FROM input_data
),
restored AS (
UPDATE mempool_txs
SET pruned = FALSE, status = ${DbTxStatus.Pending}
WHERE tx_id IN ${sql(txIds)} AND pruned = TRUE
SET pruned = false, status = ${DbTxStatus.Pending}
WHERE pruned = true AND tx_id IN (SELECT DISTINCT tx_id FROM affected_mempool_tx_ids)
RETURNING tx_id
),
count_update AS (
Expand All @@ -2668,60 +2693,67 @@ export class PgWriteStore extends PgStore {
)
SELECT tx_id FROM restored
`;

const updatedTxs = updatedRows.map(r => r.tx_id);
for (const tx of updatedTxs) {
logger.debug(`Updated mempool tx: ${tx}`);
}

let restoredTxs = updatedRows.map(r => r.tx_id);

// txs that didnt exist in the mempool need to be inserted into the mempool
if (updatedRows.length < txIds.length) {
const txsRequiringInsertion = txIds.filter(txId => !updatedTxs.includes(txId));

logger.debug(`To restore mempool txs, ${txsRequiringInsertion.length} txs require insertion`);

const restoredTxIds = updatedRows.map(r => r.tx_id);
if (logger.isLevelEnabled('debug'))
for (const txId of restoredTxIds) logger.debug(`Restored mempool tx: ${txId}`);

// Transactions that didn't exist in the mempool need to be inserted into the mempool
const txIdsRequiringInsertion = transactions
.filter(tx => !restoredTxIds.includes(tx.txId))
.map(tx => tx.txId);
if (txIdsRequiringInsertion.length) {
logger.debug(
`To restore mempool txs, ${txIdsRequiringInsertion.length} txs require insertion`
);
const txs: TxQueryResult[] = await sql`
SELECT DISTINCT ON(tx_id) ${sql(TX_COLUMNS)}
FROM txs
WHERE tx_id IN ${sql(txsRequiringInsertion)}
WHERE tx_id IN ${sql(txIdsRequiringInsertion)}
ORDER BY tx_id, block_height DESC, microblock_sequence DESC, tx_index DESC
`;

if (txs.length !== txsRequiringInsertion.length) {
if (txs.length !== txIdsRequiringInsertion.length) {
logger.error(`Not all txs requiring insertion were found`);
}

const mempoolTxs = convertTxQueryResultToDbMempoolTx(txs);

await this.updateMempoolTxs({ mempoolTxs });

restoredTxs = [...restoredTxs, ...txsRequiringInsertion];

for (const tx of mempoolTxs) {
logger.debug(`Inserted mempool tx: ${tx.tx_id}`);
}
if (logger.isLevelEnabled('debug'))
for (const tx of mempoolTxs) logger.debug(`Inserted non-existing mempool tx: ${tx.tx_id}`);
}

return { restoredTxs: restoredTxs };
return { restoredTxs: [...restoredTxIds, ...txIdsRequiringInsertion] };
}

/**
* Remove transactions in the mempool table. This should be called when transactions are
* mined into a block.
* @param txIds - List of transactions to update in the mempool
*/
async pruneMempoolTxs(sql: PgSqlClient, txIds: string[]): Promise<{ removedTxs: string[] }> {
if (txIds.length === 0) return { removedTxs: [] };
for (const txId of txIds) {
logger.debug(`Pruning mempool tx: ${txId}`);
}
async pruneMempoolTxs(
sql: PgSqlClient,
transactions: TransactionHeader[]
): Promise<{ removedTxs: string[] }> {
if (transactions.length === 0) return { removedTxs: [] };
if (logger.isLevelEnabled('debug'))
for (const tx of transactions)
logger.debug(`Pruning mempool tx: ${tx.txId} sender: ${tx.sender} nonce: ${tx.nonce}`);

// Also prune transactions for the same `sender_address` with the same `nonce`.
const inputData = transactions.map(t => [t.txId.replace('0x', '\\x'), t.sender, t.nonce]);
const updateResults = await sql<{ tx_id: string }[]>`
WITH pruned AS (
WITH input_data (tx_id, sender_address, nonce) AS (VALUES ${sql(inputData)}),
affected_mempool_tx_ids AS (
SELECT m.tx_id
FROM mempool_txs AS m
INNER JOIN input_data AS i
ON m.sender_address = i.sender_address AND m.nonce = i.nonce::int
UNION
SELECT tx_id::bytea FROM input_data
),
pruned AS (
UPDATE mempool_txs
SET pruned = true
WHERE tx_id IN ${sql(txIds)} AND pruned = FALSE
WHERE pruned = false AND tx_id IN (SELECT DISTINCT tx_id FROM affected_mempool_tx_ids)
RETURNING tx_id
),
count_update AS (
Expand Down Expand Up @@ -2769,20 +2801,28 @@ export class PgWriteStore extends PgStore {
indexBlockHash: string,
canonical: boolean,
updatedEntities: ReOrgUpdatedEntities
): Promise<{ txsMarkedCanonical: string[]; txsMarkedNonCanonical: string[] }> {
const result: { txsMarkedCanonical: string[]; txsMarkedNonCanonical: string[] } = {
): Promise<{
txsMarkedCanonical: TransactionHeader[];
txsMarkedNonCanonical: TransactionHeader[];
}> {
const result: {
txsMarkedCanonical: TransactionHeader[];
txsMarkedNonCanonical: TransactionHeader[];
} = {
txsMarkedCanonical: [],
txsMarkedNonCanonical: [],
};

const q = new PgWriteQueue();
q.enqueue(async () => {
const txResult = await sql<{ tx_id: string; update_balances_count: number }[]>`
const txResult = await sql<
{ tx_id: string; sender_address: string; nonce: number; update_balances_count: number }[]
>`
WITH updated_txs AS (
UPDATE txs
SET canonical = ${canonical}
WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical}
RETURNING tx_id, sender_address, sponsor_address, fee_rate, sponsored, canonical
RETURNING tx_id, sender_address, nonce, sponsor_address, fee_rate, sponsored, canonical
),
affected_addresses AS (
SELECT
Expand Down Expand Up @@ -2817,22 +2857,26 @@ export class PgWriteStore extends PgStore {
SET balance = ft_balances.balance + EXCLUDED.balance
RETURNING ft_balances.address
)
SELECT tx_id, (SELECT COUNT(*)::int FROM update_ft_balances) AS update_balances_count
SELECT tx_id, sender_address, nonce, (SELECT COUNT(*)::int FROM update_ft_balances) AS update_balances_count
FROM updated_txs
`;
const txIds = txResult.map(row => row.tx_id);
const txs = txResult.map(row => ({
txId: row.tx_id,
sender: row.sender_address,
nonce: row.nonce,
}));
if (canonical) {
updatedEntities.markedCanonical.txs += txResult.count;
result.txsMarkedCanonical = txIds;
result.txsMarkedCanonical = txs;
} else {
updatedEntities.markedNonCanonical.txs += txResult.count;
result.txsMarkedNonCanonical = txIds;
result.txsMarkedNonCanonical = txs;
}
if (txResult.count) {
await sql`
UPDATE principal_stx_txs
SET canonical = ${canonical}
WHERE tx_id IN ${sql(txIds)}
WHERE tx_id IN ${sql(txs.map(t => t.txId))}
AND index_block_hash = ${indexBlockHash} AND canonical != ${canonical}
`;
}
Expand Down
2 changes: 1 addition & 1 deletion tests/api/datastore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4352,7 +4352,7 @@ describe('postgres datastore', () => {
microblock_sequence: undefined,
tx_count: 2, // Tx from block 2b now counts, but compensates with tx from block 2
tx_count_unanchored: 2,
mempool_tx_count: 1,
mempool_tx_count: 0,
});

const b1 = await db.getBlock({ hash: block1.block_hash });
Expand Down
Loading
Loading