Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3831 from LiskHQ/3809-fix_disappearing_broadcasts
Browse files Browse the repository at this point in the history
Fix disappearing broadcasts from broadcaster queue - Closes #3809
  • Loading branch information
shuse2 authored Jun 19, 2019
2 parents bad41a2 + 51b48e8 commit f805908
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 31 deletions.
21 changes: 11 additions & 10 deletions elements/lisk-transaction-pool/src/transaction_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,6 @@ export class TransactionPool extends EventEmitter {
public addPendingTransaction(transaction: Transaction): AddTransactionResult {
const pendingQueue: QueueNames = 'pending';

this.emit(EVENT_VERIFIED_TRANSACTION_ONCE, {
action: ACTION_ADD_PENDING_TRANSACTIONS,
payload: [transaction],
});

return this.addTransactionToQueue(pendingQueue, transaction);
}

Expand All @@ -250,11 +245,6 @@ export class TransactionPool extends EventEmitter {
): AddTransactionResult {
const verifiedQueue: QueueNames = 'verified';

this.emit(EVENT_VERIFIED_TRANSACTION_ONCE, {
action: ACTION_ADD_VERIFIED_TRANSACTIONS,
payload: [transaction],
});

return this.addTransactionToQueue(verifiedQueue, transaction);
}

Expand Down Expand Up @@ -472,6 +462,17 @@ export class TransactionPool extends EventEmitter {
payload: [transaction],
});

// If transaction is added to one of the queues which semantically mean that transactions are verified, then fire the event.
if (queueName === 'verified' || queueName === 'pending') {
this.emit(EVENT_VERIFIED_TRANSACTION_ONCE, {
action:
queueName === 'verified'
? ACTION_ADD_VERIFIED_TRANSACTIONS
: ACTION_ADD_PENDING_TRANSACTIONS,
payload: [transaction],
});
}

return {
isFull: false,
alreadyExists: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
TransactionPool,
TransactionPoolConfiguration,
Transaction,
EVENT_VERIFIED_TRANSACTION_ONCE,
} from '../../src/transaction_pool';
import {
fakeCheckFunctionGenerator,
Expand Down Expand Up @@ -317,5 +318,49 @@ describe('transaction pool events', () => {
});
});
});

