From fe56f25782ded9667125b25f61a053f553d90ff9 Mon Sep 17 00:00:00 2001 From: Liam Grace Date: Thu, 13 Jun 2019 11:47:31 +0100 Subject: [PATCH] [FABN-1251] Added start/end block functionality - Ability to specify startBlock and endBlock when using fabric-network to handle events - Added configurable event hub timeout when creating listeners - Added configurable wait time between trying new event hubs - Updated documentation - Updated typescript definitions Change-Id: Ifca2d775e6b5f9d583ddb7bd5b81d9b8f0018bcb Signed-off-by: Liam Grace --- docs/tutorials/event-checkpointer.md | 153 ++++++++++-------- docs/tutorials/listening-to-events.md | 92 ++++++----- fabric-client/lib/ChannelEventHub.js | 2 +- fabric-network/lib/contract.js | 2 +- .../lib/impl/event/abstracteventlistener.js | 84 +++++++++- .../lib/impl/event/basecheckpointer.js | 3 +- .../lib/impl/event/blockeventlistener.js | 33 ++-- .../lib/impl/event/commiteventlistener.js | 11 +- .../lib/impl/event/contracteventlistener.js | 61 ++++--- .../lib/impl/event/eventhubmanager.js | 5 +- .../lib/impl/event/filesystemcheckpointer.js | 30 +++- fabric-network/lib/network.js | 20 ++- .../test/impl/event/abstracteventlistener.js | 125 +++++++++++--- .../test/impl/event/basecheckpointer.js | 4 +- .../test/impl/event/blockeventlistener.js | 24 +-- .../test/impl/event/commiteventlistener.js | 41 +++-- .../test/impl/event/contracteventlistener.js | 74 +++++++-- .../test/impl/event/filesystemcheckpointer.js | 38 ++++- fabric-network/test/network.js | 46 +++++- fabric-network/types/index.d.ts | 9 +- test/scenario/features/event.feature | 4 + test/scenario/features/lib/network.js | 15 +- test/scenario/features/steps/docker_steps.js | 4 +- test/scenario/features/steps/network_steps.js | 14 +- 24 files changed, 619 insertions(+), 275 deletions(-) diff --git a/docs/tutorials/event-checkpointer.md b/docs/tutorials/event-checkpointer.md index 0b26758cdc..2892d9f7e8 100644 --- a/docs/tutorials/event-checkpointer.md +++ b/docs/tutorials/event-checkpointer.md @@ -1,32 +1,40 @@ +# Event Checkpointing + This tutorial describes the approaches that can be selected by users of the fabric-network module for replaying missed events emitted by peers. -### Overview +## Overview Events are emitted by peers when blocks are committed. Two types of events support checkpointing: + 1. Contract events (also known as chaincode events) - Defined in transactions to be emitted. E.g. an event emitted when a commercial paper is sold 2. Block Events - Emitted when a block is committed -In the case of an application crashing and events being missed, applications may still want to execute the event callback for the event it missed. Peers in a Fabric network support event replay, and to support this, the fabric-network module supports checkpointing strategies that track the last block and transactions in that block, that have been seen by the client. +In the case of an application crashing and events being missed, applications may still want to execute the event callback for the event it missed. Peers in a Fabric network support event replay, and to support this, the fabric-network module supports checkpointing strategies that track the last block and transactions in that block, that have been seen by the client. + +### Disclaimer + +Checkpointing in its current form has not been tested to deal with all recovery scenarios, so it should be used alongside existing recovery infrastructure. {@link module:fabric-network~FileSystemCheckpointer} is designed for Proof of Technology projects, so we strongly suggest implementing your own checkpointer using the {@link module:fabric-network~BaseCheckpointer} interface. + +### Notes -#### Notes -`Block Number` = `Block Height - 1` +`Block Number` = `Block Height - 1` When using checkpointing: + - The listener will only catch up on events if the `startBlock` is less than the current `Block Number` - If the latest block in the checkpointer is block `n` the `startBlock` will be `n + 1` (e.g. for checkpoint `blockNumber=1`,`startBlock=2`) -### Checkpointers +## Checkpointers The `BaseCheckpoint` class is an interface that is to be used by all Checkpoint classes. fabric-network has one default class, {@link module:fabric-network~FileSystemCheckpointer} that is exported as a factory in the {@link module:fabric-network~CheckpointFactories}. The `FILE_SYSTEM_CHECKPOINTER` is the default checkpointer. -A checkpoint factory is a function that returns an instance with `BaseCheckpointer` as a parent class. These classes implement the `async save(channelName, listenerName)` and `async load()` functions. - -A checkpointer is called each time the event callback is triggered. +A checkpoint factory is a function that returns an instance with `BaseCheckpointer` as a parent class. These classes implement the `async save(channelName, listenerName)` and `async load()` functions. -The checkpointer can be set when connecting to a gateway or when creating the event listener. +`BaseCheckpointer.save()` is called after the async callback function given to the event listener has finished processing. ### Custom Checkpointer -Users can configure their own checkpointer. This requires two components to be created: +Configuring a custom checkpointer requires two components to be created: + 1. The Checkpointer class 2. The Factory @@ -36,76 +44,79 @@ const path = require('path'); const { Gateway } = require('fabric-network'); class FileSystemCheckpointer extends BaseCheckpointer { - constructor(channelName, listenerName, fsOptions) { - super(channelName, listenerName); - this.basePath = path.resolve(fsOptions.basePath); - this.channelName = channelName; - this.listenerName = listenerName; - } - - /** - * Initializes the checkpointer directory structure - */ - async _initialize() { - const cpPath = this._getCheckpointFileName() - } - - /** - * Constructs the checkpoint files name - */ - _getCheckpointFileName() { - let filePath = path.join(this._basePath, this._channelName); - if (this._chaincodeId) { - filePath = path.join(filePath, this._chaincodeId); - } - return path.join(filePath, this._listenerName); - } - - async save(transactionId, blockNumber) { - const cpPath = this._getCheckpointFileName() - if (!(await fs.exists(cpPath))) { - await this._initialize(); - } - const latestCheckpoint = await this.load(); - if (Number(latestCheckpoint.blockNumber) === Number(blockNumber)) { - const transactionIds = latestCheckpoint.transactionIds; - latestCheckpoint.transactionIds = transactionIds; - } else { - latestCheckpoint.blockNumber = blockNumber; - latestCheckpoint.transactionIds = [transactionIds]; - } - await fs.writeFile(cppPath, JSON.stringify(latestCheckpoint)); - } - - async load() { - const cpPath = this._getCheckpointFileName(this._chaincodeId); - if (!(await fs.exists(cpPath))) { - await this._initialize(); - } - const chkptBuffer = await fs.readFile(cpFile); - let checkpoint = checkpointBuffer.toString('utf8'); - if (!checkpoint) { - checkpoint = {}; - } else { - checkpoint = JSON.parse(checkpoint); - } - return checkpoint; - } + constructor(channelName, listenerName, fsOptions) { + super(channelName, listenerName); + this.basePath = path.resolve(fsOptions.basePath); + this.channelName = channelName; + this.listenerName = listenerName; + } + + /** + * Initializes the checkpointer directory structure + */ + async _initialize() { + const cpPath = this._getCheckpointFileName() + } + + /** + * Constructs the checkpoint files name + */ + _getCheckpointFileName() { + let filePath = path.join(this._basePath, this._channelName); + if (this._chaincodeId) { + filePath = path.join(filePath, this._chaincodeId); + } + return path.join(filePath, this._listenerName); + } + + async save(transactionId, blockNumber) { + const cpPath = this._getCheckpointFileName() + if (!(await fs.exists(cpPath))) { + await this._initialize(); + } + const latestCheckpoint = await this.load(); + if (Number(latestCheckpoint.blockNumber) === Number(blockNumber)) { + const transactionIds = latestCheckpoint.transactionIds; + latestCheckpoint.transactionIds = transactionIds; + } else { + latestCheckpoint.blockNumber = blockNumber; + latestCheckpoint.transactionIds = [transactionIds]; + } + await fs.writeFile(cppPath, JSON.stringify(latestCheckpoint)); + } + + async load() { + const cpPath = this._getCheckpointFileName(this._chaincodeId); + if (!(await fs.exists(cpPath))) { + await this._initialize(); + } + const chkptBuffer = await fs.readFile(cpFile); + let checkpoint = checkpointBuffer.toString('utf8'); + if (!checkpoint) { + checkpoint = {}; + } else { + checkpoint = JSON.parse(checkpoint); + } + return checkpoint; + } } function File_SYSTEM_CHECKPOINTER_FACTORY(channelName, listenerName, options) { - return new FileSystemCheckpointer(channelName, listenerName, options); + return new FileSystemCheckpointer(channelName, listenerName, options); } const gateway = new Gateway(); await gateway.connect({ - checkpointer: { - factory: FILE_SYSTEM_CHECKPOINTER_FACTORY, - options: {basePath: '/home/blockchain/checkpoints'} // These options will vary depending on the checkpointer implementation + checkpointer: { + factory: FILE_SYSTEM_CHECKPOINTER_FACTORY, + options: {basePath: '/home/blockchain/checkpoints'} // These options will vary depending on the checkpointer implementation }); ``` -`Note:` When using the filesystem checkpointer, use absolute paths rather than relative paths -When specifying a specific type of checkpointer for a listener, the `checkpointer` option in {@link module:fabric-network.Network~EventListenerOptions`} +In addition to `save()` and `load()` the `BaseCheckpointer` interface also has the `loadLatestCheckpoint()` function which, in the case that `load()` returns a list of checkpoints, will return the latest incomplete checkpoint (or whichever is most relevant for the specific implementation). + +`Note:` When using the filesystem checkpointer, use absolute paths rather than relative paths. + +When specifying a specific type of checkpointer for a listener, the `checkpointer` option in {@link module:fabric-network.Network~EventListenerOptions`}. diff --git a/docs/tutorials/listening-to-events.md b/docs/tutorials/listening-to-events.md index 4562eac39a..cd9a812b1d 100644 --- a/docs/tutorials/listening-to-events.md +++ b/docs/tutorials/listening-to-events.md @@ -1,30 +1,35 @@ + +# Listening to events with Fabric Network + This tutorial describes the different ways to listen to events emitted by a network using the fabric-network module. -### Overview +## Overview There are three event types that can be subscribed to: + 1. Contract events - Those emitted explicitly by the chaincode developer within a transaction 2. Transaction (Commit) events - Those emitted automatically when a transaction is committed after an invoke 3. Block events - Those emitted automatically when a block is committed Listening for these events allows the application to react without directly calling a transaction. This is ideal in use cases such as monitoring network analytics. -### Usage +## Usage Each listener type takes at least one parameter, the event callback. This is the function that is called when an event is received. The callback function given is expected to be a promise, meaning that the callback can perform asynchronous tasks without risking missing events. -### Options +## Options + {@link module:fabric-network.Network~EventListenerOptions}. *Note*: Listeners will connect to event hubs and ask to receive _unfiltered_ events by default. To receive _filtered_ events, set `EventListenerOptions.filtered: true`. -### Naming +## Naming All event listeners (including CommitEventListeners, which use the transaction ID) must have a unique name at the `Network` level -#### Contract events +## Contract events ```javascript const gateway = new Gateway(); @@ -39,16 +44,17 @@ const contract = network.getContract('my-contract'); * @param {module:fabric-network.Network~EventListenerOptions} options **/ const listener = await contract.addContractListener('my-contract-listener', 'sale', (err, event, blockNumber, transactionId, status) => { - if (err) { - console.error(err); - return; - } - console.log(`Block Number: ${blockNumber} Transaction ID: ${transactionId} Status: ${status}`); + if (err) { + console.error(err); + return; + } + console.log(`Block Number: ${blockNumber} Transaction ID: ${transactionId} Status: ${status}`); }) ``` + Notice that there is no need to specify an event hub, as the `EventHubSelectionStrategy` will select it automatically. -#### Block events +### Block events ```javascript const gateway = new Gateway(); @@ -61,20 +67,24 @@ const network = await gateway.getNetwork('mychannel'); * @param {module:fabric-network.Network~EventListenerOptions} options **/ const listener = await network.addBlockListener('my-block-listener', (error, block) => { - if (err) { - console.error(err); - return; - } - console.log(`Block: ${block}`); -}) + if (err) { + console.error(err); + return; + } + console.log(`Block: ${block}`); +}); ``` -When listening for block events, it is important to specify if you want a filtered or none filtered event, as this determines which event hub is compatible with the request. -#### Commit events +When listening for block events, it is important to specify if you want a filtered or none filtered event, as this determines which event hub is compatible with the request. + +## Commit events *Note*: The listener listener name is _transactionId_._\_ -#### Option 1: +There are two methods for subscribing to a transaction commit event. Using {@link module:fabric-network.Network} and directly, using {@link module:fabric-network.Transaction}. Using {@link module:fabric-network.Transaction} directly, abstracts away the need to specify which transaction ID you wish to listen for. + +### Option 1 + ```javascript const gateway = new Gateway(); await gateway.connect(connectionProfile, gatewayOptions); @@ -83,20 +93,21 @@ const contract = network.getContract('my-contract'); const transaction = contract.newTransaction('sell'); /** - * @param {String} transactionId the name of the event listener + * @param {String} transactionId the transaction ID * @param {Function} callback the callback function with signature (error, transactionId, status, blockNumber) * @param {Object} options **/ const listener = await network.addCommitListener(transaction.getTransactionID().getTransactionID(), (err, transactionId, status, blockNumber) => { - if (err) { - console.error(err); - return; - } - console.log(`Transaction ID: ${transactionId} Status: ${status} Block number: ${blockNumber}`); -}); + if (err) { + console.error(err); + return; + } + console.log(`Transaction ID: ${transactionId} Status: ${status} Block number: ${blockNumber}`); +}); ``` -#### Option 2: +### Option 2 + ```javascript const gateway = new Gateway(); await gateway.connect(connectionProfile, gatewayOptions); @@ -105,24 +116,29 @@ const contract = network.getContract('my-contract'); const transaction = contract.newTransaction('sell'); /** - * @param {String} transactionId the name of the event listener + * @param {String} transactionId the transaction ID * @param {Function} callback the callback function with signature (error, transactionId, status, blockNumber) * @param {Object} options **/ const listener = await transaction.addCommitListener((err, transactionId, status, blockNumber) => { - if (err) { - console.error(err); - return; - } - console.log(`Transaction ID: ${transactionId} Status: ${status} Block number: ${blockNumber}`); -}); + if (err) { + console.error(err); + return; + } + console.log(`Transaction ID: ${transactionId} Status: ${status} Block number: ${blockNumber}`); +}); ``` Both `Network.addCommitListener` and `Contract.addCommitListener` have an optional `eventHub` parameter. When set, the listener will only listen to that event hub, and in the event of an unforeseen disconnect, it will try and to reconnect without using the `EventHubSelectionStrategy`. -### Checkpointing +## Checkpointing + {@tutorial event-checkpointer} -### Unregistering listeners +## Start Block and End Block + +In the {@link module:fabric-network~EventListenerOptions} it is possible to specify a `startBlock` and an `endBlock`. This behaves in the same way as the same options on {@link ChannelEventHub} shown in the tutorial here {@tutorial channel-events}. Using `startBlock` and `endBlock` disables event replay using a checkpointer for the events received by that listener. + +## Unregistering listeners -`addContractListener`, `addBlockListener` and `addCommitListener` return a `ContractEventListener`, `BlockEventListener` and `CommitEventListener` respectively. Each has an `unregister()` function that removes the listener from the event hub, meaning no further events will be received from that listener until `register()` is called again +`addContractListener`, `addBlockListener` and `addCommitListener` return a `ContractEventListener`, `BlockEventListener` and `CommitEventListener` respectively. Each has an `unregister()` function that removes the listener from the event hub, meaning no further events will be received from that listener until `register()` is called again. diff --git a/fabric-client/lib/ChannelEventHub.js b/fabric-client/lib/ChannelEventHub.js index 076aca4840..3be1c12d2f 100644 --- a/fabric-client/lib/ChannelEventHub.js +++ b/fabric-client/lib/ChannelEventHub.js @@ -1481,7 +1481,7 @@ class ChannelEventHub { } if (trans_reg.disconnect) { logger.debug('_callTransactionListener - automatically disconnect'); - this._disconnect(new EventHubDisconnectError('Shutdown due to disconnect on transaction id registration')); + this._disconnect(new Error('Shutdown due to disconnect on transaction id registration')); } } diff --git a/fabric-network/lib/contract.js b/fabric-network/lib/contract.js index af1a0b30ed..16e9151020 100644 --- a/fabric-network/lib/contract.js +++ b/fabric-network/lib/contract.js @@ -192,7 +192,7 @@ class Contract { options.checkpointer = this.getCheckpointer(options); const listener = new ContractEventListener(this, listenerName, eventName, callback, options); const network = this.getNetwork(); - network.saveListener(listenerName, listener); + network._checkListenerNameIsUnique(listener.listenerName); await listener.register(); return listener; } diff --git a/fabric-network/lib/impl/event/abstracteventlistener.js b/fabric-network/lib/impl/event/abstracteventlistener.js index 06b0111203..428b290566 100644 --- a/fabric-network/lib/impl/event/abstracteventlistener.js +++ b/fabric-network/lib/impl/event/abstracteventlistener.js @@ -33,6 +33,11 @@ class AbstractEventListener { if (!options) { options = {}; } + options = Object.assign({ + eventHubConnectWait: 1000, + eventHubConnectTimeout: 30000 + }, options); + this.channel = network.getChannel(); this.network = network; this.listenerName = listenerName; @@ -40,11 +45,23 @@ class AbstractEventListener { this.options = options; this.clientOptions = {}; // fabric-client ChannelEventHub options + if (this.options.startBlock !== undefined && this.options.startBlock !== null) { + this.clientOptions.startBlock = Number(this.options.startBlock); + delete this.options.startBlock; + } + + if (this.options.endBlock !== undefined && this.options.endBlock !== null) { + this.clientOptions.endBlock = Number(this.options.endBlock); + delete this.options.endBlock; + } + this._registered = false; this._firstCheckpoint = {}; this._registration = null; this._filtered = typeof options.filtered === 'boolean' ? options.filtered : false; this._usingCheckpointer = false; + this._firstRegistrationAttempt = true; + this._abandonEventHubConnect = false; } /** @@ -65,6 +82,10 @@ class AbstractEventListener { if (this._registered) { throw new Error('Listener already registered'); } + + if (!this.network.listeners.has(this.listenerName)) { + this.network.saveListener(this.listenerName, this); + } if (this.eventHub && this.eventHub.isconnected()) { if (this.eventHub.isFiltered() !== this._filtered) { this.eventHub._filtered_stream = this._filtered; @@ -88,11 +109,16 @@ class AbstractEventListener { if (!this.getCheckpointer()) { logger.error('Opted to use checkpointing without defining a checkpointer'); } + if ((this.clientOptions.startBlock !== null) && (this.clientOptions.startBlock !== undefined) || + (this.clientOptions.endBlock !== null) && (this.clientOptions.endBlock !== undefined)) { + logger.debug('startBlock and/or endBlock were given. Disabling event replay'); + this.options.replay = false; + } } let checkpoint; if (this.useEventReplay() && this.checkpointer instanceof BaseCheckpointer) { - this._firstCheckpoint = checkpoint = await this.checkpointer.loadStartingCheckpoint(); + this._firstCheckpoint = checkpoint = await this.checkpointer.loadLatestCheckpoint(); const blockchainInfo = await this.channel.queryInfo(); if (checkpoint && checkpoint.hasOwnProperty('blockNumber') && !isNaN(checkpoint.blockNumber) && blockchainInfo.height - 1 > Number(checkpoint.blockNumber)) { logger.debug(`Requested Block Number: ${Number(checkpoint.blockNumber) + 1} Latest Block Number: ${blockchainInfo.height - 1}`); @@ -102,14 +128,16 @@ class AbstractEventListener { } /** - * Called by the super classes unregister function. Removes state from the listener so it - * can be reregistered at a later time + * Called by the super classes unregister function. */ unregister() { logger.debug(`Unregister event listener: ${this.listenerName}`); this._registered = false; + this._abandonEventHubConnect = true; this._firstCheckpoint = {}; - this.clientOptions = {}; + if (this.network.listeners.has(this.listenerName)) { + this.network.listeners.delete(this.listenerName); + } } /** @@ -162,6 +190,54 @@ class AbstractEventListener { } return false; } + + async _setEventHubConnectWait() { + logger.debug(`[${this.listenerName}]: The event hub connect failed. Waiting ${this.options.eventHubConnectWait}ms then trying again`); + await new Promise(resolve => setTimeout(resolve, this.options.eventHubConnectWait)); + } + + _setEventHubConnectTimeout() { + this._abandonEventHubConnect = false; + this._eventHubConnectTimeout = setTimeout(() => { + logger.error(`[${this.listenerName}]: The event hub failed to connect after ${this.options.eventHubConnectTimeout}ms`); + }, this.options.eventHubConnectTimeout); + } + + _unsetEventHubConnectTimeout() { + clearTimeout(this._eventHubConnectTimeout); + this._eventHubConnectTimeout = null; + } + + /** + * + * Finds a new event hub for the listener in the event of one shutting down. Will + * create a new instance if checkpointer is being used, or reuse one if not + * @private + */ + async _registerWithNewEventHub() { + this.unregister(); + if (this.options.fixedEventHub && !this.eventHub) { + throw new Error('No event hub given and option fixedEventHub is set'); + } + if (!this._firstRegistrationAttempt) { + await this._setEventHubConnectWait(); + } + + const useCheckpointing = this.useEventReplay() && this.checkpointer instanceof BaseCheckpointer || + (this.clientOptions.startBlock || this.clientOptions.endBlock); + + if (useCheckpointing && !this.options.fixedEventHub) { + this.eventHub = this.getEventHubManager().getReplayEventHub(); + } else if (useCheckpointing && this.options.fixedEventHub) { + this.eventHub = this.getEventHubManager().getReplayEventHub(this.eventHub._peer); + } else if (!useCheckpointing && this.options.fixedEventHub) { + this.eventHub = this.getEventHubManager().getEventHub(this.eventHub._peer); + } else { + this.eventHub = this.getEventHubManager().getEventHub(); + } + this._firstRegistrationAttempt = false; + await this.register(); + } } module.exports = AbstractEventListener; diff --git a/fabric-network/lib/impl/event/basecheckpointer.js b/fabric-network/lib/impl/event/basecheckpointer.js index d587269fb3..b06c9c166a 100644 --- a/fabric-network/lib/impl/event/basecheckpointer.js +++ b/fabric-network/lib/impl/event/basecheckpointer.js @@ -11,6 +11,7 @@ * @memberof module:fabric-network * @property {number} blockNumber * @property {string[]} transactionIds + * @property {number} [expectedTotal] The expected number of events in the block */ /** @@ -55,7 +56,7 @@ class BaseCheckpointer { * @return {module:fabric-network.BaseCheckpointer~Checkpoint} * @async */ - async loadStartingCheckpoint() { + async loadLatestCheckpoint() { return this.load(); } diff --git a/fabric-network/lib/impl/event/blockeventlistener.js b/fabric-network/lib/impl/event/blockeventlistener.js index 7f821c0f80..d31dca3f1b 100644 --- a/fabric-network/lib/impl/event/blockeventlistener.js +++ b/fabric-network/lib/impl/event/blockeventlistener.js @@ -39,8 +39,16 @@ class BlockEventListener extends AbstractEventListener { async register() { await super.register(); if (!this.eventHub) { - return this._registerWithNewEventHub(); + if (!this._eventHubConnectTimeout) { + this._setEventHubConnectTimeout(); + } + if (this._abandonEventHubConnect) { + this._unsetEventHubConnectTimeout(); + return; + } + return await this._registerWithNewEventHub(); } + this._unsetEventHubConnectTimeout(); this._registration = this.eventHub.registerBlockEvent( this._onEvent.bind(this), this._onError.bind(this), @@ -109,29 +117,6 @@ class BlockEventListener extends AbstractEventListener { } this.eventCallback(error); } - - /** - * Finds a new event hub for the listener in the event of one shutting down. Will - * create a new instance if checkpointer is being used, or reuse one if not - * @private - */ - async _registerWithNewEventHub() { - this.unregister(); - if (this.options.fixedEventHub && !this.eventHub) { - throw new Error('No event hub given and option fixedEventHub is set'); - } - const useCheckpointing = this.useEventReplay() && this.checkpointer instanceof BaseCheckpointer; - if (useCheckpointing && !this.options.fixedEventHub) { - this.eventHub = this.getEventHubManager().getReplayEventHub(); - } else if (useCheckpointing && this.options.fixedEventHub) { - this.eventHub = this.getEventHubManager().getReplayEventHub(this.eventHub._peer); - } else if (!useCheckpointing && this.options.fixedEventHub) { - this.eventHub = this.getEventHubManager().getEventHub(this.eventHub._peer); - } else { - this.eventHub = this.getEventHubManager().getEventHub(); - } - await this.register(); - } } module.exports = BlockEventListener; diff --git a/fabric-network/lib/impl/event/commiteventlistener.js b/fabric-network/lib/impl/event/commiteventlistener.js index c32df2fe7e..2e3e0389f6 100644 --- a/fabric-network/lib/impl/event/commiteventlistener.js +++ b/fabric-network/lib/impl/event/commiteventlistener.js @@ -34,8 +34,7 @@ class CommitEventListener extends AbstractEventListener { async register() { await super.register(); if (!this.eventHub) { - logger.debug('No event hub. Retrieving new one.'); - return await this._registerWithNewEventHub(); + return await this._registerWithNewCommitEventHub(); } if (this._isAlreadyRegistered(this.eventHub)) { // Prevents a transaction being registered twice with the same event hub logger.debug('Event hub already has registrations. Generating new event hub instance.'); @@ -45,13 +44,14 @@ class CommitEventListener extends AbstractEventListener { this.eventHub = this.getEventHubManager().getFixedEventHub(this.eventHub._peer); } } - const txid = this.eventHub.registerTxEvent( + this._unsetEventHubConnectTimeout(); + const txId = this.eventHub.registerTxEvent( this.transactionId, this._onEvent.bind(this), this._onError.bind(this), Object.assign({unregister: true}, this.clientOptions) ); - this._registration = this.eventHub._transactionRegistrations[txid]; + this._registration = this.eventHub._transactionRegistrations[txId]; this.eventHub.connect(!this._filtered); this._registered = true; } @@ -60,6 +60,7 @@ class CommitEventListener extends AbstractEventListener { super.unregister(); if (this.eventHub) { this.eventHub.unregisterTxEvent(this.transactionId); + this.network.listeners.delete(this.listenerName); } } @@ -83,7 +84,7 @@ class CommitEventListener extends AbstractEventListener { } - async _registerWithNewEventHub() { + async _registerWithNewCommitEventHub() { if (this.isregistered()) { this.unregister(); } diff --git a/fabric-network/lib/impl/event/contracteventlistener.js b/fabric-network/lib/impl/event/contracteventlistener.js index ba6190a6d7..552a4a1a34 100644 --- a/fabric-network/lib/impl/event/contracteventlistener.js +++ b/fabric-network/lib/impl/event/contracteventlistener.js @@ -33,9 +33,13 @@ class ContractEventListener extends AbstractEventListener { this.eventName = eventName; if (this.useEventReplay()) { - Object.assign(this.clientOptions, {as_array: true}); + Object.assign(this.clientOptions, { + as_array: true + }); } else { - Object.assign(this.clientOptions, {as_array: false}); + Object.assign(this.clientOptions, { + as_array: this.options.asArray ? this.options.asArray : false + }); } } @@ -45,8 +49,16 @@ class ContractEventListener extends AbstractEventListener { async register() { await super.register(this.contract.getChaincodeId()); if (!this.eventHub) { + if (!this._eventHubConnectTimeout) { + this._setEventHubConnectTimeout(); + } + if (this._abandonEventHubConnect) { + this._unsetEventHubConnectTimeout(); + return; + } return await this._registerWithNewEventHub(); } + this._unsetEventHubConnectTimeout(); this._registration = this.eventHub.registerChaincodeEvent( this.contract.getChaincodeId(), this.eventName, @@ -87,10 +99,8 @@ class ContractEventListener extends AbstractEventListener { blockNumber = Number(blockNumber); let useCheckpoint = false; if (this.useEventReplay() && this.checkpointer instanceof BaseCheckpointer) { - let checkpoint = await this.checkpointer.load(); - if (!checkpoint.hasOwnProperty('blockNumber') && Object.keys(checkpoint).length > 0) { - checkpoint = checkpoint[blockNumber]; - } + const checkpoint = await this.checkpointer.loadLatestCheckpoint(); + useCheckpoint = Number(checkpoint.blockNumber || 0) <= Number(blockNumber); if (checkpoint && checkpoint.transactionIds && checkpoint.transactionIds.includes(transactionId)) { logger.debug(util.format('_onEvent skipped transaction: %s', transactionId)); @@ -112,8 +122,16 @@ class ContractEventListener extends AbstractEventListener { } async _onEvents(events) { - logger.debug('Received contract events as array'); - await Promise.all(events.map((event) => this._onEvent(event.chaincode_event, event.block_num, event.tx_id, event.tx_status))); + logger.debug(`[${this.listenerName}]: Received contract events as array`); + if (!this.options.asArray) { + logger.debug(`[${this.listenerName}]: Splitting events`); + await Promise.all( + events.map((event) => this._onEvent(event.chaincode_event, event.block_num, event.tx_id, event.tx_status, events.length)) + ); + } else { + logger.debug(`[${this.listenerName}]: Sending events to callback as array`); + await this.eventCallback(events); + } } /** @@ -131,33 +149,8 @@ class ContractEventListener extends AbstractEventListener { await this._registerWithNewEventHub(); } } - this.eventCallback(error); + await this.eventCallback(error); } - - /** - * - * Finds a new event hub for the listener in the event of one shutting down. Will - * create a new instance if checkpointer is being used, or reuse one if not - * @private - */ - async _registerWithNewEventHub() { - this.unregister(); - if (this.options.fixedEventHub && !this.eventHub) { - throw new Error('No event hub given and option fixedEventHub is set'); - } - const useCheckpointing = this.useEventReplay() && this.checkpointer instanceof BaseCheckpointer; - if (useCheckpointing && !this.options.fixedEventHub) { - this.eventHub = this.getEventHubManager().getReplayEventHub(); - } else if (useCheckpointing && this.options.fixedEventHub) { - this.eventHub = this.getEventHubManager().getReplayEventHub(this.eventHub._peer); - } else if (!useCheckpointing && this.options.fixedEventHub) { - this.eventHub = this.getEventHubManager().getEventHub(this.eventHub._peer); - } else { - this.eventHub = this.getEventHubManager().getEventHub(); - } - await this.register(); - } - } module.exports = ContractEventListener; diff --git a/fabric-network/lib/impl/event/eventhubmanager.js b/fabric-network/lib/impl/event/eventhubmanager.js index bd722f8b38..92d8d3cd3d 100644 --- a/fabric-network/lib/impl/event/eventhubmanager.js +++ b/fabric-network/lib/impl/event/eventhubmanager.js @@ -24,6 +24,7 @@ class EventHubManager { this.channel = network.getChannel(); this.eventHubFactory = new EventHubFactory(this.channel); this.eventHubSelectionStrategy = network.getEventHubSelectionStrategy(); + this.newEventHubs = []; } /** @@ -64,7 +65,7 @@ class EventHubManager { getReplayEventHub(peer) { for (const index in this.newEventHubs) { const eventHub = this.newEventHubs[index]; - if (this._isNewEventHub(eventHub) && (!peer || eventHub.getName() === peer.getName())) { + if (!eventHub.isconnected() && this._isNewEventHub(eventHub) && (!peer || eventHub.getName() === peer.getName())) { this.newEventHubs.splice(index, 1); } } @@ -88,7 +89,7 @@ class EventHubManager { } /** - * When called with a peer, it updates the {@link EventHubSelectionStategy} with the + * When called with a peer, it updates the {@link EventHubSelectionStrategy} with the * new status of a peer to allow for intelligent strategies * @param {Peer} deadPeer A peer instance */ diff --git a/fabric-network/lib/impl/event/filesystemcheckpointer.js b/fabric-network/lib/impl/event/filesystemcheckpointer.js index ecc4d9abcd..436d346281 100644 --- a/fabric-network/lib/impl/event/filesystemcheckpointer.js +++ b/fabric-network/lib/impl/event/filesystemcheckpointer.js @@ -14,21 +14,22 @@ const BaseCheckpointer = require('./basecheckpointer'); /** * @typedef FileSystemCheckpointer~FileSystemCheckpointerOptions * @memberof module:fabric-network - * @property {string} basePath The directory that will store the checkpoint + * @property {string} [basePath] The directory that will store the checkpoint + * @property {number} [maxLength] The maximum number of blocks that can be in the checkpointer */ /** * Created a checkpointer in a file per event listener * @memberof module:fabric-network - * @extends module:fabric-network.BaseCheckpointer + * @extends module:fabric-network~BaseCheckpointer * @class */ class FileSystemCheckpointer extends BaseCheckpointer { /** * * @param {string} channelName - * @param {string} listenerName The name of the listener being checkpointed - * @param {module:fabric-network.FileSystemCheckpointer~FileSystemCheckpointerOptions} options + * @param {string} listenerName The name of the listener being checkpointer + * @param {module:fabric-network.FileSystemCheckpointer~FileSystemCheckpointerOptions} [options] */ constructor(channelName, listenerName, options = {}) { super(options); @@ -84,6 +85,7 @@ class FileSystemCheckpointer extends BaseCheckpointer { } if (hasExpectedTotal) { fullCheckpoint[blockNumber] = checkpoint; + fullCheckpoint = this._prune(fullCheckpoint); await fs.writeFile(checkpointPath, JSON.stringify(fullCheckpoint)); } else { await fs.writeFile(checkpointPath, JSON.stringify(checkpoint)); @@ -111,7 +113,7 @@ class FileSystemCheckpointer extends BaseCheckpointer { /** * @inheritdoc */ - async loadStartingCheckpoint() { + async loadLatestCheckpoint() { const checkpoint = await this.load(); const orderedBlockNumbers = Object.keys(checkpoint).sort(); if (checkpoint.hasOwnProperty('blockNumber') || orderedBlockNumbers.length === 0) { @@ -137,6 +139,24 @@ class FileSystemCheckpointer extends BaseCheckpointer { } return path.join(filePath, this._listenerName); } + + _prune(checkpoint) { + if (!checkpoint.hasOwnProperty('blockNumber')) { + checkpoint = Object.values(checkpoint).sort((a, b) => { + return b.blockNumber - a.blockNumber; + }); + + if (checkpoint.length > this.options.maxLength) { + checkpoint = checkpoint.slice(0, this.options.maxLength); + } + const rebuiltCheckpoint = {}; + for (const cp of checkpoint) { + rebuiltCheckpoint[cp.blockNumber] = cp; + } + return rebuiltCheckpoint; + } + return checkpoint; + } } module.exports = FileSystemCheckpointer; diff --git a/fabric-network/lib/network.js b/fabric-network/lib/network.js index a812cdf7d5..2d597b30de 100644 --- a/fabric-network/lib/network.js +++ b/fabric-network/lib/network.js @@ -23,6 +23,11 @@ const util = require('util'); * @property {boolean} [replay=false] event replay and checkpointing on listener * @property {boolean} [filtered=false] use receive filtered block events or not * @property {boolean} [unregister=false] unregisters the listener as soon as a single event is received + * @property {number} [startBlock] the first block to play events from + * @property {number} [endBlock] the final block to play events from + * @property {boolean} [asArray] will deliver all of the events in a block to the callback + * @property {number} [eventHubConnectWait=1000] the number of milliseconds before looking for a new event hub + * @property {number} [eventHubConnectTimeout=30000] the number of milliseconds before timing out event hub connect */ /** @@ -239,10 +244,7 @@ class Network { * @private */ saveListener(listenerName, listener) { - if (this.listeners.has(listenerName)) { - listener.unregister(); - throw new Error(`Listener already exists with the name ${listenerName}`); - } + this._checkListenerNameIsUnique(listenerName); this.listeners.set(listenerName, listener); } @@ -261,7 +263,7 @@ class Network { options.replay = options.replay ? true : false; options.checkpointer = this.getCheckpointer(options); const listener = new BlockEventListener(this, listenerName, callback, options); - this.saveListener(listenerName, listener); + this._checkListenerNameIsUnique(listener.listenerName); await listener.register(); return listener; } @@ -285,13 +287,19 @@ class Network { options.replay = false; options.checkpointer = null; const listener = new CommitEventListener(this, transactionId, callback, options); + this._checkListenerNameIsUnique(listener.listenerName); if (eventHub) { listener.setEventHub(eventHub, options.fixedEventHub); } - this.saveListener(listener.listenerName, listener); await listener.register(); return listener; } + + _checkListenerNameIsUnique(listenerName) { + if (this.listeners.has(listenerName)) { + throw new Error(`Listener already exists with the name ${listenerName}`); + } + } } module.exports = Network; diff --git a/fabric-network/test/impl/event/abstracteventlistener.js b/fabric-network/test/impl/event/abstracteventlistener.js index 89eeb69631..60e03716b9 100644 --- a/fabric-network/test/impl/event/abstracteventlistener.js +++ b/fabric-network/test/impl/event/abstracteventlistener.js @@ -24,7 +24,7 @@ describe('AbstractEventListener', () => { let testListener; let contractStub; - let networkStub; + let network; let checkpointerStub; let eventHubManagerStub; let channelStub; @@ -34,20 +34,20 @@ describe('AbstractEventListener', () => { eventHubManagerStub = sandbox.createStubInstance(EventHubManager); contractStub = sandbox.createStubInstance(Contract); - networkStub = sandbox.createStubInstance(Network); - networkStub.getEventHubManager.returns(eventHubManagerStub); - contractStub.getNetwork.returns(networkStub); + network = new Network(); + contractStub.getNetwork.returns(network); checkpointerStub = sandbox.createStubInstance(FileSystemCheckpointer); checkpointerStub.setChaincodeId = sandbox.stub(); channelStub = sandbox.createStubInstance(Channel); - networkStub.getChannel.returns(channelStub); + sandbox.stub(network, 'getChannel').returns(channelStub); channelStub.getName.returns('mychannel'); eventHubManagerStub.getPeers.returns(['peer1']); channelStub.queryInfo.returns({height: 10}); + sandbox.stub(network, 'getEventHubManager').returns(eventHubManagerStub); contractStub.getChaincodeId.returns('ccid'); const callback = (err) => {}; - testListener = new AbstractEventListener(networkStub, 'testListener', callback, {option: 'anoption', replay: true}); + testListener = new AbstractEventListener(network, 'testListener', callback, {option: 'anoption', replay: true}); }); @@ -58,11 +58,11 @@ describe('AbstractEventListener', () => { describe('#constructor', () => { it('should set the correct properties on instantiation', () => { const callback = (err) => {}; - const listener = new AbstractEventListener(networkStub, 'testlistener', callback, {option: 'anoption'}); - expect(listener.network).to.equal(networkStub); + const listener = new AbstractEventListener(network, 'testlistener', callback, {option: 'anoption'}); + expect(listener.network).to.equal(network); expect(listener.listenerName).to.equal('testlistener'); expect(listener.eventCallback).to.equal(callback); - expect(listener.options).to.deep.equal({option: 'anoption'}); + expect(listener.options).to.deep.equal({option: 'anoption', eventHubConnectTimeout: 30000, eventHubConnectWait: 1000}); expect(listener.checkpointer).to.be.undefined; expect(listener._registered).to.be.false; expect(listener._firstCheckpoint).to.deep.equal({}); @@ -72,33 +72,54 @@ describe('AbstractEventListener', () => { it('should set options if options is undefined', () => { const callback = (err) => {}; - const listener = new AbstractEventListener(networkStub, 'testlistener', callback); - expect(listener.options).to.deep.equal({}); + const listener = new AbstractEventListener(network, 'testlistener', callback); + expect(listener.options).to.deep.equal({eventHubConnectTimeout: 30000, eventHubConnectWait: 1000}); }); it('should set this.filtered to be options.filtered', () => { - const listener = new AbstractEventListener(networkStub, 'testListener', () => {}, {filtered: true}); + const listener = new AbstractEventListener(network, 'testListener', () => {}, {filtered: true}); expect(listener._filtered).to.be.true; }); + + it('should set and unset the startBlock from clientOptions and options respectively', () => { + const callback = (err) => {}; + const listener = new AbstractEventListener(network, 'testlistener', callback, {startBlock: 0}); + expect(listener.options).to.deep.equal({eventHubConnectTimeout: 30000, eventHubConnectWait: 1000}); + expect(listener.clientOptions).to.deep.equal({startBlock: 0}); + }); + + it('should set and unset the endBlock from clientOptions and options respectively', () => { + const callback = (err) => {}; + const listener = new AbstractEventListener(network, 'testlistener', callback, {endBlock: 0}); + expect(listener.options).to.deep.equal({eventHubConnectTimeout: 30000, eventHubConnectWait: 1000}); + expect(listener.clientOptions).to.deep.equal({endBlock: 0}); + }); + + it('should set and unset the startBlock and endBlock from clientOptions and options respectively', () => { + const callback = (err) => {}; + const listener = new AbstractEventListener(network, 'testlistener', callback, {startBlock: 0, endBlock: 10}); + expect(listener.options).to.deep.equal({eventHubConnectTimeout: 30000, eventHubConnectWait: 1000}); + expect(listener.clientOptions).to.deep.equal({startBlock: 0, endBlock: 10}); + }); }); describe('#register', () => { - it('should throw if the listener is already registered', () => { + it('should throw if the listener is already registered', async () => { testListener._registered = true; - expect(testListener.register()).to.be.rejectedWith('Listener already registered'); + await expect(testListener.register()).to.be.rejectedWith('Listener already registered'); }); - it('should not call checkpointer._initialize() or checkpointer.loadStartingCheckpoint()', async () => { + it('should not call checkpointer._initialize() or checkpointer.loadLatestCheckpoint()', async () => { await testListener.register(); - sinon.assert.notCalled(checkpointerStub.loadStartingCheckpoint); + sinon.assert.notCalled(checkpointerStub.loadLatestCheckpoint); }); it('should not call checkpointer.initialize()', async () => { const checkpoint = {transactionId: 'txid', blockNumber: '8'}; - checkpointerStub.loadStartingCheckpoint.returns(checkpoint); + checkpointerStub.loadLatestCheckpoint.returns(checkpoint); testListener.checkpointer = checkpointerStub; await testListener.register(); - sinon.assert.called(checkpointerStub.loadStartingCheckpoint); + sinon.assert.called(checkpointerStub.loadLatestCheckpoint); expect(testListener.clientOptions.startBlock.toNumber()).to.equal(9); // Start block is a Long expect(testListener._firstCheckpoint).to.deep.equal(checkpoint); }); @@ -121,7 +142,7 @@ describe('AbstractEventListener', () => { it('should call the checkpointer factory if it is set', async () => { const checkpointerFactoryStub = sinon.stub().returns(checkpointerStub); - const listener = new AbstractEventListener(networkStub, 'testlistener', () => {}, {replay: true, checkpointer: {factory: checkpointerFactoryStub}}); + const listener = new AbstractEventListener(network, 'testlistener', () => {}, {replay: true, checkpointer: {factory: checkpointerFactoryStub}}); await listener.register(); sinon.assert.calledWith(checkpointerFactoryStub, 'mychannel', 'testlistener'); sinon.assert.called(checkpointerStub.setChaincodeId); @@ -129,12 +150,12 @@ describe('AbstractEventListener', () => { }); it('should log an error if replay is enabled and no checkpointer is given', async () => { - const listener = new AbstractEventListener(networkStub, 'testlistener', () => {}, {replay: true}); + const listener = new AbstractEventListener(network, 'testlistener', () => {}, {replay: true}); await listener.register(); }); it('should not reset the event hub if it is fixed and filtered status doesn\'t match', async () => { - const listener = new AbstractEventListener(networkStub, 'testlistener', () => {}, {filtered: true}); + const listener = new AbstractEventListener(network, 'testlistener', () => {}, {filtered: true}); const eventHub = sandbox.createStubInstance(ChannelEventHub); eventHub.isFiltered.returns(false); eventHub.isconnected.returns(true); @@ -145,16 +166,52 @@ describe('AbstractEventListener', () => { it('should set startBlock of 1', async () => { checkpointerStub.load.returns({transactionId: 'txid', blockNumber: 0}); - checkpointerStub.loadStartingCheckpoint.returns({transactionId: 'txid', blockNumber: '0'}); + checkpointerStub.loadLatestCheckpoint.returns({transactionId: 'txid', blockNumber: '0'}); testListener.checkpointer = checkpointerStub; await testListener.register(); expect(testListener.clientOptions.startBlock.toInt()).to.equal(1); }); + + it('should register the listener with the network', async () => { + await testListener.register(); + expect(network.listeners.get(testListener.listenerName)).to.equal(testListener); + }); + + it('should do nothing if the event hub is connected and the filtered status has not changed', async () => { + const eventHub = sandbox.createStubInstance(ChannelEventHub); + eventHub.isconnected.returns(true); + eventHub.isFiltered.returns(false); + testListener.eventHub = eventHub; + await testListener.register(); + sinon.assert.notCalled(eventHub.disconnect); + }); + + it('should set replay to false if startBlock is set', async () => { + const eventHub = sandbox.createStubInstance(ChannelEventHub); + eventHub.isconnected.returns(true); + eventHub.isFiltered.returns(false); + testListener.eventHub = eventHub; + testListener.options.replay = true; + testListener.clientOptions.startBlock = 0; + await testListener.register(); + expect(testListener.options.replay).to.be.false; + }); + + it('should set replay to false if endBlock is set', async () => { + const eventHub = sandbox.createStubInstance(ChannelEventHub); + eventHub.isconnected.returns(true); + eventHub.isFiltered.returns(false); + testListener.eventHub = eventHub; + testListener.options.replay = true; + testListener.clientOptions.endBlock = 10; + await testListener.register(); + expect(testListener.options.replay).to.be.false; + }); }); describe('#unregister', () => { beforeEach(async () => { - checkpointerStub.loadStartingCheckpoint.returns({transactionId: 'txid', blockNumber: '10'}); + checkpointerStub.loadLatestCheckpoint.returns({transactionId: 'txid', blockNumber: '10'}); testListener.checkpointer = checkpointerStub; await testListener.register(); }); @@ -215,4 +272,26 @@ describe('AbstractEventListener', () => { expect(testListener._isShutdownMessage(new EventHubDisconnectError())).to.be.true; }); }); + + describe('#_setEventHubConnectWait', async () => { + it('should resolve', async () => { + testListener.options.eventHubConnectWait = 0; + expect(testListener._setEventHubConnectWait()).to.eventually.be.fulfilled; + }); + }); + + describe('#_setEventHubConnectTimeout', async () => { + it('should unset _abandonEventHubConnect', async () => { + testListener.options.eventHubConnectTimeout = 0; + testListener._setEventHubConnectTimeout(); + expect(testListener._abandonEventHubConnect).to.be.false; + }); + }); + + describe('#_unsetEventHubConnectTimeout', async () => { + it('should unset _eventHubConnectTimeout', async () => { + testListener._unsetEventHubConnectTimeout(); + expect(testListener._eventHubConnectTimeout).to.be.null; + }); + }); }); diff --git a/fabric-network/test/impl/event/basecheckpointer.js b/fabric-network/test/impl/event/basecheckpointer.js index 502d428da1..e9f09b7079 100644 --- a/fabric-network/test/impl/event/basecheckpointer.js +++ b/fabric-network/test/impl/event/basecheckpointer.js @@ -39,10 +39,10 @@ describe('BaseCheckpointer', () => { }); }); - describe('#loadStartingCheckpoint', () => { + describe('#loadLatestCheckpoint', () => { it('should throw an exception', () => { const checkpointer = new BaseCheckpointer(); - expect(checkpointer.loadStartingCheckpoint()).to.be.rejectedWith('Method has not been implemented'); + expect(checkpointer.loadLatestCheckpoint()).to.be.rejectedWith('Method has not been implemented'); }); }); diff --git a/fabric-network/test/impl/event/blockeventlistener.js b/fabric-network/test/impl/event/blockeventlistener.js index 99e6c21e74..5f6cce6d4f 100644 --- a/fabric-network/test/impl/event/blockeventlistener.js +++ b/fabric-network/test/impl/event/blockeventlistener.js @@ -25,7 +25,7 @@ describe('BlockEventListener', () => { let sandbox; let channelStub; let contractStub; - let networkStub; + let network; let eventHubStub; let checkpointerStub; let eventHubManagerStub; @@ -37,15 +37,15 @@ describe('BlockEventListener', () => { channelStub.getName.returns('mychannel'); contractStub = sandbox.createStubInstance(Contract); contractStub.getChaincodeId.returns('chaincodeid'); - networkStub = sandbox.createStubInstance(Network); - networkStub.getChannel.returns(channelStub); - contractStub.getNetwork.returns(networkStub); + network = new Network(); + sandbox.stub(network, 'getChannel').returns(channelStub); + contractStub.getNetwork.returns(network); eventHubManagerStub = sinon.createStubInstance(EventHubManager); eventHubManagerStub.getPeers.returns(['peer1']); - networkStub.getEventHubManager.returns(eventHubManagerStub); + sandbox.stub(network, 'getEventHubManager').returns(eventHubManagerStub); eventHubStub = sandbox.createStubInstance(ChannelEventHub); checkpointerStub = sandbox.createStubInstance(BaseCheckpointer); - blockEventListener = new BlockEventListener(networkStub, 'test', () => {}, {replay: true}); + blockEventListener = new BlockEventListener(network, 'blockTest', () => {}, {replay: true}); eventHubStub.isFiltered.returns(true); }); describe('#register', () => { @@ -67,7 +67,7 @@ describe('BlockEventListener', () => { }); it('should call _registerWithNewEventHub', async () => { - sandbox.stub(blockEventListener, '_registerWithNewEventHub'); + sandbox.spy(blockEventListener, '_registerWithNewEventHub'); await blockEventListener.register(); sinon.assert.called(blockEventListener._registerWithNewEventHub); }); @@ -230,7 +230,7 @@ describe('BlockEventListener', () => { let checkpointer; it('should set the block number and transaction ID', async () => { checkpointer = new InMemoryCheckpointer(); - blockEventListener = new BlockEventListener(networkStub, 'listener', (block) => {}, {replay: true, checkpointer: {factory: () => checkpointer}}); + blockEventListener = new BlockEventListener(network, 'listener', (block) => {}, {replay: true, checkpointer: {factory: () => checkpointer}}); blockEventListener.eventHub = eventHubStub; eventHubStub.registerBlockEvent.returns({}); await blockEventListener.register(); @@ -241,7 +241,7 @@ describe('BlockEventListener', () => { it('should update with a new block number (filtered)', async () => { checkpointer = new InMemoryCheckpointer(); - blockEventListener = new BlockEventListener(networkStub, 'listener', (block) => {}, {filtered: true, replay: true, checkpointer: {factory: () => checkpointer}}); + blockEventListener = new BlockEventListener(network, 'listener', (block) => {}, {filtered: true, replay: true, checkpointer: {factory: () => checkpointer}}); blockEventListener.eventHub = eventHubStub; eventHubStub.registerBlockEvent.returns({}); await blockEventListener.register(); @@ -253,7 +253,7 @@ describe('BlockEventListener', () => { it('should update with a new block number (unfiltered)', async () => { checkpointer = new InMemoryCheckpointer(); - blockEventListener = new BlockEventListener(networkStub, 'listener', (block) => {}, {replay: true, checkpointer: {factory: () => checkpointer}}); + blockEventListener = new BlockEventListener(network, 'listener', (block) => {}, {replay: true, checkpointer: {factory: () => checkpointer}}); blockEventListener.eventHub = eventHubStub; eventHubStub.registerBlockEvent.returns({}); await blockEventListener.register(); @@ -267,7 +267,7 @@ describe('BlockEventListener', () => { channelStub.queryInfo.resolves({height: '3'}); checkpointer = new InMemoryCheckpointer(); checkpointer.checkpoint = {blockNumber: '1'}; - blockEventListener = new BlockEventListener(networkStub, 'listener', (block) => {}, {replay: true, checkpointer: {factory: () => checkpointer}}); + blockEventListener = new BlockEventListener(network, 'listener', (block) => {}, {replay: true, checkpointer: {factory: () => checkpointer}}); blockEventListener.eventHub = eventHubStub; eventHubStub.registerBlockEvent.returns({}); await blockEventListener.register(); @@ -278,7 +278,7 @@ describe('BlockEventListener', () => { channelStub.queryInfo.resolves({height: '3'}); checkpointer = new InMemoryCheckpointer(); checkpointer.checkpoint = {blockNumber: '2'}; - blockEventListener = new BlockEventListener(networkStub, 'listener', (block) => {}, {replay: true, checkpointer: {factory: () => checkpointer}}); + blockEventListener = new BlockEventListener(network, 'listener', (block) => {}, {replay: true, checkpointer: {factory: () => checkpointer}}); blockEventListener.eventHub = eventHubStub; eventHubStub.registerBlockEvent.returns({}); await blockEventListener.register(); diff --git a/fabric-network/test/impl/event/commiteventlistener.js b/fabric-network/test/impl/event/commiteventlistener.js index 175c07f51e..32895211a8 100644 --- a/fabric-network/test/impl/event/commiteventlistener.js +++ b/fabric-network/test/impl/event/commiteventlistener.js @@ -14,6 +14,7 @@ const sinon = require('sinon'); const Channel = require('fabric-client/lib/Channel'); const Contract = require('fabric-network/lib/contract'); const Network = require('fabric-network/lib/network'); +const Gateway = require('fabric-network/lib/gateway'); const EventHubManager = require('fabric-network/lib/impl/event/eventhubmanager'); const ChannelEventHub = require('fabric-client/lib/ChannelEventHub'); const CommitEventListener = require('fabric-network/lib/impl/event/commiteventlistener'); @@ -23,7 +24,7 @@ describe('CommitEventListener', () => { let eventHubManagerStub; let eventHubStub; let contractStub; - let networkStub; + let network; let channelStub; let listener; let callback; @@ -33,17 +34,18 @@ describe('CommitEventListener', () => { eventHubStub = sandbox.createStubInstance(ChannelEventHub); eventHubStub._transactionRegistrations = {}; contractStub = sandbox.createStubInstance(Contract); - networkStub = sandbox.createStubInstance(Network); + // network = sandbox.createStubInstance(Network); + const gatewayStub = sandbox.createStubInstance(Gateway); channelStub = sandbox.createStubInstance(Channel); - networkStub.getChannel.returns(channelStub); - contractStub.getNetwork.returns(networkStub); + network = new Network(gatewayStub, channelStub); + contractStub.getNetwork.returns(network); eventHubManagerStub = sinon.createStubInstance(EventHubManager); eventHubManagerStub.getPeers.returns(['peer1']); - networkStub.getEventHubManager.returns(eventHubManagerStub); + sandbox.stub(network, 'getEventHubManager').returns(eventHubManagerStub); eventHubStub.isFiltered.returns(true); callback = () => {}; - listener = new CommitEventListener(networkStub, 'transactionId', callback, {}); + listener = new CommitEventListener(network, 'transactionId', callback, {}); }); afterEach(() => { @@ -59,12 +61,12 @@ describe('CommitEventListener', () => { describe('#register', () => { beforeEach(() => { - sandbox.stub(listener, '_registerWithNewEventHub'); + sandbox.stub(listener, '_registerWithNewCommitEventHub'); }); it('should grab a new event hub if one isnt given', async () => { await listener.register(); - sinon.assert.called(listener._registerWithNewEventHub); + sinon.assert.called(listener._registerWithNewCommitEventHub); }); it('should assign a new event hub if given on has registrations', async () => { @@ -108,12 +110,21 @@ describe('CommitEventListener', () => { sinon.assert.notCalled(eventHubStub.unregisterTxEvent); }); - it('should call ChannelEventHub.unregisterBlockEvent', () => { + it('should call ChannelEventHub.unregisterBlockEvent', async () => { listener.eventHub = eventHubStub; - listener.register(); + await listener.register(); listener.unregister(); sinon.assert.calledWith(eventHubStub.unregisterTxEvent, 'transactionId'); }); + + it('should remove the listener from the network', async () => { + network.listeners.set(listener.listenerName, listener); + listener.eventHub = eventHubStub; + await listener.register(); + expect(network.listeners.get(listener.listenerName)).to.equal(listener); + listener.unregister(); + expect(network.listeners.has(listener.listenerName)).to.be.false; + }); }); describe('#_onEvent', () => { @@ -178,7 +189,7 @@ describe('CommitEventListener', () => { }); }); - describe('#_registerWithNewEventHub', () => { + describe('#_registerWithNewCommitEventHub', () => { beforeEach(() => { listener._registration = {}; sandbox.spy(listener, 'unregister'); @@ -188,7 +199,7 @@ describe('CommitEventListener', () => { }); it('should call the correct methods', async () => { - await listener._registerWithNewEventHub(); + await listener._registerWithNewCommitEventHub(); sinon.assert.called(eventHubManagerStub.getReplayEventHub); expect(listener.eventHub).to.equal(eventHubStub); expect(listener.clientOptions.disconnect).to.be.true; @@ -199,19 +210,19 @@ describe('CommitEventListener', () => { eventHubStub._peer = 'peer'; listener.eventHub = eventHubStub; listener.options.fixedEventHub = true; - await listener._registerWithNewEventHub(); + await listener._registerWithNewCommitEventHub(); sinon.assert.calledWith(eventHubManagerStub.getFixedEventHub, eventHubStub._peer); }); it('should unregister if the listener is already registered', async () => { listener._registered = true; - await listener._registerWithNewEventHub(); + await listener._registerWithNewCommitEventHub(); sinon.assert.called(listener.unregister); }); it('should throw if options.fixedEventHub is set and no event hub is given', () => { listener.options.fixedEventHub = true; - return expect(listener._registerWithNewEventHub()).to.be.rejectedWith(); + return expect(listener._registerWithNewCommitEventHub()).to.be.rejectedWith(); }); }); diff --git a/fabric-network/test/impl/event/contracteventlistener.js b/fabric-network/test/impl/event/contracteventlistener.js index 02d130d3bf..f69b375e93 100644 --- a/fabric-network/test/impl/event/contracteventlistener.js +++ b/fabric-network/test/impl/event/contracteventlistener.js @@ -22,7 +22,7 @@ const InMemoryCheckpointer = require('./inmemorycheckpointer'); describe('ContractEventListener', () => { let sandbox; let contractStub; - let networkStub; + let network; let channelStub; let eventHubStub; let checkpointerStub; @@ -33,18 +33,22 @@ describe('ContractEventListener', () => { sandbox = sinon.createSandbox(); contractStub = sandbox.createStubInstance(Contract); contractStub.getChaincodeId.returns('chaincodeid'); - networkStub = sandbox.createStubInstance(Network); + network = new Network(); channelStub = sandbox.createStubInstance(Channel); channelStub.getName.returns('channelName'); - networkStub.getChannel.returns(channelStub); - contractStub.getNetwork.returns(networkStub); + sandbox.stub(network, 'getChannel').returns(channelStub); + contractStub.getNetwork.returns(network); eventHubManagerStub = sinon.createStubInstance(EventHubManager); - networkStub.getEventHubManager.returns(eventHubManagerStub); - eventHubManagerStub.getPeers.returns(['peer1']); + sandbox.stub(network, 'getEventHubManager').returns(eventHubManagerStub); eventHubStub = sandbox.createStubInstance(ChannelEventHub); - checkpointerStub = sandbox.createStubInstance(Checkpointer); + eventHubManagerStub.getPeers.returns(['peer1']); + eventHubManagerStub.getEventHub.returns(eventHubStub); + eventHubManagerStub.getReplayEventHub.returns(eventHubStub); + checkpointerStub = new Checkpointer(); + sandbox.stub(checkpointerStub, 'load'); + sandbox.stub(checkpointerStub, 'save'); checkpointerStub.load.returns({}); - contractEventListener = new ContractEventListener(contractStub, 'test', 'eventName', () => {}, {replay: true}); + contractEventListener = new ContractEventListener(contractStub, 'contractTest', 'eventName', () => {}, {replay: true, filtered: true}); eventHubStub.isFiltered.returns(true); }); @@ -56,6 +60,21 @@ describe('ContractEventListener', () => { it('should set the listener name', () => { expect(contractEventListener.eventName).to.equal('eventName'); }); + + it('should set as_array on clientOptions with replay on', () => { + const listener = new ContractEventListener(contractStub, 'contractAsArrayTest', 'eventName', () => {}, {asArray: true, replay: true}); + expect(listener.clientOptions.as_array).to.be.true; + }); + + it('should set as_array on clientOptions with replay off', () => { + const listener = new ContractEventListener(contractStub, 'contractReplayTest', 'eventName', () => {}, {asArray: true, replay: false}); + expect(listener.clientOptions.as_array).to.be.true; + }); + + it('should set as_array to false on clientOptions with replay off', () => { + const listener = new ContractEventListener(contractStub, 'contractReplayOffTest2', 'eventName', () => {}, {asArray: false, replay: false}); + expect(listener.clientOptions.as_array).to.be.false; + }); }); describe('#register', () => { @@ -79,7 +98,7 @@ describe('ContractEventListener', () => { }); it('should call _registerWithNewEventHub', async () => { - sandbox.stub(contractEventListener, '_registerWithNewEventHub'); + sandbox.spy(contractEventListener, '_registerWithNewEventHub'); await contractEventListener.register(); sinon.assert.called(contractEventListener._registerWithNewEventHub); }); @@ -96,6 +115,23 @@ describe('ContractEventListener', () => { {as_array: true} ); }); + + it('should return if _abandonEventHubConnect is true', async () => { + sandbox.spy(contractEventListener, '_registerWithNewEventHub'); + sandbox.spy(contractEventListener, '_unsetEventHubConnectTimeout'); + contractEventListener._eventHubConnectTimeout = true; + contractEventListener._abandonEventHubConnect = true; + await contractEventListener.register(); + sinon.assert.called(contractEventListener._unsetEventHubConnectTimeout); + sinon.assert.notCalled(contractEventListener._registerWithNewEventHub); + }); + + it('should call _setEventHubConnectWait if its not the first registration', async () => { + sandbox.stub(contractEventListener, '_setEventHubConnectWait'); + contractEventListener._firstRegistrationAttempt = false; + await contractEventListener.register(); + sinon.assert.called(contractEventListener._setEventHubConnectWait); + }); }); describe('#unregister', () => { @@ -182,6 +218,7 @@ describe('ContractEventListener', () => { const blockNumber = '10'; const transactionId = 'transactionId'; const status = 'VALID'; + contractEventListener.checkpointer = checkpointerStub; contractEventListener._registration.unregister = true; await contractEventListener._onEvent(event, blockNumber, transactionId, status); sinon.assert.calledWith(contractEventListener.eventCallback, null, event, Number(blockNumber), transactionId, status); @@ -214,6 +251,25 @@ describe('ContractEventListener', () => { await contractEventListener._onEvent(event, blockNumber, transactionId, status); sinon.assert.notCalled(contractEventListener.eventCallback); }); + + it('should deal with as_array checkpoints', async () => { + contractEventListener.checkpointer = checkpointerStub; + checkpointerStub.load.resolves({1: {blockNumber: 1, transactionIds: ['txId']}}); + const event = {name: 'eventName'}; + const blockNumber = '2'; + const transactionId = 'txId2'; + const status = 'VALID'; + contractEventListener.options.replay = true; + await contractEventListener._onEvent(event, blockNumber, transactionId, status); + sinon.assert.called(contractEventListener.eventCallback); + }); + + it('should emit a list of events if asArray is on', async () => { + contractEventListener.options.asArray = true; + const events = [{eventId: 1}, {eventId: 2}]; + await contractEventListener._onEvent(events); + sinon.assert.calledWith(contractEventListener.eventCallback, events); + }); }); describe('#_onError', () => { diff --git a/fabric-network/test/impl/event/filesystemcheckpointer.js b/fabric-network/test/impl/event/filesystemcheckpointer.js index 16581cdb70..ea54fa5167 100644 --- a/fabric-network/test/impl/event/filesystemcheckpointer.js +++ b/fabric-network/test/impl/event/filesystemcheckpointer.js @@ -134,6 +134,17 @@ describe('FileSystemCheckpointer', () => { }) ); }); + + it('should support loading a multi-block checkpoint', async () => { + fs.exists.resolves(true); + fs.readFile.resolves(JSON.stringify({1: {blockNumber: 1, transactionIds: ['transactionId'], expectedTotal: 10}})); + await checkpointer.save('transactionId2', 1, 10); + sinon.assert.calledWith( + fs.writeFile, + checkpointer._getCheckpointFileName(), + JSON.stringify({1: {blockNumber: 1, transactionIds: ['transactionId', 'transactionId2'], expectedTotal: 10}}) + ); + }); }); describe('#load', () => { @@ -161,12 +172,12 @@ describe('FileSystemCheckpointer', () => { }); }); - describe('#loadStartingCheckpoint', () => { + describe('#loadLatestCheckpoint', () => { it('should return the loaded checkpoint if there is only one checkpoint in the file', async () => { const checkpoint = {blockNumber: 1, transactionIds: []}; fs.exists.resolves(true); fs.readFile.resolves(JSON.stringify(checkpoint)); - const loadedCheckpoint = await checkpointer.loadStartingCheckpoint(); + const loadedCheckpoint = await checkpointer.loadLatestCheckpoint(); expect(loadedCheckpoint).to.deep.equal(checkpoint); }); @@ -174,7 +185,7 @@ describe('FileSystemCheckpointer', () => { const checkpoint = {1: {blockNumber: 1, transactionIds: ['transactionId1'], expectedNumber: 2}, 2: {blockNumber: 2, transactionIds: ['transactionId3'], expectedNumber: 1}}; fs.exists.resolves(true); fs.readFile.resolves(JSON.stringify(checkpoint)); - const loadedCheckpoint = await checkpointer.loadStartingCheckpoint(); + const loadedCheckpoint = await checkpointer.loadLatestCheckpoint(); expect(loadedCheckpoint).to.deep.equal({blockNumber: 1, transactionIds: ['transactionId1'], expectedNumber: 2}); }); @@ -182,7 +193,7 @@ describe('FileSystemCheckpointer', () => { const checkpoint = {1: {blockNumber: 1, transactionIds: ['transactionId1'], expectedNumber: 2}, 2: {blockNumber: 2, transactionIds: ['transactionId3'], expectedNumber: 2}}; fs.exists.resolves(true); fs.readFile.resolves(JSON.stringify(checkpoint)); - const loadedCheckpoint = await checkpointer.loadStartingCheckpoint(); + const loadedCheckpoint = await checkpointer.loadLatestCheckpoint(); expect(loadedCheckpoint).to.deep.equal({blockNumber: 1, transactionIds: ['transactionId1'], expectedNumber: 2}); }); @@ -190,7 +201,7 @@ describe('FileSystemCheckpointer', () => { const checkpoint = {1: {blockNumber: 1, transactionIds: ['transactionId1'], expectedNumber: 1}, 2: {blockNumber: 2, transactionIds: ['transactionId2'], expectedNumber: 1}}; fs.exists.resolves(true); fs.readFile.resolves(JSON.stringify(checkpoint)); - const loadedCheckpoint = await checkpointer.loadStartingCheckpoint(); + const loadedCheckpoint = await checkpointer.loadLatestCheckpoint(); expect(loadedCheckpoint).to.deep.equal({blockNumber: 2, transactionIds: ['transactionId2'], expectedNumber: 1}); }); @@ -198,7 +209,7 @@ describe('FileSystemCheckpointer', () => { const checkpoint = {1: {blockNumber: 1, transactionIds: ['transactionId1']}}; fs.exists.resolves(true); fs.readFile.resolves(JSON.stringify(checkpoint)); - const loadedCheckpoint = await checkpointer.loadStartingCheckpoint(); + const loadedCheckpoint = await checkpointer.loadLatestCheckpoint(); expect(loadedCheckpoint).to.deep.equal({blockNumber: 1, transactionIds: ['transactionId1']}); }); }); @@ -210,4 +221,19 @@ describe('FileSystemCheckpointer', () => { expect(checkpointer._getCheckpointFileName()).to.equal(`home/.hlf-checkpoint/${channelName}/${chaincodeId}/${listenerName}`); }); }); + + describe('#_prune', () => { + it('should prune a checkpoint to 1 block', () => { + checkpointer.options.maxLength = 1; + const checkpoint = [{blockNumber: 1}, {blockNumber: 2}]; + const prunedCheckpoint = checkpointer._prune(checkpoint); + expect(prunedCheckpoint).to.deep.equal({2: {blockNumber: 2}}); + }); + + it('should not prune when single checkpoint is given', () => { + const checkpoint = {blockNumber: 1}; + const prunedCheckpoint = checkpointer._prune(checkpoint); + expect(prunedCheckpoint).to.deep.equal({blockNumber: 1}); + }); + }); }); diff --git a/fabric-network/test/network.js b/fabric-network/test/network.js index 1097d47173..04cb99278f 100644 --- a/fabric-network/test/network.js +++ b/fabric-network/test/network.js @@ -28,6 +28,7 @@ const AbstractEventHubSelectionStrategy = require('fabric-network/lib/impl/event const EventHubManager = require('fabric-network/lib/impl/event/eventhubmanager'); const CommitEventListener = require('fabric-network/lib/impl/event/commiteventlistener'); const BlockEventListener = require('fabric-network/lib/impl/event/blockeventlistener'); +const BaseCheckpointer = require('fabric-network/lib/impl/event/basecheckpointer'); describe('Network', () => { let mockChannel, mockClient; @@ -39,6 +40,7 @@ describe('Network', () => { let stubEventHubSelectionStrategy; let mockEventHubManager; let mockEventHub; + let mockCheckpointer; beforeEach(() => { mockChannel = sinon.createStubInstance(InternalChannel); @@ -77,6 +79,7 @@ describe('Network', () => { mockPeer5.isInRole.withArgs(FABRIC_CONSTANTS.NetworkConfig.LEDGER_QUERY_ROLE).returns(false); peerArray = [mockPeer1, mockPeer2, mockPeer3, mockPeer4, mockPeer5]; mockChannel.getPeers.returns(peerArray); + mockCheckpointer = sinon.createStubInstance(BaseCheckpointer); stubQueryHandler = {}; @@ -345,6 +348,39 @@ describe('Network', () => { }); }); + describe('#getCheckpointer', () => { + beforeEach(() => { + network.checkpointer = mockCheckpointer; + }); + + it('should return the global checkpointer if it is undefined in options', () => { + const checkpointer = network.getCheckpointer(); + checkpointer.should.equal(mockCheckpointer); + }); + + it('should return the global checkpointer if it is undefined in options object', () => { + const checkpointer = network.getCheckpointer({}); + checkpointer.should.equal(mockCheckpointer); + }); + + it('should return the global checkpointer if it is true in options', () => { + const checkpointer = network.getCheckpointer({checkpointer: 'something'}); + checkpointer.should.equal(mockCheckpointer); + }); + + it('should return the checkpointer passed as an option', () => { + const options = {checkpointer: {factory: () => {}}}; + const checkpointer = network.getCheckpointer(options); + checkpointer.should.equal(options.checkpointer); + checkpointer.should.not.equal(mockCheckpointer); + }); + + it('should return null if checkpointer is false', () => { + const checkpointer = network.getCheckpointer({checkpointer: false}); + should().equal(checkpointer, null); + }); + }); + describe('#saveListener', () => { it ('should register a new listener if the name isnt taken', () => { const listener = {}; @@ -359,7 +395,15 @@ describe('Network', () => { (() => { network.saveListener('listener1', listener); }).should.throw('Listener already exists with the name listener1'); - sinon.assert.called(listener.unregister); + }); + }); + + describe('#_checkListenerNameIsUnique', () => { + it('should throw if the listener name has been used before', () => { + network.listeners.set('listenerName', {}); + (() => { + network._checkListenerNameIsUnique('listenerName'); + }).should.throw('Listener already exists with the name listenerName'); }); }); }); diff --git a/fabric-network/types/index.d.ts b/fabric-network/types/index.d.ts index 42c94fcd1c..f5bfb6d4ff 100644 --- a/fabric-network/types/index.d.ts +++ b/fabric-network/types/index.d.ts @@ -33,6 +33,11 @@ export interface EventListenerOptions { replay?: boolean; filtered?: boolean; unregister?: boolean; + startBlock?: number; + endBlock?: number; + asArray?: boolean; + eventHubConnectWait?: number; + eventHubConnectTimeout?: number; } export interface DiscoveryOptions { @@ -105,7 +110,7 @@ export interface Contract { createTransaction(name: string): Transaction; evaluateTransaction(name: string, ...args: string[]): Promise; submitTransaction(name: string, ...args: string[]): Promise; - addContractListener(listenerName: string, eventName: string, callback: (error: Error, event?: {[key: string]: any}, blockNumber?: string, transactionId?: string, status?: string) => Promise, options?: object): Promise; + addContractListener(listenerName: string, eventName: string, callback: (error: Error, event?: {[key: string]: any} | Array<{[key: string]: any}>, blockNumber?: string, transactionId?: string, status?: string) => Promise, options?: EventListenerOptions): Promise; } export interface TransientMap { @@ -199,7 +204,7 @@ export interface Checkpoint { export class BaseCheckpointer { public setChaincodeId(chaincodeId: string): void; - public loadStartingCheckpoint(): Promise; + public loadLatestCheckpoint(): Promise; } export class FileSystemCheckpointer extends BaseCheckpointer { diff --git a/test/scenario/features/event.feature b/test/scenario/features/event.feature index 43a6b763d1..4b6286bd72 100644 --- a/test/scenario/features/event.feature +++ b/test/scenario/features/event.feature @@ -38,3 +38,7 @@ Feature: Listen to events using a fabric-network When I use the gateway named test_gateway to submit 5 transactions with args [createValue] for chaincode events01 instantiated on channel mychannel # Waiting for 6 events as ChannelEventHub returns the last block connected Then I receive 6 events from the listener unfilteredListener + + Scenario: Using a Contract I can listen to block events emitted by networks between a start and end block + When I use the gateway named test_gateway to listen for unfiltered_block_events with listener unFilteredBlockListener between 0 and 5 on chaincode events01 instantiated on channel mychannel + Then I receive at least 5 events from the listener unFilteredBlockListener diff --git a/test/scenario/features/lib/network.js b/test/scenario/features/lib/network.js index 000017ad70..ec98892a95 100644 --- a/test/scenario/features/lib/network.js +++ b/test/scenario/features/lib/network.js @@ -279,7 +279,7 @@ async function createContractListener(gatewayName, channelName, ccName, eventNam testUtil.logMsg('Contract event error', err); return err; } - testUtil.logMsg('Received a contract event', listenerName); + testUtil.logMsg(`Received a contract event [${listenerName}]`); if (!filtered) { const [event] = args; expect(event.payload.toString('utf8')).to.equal('content'); @@ -294,7 +294,7 @@ async function createContractListener(gatewayName, channelName, ccName, eventNam listeners.set(listenerName, listenerInfo); } -async function createBlockListener(gatewayName, channelName, ccName, listenerName, filtered) { +async function createBlockListener(gatewayName, channelName, ccName, listenerName, filtered, replay, startBlock, endBlock) { const gateway = gateways.get(gatewayName).gateway; const contract = await retrieveContractFromGateway(gateway, channelName, ccName); const network = contract.getNetwork(); @@ -316,10 +316,17 @@ async function createBlockListener(gatewayName, channelName, ccName, listenerNam expect(block).to.have.property('data'); expect(block).to.have.property('metadata'); } + const blockNumber = filtered ? block.number : block.header.number; + if (startBlock) { + expect(Number(blockNumber)).to.be.greaterThan(Number(startBlock) - 1); + } + if (endBlock) { + expect(Number(blockNumber)).to.be.lessThan(Number(endBlock) + 1); + } const listenerInfo = listeners.get(listenerName); listenerInfo.payloads.push(block); listenerInfo.calls = listenerInfo.payloads.length; - }, {filtered, replay: true}); + }, {filtered, replay, startBlock, endBlock}); const listenerInfo = listeners.get(listenerName); listenerInfo.listener = listener; listeners.set(listenerName, listenerInfo); @@ -362,7 +369,7 @@ async function createCommitListener(transactionName, listenerName) { listenerInfo.payloads.push(args); listenerInfo.calls = listenerInfo.payloads.length; }); - listeners.set(listenerName, {listener, payloads: []}); + listeners.set(listenerName, {listener, payloads: [], calls: 0}); } async function submitExistingTransaction(transactionName, args) { diff --git a/test/scenario/features/steps/docker_steps.js b/test/scenario/features/steps/docker_steps.js index fd0e5f83d9..a6bd6f0c3e 100644 --- a/test/scenario/features/steps/docker_steps.js +++ b/test/scenario/features/steps/docker_steps.js @@ -15,7 +15,7 @@ module.exports = function () { const fabricState = StateStore.get('fabricState'); if (!fabricState || !fabricState.deployed || fabricState.type.localeCompare(type) !== 0) { - await testUtil.runShellCommand(undefined, 'rm -f ~/.hlf-checkpoint'); + await testUtil.runShellCommand(undefined, 'rm -rf ~/.hlf-checkpoint'); await testUtil.runShellCommand(undefined, 'docker kill $(docker ps -aq); docker rm $(docker ps -aq)'); if (type.localeCompare('non-tls') === 0) { await testUtil.runShellCommand(true, 'docker-compose -f ' + path.join(__dirname, '../../../fixtures/docker-compose/docker-compose.yaml') + ' -p node up -d'); @@ -29,7 +29,7 @@ module.exports = function () { }); this.Given(/^I have forcibly taken down all docker containers/, {timeout: testUtil.TIMEOUTS.LONG_STEP}, async () => { - await testUtil.runShellCommand(undefined, 'rm -f ~/.hlf-checkpoint'); + await testUtil.runShellCommand(undefined, 'rm -rf ~/.hlf-checkpoint'); await testUtil.runShellCommand(undefined, 'docker kill $(docker ps -aq); docker rm $(docker ps -aq)'); StateStore.set('fabricState', {deployed: false, type: null}); return await testUtil.sleep(testUtil.TIMEOUTS.SHORT_INC); diff --git a/test/scenario/features/steps/network_steps.js b/test/scenario/features/steps/network_steps.js index 74551333ce..c1a1b62919 100644 --- a/test/scenario/features/steps/network_steps.js +++ b/test/scenario/features/steps/network_steps.js @@ -91,19 +91,19 @@ module.exports = function () { }); // Events - this.Then(/^I use the gateway named (.+?) to listen for filtered ([^\s.]+?) events with listener (.+?) on chaincode (.+?) instantiated on channel (.+?)$/, {timeout: testUtil.TIMEOUTS.SHORT_STEP}, async (gatewayName, eventName, listenerName, ccName, channelName) => { - return await network_util.createContractListener(gatewayName, channelName, ccName, eventName, listenerName, false, true); + this.Then(/^I use the gateway named (.+?) to listen for (filtered|unfiltered) ([^\s.]+?) events with listener (.+?) on chaincode (.+?) instantiated on channel (.+?)$/, {timeout: testUtil.TIMEOUTS.SHORT_STEP}, async (gatewayName, eventType, eventName, listenerName, ccName, channelName) => { + return await network_util.createContractListener(gatewayName, channelName, ccName, eventName, listenerName, true, eventType === 'unfiltered'); }); - this.Then(/^I use the gateway named (.+?) to listen for unfiltered (.+?) events with listener (.+?) on chaincode (.+?) instantiated on channel (.+?)$/, {timeout: testUtil.TIMEOUTS.SHORT_STEP}, async (gatewayName, eventName, listenerName, ccName, channelName) => { - return await network_util.createContractListener(gatewayName, channelName, ccName, eventName, listenerName, false, false); + this.Then(/^I use the gateway named (.+?) to listen for filtered_block_events with listener ([a-zA-Z]+) on chaincode (.+?) instantiated on channel (.+?)$/, {timeout: testUtil.TIMEOUTS.SHORT_STEP}, async (gatewayName, listenerName, ccName, channelName) => { + return await network_util.createBlockListener(gatewayName, channelName, ccName, listenerName, true, true); }); - this.Then(/^I use the gateway named (.+?) to listen for filtered_block_events with listener (.+?) on chaincode (.+?) instantiated on channel (.+?)$/, {timeout: testUtil.TIMEOUTS.SHORT_STEP}, async (gatewayName, listenerName, ccName, channelName) => { - return await network_util.createBlockListener(gatewayName, channelName, ccName, listenerName, true); + this.Then(/^I use the gateway named (.+?) to listen for unfiltered_block_events with listener ([a-zA-Z]+) between ([0-9]+) and ([0-9]+) on chaincode (.+?) instantiated on channel (.+?)$/, {timeout: testUtil.TIMEOUTS.SHORT_STEP}, async (gatewayName, listenerName, startBlock, endBlock, ccName, channelName) => { + await network_util.createBlockListener(gatewayName, channelName, ccName, listenerName, false, false, startBlock, endBlock); }); - this.Then(/^I use the gateway named (.+?) to listen for unfiltered_block_events with listener (.+?) on chaincode (.+?) instantiated on channel (.+?)$/, {timeout: testUtil.TIMEOUTS.SHORT_STEP}, async (gatewayName, listenerName, ccName, channelName) => { + this.Then(/^I use the gateway named (.+?) to listen for unfiltered_block_events with listener ([a-zA-Z]+) on chaincode (.+?) instantiated on channel (.+?)$/, {timeout: testUtil.TIMEOUTS.SHORT_STEP}, async (gatewayName, listenerName, ccName, channelName) => { return await network_util.createBlockListener(gatewayName, channelName, ccName, listenerName, false); });