From 9aa182719c4ac65ef9286a22ad9808d0831f6629 Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Wed, 25 Sep 2024 15:41:03 -0600 Subject: [PATCH 1/3] fix: prune and restore --- src/datastore/pg-write-store.ts | 176 ++++++++++++++++++++------------ tests/api/mempool.test.ts | 60 +++++++++++ 2 files changed, 170 insertions(+), 66 deletions(-) diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index 32badbeb7..50d864877 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -1,11 +1,5 @@ import * as assert from 'assert'; -import { - getOrAdd, - I32_MAX, - getIbdBlockHeight, - getUintEnvOrDefault, - unwrapOptionalProp, -} from '../helpers'; +import { getOrAdd, I32_MAX, getIbdBlockHeight, getUintEnvOrDefault } from '../helpers'; import { DbBlock, DbTx, @@ -92,7 +86,6 @@ import { parseResolver, parseZoneFileTxt } from '../event-stream/bns/bns-helpers import { SyntheticPoxEventName } from '../pox-helpers'; import { logger } from '../logger'; import { - PgJsonb, PgSqlClient, batchIterate, connectPostgres, @@ -122,6 +115,8 @@ class MicroblockGapError extends Error { } } +type TransactionHeader = { txId: string; sender: string; nonce: number }; + /** * Extends `PgStore` to provide data insertion functions. These added features are usually called by * the `EventServer` upon receiving blockchain events from a Stacks node. It also deals with chain data @@ -208,8 +203,12 @@ export class PgWriteStore extends PgStore { if (!isCanonical) { markBlockUpdateDataAsNonCanonical(data); } else { - const txIds = data.txs.map(d => d.tx.tx_id); - await this.pruneMempoolTxs(sql, txIds); + const prunableTxs: TransactionHeader[] = data.txs.map(d => ({ + txId: d.tx.tx_id, + sender: d.tx.sender_address, + nonce: d.tx.nonce, + })); + await this.pruneMempoolTxs(sql, prunableTxs); } setTotalBlockUpdateDataExecutionCost(data); @@ -245,7 +244,11 @@ export class PgWriteStore extends PgStore { ); const restoredMempoolTxs = await this.restoreMempoolTxs( sql, - orphanedAndMissingTxs.map(tx => tx.tx_id) + orphanedAndMissingTxs.map(tx => ({ + txId: tx.tx_id, + sender: tx.sender_address, + nonce: tx.nonce, + })) ); restoredMempoolTxs.restoredTxs.forEach(txId => { logger.info(`Restored micro-orphaned tx to mempool ${txId}`); @@ -688,15 +691,23 @@ export class PgWriteStore extends PgStore { // Restore any micro-orphaned txs into the mempool const restoredMempoolTxs = await this.restoreMempoolTxs( sql, - microOrphanedTxs.map(tx => tx.tx_id) + microOrphanedTxs.map(tx => ({ + txId: tx.tx_id, + sender: tx.sender_address, + nonce: tx.nonce, + })) ); restoredMempoolTxs.restoredTxs.forEach(txId => { logger.info(`Restored micro-orphaned tx to mempool ${txId}`); }); } - const candidateTxIds = data.txs.map(d => d.tx.tx_id); - const removedTxsResult = await this.pruneMempoolTxs(sql, candidateTxIds); + const prunableTxs: TransactionHeader[] = data.txs.map(d => ({ + txId: d.tx.tx_id, + sender: d.tx.sender_address, + nonce: d.tx.nonce, + })); + const removedTxsResult = await this.pruneMempoolTxs(sql, prunableTxs); if (removedTxsResult.removedTxs.length > 0) { logger.debug( `Removed ${removedTxsResult.removedTxs.length} microblock-txs from mempool table during microblock ingestion` @@ -2471,9 +2482,9 @@ export class PgWriteStore extends PgStore { `; // Any txs restored need to be pruned from the mempool const updatedMbTxs = updatedMbTxsQuery.map(r => parseTxQueryResult(r)); - const txsToPrune = updatedMbTxs + const txsToPrune: TransactionHeader[] = updatedMbTxs .filter(tx => tx.canonical && tx.microblock_canonical) - .map(tx => tx.tx_id); + .map(tx => ({ txId: tx.tx_id, sender: tx.sender_address, nonce: tx.nonce })); const removedTxsResult = await this.pruneMempoolTxs(sql, txsToPrune); if (removedTxsResult.removedTxs.length > 0) { logger.debug( @@ -2648,17 +2659,31 @@ export class PgWriteStore extends PgStore { * marked from canonical to non-canonical. * @param txIds - List of transactions to update in the mempool */ - async restoreMempoolTxs(sql: PgSqlClient, txIds: string[]): Promise<{ restoredTxs: string[] }> { - if (txIds.length === 0) return { restoredTxs: [] }; - for (const txId of txIds) { - logger.debug(`Restoring mempool tx: ${txId}`); - } - + async restoreMempoolTxs( + sql: PgSqlClient, + transactions: TransactionHeader[] + ): Promise<{ restoredTxs: string[] }> { + if (transactions.length === 0) return { restoredTxs: [] }; + if (logger.isLevelEnabled('debug')) + for (const tx of transactions) + logger.debug(`Restoring mempool tx: ${tx.txId} sender: ${tx.sender} nonce: ${tx.nonce}`); + + // Also restore transactions for the same `sender_address` with the same `nonce`. + const inputData = transactions.map(t => [t.txId.replace('0x', '\\x'), t.sender, t.nonce]); const updatedRows = await sql<{ tx_id: string }[]>` - WITH restored AS ( + WITH input_data (tx_id, sender_address, nonce) AS (VALUES ${sql(inputData)}), + affected_mempool_tx_ids AS ( + SELECT m.tx_id + FROM mempool_txs AS m + INNER JOIN input_data AS i + ON m.sender_address = i.sender_address AND m.nonce = i.nonce::int + UNION + SELECT tx_id::bytea FROM input_data + ), + restored AS ( UPDATE mempool_txs - SET pruned = FALSE, status = ${DbTxStatus.Pending} - WHERE tx_id IN ${sql(txIds)} AND pruned = TRUE + SET pruned = false, status = ${DbTxStatus.Pending} + WHERE pruned = true AND tx_id IN (SELECT DISTINCT tx_id FROM affected_mempool_tx_ids) RETURNING tx_id ), count_update AS ( @@ -2668,43 +2693,35 @@ export class PgWriteStore extends PgStore { ) SELECT tx_id FROM restored `; - - const updatedTxs = updatedRows.map(r => r.tx_id); - for (const tx of updatedTxs) { - logger.debug(`Updated mempool tx: ${tx}`); - } - - let restoredTxs = updatedRows.map(r => r.tx_id); - - // txs that didnt exist in the mempool need to be inserted into the mempool - if (updatedRows.length < txIds.length) { - const txsRequiringInsertion = txIds.filter(txId => !updatedTxs.includes(txId)); - - logger.debug(`To restore mempool txs, ${txsRequiringInsertion.length} txs require insertion`); - + const restoredTxIds = updatedRows.map(r => r.tx_id); + if (logger.isLevelEnabled('debug')) + for (const txId of restoredTxIds) logger.debug(`Restored mempool tx: ${txId}`); + + // Transactions that didn't exist in the mempool need to be inserted into the mempool + const txIdsRequiringInsertion = transactions + .filter(tx => !restoredTxIds.includes(tx.txId)) + .map(tx => tx.txId); + if (txIdsRequiringInsertion.length) { + logger.debug( + `To restore mempool txs, ${txIdsRequiringInsertion.length} txs require insertion` + ); const txs: TxQueryResult[] = await sql` SELECT DISTINCT ON(tx_id) ${sql(TX_COLUMNS)} FROM txs - WHERE tx_id IN ${sql(txsRequiringInsertion)} + WHERE tx_id IN ${sql(txIdsRequiringInsertion)} ORDER BY tx_id, block_height DESC, microblock_sequence DESC, tx_index DESC `; - - if (txs.length !== txsRequiringInsertion.length) { + if (txs.length !== txIdsRequiringInsertion.length) { logger.error(`Not all txs requiring insertion were found`); } const mempoolTxs = convertTxQueryResultToDbMempoolTx(txs); - await this.updateMempoolTxs({ mempoolTxs }); - - restoredTxs = [...restoredTxs, ...txsRequiringInsertion]; - - for (const tx of mempoolTxs) { - logger.debug(`Inserted mempool tx: ${tx.tx_id}`); - } + if (logger.isLevelEnabled('debug')) + for (const tx of mempoolTxs) logger.debug(`Inserted non-existing mempool tx: ${tx.tx_id}`); } - return { restoredTxs: restoredTxs }; + return { restoredTxs: [...restoredTxIds, ...txIdsRequiringInsertion] }; } /** @@ -2712,16 +2729,31 @@ export class PgWriteStore extends PgStore { * mined into a block. * @param txIds - List of transactions to update in the mempool */ - async pruneMempoolTxs(sql: PgSqlClient, txIds: string[]): Promise<{ removedTxs: string[] }> { - if (txIds.length === 0) return { removedTxs: [] }; - for (const txId of txIds) { - logger.debug(`Pruning mempool tx: ${txId}`); - } + async pruneMempoolTxs( + sql: PgSqlClient, + transactions: TransactionHeader[] + ): Promise<{ removedTxs: string[] }> { + if (transactions.length === 0) return { removedTxs: [] }; + if (logger.isLevelEnabled('debug')) + for (const tx of transactions) + logger.debug(`Pruning mempool tx: ${tx.txId} sender: ${tx.sender} nonce: ${tx.nonce}`); + + // Also prune transactions for the same `sender_address` with the same `nonce`. + const inputData = transactions.map(t => [t.txId.replace('0x', '\\x'), t.sender, t.nonce]); const updateResults = await sql<{ tx_id: string }[]>` - WITH pruned AS ( + WITH input_data (tx_id, sender_address, nonce) AS (VALUES ${sql(inputData)}), + affected_mempool_tx_ids AS ( + SELECT m.tx_id + FROM mempool_txs AS m + INNER JOIN input_data AS i + ON m.sender_address = i.sender_address AND m.nonce = i.nonce::int + UNION + SELECT tx_id::bytea FROM input_data + ), + pruned AS ( UPDATE mempool_txs SET pruned = true - WHERE tx_id IN ${sql(txIds)} AND pruned = FALSE + WHERE pruned = false AND tx_id IN (SELECT DISTINCT tx_id FROM affected_mempool_tx_ids) RETURNING tx_id ), count_update AS ( @@ -2769,20 +2801,28 @@ export class PgWriteStore extends PgStore { indexBlockHash: string, canonical: boolean, updatedEntities: ReOrgUpdatedEntities - ): Promise<{ txsMarkedCanonical: string[]; txsMarkedNonCanonical: string[] }> { - const result: { txsMarkedCanonical: string[]; txsMarkedNonCanonical: string[] } = { + ): Promise<{ + txsMarkedCanonical: TransactionHeader[]; + txsMarkedNonCanonical: TransactionHeader[]; + }> { + const result: { + txsMarkedCanonical: TransactionHeader[]; + txsMarkedNonCanonical: TransactionHeader[]; + } = { txsMarkedCanonical: [], txsMarkedNonCanonical: [], }; const q = new PgWriteQueue(); q.enqueue(async () => { - const txResult = await sql<{ tx_id: string; update_balances_count: number }[]>` + const txResult = await sql< + { tx_id: string; sender_address: string; nonce: number; update_balances_count: number }[] + >` WITH updated_txs AS ( UPDATE txs SET canonical = ${canonical} WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical} - RETURNING tx_id, sender_address, sponsor_address, fee_rate, sponsored, canonical + RETURNING tx_id, sender_address, nonce, sponsor_address, fee_rate, sponsored, canonical ), affected_addresses AS ( SELECT @@ -2817,22 +2857,26 @@ export class PgWriteStore extends PgStore { SET balance = ft_balances.balance + EXCLUDED.balance RETURNING ft_balances.address ) - SELECT tx_id, (SELECT COUNT(*)::int FROM update_ft_balances) AS update_balances_count + SELECT tx_id, sender_address, nonce, (SELECT COUNT(*)::int FROM update_ft_balances) AS update_balances_count FROM updated_txs `; - const txIds = txResult.map(row => row.tx_id); + const txs = txResult.map(row => ({ + txId: row.tx_id, + sender: row.sender_address, + nonce: row.nonce, + })); if (canonical) { updatedEntities.markedCanonical.txs += txResult.count; - result.txsMarkedCanonical = txIds; + result.txsMarkedCanonical = txs; } else { updatedEntities.markedNonCanonical.txs += txResult.count; - result.txsMarkedNonCanonical = txIds; + result.txsMarkedNonCanonical = txs; } if (txResult.count) { await sql` UPDATE principal_stx_txs SET canonical = ${canonical} - WHERE tx_id IN ${sql(txIds)} + WHERE tx_id IN ${sql(txs.map(t => t.txId))} AND index_block_hash = ${indexBlockHash} AND canonical != ${canonical} `; } diff --git a/tests/api/mempool.test.ts b/tests/api/mempool.test.ts index 4db5dce75..efed62148 100644 --- a/tests/api/mempool.test.ts +++ b/tests/api/mempool.test.ts @@ -2059,4 +2059,64 @@ describe('mempool tests', () => { }, }); }); + + test('prunes transactions with nonces that were already confirmed', async () => { + // Initial block + await db.update( + new TestBlockBuilder({ + block_height: 1, + index_block_hash: `0x0001`, + parent_index_block_hash: `0x0000`, + }).build() + ); + + // Add tx with nonce = 1 to the mempool + const sender_address = 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6'; + const mempoolTx = testMempoolTx({ tx_id: `0xff0001`, sender_address, nonce: 1 }); + await db.updateMempoolTxs({ mempoolTxs: [mempoolTx] }); + const check0 = await supertest(api.server).get( + `/extended/v1/address/${sender_address}/mempool` + ); + expect(check0.body.results).toHaveLength(1); + + // Confirm a different tx with the same nonce = 1 by the same sender without it ever touching + // the mempool + await db.update( + new TestBlockBuilder({ + block_height: 2, + index_block_hash: `0x0002`, + parent_index_block_hash: `0x0001`, + }) + .addTx({ tx_id: `0xaa0001`, sender_address, nonce: 1 }) + .build() + ); + + // Mempool tx should now be pruned + const check1 = await supertest(api.server).get( + `/extended/v1/address/${sender_address}/mempool` + ); + expect(check1.body.results).toHaveLength(0); + + // Re-org block 2 + await db.update( + new TestBlockBuilder({ + block_height: 2, + index_block_hash: `0x00b2`, + parent_index_block_hash: `0x0001`, + }).build() + ); + await db.update( + new TestBlockBuilder({ + block_height: 3, + index_block_hash: `0x00b3`, + parent_index_block_hash: `0x00b2`, + }).build() + ); + + // Now both conflicting nonce txs should coexist in the mempool (like RBF) + const check2 = await supertest(api.server).get( + `/extended/v1/address/${sender_address}/mempool` + ); + expect(check2.body.results).toHaveLength(2); + }); }); From 69dc58df164077d5761c697055e6246eb03c315d Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Wed, 25 Sep 2024 15:55:10 -0600 Subject: [PATCH 2/3] fix: mempool tests --- tests/api/mempool.test.ts | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/tests/api/mempool.test.ts b/tests/api/mempool.test.ts index efed62148..8bf472bc9 100644 --- a/tests/api/mempool.test.ts +++ b/tests/api/mempool.test.ts @@ -53,7 +53,7 @@ describe('mempool tests', () => { index_block_hash: `0x0${block_height}`, parent_index_block_hash: `0x0${block_height - 1}`, }) - .addTx({ tx_id: `0x111${block_height}` }) + .addTx({ tx_id: `0x111${block_height}`, nonce: block_height }) .build(); await db.update(block); const mempoolTx = testMempoolTx({ tx_id: `0x0${block_height}` }); @@ -95,18 +95,21 @@ describe('mempool tests', () => { type_id: DbTxTypeId.TokenTransfer, fee_rate: BigInt(100 * block_height), raw_tx: '0x' + 'ff'.repeat(block_height), + nonce: block_height, }); const mempoolTx2 = testMempoolTx({ tx_id: `0x1${block_height}`, type_id: DbTxTypeId.ContractCall, fee_rate: BigInt(200 * block_height), raw_tx: '0x' + 'ff'.repeat(block_height + 10), + nonce: block_height + 1, }); const mempoolTx3 = testMempoolTx({ tx_id: `0x2${block_height}`, type_id: DbTxTypeId.SmartContract, fee_rate: BigInt(300 * block_height), raw_tx: '0x' + 'ff'.repeat(block_height + 20), + nonce: block_height + 2, }); await db.updateMempoolTxs({ mempoolTxs: [mempoolTx1, mempoolTx2, mempoolTx3] }); } @@ -1519,7 +1522,10 @@ describe('mempool tests', () => { .toString() .repeat(2)}${chainA_Suffix}`, }) - .addTx({ tx_id: `0x0${txId++}${chainA_Suffix}` }) + .addTx({ + tx_id: `0x0${txId++}${chainA_Suffix}`, + sender_address: `STACKS${chainA_BlockHeight}`, + }) .build(); await db.update(block); } @@ -1546,7 +1552,10 @@ describe('mempool tests', () => { .toString() .repeat(2)}${parentChainSuffix}`, }) - .addTx({ tx_id: `0x0${txId++}${chainB_Suffix}` }) // Txs that don't exist in the mempool and will be reorged + .addTx({ + tx_id: `0x0${txId++}${chainB_Suffix}`, + sender_address: `STACKS${chainB_BlockHeight + 1}`, + }) // Txs that don't exist in the mempool and will be reorged .build(); await db.update(block); } From 4a127dc44c645d182f6fbae4ec2762b9c6ac7f51 Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Wed, 25 Sep 2024 16:16:45 -0600 Subject: [PATCH 3/3] fix: other tests --- tests/api/datastore.test.ts | 2 +- tests/api/socket-io.test.ts | 18 +++++++++++++++--- tests/api/websocket.test.ts | 6 +++++- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/tests/api/datastore.test.ts b/tests/api/datastore.test.ts index e24e30ff2..740efceac 100644 --- a/tests/api/datastore.test.ts +++ b/tests/api/datastore.test.ts @@ -4352,7 +4352,7 @@ describe('postgres datastore', () => { microblock_sequence: undefined, tx_count: 2, // Tx from block 2b now counts, but compensates with tx from block 2 tx_count_unanchored: 2, - mempool_tx_count: 1, + mempool_tx_count: 0, }); const b1 = await db.getBlock({ hash: block1.block_hash }); diff --git a/tests/api/socket-io.test.ts b/tests/api/socket-io.test.ts index af2a127b1..1fb7b8fd2 100644 --- a/tests/api/socket-io.test.ts +++ b/tests/api/socket-io.test.ts @@ -86,7 +86,11 @@ describe('socket-io', () => { const block = new TestBlockBuilder().addTx().build(); await db.update(block); - const mempoolTx = testMempoolTx({ tx_id: '0x01', status: DbTxStatus.Pending }); + const mempoolTx = testMempoolTx({ + tx_id: '0x01', + status: DbTxStatus.Pending, + sender_address: 'TEST', + }); await db.updateMempoolTxs({ mempoolTxs: [mempoolTx] }); const mempoolResult = await mempoolWaiter; const txResult = await txWaiters[0]; @@ -195,7 +199,11 @@ describe('socket-io', () => { const block = new TestBlockBuilder().addTx().build(); await db.update(block); - const mempoolTx = testMempoolTx({ tx_id: '0x01', status: DbTxStatus.Pending }); + const mempoolTx = testMempoolTx({ + tx_id: '0x01', + status: DbTxStatus.Pending, + sender_address: 'TEST', + }); await db.updateMempoolTxs({ mempoolTxs: [mempoolTx] }); const mempoolResult = await mempoolWaiter; const txResult = await txWaiters[0]; @@ -242,7 +250,11 @@ describe('socket-io', () => { .addTx({ tx_id: '0x0101' }) .build(); await db.update(block1); - const mempoolTx = testMempoolTx({ tx_id: '0x01', status: DbTxStatus.Pending }); + const mempoolTx = testMempoolTx({ + tx_id: '0x01', + status: DbTxStatus.Pending, + sender_address: 'TEST', + }); await db.updateMempoolTxs({ mempoolTxs: [mempoolTx] }); const pendingResult = await txWaiters[0]; diff --git a/tests/api/websocket.test.ts b/tests/api/websocket.test.ts index e69c591f0..bfd3682c5 100644 --- a/tests/api/websocket.test.ts +++ b/tests/api/websocket.test.ts @@ -101,7 +101,11 @@ describe('websocket notifications', () => { const block = new TestBlockBuilder().addTx().build(); await db.update(block); - const mempoolTx = testMempoolTx({ tx_id: txId, status: DbTxStatus.Pending }); + const mempoolTx = testMempoolTx({ + tx_id: txId, + status: DbTxStatus.Pending, + sender_address: 'TEST', + }); await db.updateMempoolTxs({ mempoolTxs: [mempoolTx] }); const microblock = new TestMicroblockStreamBuilder()