describe('on adding transactions to transaction pool', () => {
it('should not fire event EVENT_VERIFIED_TRANSACTION_ONCE if transaction unable to add to the pending queue', done => {
transactionPool.addPendingTransaction(transactions[0]);
transactionPool.on(EVENT_VERIFIED_TRANSACTION_ONCE, () => {
done('should not be called');
});
const { alreadyExists } = transactionPool.addPendingTransaction(
transactions[0],
);
expect(alreadyExists).to.equal(true);
// wait 1 second to ensure that event is not called for transaction
setTimeout(done, 1000);
});

it('should fire event EVENT_VERIFIED_TRANSACTION_ONCE if transaction is added to the pending queue after adding transaction', done => {
transactionPool.on(EVENT_VERIFIED_TRANSACTION_ONCE, ({ payload }) => {
expect(payload[0]).to.eql(transactions[0]);
done();
});
transactionPool.addPendingTransaction(transactions[0]);
});

it('should not fire event EVENT_VERIFIED_TRANSACTION_ONCE if transaction unable to add to the verified queue', done => {
transactionPool.addVerifiedTransaction(transactions[0]);
transactionPool.on(EVENT_VERIFIED_TRANSACTION_ONCE, () => {
done('should not be called');
});
const { alreadyExists } = transactionPool.addVerifiedTransaction(
transactions[0],
);
expect(alreadyExists).to.equal(true);
// wait 1 second to ensure that event is not called for transaction
setTimeout(done, 1000);
});

it('should fire event EVENT_VERIFIED_TRANSACTION_ONCE if transaction is added to the verified queue after adding transaction', done => {
transactionPool.on(EVENT_VERIFIED_TRANSACTION_ONCE, ({ payload }) => {
expect(payload[0]).to.eql(transactions[0]);
done();
});
transactionPool.addVerifiedTransaction(transactions[0]);
});
});
});
});
64 changes: 43 additions & 21 deletions framework/src/modules/chain/logic/broadcaster.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,49 +150,71 @@ class Broadcaster {
async filterQueue() {
library.logger.debug(`Broadcasts before filtering: ${this.queue.length}`);

this.queue = await this.queue.reduce(async (prev, broadcast) => {
const filteredBroadcast = await prev;
const transactionIdsNotInPool = [];
this.queue = this.queue.filter(broadcast => {
if (broadcast.options.immediate) {
return filteredBroadcast;
return false;
}

if (broadcast.options.data) {
let transactionId;
if (broadcast.options.data.transaction) {
// Look for a transaction of a given "id" when broadcasting transactions
// Look for a transaction of a given "id"
transactionId = broadcast.options.data.transaction.id;
} else if (broadcast.options.data.signature) {
// Look for a corresponding "transactionId" of a given signature when broadcasting signatures
// Look for a corresponding "transactionId" of a given signature
transactionId = broadcast.options.data.signature.transactionId;
}
if (!transactionId) {
return filteredBroadcast;
return false;
}
// Broadcast if transaction is in transaction pool
if (modules.transactions.transactionInPool(transactionId)) {
filteredBroadcast.push(broadcast);
return filteredBroadcast;
if (!modules.transactions.transactionInPool(transactionId)) {
transactionIdsNotInPool.push(transactionId);
}
// Don't broadcast if transaction is already confirmed
return true;
}
return true;
});

const persistedTransactionIds = (await Promise.all(
transactionIdsNotInPool.map(async transactionId => {
try {
const isPersisted = await library.storage.entities.Transaction.isPersisted(
{
id: transactionId,
}
);
if (!isPersisted) {
filteredBroadcast.push(broadcast);
}
return filteredBroadcast;
} catch (err) {
return filteredBroadcast;
return {
transactionId,
isPersisted,
};
} catch (e) {
// if there is an error for transaction id then remove it from the broadcasts
return {
transactionId,
isPersisted: true,
};
}
}

filteredBroadcast.push(broadcast);
})
))
.filter(({ isPersisted }) => isPersisted)
.map(({ transactionId }) => transactionId);

return filteredBroadcast;
}, []);
this.queue = this.queue.filter(broadcast => {
if (broadcast.options.data) {
let transactionId;
if (broadcast.options.data.transaction) {
// Look for a transaction of a given "id"
transactionId = broadcast.options.data.transaction.id;
} else if (broadcast.options.data.signature) {
// Look for a corresponding "transactionId" of a given signature
transactionId = broadcast.options.data.signature.transactionId;
}
return !persistedTransactionIds.includes(transactionId);
}
return true;
});

library.logger.debug(`Broadcasts after filtering: ${this.queue.length}`);

Expand Down
4 changes: 4 additions & 0 deletions framework/src/modules/chain/logic/transaction_pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ class TransactionPool {
]);
}

if (transaction.bundled) {
return this.addBundledTransaction(transaction, cb);
}

return this.verifyTransactions([transaction]).then(
({ transactionsResponses }) => {
if (transactionsResponses[0].status === TransactionStatus.OK) {
Expand Down
14 changes: 14 additions & 0 deletions framework/test/mocha/unit/modules/chain/logic/broadcaster.js
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,20 @@ describe('Broadcaster', () => {
.to.eql(auxBroadcasts);
});
});

describe('when all transactions are confirmed', () => {
beforeEach(async () => {
modulesStub.transactions.transactionInPool.returns(false);
library.storage.entities.Transaction.isPersisted.resolves(true);
});

it('should remove all of them from broadcaster.queue', async () => {
await broadcaster.filterQueue();
expect(broadcaster.queue)
.to.be.an('Array')
.to.eql([]);
});
});
});
});

Expand Down
11 changes: 11 additions & 0 deletions framework/test/mocha/unit/modules/chain/logic/transaction_pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,17 @@ describe('transactionPool', () => {
});
});

it('should add transaction to the received queue if the bundled property = true', done => {
transaction.bundled = true;
const addBundledTransactionStub = sinonSandbox
.stub(transactionPool, 'addBundledTransaction')
.callsArg(1);
transactionPool.processUnconfirmedTransaction(transaction, false, () => {
expect(addBundledTransactionStub).to.be.calledWith(transaction);
done();
});
});

it('should add transaction to the verified queue when status is OK', done => {
const transactionsResponses = [
{
Expand Down

0 comments on commit f805908

Please sign in to comment.