Skip to content

Commit

Permalink
FABN-865: Plug-in event handlers
Browse files Browse the repository at this point in the history
User plug-in point by passing their own factory function as the
'strategy' option on Gateway creation. Sample user implementation
included in fabric-network integration tests.

Change-Id: I961b3e09dbfbc17d92b9acef22593388611bf166
Signed-off-by: Mark S. Lewis <[email protected]>
  • Loading branch information
bestbeforetoday committed Oct 12, 2018
1 parent ddf9c61 commit 792d86f
Show file tree
Hide file tree
Showing 22 changed files with 929 additions and 527 deletions.
25 changes: 20 additions & 5 deletions fabric-network/lib/contract.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ const util = require('util');
*/
class Contract {

constructor(channel, chaincodeId, gateway, queryHandler, eventHandlerFactory, namespace='') {
constructor(network, chaincodeId, gateway, queryHandler, namespace='') {
logger.debug('in Contract constructor');

this.channel = channel;
this.network = network;
this.channel = network.getChannel();
this.chaincodeId = chaincodeId;
this.gateway = gateway;
this.queryHandler = queryHandler;
this.eventHandlerFactory = eventHandlerFactory;
this.eventHandlerOptions = gateway.getOptions().eventHandlerOptions;
this.namespace = namespace;
}

Expand Down Expand Up @@ -94,8 +95,7 @@ class Contract {
this._verifyTransactionDetails('submitTransaction', fullTxName, parameters);

const txId = this.gateway.getClient().newTransactionID();
// createTxEventHandler() will return null if no event handler is requested
const eventHandler = this.eventHandlerFactory ? this.eventHandlerFactory.createTxEventHandler(txId.getTransactionID()) : null;
const eventHandler = this._createTxEventHandler(txId.getTransactionID());

// Submit the transaction to the endorsers.
const request = {
Expand Down Expand Up @@ -138,6 +138,21 @@ class Contract {
return result;
}

/**
* Create an appropriate transaction event handler, if one is configured.
* @private
* @param {String} transactionId The transation ID to listen for.
* @returns {TransactionEventHandler|null} A transactionevent handler, or null if non is configured.
*/
_createTxEventHandler(transactionId) {
const eventHandlerFactoryFn = this.eventHandlerOptions.strategy;
if (eventHandlerFactoryFn) {
return eventHandlerFactoryFn(transactionId, this.network, this.eventHandlerOptions);
} else {
return null;
}
}

/**
* Verify the supplied transaction details.
* @private
Expand Down
39 changes: 29 additions & 10 deletions fabric-network/lib/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,38 @@ class Gateway {

/**
* @typedef {Object} GatewayOptions
* @property {Wallet} wallet The identity wallet implementation for use with this Gateway instance
* @property {string} identity The identity in the wallet for all interactions on this Gateway instance
* @property {string} [clientTlsIdentity] the identity in the wallet to use as the client TLS identity
* @property {DefaultEventHandlerOptions|Object} [eventHandlerOptions] This defines options for the inbuilt default
* event handler capability
* @property {Wallet} wallet The identity wallet implementation for use with this Gateway instance.
* @property {string} identity The identity in the wallet for all interactions on this Gateway instance.
* @property {string} [clientTlsIdentity] the identity in the wallet to use as the client TLS identity.
* @property {DefaultEventHandlerOptions} [eventHandlerOptions] This defines options for the inbuilt default
* event handler capability.
*/

/**
* @typedef {Object} DefaultEventHandlerOptions
* @property {number} [commitTimeout = 300] The timeout period in seconds to wait for commit notification to complete
* @property {MSPID_SCOPE_ALLFORTX|MSPID_SCOPE_ANYFORTX|NETWORK_SCOPE_ALLFORTX|NETWORK_SCOPE_ANYFORTX} [strategy] Event
* handling strategy to identify successful transaction commits. A null value
* indicates that no event handling is desired. The default is {@link MSPID_SCOPE_ALLFORTX}.
* @property {number} [commitTimeout = 300] The timeout period in seconds to wait for commit notification to
* complete.
* @property {?TxEventHandlerFactory} [strategy = MSPID_SCOPE_ALLFORTX] Event handling strategy to identify
* successful transaction commits. A null value indicates that no event handling is desired. The default is
* {@link MSPID_SCOPE_ALLFORTX}.
*/

/**
* @typedef {Function} TxEventHandlerFactory
* @param {String} transactionId The transaction ID for which the handler should listen.
* @param {Network} network The network on which this transaction is being submitted.
* @returns {TxEventHandler} A transaction event handler.
*/

/**
* @typedef {Object} TxEventHandler
* @property {Function} startLstening Async function that resolves when the handler has started listening for
* transaction commit events. Called after the transaction proposal has been accepted and prior to submission of
* the transaction to the orderer.
* @property {Function} waitForEvents Async function that resolves (or rejects) when suitable transaction
* commit events have been received. Called after submission of the transaction to the orderer.
* @property {Function} cancelListening Cancel listening. Called if submission of the transaction to the orderer
* fails.
*/

/**
Expand Down Expand Up @@ -215,7 +234,7 @@ class Gateway {

async _createQueryHandler(channel, peerMap) {
if (this.queryHandlerClass) {
const currentmspId = this.getCurrentIdentity()._mspId;
const currentmspId = this.getCurrentIdentity().getIdentity().getMSPId();
const queryHandler = new this.queryHandlerClass(
channel,
currentmspId,
Expand Down
2 changes: 1 addition & 1 deletion fabric-network/lib/impl/event/abstracteventstrategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class AbstractEventStrategy {
* Called by event handler to obtain the event hubs to which it should listen. Gives an opportunity for
* the strategy to store information on the events it expects to receive for later use in event handling.
* @async
* @returns ChannelEventHubs[] connected event hubs.
* @returns {ChannelEventHubs[]} connected event hubs.
* @throws {Error} if the connected event hubs do not satisfy the strategy.
*/
async getConnectedEventHubs() {
Expand Down
64 changes: 0 additions & 64 deletions fabric-network/lib/impl/event/defaulteventhandlermanager.js

This file was deleted.

37 changes: 23 additions & 14 deletions fabric-network/lib/impl/event/defaulteventhandlerstrategies.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@

const AllForTxStrategy = require('fabric-network/lib/impl/event/allfortxstrategy');
const AnyForTxStrategy = require('fabric-network/lib/impl/event/anyfortxstrategy');
const TransactionEventHandler = require('fabric-network/lib/impl/event/transactioneventhandler');

function getOrganizationEventHubs(network) {
const peers = network.getChannel().getPeersForOrg();
return network.getEventHubFactory().getEventHubs(peers);
}

function getNetworkEventHubs(network) {
const peers = network.getChannel().getPeers();
return network.getEventHubFactory().getEventHubs(peers);
}

/**
* Default event handler strategy.<br>
Expand All @@ -16,9 +27,9 @@ const AnyForTxStrategy = require('fabric-network/lib/impl/event/anyfortxstrategy
* until <b>all</b> of the events to be received from
* all of the event hubs that are still connected (minimum 1).
*/
function MSPID_SCOPE_ALLFORTX(eventHubFactory, network, mspId) {
const peers = network.getPeerMap().get(mspId);
return new AllForTxStrategy(eventHubFactory.getEventHubs(peers));
function MSPID_SCOPE_ALLFORTX(transactionId, network, options) {
const eventStrategy = new AllForTxStrategy(getOrganizationEventHubs(network));
return new TransactionEventHandler(transactionId, eventStrategy, options);
}

/**
Expand All @@ -27,9 +38,9 @@ function MSPID_SCOPE_ALLFORTX(eventHubFactory, network, mspId) {
* The [submitTransaction]{@link Contract#submitTransaction} method will wait
* until the first event from <b>any</b> of the event hubs that are still connected.
*/
function MSPID_SCOPE_ANYFORTX(eventHubFactory, network, mspId) {
const peers = network.getPeerMap().get(mspId);
return new AnyForTxStrategy(eventHubFactory.getEventHubs(peers));
function MSPID_SCOPE_ANYFORTX(transactionId, network, options) {
const eventStrategy = new AnyForTxStrategy(getOrganizationEventHubs(network));
return new TransactionEventHandler(transactionId, eventStrategy, options);
}

/**
Expand All @@ -39,10 +50,9 @@ function MSPID_SCOPE_ANYFORTX(eventHubFactory, network, mspId) {
* until <b>all</b> of the events to be received from
* all of the event hubs that are still connected (minimum 1).
*/
//eslint-disable-next-line no-unused-vars
function NETWORK_SCOPE_ALLFORTX(eventHubFactory, network, mspId) {
const peers = network.getChannel().getPeers();
return new AllForTxStrategy(eventHubFactory.getEventHubs(peers));
function NETWORK_SCOPE_ALLFORTX(transactionId, network, options) {
const eventStrategy = new AllForTxStrategy(getNetworkEventHubs(network));
return new TransactionEventHandler(transactionId, eventStrategy, options);
}

/**
Expand All @@ -51,10 +61,9 @@ function NETWORK_SCOPE_ALLFORTX(eventHubFactory, network, mspId) {
* The [submitTransaction]{@link Contract#submitTransaction} method will wait
* until the first event from <b>any</b> of the event hubs that are still connected.
*/
//eslint-disable-next-line no-unused-vars
function NETWORK_SCOPE_ANYFORTX(eventHubFactory, network, mspId) {
const peers = network.getChannel().getPeers();
return new AnyForTxStrategy(eventHubFactory.getEventHubs(peers));
function NETWORK_SCOPE_ANYFORTX(transactionId, network, options) {
const eventStrategy = new AnyForTxStrategy(getNetworkEventHubs(network));
return new TransactionEventHandler(transactionId, eventStrategy, options);
}

module.exports = {
Expand Down
2 changes: 1 addition & 1 deletion fabric-network/lib/impl/event/eventhubfactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class EventHubFactory {
/**
* Gets event hubs for all specified peers. Where possible, the event hubs will be connected.
* @async
* @param {ChannelPeer} peers Peers for which event hubs should be connected.
* @param {ChannelPeer[]} peers Peers for which event hubs should be connected.
* @returns {ChannelEventHub[]} Event hubs, which may or may not have successfully connected.
*/
async getEventHubs(peers) {
Expand Down
8 changes: 4 additions & 4 deletions fabric-network/lib/impl/wallet/basewallet.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ class BaseWallet extends Wallet {
// a mixin can override the getCryptoSuite
//========================================

async getStateStore(label) {
async getStateStore(label) { // eslint-disable-line no-unused-vars
throw new Error('Not implemented');
}

async getCryptoSuite(label) {
async getCryptoSuite(label) { // eslint-disable-line no-unused-vars
throw new Error('Not implemented');
}

Expand Down Expand Up @@ -173,11 +173,11 @@ class BaseWallet extends Wallet {
//=========================================================


async delete(label) {
async delete(label) { // eslint-disable-line no-unused-vars
throw new Error('Not implemented');
}

async exists(label) {
async exists(label) { // eslint-disable-line no-unused-vars
throw new Error('Not implemented');
}

Expand Down
42 changes: 14 additions & 28 deletions fabric-network/lib/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
'use strict';
const FabricConstants = require('fabric-client/lib/Constants');
const Contract = require('./contract');
const EventHubFactory = require('fabric-network/lib/impl/event/eventhubfactory');

const logger = require('./logger').getLogger('Network');
const DefaultEventHandlerManager = require('./impl/event/defaulteventhandlermanager');
const util = require('util');

/**
Expand All @@ -31,6 +32,7 @@ class Network {
this.gateway = gateway;
this.channel = channel;
this.contracts = new Map();
this.eventHubFactory = new EventHubFactory(channel);
this.initialized = false;
}

Expand Down Expand Up @@ -127,9 +129,8 @@ class Network {
}

await this._initializeInternalChannel();
this.peerMap = this._mapPeersToMSPid();
this.eventHandlerManager = await this._createEventHandlerManager();
this.queryHandler = await this.gateway._createQueryHandler(this.channel, this.peerMap);
const peerMap = this._mapPeersToMSPid();
this.queryHandler = await this.gateway._createQueryHandler(this.channel, peerMap);
this.initialized = true;
}

Expand All @@ -139,12 +140,6 @@ class Network {
return this.channel;
}

getPeerMap() {
logger.debug('in getPeerMap');

return this.peerMap;
}

/**
* Returns an instance of a contract (chaincode) on the current network
* @param {string} chaincodeId the chaincode Identifier
Expand All @@ -161,34 +156,17 @@ class Network {
let contract = this.contracts.get(key);
if (!contract) {
contract = new Contract(
this.channel,
this,
chaincodeId,
this.gateway,
this.queryHandler,
this.eventHandlerManager,
namespace
);
this.contracts.set(key, contract);
}
return contract;
}

async _createEventHandlerManager() {
const createEventStrategyFn = this.gateway.getOptions().eventHandlerOptions.strategy;
if (createEventStrategyFn) {
const currentmspId = this.gateway.getCurrentIdentity()._mspId;
const eventHandlerManager = new DefaultEventHandlerManager(
this,
currentmspId,
this.gateway.getOptions().eventHandlerOptions
);
await eventHandlerManager.initialize();
return eventHandlerManager;
}
return null;

}

_dispose() {
logger.debug('in _dispose');

Expand All @@ -206,6 +184,14 @@ class Network {
this.initialized = false;
}

/**
* Get the event hub factory for this network.
* @private
* @returns {EventHubFactory} An event hub factory.
*/
getEventHubFactory() {
return this.eventHubFactory;
}
}

module.exports = Network;
Loading

0 comments on commit 792d86f

Please sign in to comment.