diff --git a/README.md b/README.md index bf09139ef7..59422f63c1 100644 --- a/README.md +++ b/README.md @@ -92,10 +92,10 @@ For a high-level design specificiation for Fabric SDKs of all languages, visit [ fabric-client and fabric-ca-client are written in CommonJS modules and take advantage of ECMAScript 2015 class syntax. -* The main top-level class is **Channel**. It is the client's view of a fabric [channel](https://docs.google.com/document/d/1eRNxxQ0P8yp4Wh__Vi6ddaN_vhN2RQHP-IruHNUwyhc/). The SDK allows you to interact with multiple channels. A channel object can be configured with a different ordering service or share a common ordering service, depending on how the target blockchain network is set up. A channel object has a _KeyValueStore_ to store private keys and certificates for authenticated users. Through the channel object the application can perform +* The main top-level class is **Channel**. It is the client's view of a fabric [channel](https://docs.google.com/document/d/1eRNxxQ0P8yp4Wh__Vi6ddaN_vhN2RQHP-IruHNUwyhc/). The SDK allows you to interact with multiple channels. A channel object can be configured with a different ordering service or share a common ordering service, depending on how the target blockchain network is set up. A channel object has a _KeyValueStore_ to store private keys and certificates for authenticated users. Through the channel object the application can perform * The **KeyValueStore** is a very simple interface which SDK uses to store and retrieve all persistent data. This data includes private keys, so it is very important to keep this storage secure. The default implementation is a simple file-based version found in the _FileKeyValueStore_ class. The SDK also provides an implementation based on CouchDB which can be configured to use a local CouchDB database or a remote deployment including a Cloudant database. * The **User** class represents an end user who transacts on the channel. The user object must have a valid enrollment configured in order to properly sign transaction requests. The enrollment materials can either be obtained from enrolling with fabric-ca or an external Certificate Authority. -* The **EventHub** class encapsulates the interaction with the network peers' event streams. +* The **ChannelEventHub** class encapsulates the interaction with the network peers' event streams. * The **FabricCAClientImpl** class provides security and identity related features such as user registration and enrollment, transaction certificate issuance. The Hyperledger Fabric has a built-in implementation that issues _ECerts_ (enrollment certificates) and _TCerts_ (transaction certificates). ECerts are for enrollment identity and TCerts are for transactions. ### Pluggability diff --git a/docs/index.md b/docs/index.md index be4c150a57..ee4b98754c 100755 --- a/docs/index.md +++ b/docs/index.md @@ -37,10 +37,10 @@ The SDK's list of features include: * [transaction-by-id]{@link Channel#queryTransaction} * [channel configuration data]{@link Channel#getChannelConfig} * monitoring events: - * [connect to a peer's event stream]{@link EventHub#connect} - * listen on [block events]{@link EventHub#registerBlockEvent} - * listen on [transactions events]{@link EventHub#registerTxEvent} and find out if the transaction was successfully committed to the ledger or marked invalid - * listen on [custom events]{@link EventHub#registerChaincodeEvent} produced by chaincodes + * [connect to a peer's event stream]{@link ChannelEventHub#connect} + * listen on [block events]{@link ChannelEventHub#registerBlockEvent} + * listen on [transactions events]{@link ChannelEventHub#registerTxEvent} and find out if the transaction was successfully committed to the ledger or marked invalid + * listen on [custom events]{@link ChannelEventHub#registerChaincodeEvent} produced by chaincodes * serializable [User]{@link User} object with signing capabilities * [hierarchical configuration]{@link Client.getConfigSetting} settings with multiple layers of overrides: files, environment variable, program arguments, in-memory settings * [logging utility]{@link Client.setLogger} with a built-in logger (winston) and can be overriden with a number of popular loggers including log4js and bunyan diff --git a/fabric-client/lib/Client.js b/fabric-client/lib/Client.js index 12587bc11b..a38bbb6400 100644 --- a/fabric-client/lib/Client.js +++ b/fabric-client/lib/Client.js @@ -16,7 +16,6 @@ const User = require('./User.js'); const Channel = require('./Channel.js'); const Packager = require('./Packager.js'); const Peer = require('./Peer.js'); -const EventHub = require('./EventHub.js'); const ChannelEventHub = require('./ChannelEventHub'); const Orderer = require('./Orderer.js'); const TransactionID = require('./TransactionID.js'); @@ -382,60 +381,6 @@ const Client = class extends BaseClient { return orderer; } - /** - * Returns an {@link EventHub} object. An event hub object encapsulates the - * properties of an event stream on a peer node, through which the peer publishes - * notifications of blocks being committed in the channel's ledger. - * - * @returns {EventHub} The EventHub instance - */ - newEventHub() { - return new EventHub(this); - } - - /** - * Returns an {@link EventHub} object based on the event hub address - * as defined in the currently loaded network configuration for the - * peer by the name parameter. The named peer must have the "eventUrl" - * setting or a null will be returned. - * - * @param {string} peer_name - The name of the peer that has an event hub defined - * @returns {EventHub} The EventHub instance that has had the event hub address assigned - */ - getEventHub(peer_name) { - if (this._network_config) { - return this._network_config.getEventHub(peer_name); - } else { - return null; - } - } - - /** - * Returns a list of {@link EventHub} for an organization as defined - * in the currently loaded network configuration. If no organization mspid is - * provided then the organization referenced in the currently active network - * configuration's client section will be used. The list will be based on - * the peers in the organization that have the "eventUrl" setting. - * - * @param {string} mspid - Optional - The mspid of an organization - * @returns {EventHub[]} An array of EventHub instances that are defined for this organization - */ - getEventHubsForOrg(mspid) { - let event_hubs = []; - let _mspid = mspid; - if (!mspid) { - _mspid = this._mspid; - } - if (_mspid && this._network_config) { - const organization = this._network_config.getOrganizationByMspId(_mspid); - if (organization) { - event_hubs = organization.getEventHubs(); - } - } - - return event_hubs; - } - /* * Private utility method to get target peers. The peers will be in the organization of this client, * (meaning the peer has the same mspid). If this client is not assigned a mspid, then all @@ -723,7 +668,7 @@ const Client = class extends BaseClient { * @returns {Promise} Promise for a result object with status on the acceptance of the update request * by the orderer. A channel update is finally completed when the new channel configuration * block created by the orderer has been committed to the channel's peers. To be notified - * of the successful update of the channel, an application should use the {@link EventHub} + * of the successful update of the channel, an application should use the {@link ChannelEventHub} * to connect to the peers and register a block listener. */ updateChannel(request) { @@ -1812,7 +1757,6 @@ function _getNetworkConfig(config, client) { module.exports = Client; module.exports.Peer = Peer; -module.exports.EventHub = EventHub; module.exports.ChannelEventHub = ChannelEventHub; module.exports.Orderer = Orderer; module.exports.Channel = Channel; diff --git a/fabric-client/lib/EventHub.js b/fabric-client/lib/EventHub.js deleted file mode 100644 index c06c332bd8..0000000000 --- a/fabric-client/lib/EventHub.js +++ /dev/null @@ -1,846 +0,0 @@ -/* - Copyright 2016, 2018 London Stock Exchange All Rights Reserved. - - SPDX-License-Identifier: Apache-2.0 - -*/ - -'use strict'; - -var utils = require('./utils.js'); -var Remote = require('./Remote.js'); -var BlockDecoder = require('./BlockDecoder.js'); -var clientUtils = require('./client-utils.js'); -var grpc = require('grpc'); -var logger = utils.getLogger('EventHub.js'); - -var _events = grpc.load(__dirname + '/protos/peer/events.proto').protos; -var _common = grpc.load(__dirname + '/protos/common/common.proto').common; -var _transProto = grpc.load(__dirname + '/protos/peer/transaction.proto').protos; -const five_minutes_ms = 5*60*1000; - -var _validation_codes = {}; -var keys = Object.keys(_transProto.TxValidationCode); -for(var i = 0;i
- * Fabric committing peers provides an event stream to publish events to registered - * listeners. As of v1.0, the only events that get published are Block events. A - * Block event gets published whenever the committing peer adds a validated block - * to the ledger. There are three ways to register a listener to get notified: - *
  • register a "block listener" to get called for every block event on all channels. The listener - * will be passed a fully decoded {@link Block} object. See [registerBlockEvent]{@link EventHub#registerBlockEvent} - *
  • register a "transaction listener" to get called when the specific transaction - * by id is committed (discovered inside a block event). The listener will be - * passed the transaction id and the [validation code]{@link https://github.com/hyperledger/fabric/blob/v1.0.0/protos/peer/transaction.proto#L125}. - * See [registerTxEvent]{@link EventHub#registerTxEvent} - *
  • register a "chaincode event listener" to get called when a specific - * [chaincode event]{@link https://github.com/hyperledger/fabric/blob/v1.0.0/examples/chaincode/go/eventsender/eventsender.go#L65} - * has arrived. The listener will be passed the {@link ChaincodeEvent}. See - * [registerChaincodeEvent]{@link EventHub#registerChaincodeEvent} - *

    - * The events are ephemeral, such that if a registered listener - * crashed when the event is published, the listener will miss the event. - * There are several techniques to compensate for missed events due to client crashes: - *
  • register block event listeners and record the block numbers received, such that - * when the next block arrives and its number is not the next in sequence, then - * the application knows exactly which block events have been missed. It can then use - * [queryBlock]{@link Channel#queryBlock} to get those missed blocks from the target peer. - *
  • use a message queue to catch all the block events. With many robust message queue - * implementations available today, you will be guaranteed to not miss an event. A - * fabric event listener can be written in any programming language. The following - * implementations can be used as reference to write the necessary glue code between - * the fabric event stream and a message queue: - * - * - * @example - * var eh = client.newEventHub(); - * eh.setPeerAddr( - * 'grpcs://localhost:7053', - * { - * pem: Buffer.from(certdata).toString(), - * 'ssl-target-name-override': 'peer1'] - * } - * ); - * - * // register the listeners before calling "connect()" so that we can - * // have the error callback ready to process an error in case the - * // connect() call fails - * eh.registerTxEvent( - * transactionId, - * (tx, code) => { - * eh.unregisterTxEvent(transactionId); - * console.log(util.format('Transaction %s has completed', transactionId)); - * }, - * (err) => { - * eh.unregisterTxEvent(transactionId); - * console.log(util.format('Error %s! Transaction listener for %s has been ' + - * 'deregistered with %s', transactionId, err, eh.getPeerAddr())); - * } - * ); - * - * eh.connect(); - * - * @class - */ -var EventHub = class { - - /** - * Constructs an EventHub object - * - * @param {Client} clientContext - An instance of the Client class - * which has already been initialzed with a userContext. - * @returns {EventHub} An instance of this class - */ - - constructor(clientContext) { - logger.debug('const '); - // hashtable of clients registered for chaincode events - this._chaincodeRegistrants = {}; - // set of clients registered for block events - this._block_registrant_count = 1; - this._blockOnEvents = {}; - this._blockOnErrors = {}; - // hashtable of clients registered for transactional events - this._transactionOnEvents = {}; - this._transactionOnErrors = {}; - // peer node to connect to - this._ep = null; - // grpc event client interface - this._event_client = null; - // grpc chat streaming interface - this._stream = null; - // fabric connection state of this eventhub - this._connected = false; - this._connect_running = false; - this._disconnect_running = false; - // should this event hub reconnect on registrations - this._force_reconnect = true; - // connect count for this instance - this._current_stream = 0; - // reference to the client instance holding critical context such as signing identity - if (typeof clientContext === 'undefined' || clientContext === null || clientContext === '') - throw new Error('Missing required argument: clientContext'); - - this._clientContext = clientContext; - } - - /** - * @typedef {Object} EventRegistrationRequest - */ - - /** - * Set peer event source url. - * - * @param {string} peeraddr - grpc or grpcs URL for the target peer's event source - * @param {ConnectionOpts} opts - The options for the connection to the peer. - */ - setPeerAddr(peerUrl, opts) { - logger.debug('setPeerAddr - %s',peerUrl); - //clean up - this._disconnect(new Error('EventHub has been shutdown due to new Peer address assignment')); - this._ep = new Remote(peerUrl, opts); - } - - /** - * Return the peer url of this event hub object - */ - getPeerAddr() { - var addr = null; - if(this._ep) { - addr = this._ep._endpoint.addr; - } - - return addr; - } - - /** - * Is the event hub connected to the event source? - * @returns {boolean} True if connected to the event source, false otherwise - */ - isconnected() { - return this._connected; - } - - /** - * Establishes a connection with the peer event source. - * The peer address must be set by calling the [setPeerAddr()]{@link EventHub#setPeerAddr} - * method before calling this method. - * - * The connection will be established asynchronously. If the connection fails to - * get established, the application will be notified via the error callbacks - * from the registerXXXEvent() methods. It is recommended that an application always - * registers at least one event listener with an error callback, by calling any one of the - * [registerBlockEvent]{@link EventHub#registerBlockEvent}, - * [registerTxEvent]{@link EventHub#registerTxEvent} or - * [registerChaincodeEvent]{@link EventHub#registerChaincodeEvent} - * methods, before calling connect(). - */ - connect(){ - logger.debug('connect - start'); - this._connect_running = false; //override a running connect - - if (typeof this._clientContext.getUserContext !== 'function') - throw new Error('Invalid clientContext argument: missing required function "getUserContext"'); - - if (typeof this._clientContext._getSigningIdentity(true) === 'undefined') - throw new Error('The clientContext has not been properly initialized, missing identity'); - - this._connect(); - } - - /* - * Internal use only - * Establishes a connection with the peer event source - * @param {boolean} force - internal use only, will reestablish the - * the connection to the peer event hub - */ - _connect(force) { - logger.debug('_connect - start - %s', new Date()); - if(this._connect_running) { - logger.debug('_connect - connect is running'); - return; - } - if (!force && this._connected) { - logger.debug('_connect - end - already conneted'); - return; - } - if (!this._ep) throw Error('Must set peer address before connecting.'); - - // close out the old stream - if(this._stream) { - this._stream.end(); - this._stream = null; - } - - this._connect_running = true; - this._current_stream++; - var stream_id = this._current_stream; - logger.debug('_connect - start stream:',stream_id); - var self = this; // for callback context - - var send_timeout = setTimeout(function(){ - logger.error('_connect - timed out after:%s', self._ep._request_timeout); - self._connect_running = false; - self._disconnect(new Error('Unable to connect to the peer event hub')); - }, self._ep._request_timeout); - - // check on the keep alive options - // the keep alive interval - var options = utils.checkAndAddConfigSetting('grpc.http2.keepalive_time', 360, this._ep._options); //grpc 1.2.4 - options = utils.checkAndAddConfigSetting('grpc.keepalive_time_ms', 360000, options); //grpc 1.3.7 - // how long should we wait for the keep alive response - let request_timeout_ms = utils.getConfigSetting('request-timeout', 3000); - let request_timeout = request_timeout_ms / 1000; - options = utils.checkAndAddConfigSetting('grpc.http2.keepalive_timeout', request_timeout, options); //grpc 1.2.4 - options = utils.checkAndAddConfigSetting('grpc.keepalive_timeout_ms', request_timeout_ms, options); //grpc 1.3.7 - options = utils.checkAndAddConfigSetting('grpc.http2.min_time_between_pings_ms', five_minutes_ms, options); //default 5 - - logger.debug('_connect - options %j', options); - this._event_client = new _events.Events(this._ep._endpoint.addr, this._ep._endpoint.creds, options); - this._stream = this._event_client.chat(); - - this._stream.on('data', function(event) { - self._connect_running = false; - clearTimeout(send_timeout); - logger.debug('on.data - event stream:%s _current_stream:%s',stream_id, self._current_stream); - if(stream_id != self._current_stream) { - logger.debug('on.data - incoming event was from a cancel stream'); - return; - } - - let state = getStreamState(self); - logger.debug('on.data - grpc stream state :%s',state); - if (event.Event == 'block') { - var block = BlockDecoder.decodeBlock(event.block); - self._processBlockOnEvents(block); - self._processTxOnEvents(block); - self._processChainCodeOnEvents(block); - } - else if (event.Event == 'register'){ - logger.debug('on.data - register event received'); - self._connected = true; - } - else if (event.Event == 'unregister'){ - if(self._connected) self._disconnect(new Error('Peer event hub has disconnected due to an "unregister" event')); - logger.debug('on.data - unregister event received'); - } - else { - logger.debug('on.data - unknown event %s',event.Event); - } - }); - - this._stream.on('end', function() { - self._connect_running = false; - clearTimeout(send_timeout); - logger.debug('on.end - event stream:%s _current_stream:%s',stream_id, self._current_stream); - if(stream_id != self._current_stream) { - logger.debug('on.end - incoming event was from a canceled stream'); - return; - } - - let state = getStreamState(self); - logger.debug('on.end - grpc stream state :%s',state); - self._disconnect(new Error('Peer event hub has disconnected due to an "end" event')); - }); - - this._stream.on('error', function(err) { - self._connect_running = false; - clearTimeout(send_timeout); - logger.debug('on.error - event stream:%s _current_stream:%s',stream_id, self._current_stream); - if(stream_id != self._current_stream) { - logger.debug('on.error - incoming event was from a canceled stream'); - logger.debug('on.error - %s %s',new Date(),err); - return; - } - - let state = getStreamState(self); - logger.debug('on.error - grpc stream state :%s',state); - if(err instanceof Error) { - self._disconnect(err); - } - else { - self._disconnect(new Error(err)); - } - }); - - this._sendRegistration(true); - logger.debug('_connect - end stream:',stream_id); - } - - /** - * Disconnects the event hub from the peer event source. - * Will close all event listeners and send an Error object - * with the message "EventHub has been shutdown" to - * all listeners that provided an "onError" callback. - */ - disconnect() { - if(this._disconnect_running) { - logger.debug('disconnect - is running, exit'); - return; - } - // to be sure applications do not call again as sometimes the disconnect - // has been placed in the callback causing an endless loop - this._disconnect_running = true; - let err = new Error('EventHub has been shutdown'); - if(this._connected || this._connect_running) { - this._disconnect(err); - } else { - // close and report to all the listeners - this._closeAllCallbacks(err); - logger.debug('disconnect - EventHub is not connected'); - } - this._disconnect_running = false; - } - - /* Internal method - * Disconnects the connection to the peer event source. - * Will close all event listeners and send an `Error` to - * all listeners that provided an "onError" callback. - */ - _disconnect(err) { - logger.debug('_disconnect - start -- called due to:: %s',err.message); - this._connected = false; - this._closeAllCallbacks(err); - if(this._stream) { - logger.debug('_disconnect - shutdown existing stream'); - this._sendRegistration(false); - this._stream.end(); - this._stream = null; - } - if(this._event_client) { - this._event_client.close(); - } - } - - /* - * Internal method - * Builds a signed event registration - * and sends it to the peer's event hub. - */ - _sendRegistration(register) { - var identity = this._clientContext._getSigningIdentity(true); - var signedEvent = new _events.SignedEvent(); - var event = new _events.Event(); - var reg = {events: [{event_type: 'BLOCK'}]}; - - if(register) { - event.setRegister(reg); - } - else { - event.setUnregister(reg); - } - - event.setCreator(identity.serialize()); - event.setTimestamp(clientUtils.buildCurrentTimestamp()); - let client_cert_hash = this._clientContext.getClientCertHash(); - if(client_cert_hash) { - event.setTlsCertHash(client_cert_hash); - } - signedEvent.setEventBytes(event.toBuffer()); - var sig = identity.sign(event.toBuffer()); - signedEvent.setSignature(Buffer.from(sig)); - this._stream.write(signedEvent); - } - - /* - * Internal method - * Will close out all callbacks - * Sends an error to all registered event "onError" callbacks - */ - _closeAllCallbacks(err) { - logger.debug('_closeAllCallbacks - start'); - - var closer = function(list) { - for (let key in list) { - let cb = list[key]; - logger.debug('_closeAllCallbacks - closing this callback %s',key); - cb(err); - } - }; - - logger.debug('_closeAllCallbacks - blockOnErrors %s', Object.keys(this._blockOnErrors).length); - closer(this._blockOnErrors); - this._blockOnEvents = {}; - this._blockOnErrors = {}; - - logger.debug('_closeAllCallbacks - transactionOnErrors %s', Object.keys(this._transactionOnErrors).length); - closer(this._transactionOnErrors); - this._transactionOnEvents = {}; - this._transactionOnErrors = {}; - - var self = this; - var cc_closer = function(key) { - var cbtable = self._chaincodeRegistrants[key]; - cbtable.forEach(function(cbe) { - logger.debug('_closeAllCallbacks - closing this chaincode event ccid:%s eventNameFilter:%s',cbe.ccid, cbe.eventNameFilter); - if(cbe.onError) { - cbe.onError(err); - } - }); - }; - - logger.debug('_closeAllCallbacks - chaincodeRegistrants %s', Object.keys(this._chaincodeRegistrants).length); - Object.keys(this._chaincodeRegistrants).forEach(cc_closer); - this._chaincodeRegistrants = {}; - } - - /* - * Internal method - * checks for a connection and will restart - */ - _checkConnection(throw_error, force_reconnect) { - logger.debug('_checkConnection - start throw_error %s, force_reconnect %s',throw_error, force_reconnect); - if(!this._stream) { - // when there is no stream, then wait for the user to do a 'connect' - return; - } - let state = getStreamState(this); - if(this._connected || this._connect_running || state == 2) { - logger.debug('_checkConnection - this hub %s is connected or trying to connect with stream channel state %s', this._ep.getUrl(), state); - } - else { - logger.debug('_checkConnection - this hub %s is not connected with stream channel state %s', this._ep.getUrl(), state); - if(throw_error && !force_reconnect) { - throw new Error('The event hub has not been connected to the event source'); - } - } - - //reconnect will only happen when there is error callback - if(force_reconnect) { - try { - var is_paused = this._stream.isPaused(); - logger.debug('_checkConnection - grpc isPaused :%s',is_paused); - if(is_paused) { - this._stream.resume(); - logger.debug('_checkConnection - grpc resuming '); - } else if(state != 2) { - // try to reconnect - this._connected = false; - this._connect(true); - } - } - catch(error) { - logger.error('_checkConnection - error ::' + error.stack ? error.stack : error); - var err = new Error('Event hub is not connected '); - this._disconnect(err); - } - } - } - - /** - * @typedef {Object} ChaincodeEvent - * @property {string} chaincode_id - * @property {string} tx_id - * @property {string} event_name - * @property {byte[]} payload - Application-specific byte array that the chaincode set - * when it called stub.SetEvent(event_name, payload) - */ - - /** - * Register a listener to receive chaincode events. - *

    - * An error may be thrown by this call if no "onError" callback - * is provided and this EventHub has noticed that the connection has not been - * established. However since the connection establishment is running - * asynchronously, a register call could be made before this EventHub has been - * notified of the network issue. The best practice would be to provide an - * "onError" callback to be notified when this EventHub has an issue. - * - * @param {string} ccid - Id of the chaincode of interest - * @param {string} eventname - The exact name of the chaincode event (must match - * the name given to the target chaincode's call to - * stub.SetEvent(name, payload)), or a - * regex string to match more than one event by this - * chaincode - * @param {function} onEvent - callback function for matched events. It gets passed - * a single parameter which is a {@link ChaincodeEvent} object - * @param {function} onError - Optional callback function to be notified when this event hub - * is shutdown. The shutdown may be caused by a network error or by - * a call to the "disconnect()" method or a connection error. - * @returns {Object} An object that should be treated as an opaque handle used - * to unregister (see unregisterChaincodeEvent) - */ - registerChaincodeEvent(ccid, eventname, onEvent, onError) { - logger.debug('registerChaincodeEvent - start'); - if(!ccid) { - throw new Error('Missing "ccid" parameter'); - } - if(!eventname) { - throw new Error('Missing "eventname" parameter'); - } - if(!onEvent) { - throw new Error('Missing "onEvent" parameter'); - } - var have_error_cb = onError ? true : false; - // when there is no error callback throw an error - // when this hub is not connected - this._checkConnection(!have_error_cb, false); - - var cbe = new ChainCodeCBE(ccid, eventname, onEvent, onError); - var cbtable = this._chaincodeRegistrants[ccid]; - if (!cbtable) { - cbtable = new Set(); - this._chaincodeRegistrants[ccid] = cbtable; - } - cbtable.add(cbe); - - // when there is an error callback try to reconnect this - // event hub if is not connected - if(have_error_cb) { - this._checkConnection(false, this._force_reconnect); - } - - return cbe; - } - - /** - * Unregister the chaincode event listener represented by - * the listener_handle object returned by - * the registerChaincodeEvent() method - * - * @param {Object} listener_handle - The handle object returned from the call to - * registerChaincodeEvent. - */ - unregisterChaincodeEvent(listener_handle) { - logger.debug('unregisterChaincodeEvent - start'); - if(!listener_handle) { - throw new Error('Missing "listener_handle" parameter'); - } - var cbtable = this._chaincodeRegistrants[listener_handle.ccid]; - if (!cbtable) { - logger.debug('No event registration for ccid %s ', listener_handle.ccid); - return; - } - cbtable.delete(listener_handle); - if (cbtable.size <= 0) { - delete this._chaincodeRegistrants[listener_handle.ccid]; - } - } - - /** - * Register a listener to receive all block events from all the channels that - * the target peer is part of. The listener's "onEvent" callback gets called - * on the arrival of every block. If the target peer is expected to participate - * in more than one channel, then care must be taken in the listener's implementation - * to differentiate blocks from different channels. See the example below on - * how to accomplish that. - *

    - * An error may be thrown by this call if no "onError" callback - * is provided and this EventHub has noticed that the connection has not been - * established. However since the connection establishment is running - * asynchronously, a register call could be made before this EventHub has been - * notified of the network issue. The best practice would be to provide an - * "onError" callback to be notified when this EventHub has an issue. - * - * @param {function} onEvent - Callback function that takes a single parameter - * of a {@link Block} object - * @param {function} onError - Optional callback function to be notified when this event hub - * is shutdown. The shutdown may be caused by a network error or by - * a call to the "disconnect()" method or a connection error. - * @returns {int} This is the block registration number that must be - * sed to unregister (see unregisterBlockEvent) - * - * @example Find out the channel Id of the arriving block - * eh.registerBlockEvent( - * (block) => { - * var first_tx = block.data.data[0]; // get the first transaction - * var header = first_tx.payload.header; // the "header" object contains metadata of the transaction - * var channel_id = header.channel_header.channel_id; - * if ("mychannel" !== channel_id) return; - * - * // do useful processing of the block - * }, - * (err) => { - * console.log('Oh snap!'); - * } - * ); - */ - registerBlockEvent(onEvent, onError) { - logger.debug('registerBlockEvent - start'); - if(!onEvent) { - throw new Error('Missing "onEvent" parameter'); - } - var have_error_cb = onError ? true : false; - // when there is no error callback throw and error - // when this hub is not connected - this._checkConnection(!have_error_cb, false); - - var block_registration_number = this._block_registrant_count++; - this._blockOnEvents[block_registration_number] = onEvent; - - // when there is an error callback try to reconnect this - // event hub if is not connected - if(have_error_cb) { - this._blockOnErrors[block_registration_number] = onError; - this._checkConnection(false, this._force_reconnect); - } - - return block_registration_number; - } - - /** - * Unregister the block event listener using the block - * registration number that is returned by the call to - * the registerBlockEvent() method. - * - * @param {int} The block registration number that was returned - * during registration. - */ - unregisterBlockEvent(block_registration_number) { - logger.debug('unregisterBlockEvent - start %s',block_registration_number); - if(!block_registration_number) { - throw new Error('Missing "block_registration_number" parameter'); - } - delete this._blockOnEvents[block_registration_number]; - delete this._blockOnErrors[block_registration_number]; - } - - /** - * Register a callback function to receive a notification when the transaction - * by the given id has been committed into a block. - *

    - * An error may be thrown by this call if no "onError" callback - * is provided and this EventHub has noticed that the connection has not been - * established. However since the connection establishment is running - * asynchronously, a register call could be made before this EventHub has been - * notified of the network issue. The best practice would be to provide an - * "onError" callback to be notified when this EventHub has an issue. - * - * @param {string} txid - Transaction id string - * @param {function} onEvent - Callback function that takes a parameter of type - * {@link Transaction}, and a string parameter which - * indicates if the transaction is valid (code = 'VALID'), - * or not (code string indicating the reason for invalid transaction) - * @param {function} onError - Optional callback function to be notified when this event hub - * is shutdown. The shutdown may be caused by a network error or by - * a call to the "disconnect()" method or a connection error. - */ - registerTxEvent(txid, onEvent, onError) { - logger.debug('registerTxEvent txid ' + txid); - if(!txid) { - throw new Error('Missing "txid" parameter'); - } - if(!onEvent) { - throw new Error('Missing "onEvent" parameter'); - } - var have_error_cb = onError ? true : false; - // when there is no onError callback throw and error - // when this hub is not connected - this._checkConnection(!have_error_cb, false); - - this._transactionOnEvents[txid] = onEvent; - - // when there is an onError callback try to reconnect this - // event hub if is not connected - if(have_error_cb) { - this._transactionOnErrors[txid] = onError; - this._checkConnection(false, this._force_reconnect); - } - } - - /** - * Unregister transaction event listener for the transaction id. - * @param {string} txid - The transaction id - */ - unregisterTxEvent(txid) { - logger.debug('unregisterTxEvent txid ' + txid); - if(!txid) { - throw new Error('Missing "txid" parameter'); - } - delete this._transactionOnEvents[txid]; - delete this._transactionOnErrors[txid]; - } - - /* - * private internal method for processing block events - * @param {Object} block protobuf object - */ - _processBlockOnEvents(block) { - logger.debug('_processBlockOnEvents block=%s', block.header.number); - if(Object.keys(this._blockOnEvents).length == 0) { - logger.debug('_processBlockOnEvents - no registered block event "listeners"'); - return; - } - - // send to all registered block listeners - let self = this; - Object.keys(this._blockOnEvents).forEach(function(key) { - var cb = self._blockOnEvents[key]; - cb(block); - }); - } - - /* - * private internal method for processing tx events - * @param {Object} block protobuf object which might contain the tx from the fabric - */ - _processTxOnEvents(block) { - logger.debug('_processTxOnEvents block=%s', block.header.number); - if(Object.keys(this._transactionOnEvents).length == 0) { - logger.debug('_processTxOnEvents - no registered transaction event "listeners"'); - return; - } - - var txStatusCodes = block.metadata.metadata[_common.BlockMetadataIndex.TRANSACTIONS_FILTER]; - - for (var index=0; index < block.data.data.length; index++) { - logger.debug('_processTxOnEvents - trans index=%s',index); - var channel_header = block.data.data[index].payload.header.channel_header; - var val_code = convertValidationCode(txStatusCodes[index]); - logger.debug('_processTxOnEvents - txid=%s val_code=%s', channel_header.tx_id, val_code); - var cb = this._transactionOnEvents[channel_header.tx_id]; - if (cb){ - logger.debug('_processTxOnEvents - about to stream the transaction call back for code=%s tx=%s', val_code, channel_header.tx_id); - cb(channel_header.tx_id, val_code); - } - } - } - - /* - * private internal method for processing chaincode events - * @param {Object} block protobuf object which might contain the chaincode event from the fabric - */ - _processChainCodeOnEvents(block) { - logger.debug('_processChainCodeOnEvents block=%s', block.header.number); - if(Object.keys(this._chaincodeRegistrants).length == 0) { - logger.debug('_processChainCodeOnEvents - no registered chaincode event "listeners"'); - return; - } - - for (var index=0; index < block.data.data.length; index++) { - logger.debug('_processChainCodeOnEvents - trans index=%s',index); - try { - var env = block.data.data[index]; - var payload = env.payload; - var channel_header = payload.header.channel_header; - if (channel_header.type === 3) { - var tx = payload.data; - var chaincodeActionPayload = tx.actions[0].payload; - var propRespPayload = chaincodeActionPayload.action.proposal_response_payload; - var caPayload = propRespPayload.extension; - var ccEvent = caPayload.events; - logger.debug('_processChainCodeOnEvents - ccEvent %s',ccEvent); - var cbtable = this._chaincodeRegistrants[ccEvent.chaincode_id]; - if (!cbtable) { - continue; - } - cbtable.forEach(function(cbe) { - if (cbe.eventNameFilter.test(ccEvent.event_name)) { - cbe.onEvent(ccEvent); - } - }); - } - } catch (err) { - logger.error('on.data - Error unmarshalling transaction=', err); - } - } - } -}; - -/* - * Utility method to get the state of the GRPC stream - */ -function getStreamState(self) { - let state = -1; - if(self._stream && self._stream.call && self._stream.call.channel_) { - state = self._stream.call.channel_.getConnectivityState(); - } - - return state; -} - -function convertValidationCode(code) { - return _validation_codes[code]; -} - -module.exports = EventHub; diff --git a/fabric-client/lib/Organization.js b/fabric-client/lib/Organization.js index 8af774edfd..4167dc6c05 100644 --- a/fabric-client/lib/Organization.js +++ b/fabric-client/lib/Organization.js @@ -76,25 +76,6 @@ var Organization = class { return this._peers; } - /** - * Add a {@link EventHub} to this organizations - * - * @param {EventHub} event_hub - The event hub instance to add to this - * organization's list of event hubs - */ - addEventHub(event_hub) { - this._event_hubs.push(event_hub); - } - - /** - * Gets the list of this organization's {@link EventHub} - * - * @returns [{EventHub}] An array of {@link EventHub} objects - */ - getEventHubs() { - return this._event_hubs; - } - /** * Add a {@link CertificateAuthority} to this organization * diff --git a/fabric-client/lib/impl/NetworkConfig_1_0.js b/fabric-client/lib/impl/NetworkConfig_1_0.js index a4d3a2c860..4dfd433b34 100644 --- a/fabric-client/lib/impl/NetworkConfig_1_0.js +++ b/fabric-client/lib/impl/NetworkConfig_1_0.js @@ -12,7 +12,6 @@ var path = require('path'); var utils = require('../utils'); var Constants = require('../Constants.js'); var Channel = require('../Channel.js'); -var EventHub = require('../EventHub.js'); var Organization = require('../Organization.js'); var CertificateAuthority = require('../CertificateAuthority.js'); @@ -207,26 +206,6 @@ const NetworkConfig_1_0 = class { } } - getEventHub(name) { - const method = 'getEventHub'; - logger.debug('%s - name %s',method, name); - let event_hub = null; - if(this._network_config && this._network_config[PEERS_CONFIG]) { - const peer_config = this._network_config[PEERS_CONFIG][name]; - if(peer_config && peer_config[EVENT_URL]) { - const opts = {}; - opts.pem = getTLSCACert(peer_config); - this._client_context.addTlsClientCertAndKey(opts); - Object.assign(opts, peer_config[GRPC_CONNECTION_OPTIONS]); - this.addTimeout(opts, EVENTREG); - event_hub = new EventHub(this._client_context); - event_hub.setPeerAddr(peer_config[EVENT_URL], opts); - } - } - - return event_hub; - } - getOrderer(name) { const method = 'getOrderer'; logger.debug('%s - name %s',method, name); @@ -271,10 +250,6 @@ const NetworkConfig_1_0 = class { const peer = this.getPeer(peer_name); if(peer) { organization.addPeer(peer); - const event_hub = this.getEventHub(peer_name); - if(event_hub) { - organization.addEventHub(event_hub); - } } } } diff --git a/fabric-client/types/index.d.ts b/fabric-client/types/index.d.ts index 57de62e401..7aab0eac2f 100644 --- a/fabric-client/types/index.d.ts +++ b/fabric-client/types/index.d.ts @@ -46,9 +46,6 @@ declare class Client extends BaseClient { newChannel(name: string): Client.Channel; getChannel(name?: string, throwError?: boolean): Client.Channel; newPeer(url: string, opts?: Client.ConnectionOptions): Client.Peer; - newEventHub(): Client.EventHub; - getEventHub(peer_name: string): Client.EventHub; - getEventHubsForOrg(org_name: string): Client.EventHub[]; getPeersForOrg(org_name: string): Client.Peer[]; newOrderer(url: string, opts?: Client.ConnectionOptions): Client.Orderer; getCertificateAuthority(): FabricCAServices; @@ -290,20 +287,6 @@ declare namespace Client { sendDeliver(envelope: Buffer): Promise; } - export class EventHub { - constructor(clientContext: Client); - connect(): void; - disconnect(): void; - getPeerAddr(): string; - setPeerAddr(url: string, opts: ConnectionOptions): void; - isconnected(): boolean; - registerBlockEvent(onEvent: (block: Block) => void, onError?: (err: Error) => void): number; - registerTxEvent(txId: string, onEvent: (txId: any, code: string) => void, onError?: (err: Error) => void): void; - registerChaincodeEvent(ccid: string, eventname: string, onEvent: (event: ChaincodeEvent) => void, onError?: (err: Error) => void): ChaincodeEventHandle; - unregisterBlockEvent(regNumber: number): void; - unregisterTxEvent(txId: string): void; - unregisterChaincodeEvent(handle: ChaincodeEventHandle): void; - } interface MSPConstructorConfig { rootCerts: IIdentity[]; intermediateCerts: IIdentity[]; diff --git a/test/fixtures/network.yaml b/test/fixtures/network.yaml index a2d54dcaea..8f9240891f 100644 --- a/test/fixtures/network.yaml +++ b/test/fixtures/network.yaml @@ -175,9 +175,6 @@ peers: # this URL is used to send endorsement and query requests url: grpcs://localhost:7051 - # this URL is used to connect the EventHub and registering event listeners - eventUrl: grpcs://localhost:7053 - grpcOptions: ssl-target-name-override: peer0.org1.example.com @@ -186,7 +183,6 @@ peers: peer0.org2.example.com: url: grpcs://localhost:8051 - eventUrl: grpcs://localhost:8053 grpcOptions: ssl-target-name-override: peer0.org2.example.com tlsCACerts: diff --git a/test/fixtures/org1.yaml b/test/fixtures/org1.yaml index 107964c2e5..7c29f24a66 100644 --- a/test/fixtures/org1.yaml +++ b/test/fixtures/org1.yaml @@ -31,7 +31,7 @@ client: endorser: 120 # the timeout in seconds to be used by applications when waiting for an # event to occur. This time should be used in a javascript timer object - # that will cancel the event registration with the event hub instance. + # that will cancel the event registration with the channel event hub instance. eventHub: 60 # the timeout in seconds to be used when setting up the connection # with the peer's event hub. If the peer does not acknowledge the diff --git a/test/integration/events.js b/test/integration/events.js deleted file mode 100644 index 1cb6dc35fa..0000000000 --- a/test/integration/events.js +++ /dev/null @@ -1,285 +0,0 @@ -/** - * Copyright 2017 London Stock Exchange All Rights Reserved. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -'use strict'; - -var utils = require('fabric-client/lib/utils.js'); -var logger = utils.getLogger('events'); - -var tape = require('tape'); -var _test = require('tape-promise').default; -var test = _test(tape); - -var path = require('path'); -var fs = require('fs'); - -var Client = require('fabric-client'); -var testUtil = require('../unit/util.js'); -var eputil = require('./eventutil.js'); -var e2eUtils = require('./e2e/e2eUtils.js'); - -var client = new Client(); -var channel = client.newChannel(testUtil.END2END.channel); - -var chaincode_id = testUtil.getUniqueVersion('events_unit_test'); -var chaincode_version = testUtil.getUniqueVersion(); -var request = null; - -test('Test chaincode instantiate with event, transaction invocation with chaincode event, and query number of chaincode events', (t) => { - testUtil.resetDefaults(); - testUtil.setupChaincodeDeploy(); - Client.addConfigFile(path.join(__dirname, 'e2e', 'config.json')); - var ORGS = Client.getConfigSetting('test-network'); - Client.setConfigSetting('request-timeout', 30000); - - var caRootsPath = ORGS.orderer.tls_cacerts; - let data = fs.readFileSync(path.join(__dirname, 'e2e', caRootsPath)); - let caroots = Buffer.from(data).toString(); - let org = 'org1'; - let orgName = ORGS[org].name; - let targets = []; - - // must use an array to track the event hub instances so that when this gets - // passed into the overriden t.end() closure below it will get properly updated - // later when the eventhub instances are created - var eventhubs = []; - var eh; - var req1 = null; - var req2 = null; - var tls_data = null; - var tlsInfo = null; - - // override t.end function so it'll always disconnect the event hub - t.end = ((context, ehs, f) => { - return function() { - for(var key in ehs) { - var eventhub = ehs[key]; - if (eventhub && eventhub.isconnected()) { - logger.debug('Disconnecting the event hub'); - eventhub.disconnect(); - } - } - - f.apply(context, arguments); - }; - })(t, eventhubs, t.end); - - e2eUtils.tlsEnroll(org) - .then((enrollment) => { - t.pass('Successfully retrieved TLS certificate'); - tlsInfo = enrollment; - client.setTlsClientCertAndKey(tlsInfo.certificate, tlsInfo.key); - return Client.newDefaultKeyValueStore({path: testUtil.storePathForOrg(orgName)}); - }).then((store) => { - client.setStateStore(store); - - // get the peer org's admin required to send install chaincode requests - - return testUtil.getSubmitter(client, t, true /* get peer org admin */, org); - }).then(() => { - t.pass('Successfully enrolled user \'admin\''); - - channel.addOrderer( - client.newOrderer( - ORGS.orderer.url, - { - 'pem': caroots, - 'ssl-target-name-override': ORGS.orderer['server-hostname'] - } - ) - ); - - for (let key in ORGS[org]) { - if (ORGS[org].hasOwnProperty(key)) { - if (key.indexOf('peer') === 0) { - let data = fs.readFileSync(path.join(__dirname, 'e2e', ORGS[org][key]['tls_cacerts'])); - let peer = client.newPeer( - ORGS[org][key].requests, - { - pem: Buffer.from(data).toString(), - 'ssl-target-name-override': ORGS[org][key]['server-hostname'] - }); - channel.addPeer(peer); - targets.push(peer); - break; //just add one - } - } - } - - // setup event hub to get notified when transactions are committed - tls_data = fs.readFileSync(path.join(__dirname, 'e2e', ORGS[org].peer1['tls_cacerts'])); - - eh = client.newEventHub(); - - // first do one that fails - eh.setPeerAddr( - 'grpcs://localhost:9999', - { - pem: Buffer.from(tls_data).toString(), - 'clientCert': tlsInfo.certificate, - 'clientKey': tlsInfo.key, - 'ssl-target-name-override': ORGS[org].peer1['server-hostname'] - }); - try { - eh.registerBlockEvent( - () => { - t.fail('this success function should not be called'); - }, - (err) => { - if(err.toString().indexOf('Connect Failed') >= 0) { - t.pass('this error function should be called ' + err); - } - else { - t.fail('Error function was called but found an unknown error '+err); - } - } - ); - eh.connect(); - } - catch(err) { - t.fail('this catch should not have been called'); - } - - return sleep(5000); - }).then(() =>{ - - // now one that fails but not wait for it fail - eh.setPeerAddr( - 'grpcs://localhost:9999', - { - pem: Buffer.from(tls_data).toString(), - 'clientCert': tlsInfo.certificate, - 'clientKey': tlsInfo.key, - 'ssl-target-name-override': ORGS[org].peer1['server-hostname'] - }); - try { - eh.registerBlockEvent( - () => { - t.fail('this success function should not be called'); - }, - (err) => { - if(err.toString().indexOf('Peer address') >= 0) { - t.pass('this error function should be called ' + err); - } - else { - t.fail('Error function was called but found an unknown error '+err); - } - } - ); - eh.connect(); - } - catch(err) { - t.fail('this catch should not have been called'); - } - - // now do one that works - eh.setPeerAddr( - ORGS[org].peer1.events, - { - pem: Buffer.from(tls_data).toString(), - 'clientCert': tlsInfo.certificate, - 'clientKey': tlsInfo.key, - 'ssl-target-name-override': ORGS[org].peer1['server-hostname'] - }); - eh.connect(); - eventhubs.push(eh); - - request = eputil.createRequest(client, chaincode_id, targets, '', ''); - request.chaincodePath = 'github.com/events_cc'; - request.chaincodeVersion = chaincode_version; - Client.setConfigSetting('request-timeout', 60000); - - return client.installChaincode(request); - }).then((results) => { - if ( eputil.checkProposal(results)) { - t.pass('Successfully endorsed the installed chaincode proposal'); - // read the config block from the peer for the channel - // and initialize the verify MSPs based on the participating - // organizations - return channel.initialize(); - } else { - t.fail(' Failed to install install chaincode'); - return Promise.reject('failed to endorse the install chaincode proposal:' + results); - } - }).then(() => { - t.pass('Successfully initialized the channel'); - request = eputil.createRequest(client, chaincode_id, targets, 'init', []); - request.chaincodePath = 'github.com/events_cc'; - request.chaincodeVersion = chaincode_version; - - return channel.sendInstantiateProposal(request, 120000); - }).then((results) => { - if ( eputil.checkProposal(results)) { - t.pass('Successfully endorsed the instantiate chaincode proposal'); - var tmo = 60000; - return Promise.all([eputil.registerTxEvent(eh, request.txId.getTransactionID().toString(), tmo), - eputil.sendTransaction(channel, results)]); - } else { - t.fail('Failed to endorse the instantiate chaincode proposal'); - return Promise.reject('Failed to endorse the instatiate chaincode proposal:' + results); - } - }).then(() => { - t.pass('Successfully instantiated chaincode.'); - - request = eputil.createRequest(client, chaincode_id, targets, 'invoke', ['invoke', 'SEVERE']); - - return channel.sendTransactionProposal(request); - }).then((results) => { - t.pass('Successfully sent transaction to orderer to instantiate chaincode.'); - - var tmo = 20000; - return Promise.all([ - eputil.registerCCEvent(eh, chaincode_id.toString(), '^evtsender*', tmo, t, 'first chaincode event'), - eputil.registerCCEvent(eh, chaincode_id.toString(), '^evtsender*', tmo, t, 'second chaincode event'), - eputil.sendTransaction(channel, results) - ]); - }).then(() => { - t.pass('Successfully received chaincode events.'); - - request = eputil.createRequest(client, chaincode_id, targets, 'invoke', ['query']); - - return channel.queryByChaincode(request); - }).then((response_payloads) => { - t.pass('Successfully queried chaincode.'); - - if(!response_payloads) { - Promise.reject('No response_payloads returned'); - } - for (let i = 0; i < response_payloads.length; i++) { - t.equal(response_payloads[i].toString('utf8'), '1', 'checking query results are number of events generated'); - } - - // Test invalid transaction - // create 2 invoke requests in quick succession that modify - // the same state variable which should cause one invoke to - // be invalid - req1 = eputil.createRequest(client, chaincode_id, targets, 'invoke', ['invoke', 'SEVERE']); - req2 = eputil.createRequest(client, chaincode_id, targets, 'invoke', ['invoke', 'SEVERE']); - return Promise.all([channel.sendTransactionProposal(req1), - channel.sendTransactionProposal(req2)]); - }).then(([results1, results2]) => { - var tmo = 20000; - return Promise.all([eputil.registerTxEvent(eh, req1.txId.getTransactionID().toString(), tmo), - eputil.registerTxEvent(eh, req2.txId.getTransactionID().toString(), tmo), - eputil.sendTransaction(channel, results1), - eputil.sendTransaction(channel, results2) - ]); - }).then(() => { - t.fail('Failed to generate an invalid transaction'); - t.end(); - }, (err) => { - t.equal(err, 'invalid', 'Expecting a rejected promise from the 2nd transaction should be invalid'); - t.end(); - }).catch((err) => { - if(err) t.fail('Unexpected error. ' + err.stack ? err.stack : err); - else t.fail('Unexpected error with no error object in catch clause'); - t.end(); - }); -}); - -function sleep(ms) { - return new Promise(resolve => setTimeout(resolve, ms)); -} diff --git a/test/integration/invoke.js b/test/integration/invoke.js index badaa81176..1d681569ec 100644 --- a/test/integration/invoke.js +++ b/test/integration/invoke.js @@ -11,25 +11,25 @@ // and checks that it succeeds. 'use strict'; -var tape = require('tape'); -var _test = require('tape-promise').default; -var test = _test(tape); +const tape = require('tape'); +const _test = require('tape-promise').default; +const test = _test(tape); -var Client = require('fabric-client'); -var utils = require('fabric-client/lib/utils.js'); -var testUtil = require('../unit/util.js'); -var e2e = testUtil.END2END; -var e2eUtils = require('./e2e/e2eUtils.js'); +const Client = require('fabric-client'); +const utils = require('fabric-client/lib/utils.js'); +const testUtil = require('../unit/util.js'); +const e2e = testUtil.END2END; +const e2eUtils = require('./e2e/e2eUtils.js'); -var path = require('path'); -var fs = require('fs'); -var util = require('util'); +const path = require('path'); +const fs = require('fs'); +const util = require('util'); -var logger = utils.getLogger('E2E testing'); +const logger = utils.getLogger('E2E testing'); -var ORGS; -var tx_id = null; -var peers = []; +let ORGS; +let tx_id = null; +const peers = []; init(); @@ -82,8 +82,8 @@ function init() { for (let key in ORGS) { if (ORGS.hasOwnProperty(key) && typeof ORGS[key].peer1 !== 'undefined') { - let data = fs.readFileSync(path.join(__dirname, 'e2e', ORGS[key].peer1['tls_cacerts'])); - var org = ORGS[key].peer1; + const data = fs.readFileSync(path.join(__dirname, 'e2e', ORGS[key].peer1['tls_cacerts'])); + const org = ORGS[key].peer1; org.pem = Buffer.from(data).toString(); peers.push(org); } @@ -93,15 +93,15 @@ function init() { function invokeChaincode(userOrg, version, t, shouldFail, peers){ logger.debug('invokeChaincode begin'); Client.setConfigSetting('request-timeout', 60000); - var channel_name = Client.getConfigSetting('E2E_CONFIGTX_CHANNEL_NAME', testUtil.END2END.channel); + const channel_name = Client.getConfigSetting('E2E_CONFIGTX_CHANNEL_NAME', testUtil.END2END.channel); - var eventhubs = []; + const eventhubs = []; // override t.end function so it'll always disconnect the event hub t.end = ((context, ehs, f) => { return function() { - for(var key in ehs) { - var eventhub = ehs[key]; + for(let key in ehs) { + const eventhub = ehs[key]; if (eventhub && eventhub.isconnected()) { logger.debug('Disconnecting the event hub'); eventhub.disconnect(); @@ -116,12 +116,12 @@ function invokeChaincode(userOrg, version, t, shouldFail, peers){ // submit the request. intentionally we are using a different org // than the one that instantiated the chaincode, although either org // should work properly - var client = new Client(); - var channel = client.newChannel(channel_name); + const client = new Client(); + const channel = client.newChannel(channel_name); - var caRootsPath = ORGS.orderer.tls_cacerts; - let data = fs.readFileSync(path.join(__dirname, 'e2e', caRootsPath)); - let caroots = Buffer.from(data).toString(); + const caRootsPath = ORGS.orderer.tls_cacerts; + const data = fs.readFileSync(path.join(__dirname, 'e2e', caRootsPath)); + const caroots = Buffer.from(data).toString(); let tlsInfo = null; return e2eUtils.tlsEnroll(userOrg) @@ -138,51 +138,32 @@ function invokeChaincode(userOrg, version, t, shouldFail, peers){ ORGS.orderer.url, { 'pem': caroots, - 'clientCert': tlsInfo.certificate, - 'clientKey': tlsInfo.key, 'ssl-target-name-override': ORGS.orderer['server-hostname'] } ) ); for (let key in peers) { - let peer = client.newPeer( + const peer = client.newPeer( peers[key].requests, { pem: peers[key].pem, - 'clientCert': tlsInfo.certificate, - 'clientKey': tlsInfo.key, 'ssl-target-name-override': peers[key]['server-hostname'], }); channel.addPeer(peer); + eventhubs.push(channel.newChannelEventHub(peer)); } - // an event listener can only register with a peer in its own org - let data = fs.readFileSync(path.join(__dirname, 'e2e', ORGS[userOrg].peer1['tls_cacerts'])); - let eh = client.newEventHub(); - eh.setPeerAddr( - ORGS[userOrg].peer1.events, - { - pem: Buffer.from(data).toString(), - 'clientCert': tlsInfo.certificate, - 'clientKey': tlsInfo.key, - 'ssl-target-name-override': ORGS[userOrg].peer1['server-hostname'], - 'grpc.http2.keepalive_time' : 15 - } - ); - eh.connect(); - eventhubs.push(eh); - return channel.initialize(); }).then(() => { tx_id = client.newTransactionID(); // send proposal to endorser - var request = { + const request = { chaincodeId : e2e.chaincodeId, fcn: 'move', - args: ['a', 'b','100'], + args: ['a', 'b', '100'], txId: tx_id, }; return channel.sendTransactionProposal(request); @@ -192,13 +173,13 @@ function invokeChaincode(userOrg, version, t, shouldFail, peers){ throw new Error('Failed to enroll user \'admin\'. ' + err); }).then((results) => { - var proposalResponses = results[0]; - var proposal = results[1]; - var all_good = true; + const proposalResponses = results[0]; + const proposal = results[1]; + let all_good = true; - for(var i in proposalResponses) { + for(let i in proposalResponses) { let one_good = false; - let proposal_response = proposalResponses[i]; + const proposal_response = proposalResponses[i]; if( proposal_response.response && proposal_response.response.status === 200) { t.pass('transaction proposal has response status of good'); one_good = channel.verifyProposalResponse(proposal_response); @@ -227,7 +208,7 @@ function invokeChaincode(userOrg, version, t, shouldFail, peers){ t.pass('Successfully sent Proposal and received ProposalResponse'); logger.debug(util.format('Successfully sent Proposal and received ProposalResponse: Status - %s, message - "%s", metadata - "%s", endorsement signature: %s', proposalResponses[0].response.status, proposalResponses[0].response.message, proposalResponses[0].response.payload, proposalResponses[0].endorsement.signature)); - var request = { + const request = { proposalResponses: proposalResponses, proposal: proposal }; @@ -235,12 +216,12 @@ function invokeChaincode(userOrg, version, t, shouldFail, peers){ // set the transaction listener and set a timeout of 30sec // if the transaction did not get committed within the timeout period, // fail the test - var deployId = tx_id.getTransactionID(); + const deployId = tx_id.getTransactionID(); - var eventPromises = []; + const eventPromises = []; eventhubs.forEach((eh) => { - let txPromise = new Promise((resolve, reject) => { - let handle = setTimeout(reject, 120000); + const txPromise = new Promise((resolve, reject) => { + const handle = setTimeout(reject, 120000); eh.registerTxEvent(deployId.toString(), (tx, code) => { @@ -267,16 +248,17 @@ function invokeChaincode(userOrg, version, t, shouldFail, peers){ }, () => { clearTimeout(handle); - t.pass('Successfully received notification of the event call back being cancelled for '+ deployId); + t.fail('Failed -- received notification of the event call back being cancelled for '+ deployId); resolve(); } ); }); + eh.connect(); eventPromises.push(txPromise); }); - var sendPromise = channel.sendTransaction(request); + const sendPromise = channel.sendTransaction(request); return Promise.all([sendPromise].concat(eventPromises)) .then((results) => { logger.debug('event promise all complete and testing complete'); diff --git a/test/integration/memory.js b/test/integration/memory.js index dd9f3fca72..3c8fbe123c 100644 --- a/test/integration/memory.js +++ b/test/integration/memory.js @@ -424,7 +424,7 @@ async function actions(t) { admin : false }; - var eventhub = client.getEventHub('peer0.org1.example.com'); + var eventhub = channel.getChannelEventHub('peer0.org1.example.com'); response = await invoke(t, request, tx_id, client, channel, eventhub); //logged in as org2 user if (!(response[0] instanceof Error) && response[0].status === 'SUCCESS') { @@ -550,28 +550,21 @@ function invoke(t, request, tx_id, client, channel, eventhub) { var promises = []; promises.push(channel.sendTransaction(request)); - eventhub.disconnect(); // clean up any old registered events - eventhub.connect(); - let txPromise = new Promise((resolve, reject) => { let handle = setTimeout(() => { eventhub.disconnect(); t.fail('REQUEST_TIMEOUT -- eventhub did not respond'); - reject(new Error('REQUEST_TIMEOUT:' + eventhub._ep._endpoint.addr)); + reject(new Error('REQUEST_TIMEOUT:' + eventhub.getPeerAddr())); }, 30000); eventhub.registerTxEvent(transactionID, (tx, code) => { clearTimeout(handle); - eventhub.unregisterTxEvent(tx); // if we do not unregister then when - // when we shutdown the eventhub the - // error call back will get called - eventhub.disconnect(); // all done if (code !== 'VALID') { t.fail('transaction was invalid, code = ' + code); reject(new Error('INVALID:' + code)); } else { - t.pass('transaction has been committed on peer ' + eventhub._ep._endpoint.addr); + t.pass('transaction has been committed on peer ' + eventhub.getPeerAddr()); resolve(); } }, (error) => { @@ -579,7 +572,11 @@ function invoke(t, request, tx_id, client, channel, eventhub) { t.fail('Event registration for this transaction was invalid ::' + error); reject(error); - }); + }, + {disconnect: true} + ); + + eventhub.connect(); }); promises.push(txPromise); diff --git a/test/integration/upgrade.js b/test/integration/upgrade.js index 4d08db9a67..095866c461 100644 --- a/test/integration/upgrade.js +++ b/test/integration/upgrade.js @@ -156,21 +156,6 @@ test('\n\n **** E R R O R T E S T I N G on upgrade call', (t) => { }); test('\n\n **** Testing re-initializing states during upgrade ****', (t) => { - let eventhubs = []; - // override t.end function so it'll always disconnect the event hub - t.end = ((context, ehs, f) => { - return function() { - for(var key in ehs) { - var eventhub = ehs[key]; - if (eventhub && eventhub.isconnected()) { - logger.debug('Disconnecting the event hub'); - eventhub.disconnect(); - } - } - - f.apply(context, arguments); - }; - })(t, eventhubs, t.end); let VER = 'v3'; diff --git a/test/typescript/test.ts b/test/typescript/test.ts index 312651b58f..61896b76e1 100644 --- a/test/typescript/test.ts +++ b/test/typescript/test.ts @@ -39,7 +39,6 @@ import { BlockchainInfo, Peer, Orderer, - EventHub, ICryptoSuite, ICryptoKeyStore, } from 'fabric-client'; @@ -72,9 +71,6 @@ test('test Peer', (t) => { let channel: Channel = new Channel('mychannel', client); t.equal(channel.constructor.name, 'Channel'); - let eh = new EventHub(client); - t.equal(eh.constructor.name, 'EventHub'); - let ceh = new ChannelEventHub(channel, p); t.equal(ceh.constructor.name, 'ChannelEventHub'); diff --git a/test/unit/client.js b/test/unit/client.js index 8252b095d6..657f86332b 100644 --- a/test/unit/client.js +++ b/test/unit/client.js @@ -63,19 +63,6 @@ test('\n\n ** index.js **\n\n', (t) => { t.end(); }); -test('\n\n ** eventhub **\n\n', (t) => { - t.doesNotThrow( - () => { - const c = new Client(); - c._userContext = new User('name'); - c.newEventHub(); - }, - null, - 'Should be able to call "newEventHub" on the new instance of "hfc"'); - - t.end(); -}); - const channelKeyValStorePath = path.join(testutil.getTempDir(), 'channelKeyValStorePath'); const testKey = 'keyValFileStoreName'; @@ -93,10 +80,9 @@ test('\n\n ** config **\n\n', (t) => { t.equals(c.getConfigSetting('something', 'ABC'), 'ABC', 'Check getting default config setting value'); c.setConfigSetting('something', 'DEF'); t.equals(c.getConfigSetting('something', 'ABC'), 'DEF', 'Check getting a set config setting value'); - c.newEventHub(); }, null, - 'Should be able to call "newEventHub" on the new instance of "hfc"'); + 'Should be able to call "getConfigSetting" on the new instance of "hfc"'); t.end(); }); diff --git a/test/unit/event-hub.js b/test/unit/event-hub.js deleted file mode 100644 index 749108669d..0000000000 --- a/test/unit/event-hub.js +++ /dev/null @@ -1,810 +0,0 @@ -/** - * Copyright 2016-2017 IBM All Rights Reserved. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -'use strict'; - -var tape = require('tape'); -var _test = require('tape-promise').default; -var test = _test(tape); - -var testutil = require('./util.js'); -var User = require('fabric-client/lib/User.js'); -var utils = require('fabric-client/lib/utils.js'); -var test_user = require('./user.js'); - -var Client = require('fabric-client'); -var EventHub = require('fabric-client/lib/EventHub.js'); -var sdkUtils = require('fabric-client/lib/utils.js'); - -test('\n\n** EventHub tests\n\n', (t) => { - testutil.resetDefaults(); - - var eh; - - t.throws( - () => { - eh = new EventHub(); - }, - /Missing required argument: clientContext/, - 'Must pass in a clientContext' - ); - - t.throws( - () => { - eh = new EventHub({}); - eh.connect(); - }, - /Invalid clientContext argument: missing required function "getUserContext"/, - 'Must pass in a clientContext that has the getUserContext() function' - ); - - t.throws( - () => { - eh = new EventHub({ getUserContext: function() {}, _getSigningIdentity: function() {} }); - eh.connect(); - }, - /The clientContext has not been properly initialized, missing identity/, - 'Must pass in a clientContext that has the user context already initialized' - ); - - eh = new EventHub({ getUserContext: function() {}, _getSigningIdentity: function() { return 'dummyUser'; } }); - t.throws( - () => { - eh.connect(); - }, - /Must set peer address before connecting/, - 'Must not allow connect() when peer address has not been set' - ); - - t.throws( - () => { - eh.setPeerAddr('badUrl'); - }, - /InvalidProtocol: Invalid protocol: undefined/, - 'Must not allow a bad url without protocol to be set' - ); - - t.throws( - () => { - eh.setPeerAddr('http://badUrl'); - }, - /InvalidProtocol: Invalid protocol: http/, - 'Must not allow an http url to be set' - ); - - t.throws( - () => { - eh.setPeerAddr('https://badUrl'); - }, - /InvalidProtocol: Invalid protocol: https/, - 'Must not allow an https url to be set' - ); - - t.doesNotThrow( - () => { - eh.setPeerAddr('grpc://localhost:7053'); - }, - null, - 'Test valid url connect and disconnect' - ); - - t.throws( - () => { - eh.registerBlockEvent(); - }, - /Missing "onEvent" parameter/, - 'Check the Missing "onEvent" parameter' - ); - - t.throws( - () => { - eh.unregisterBlockEvent(); - }, - /Missing "block_registration_number" parameter/, - 'Check the Missing "block_registration_number" parameter' - ); - t.throws( - () => { - eh.registerTxEvent(); - }, - /Missing "txid" parameter/, - 'Check the Missing "txid" parameter' - ); - t.throws( - () => { - eh.registerTxEvent('txid'); - }, - /Missing "onEvent" parameter/, - 'Check the Missing "onEvent" parameter' - ); - t.throws( - () => { - eh.unregisterTxEvent(); - }, - /Missing "txid" parameter/, - 'Check the Missing "txid" parameter' - ); - t.throws( - () => { - eh.registerChaincodeEvent(); - }, - /Missing "ccid" parameter/, - 'Check the Missing "ccid" parameter' - ); - t.throws( - () => { - eh.registerChaincodeEvent('ccid'); - }, - /Missing "eventname" parameter/, - 'Check the Missing "eventname" parameter' - ); - t.throws( - () => { - eh.registerChaincodeEvent('ccid','eventname'); - }, - /Missing "onEvent" parameter/, - 'Check the Missing "onEvent" parameter' - ); - t.throws( - () => { - eh.unregisterChaincodeEvent(); - }, - /Missing "listener_handle" parameter/, - 'Check the Missing "listener_handle" parameter' - ); - t.end(); -}); - -test('\n\n** EventHub block callback \n\n', (t) => { - var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); - eh.setPeerAddr('grpc://localhost:7053'); - eh._connected = true; //force this into connected state - eh._force_reconnect = false; - - var index = eh.registerBlockEvent(() => { - t.fail('Should not have called success callback when disconnect() is called'); - t.end(); - }, () =>{ - t.pass('Successfully called error callback from disconnect()'); - t.end(); - }); - - t.pass('successfully registered block callbacks'); - t.equal(index, 1, 'Check the first block listener is at index 1'); - - index = eh.registerBlockEvent(() => { - // empty method body - }, () => { - // empty method body - }); - - t.equal(index, 2, 'Check the 2nd block listener is at index 2'); - t.equal(Object.keys(eh._blockOnEvents).length, 2, 'Check the size of the blockOnEvents hash table'); - t.equal(Object.keys(eh._blockOnErrors).length, 2, 'Check the size of the blockOnErrors hash table'); - - eh.disconnect(); -}); - -test('\n\n** EventHub transaction callback \n\n', (t) => { - var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); - eh.setPeerAddr('grpc://localhost:7053'); - eh._connected = true; //force this into connected state - eh._force_reconnect = false; - - eh.registerTxEvent('txid1', () => { - // empty method body - }, () =>{ - // empty method body - }); - t.pass('successfully registered transaction callbacks'); - t.equal(Object.keys(eh._transactionOnEvents).length, 1, 'Check the size of the transactionOnEvents hash table'); - t.equal(Object.keys(eh._transactionOnErrors).length, 1, 'Check the size of the transactionOnErrors hash table'); - - eh.registerTxEvent('txid1', () => { - t.fail('Should not have called success callback'); - t.end(); - }, () =>{ - t.pass('Successfully called transaction error callback'); - t.end(); - }); - t.equal(Object.keys(eh._transactionOnEvents).length, 1, - 'Size of the transactionOnEvents hash table should still be 1 since the listeners are for the same txId'); - t.equal(Object.keys(eh._transactionOnErrors).length, 1, - 'Size of the transactionOnErrors hash table should still be 1 since the listeners are for the same txId'); - - eh.registerTxEvent('txid2', () => { - // empty method body - }, () =>{ - // empty method body - }); - - t.equal(Object.keys(eh._transactionOnEvents).length, 2, 'Check the size of the transactionOnEvents hash table'); - t.equal(Object.keys(eh._transactionOnErrors).length, 2, 'Check the size of the transactionOnErrors hash table'); - - eh.disconnect(); -}); - -test('\n\n** EventHub chaincode callback \n\n', (t) => { - var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); - eh.setPeerAddr('grpc://localhost:7053'); - eh._connected = true; //force this into connected state - eh._force_reconnect = false; - - eh.registerChaincodeEvent('ccid1', 'eventfilter', () => { - t.fail('Should not have called success callback'); - t.end(); - }, () =>{ - t.pass('Successfully called chaincode error callback'); - t.end(); - }); - t.pass('successfully registered chaincode callbacks'); - - t.equal(Object.keys(eh._chaincodeRegistrants).length, 1, 'Check the size of the chaincodeRegistrants hash table'); - - eh.registerChaincodeEvent('ccid1', 'eventfilter', () => { - // empty method body - }, () =>{ - // empty method body - }); - - t.equal(Object.keys(eh._chaincodeRegistrants).length, 1, - 'Size of the chaincodeRegistrants hash table should still be 1 because both listeners are for the same chaincode'); - - eh.registerChaincodeEvent('ccid2', 'eventfilter', () => { - // empty method body - }, () =>{ - // empty method body - }); - - t.equal(Object.keys(eh._chaincodeRegistrants).length, 2, - 'Size of the chaincodeRegistrants hash table should still be 2'); - - eh.disconnect(); -}); - -test('\n\n** EventHub block callback no Error callback \n\n', (t) => { - var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); - eh.setPeerAddr('grpc://localhost:7053'); - eh._connected = true; //force this into connected state - eh._force_reconnect = false; - - eh.registerBlockEvent(() => { - t.fail('Should not have called block no error success callback'); - t.end(); - }); - t.pass('successfully registered block callbacks'); - eh.disconnect(); - t.end(); -}); - -test('\n\n** EventHub transaction callback no Error callback \n\n', (t) => { - var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); - eh.setPeerAddr('grpc://localhost:7053'); - eh._connected = true; //force this into connected state - eh._force_reconnect = false; - - eh.registerTxEvent('txid', () => { - t.fail('Should not have called transaction no error success callback'); - t.end(); - }); - t.pass('successfully registered transaction callbacks'); - eh.disconnect(); - t.end(); -}); - -test('\n\n** EventHub chaincode callback no Error callback \n\n', (t) => { - var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); - eh.setPeerAddr('grpc://localhost:7053'); - eh._connected = true; //force this into connected state - eh._force_reconnect = false; - - eh.registerChaincodeEvent('ccid', 'eventfilter', () => { - t.fail('Should not have called chaincode no error success callback'); - t.end(); - }); - t.pass('successfully registered chaincode callbacks'); - eh.disconnect(); - t.end(); -}); - -test('\n\n** EventHub remove block callback \n\n', (t) => { - var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); - eh.setPeerAddr('grpc://localhost:7053'); - eh._connected = true; //force this into connected state - eh._force_reconnect = false; - - var blockcallback = () => { - t.fail('Should not have called block success callback (on remove)'); - t.end(); - }; - var blockerrorcallback = () =>{ - t.fail('Should not have called block error callback (on remove)'); - t.end(); - }; - var brn = eh.registerBlockEvent( blockcallback, blockerrorcallback); - t.pass('successfully registered block callbacks'); - eh.unregisterBlockEvent(brn); - t.equal(Object.keys(eh._blockOnEvents).length, 0, 'Check the size of the blockOnEvents hash table'); - t.pass('successfuly unregistered block callback'); - eh.disconnect(); - t.pass('successfuly disconnected eventhub'); - t.end(); -}); - -test('\n\n** EventHub remove transaction callback \n\n', (t) => { - var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); - eh.setPeerAddr('grpc://localhost:7053'); - eh._connected = true; //force this into connected state - eh._force_reconnect = false; - - var txid = 'txid'; - eh.registerTxEvent(txid, () => { - t.fail('Should not have called transaction success callback (on remove)'); - t.end(); - }, () =>{ - t.fail('Should not have called transaction error callback (on remove)'); - t.end(); - }); - t.pass('successfully registered transaction callbacks'); - eh.unregisterTxEvent(txid); - t.pass('successfuly unregistered transaction callback'); - t.equal(Object.keys(eh._transactionOnEvents).length, 0, 'Check the size of the transactionOnEvents hash table'); - eh.disconnect(); - t.pass('successfuly disconnected eventhub'); - t.end(); -}); - -test('\n\n** EventHub remove chaincode callback \n\n', (t) => { - var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); - eh.setPeerAddr('grpc://localhost:7053'); - eh._connected = true; //force this into connected state - eh._force_reconnect = false; - - var cbe = eh.registerChaincodeEvent('ccid', 'eventfilter', () => { - t.fail('Should not have called chaincode success callback (on remove)'); - t.end(); - }, () =>{ - t.fail('Should not have called chaincode error callback (on remove)'); - t.end(); - }); - t.pass('successfully registered chaincode callbacks'); - eh.unregisterChaincodeEvent(cbe); - t.pass('successfuly unregistered chaincode callback'); - t.equal(Object.keys(eh._chaincodeRegistrants).length, 0, - 'Size of the chaincodeRegistrants hash table should be 0'); - eh.disconnect(); - t.pass('successfuly disconnected eventhub'); - t.end(); -}); - - -test('\n\n** EventHub remove block callback no Error callback \n\n', (t) => { - var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); - eh.setPeerAddr('grpc://localhost:7053'); - eh._connected = true; //force this into connected state - eh._force_reconnect = false; - - var blockcallback = () => { - t.fail('Should not have called block success callback (remove with no error callback)'); - t.end(); - }; - var brn = eh.registerBlockEvent( blockcallback); - t.pass('successfully registered block callbacks'); - eh.unregisterBlockEvent(brn); - t.pass('successfuly unregistered block callback'); - eh.disconnect(); - t.pass('successfuly disconnected eventhub'); - t.end(); -}); - -test('\n\n** EventHub remove transaction callback no Error callback\n\n', (t) => { - var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); - eh.setPeerAddr('grpc://localhost:7053'); - eh._connected = true; //force this into connected state - eh._force_reconnect = false; - - var txid = 'txid'; - eh.registerTxEvent(txid, () => { - t.fail('Should not have called transaction success callback (remove with no error callback)'); - t.end(); - }); - t.pass('successfully registered transaction callbacks'); - eh.unregisterTxEvent(txid); - t.pass('successfuly unregistered transaction callback'); - eh.disconnect(); - t.pass('successfuly disconnected eventhub'); - t.end(); -}); - -test('\n\n** EventHub remove chaincode callback no Error callback \n\n', (t) => { - var eh = new EventHub({ getUserContext: function() { return 'dummyUser'; } }); - eh.setPeerAddr('grpc://localhost:7053'); - eh._connected = true; //force this into connected state - eh._force_reconnect = false; - var cbe = eh.registerChaincodeEvent('ccid', 'eventfilter', () => { - t.fail('Should not have called chaincode success callback (remove with no error callback)'); - t.end(); - }); - t.pass('successfully registered chaincode callbacks'); - eh.unregisterChaincodeEvent(cbe); - t.pass('successfuly unregistered chaincode callback'); - eh.disconnect(); - t.pass('successfuly disconnected eventhub'); - t.end(); -}); - -test('\n\n** Test the add and remove utilty used by the EventHub to add a setting to the options \n\n', (t) => { - var only_options = sdkUtils.checkAndAddConfigSetting('opt1', 'default1', null); - t.equals(only_options['opt1'], 'default1', 'Checking that new options has the setting with the incoming value and options are null'); - - var options = { opt1 : 'incoming1', opt4 : 'incoming4'}; - - // case where incoming options does have the setting - var updated_options = sdkUtils.checkAndAddConfigSetting('opt1', 'default1', options); - // case where incoming options does not have setting and config does not - updated_options = sdkUtils.checkAndAddConfigSetting('opt2', 'default2', updated_options); - // case where incoming options does not have setting and config does - sdkUtils.setConfigSetting('opt3', 'config3'); - updated_options = sdkUtils.checkAndAddConfigSetting('opt3', 'default3', updated_options); - - // case where incoming options does not have setting and config does have - t.equals(updated_options['opt1'], 'incoming1', 'Checking that new options has the setting with the incoming value'); - t.equals(updated_options['opt2'], 'default2', 'Checking that new options has the setting with the default value'); - t.equals(updated_options['opt3'], 'config3', 'Checking that new options has the setting with the value from the config'); - t.equals(updated_options['opt4'], 'incoming4', 'Checking that new options has setting not looked at'); - - t.end(); -}); - -// test actions after connect fails -// 1. register for event with no delay and no error callback -// 2. register for event with no delay and error callback -// 3. register for event with delay and no error callback -// 4. register for event with delay and error callback -test('\n\n** EventHub test actions when connect failures on transaction registration \n\n', (t) => { - var client = new Client(); - var event_hub = null; - var member = new User('user1'); - var crypto_suite = utils.newCryptoSuite(); - crypto_suite.setCryptoKeyStore(utils.newCryptoKeyStore()); - member.setCryptoSuite(crypto_suite); - crypto_suite.generateKey() - .then(function (key) { - return member.setEnrollment(key, test_user.TEST_CERT_PEM, 'DEFAULT'); - }).then(() => { - //var id = member.getIdentity(); - client.setUserContext(member, true); - - // tx test 1 - event_hub = client.newEventHub(); - event_hub.setPeerAddr('grpc://localhost:9999'); - event_hub.connect(); - t.doesNotThrow( - () => { - event_hub.registerTxEvent('123', () => { - t.fail('Failed callback should not have been called - tx test 1'); - }); - }, - null, - 'Check for The event hub has not been connected to the event source - tx test 1' - ); - - // tx test 2 - event_hub = client.newEventHub(); - event_hub.setPeerAddr('grpc://localhost:9999'); - event_hub.connect(); - t.doesNotThrow( - () => { - event_hub.registerTxEvent('123', - () => { - t.fail('Failed callback should not have been called - tx test 2'); - }, - (error) =>{ - if(error.toString().indexOf('Connect Failed')) { - t.pass('Successfully got the error call back tx test 2 ::'+error); - } else { - t.failed('Failed to get connection failed error tx test 2 ::'+error); - } - }); - }, - null, - 'Check for The event hub has not been connected to the event source - tx test 2' - ); - - // tx test 3 - event_hub = client.newEventHub(); - event_hub.setPeerAddr('grpc://localhost:9999'); - event_hub.connect(); - - let sleep_time = 3000; - t.comment('about to sleep '+sleep_time); - return sleep(sleep_time); - }).then(() => { - t.pass('Sleep complete'); - // eventhub is now actually not connected - - t.doesNotThrow( - () => { - event_hub.registerTxEvent('123', - () => { - t.fail('Failed callback should not have been called - tx test 3'); - }, - (error) =>{ - if(error.toString().indexOf('Shutdown')) { - t.pass('Successfully got the error call back tx test 3 ::'+error); - } else { - t.failed('Failed to get shutdown error tx test 3 :: '+error); - } - }); - }, - null, - 'Check for The event hub has been shutdown - tx test 3' - ); - event_hub.disconnect(); - }).then(() => { - t.pass('Sleep complete'); - // eventhub is now actually not connected - - t.doesNotThrow( - () => { - event_hub.registerTxEvent('123', - () => { - t.fail('Failed callback should not have been called - tx test 4'); - }, - (error) =>{ - if(error.toString().indexOf('Connect Failed')) { - t.pass('Successfully got the error call back tx test 4 ::'+error); - } else { - t.failed('Failed to get connection failed error tx test 4 :: '+error); - } - }); - }, - null, - 'Check for The event hub has not been connected to the event source - tx test 4' - ); - - t.end(); - }).catch((err) => { - t.fail(err.stack ? err.stack : err); - t.end(); - }); - -}); - -// test actions after connect fails -// 1. register for event with no delay and no error callback -// 2. register for event with no delay and error callback -// 3. register for event with delay and no error callback -// 4. register for event with delay and error callback -test('\n\n** EventHub test actions when connect failures on block registration \n\n', (t) => { - var client = new Client(); - var event_hub = null; - var member = new User('user1'); - var crypto_suite = utils.newCryptoSuite(); - crypto_suite.setCryptoKeyStore(utils.newCryptoKeyStore()); - member.setCryptoSuite(crypto_suite); - crypto_suite.generateKey() - .then(function (key) { - return member.setEnrollment(key, test_user.TEST_CERT_PEM, 'DEFAULT'); - }).then(() => { - //var id = member.getIdentity(); - client.setUserContext(member, true); - - // test 1 - event_hub = client.newEventHub(); - event_hub.setPeerAddr('grpc://localhost:9997'); - event_hub.connect(); - t.doesNotThrow( - () => { - event_hub.registerBlockEvent(() => { - t.fail('Failed callback should not have been called - block test 1'); - }); - }, - null, - 'Check for The event hub has not been connected to the event source - block test 1' - ); - - // block test 2 - event_hub = client.newEventHub(); - event_hub.setPeerAddr('grpc://localhost:9997'); - event_hub.connect(); - t.doesNotThrow( - () => { - event_hub.registerBlockEvent( - () => { - t.fail('Failed callback should not have been called - block test 2'); - }, - (error) =>{ - if(error.toString().indexOf('Connect Failed')) { - t.pass('Successfully got the error call back block test 2 ::'+error); - } else { - t.failed('Failed to get connection failed error block test 2 ::'+error); - } - }); - }, - null, - 'Check for The event hub has not been connected to the event source - block test 2' - ); - - // block test 3 - event_hub = client.newEventHub(); - event_hub.setPeerAddr('grpc://localhost:9997'); - event_hub.connect(); - - let sleep_time = 3000; - t.comment('about to sleep '+sleep_time); - return sleep(sleep_time); - }).then(() => { - t.pass('Sleep complete'); - // eventhub is now actually not connected - - t.doesNotThrow( - () => { - event_hub.registerBlockEvent( - () => { - t.fail('Failed callback should not have been called - block test 3'); - }, - (error) =>{ - if(error.toString().indexOf('Shutdown')) { - t.pass('Successfully got the error call back block test 3 ::'+error); - } else { - t.failed('Failed to get Shutdown error block test 3 :: '+error); - } - }); - }, - null, - 'Check for The event hub disconnect - block test 3' - ); - event_hub.disconnect(); - }).then(() => { - t.pass('Sleep complete'); - // eventhub is now actually not connected - - t.doesNotThrow( - () => { - event_hub.registerBlockEvent( - () => { - t.fail('Failed callback should not have been called - block test 4'); - }, - (error) =>{ - if(error.toString().indexOf('Connect Failed')) { - t.pass('Successfully got the error call back block test 4 ::'+error); - } else { - t.failed('Failed to get connection failed error block test 4 :: '+error); - } - }); - }, - null, - 'Check for The event hub has not been connected to the event source - block test 4' - ); - - t.end(); - }).catch((err) => { - t.fail(err.stack ? err.stack : err); - t.end(); - }); - -}); - -// chaincode test actions after connect fails -// 1. register for event with no delay and no error callback -// 2. register for event with no delay and error callback -// 3. register for event with delay and no error callback -// 4. register for event with delay and error callback -test('\n\n** EventHub test actions when connect failures on chaincode registration \n\n', (t) => { - var client = new Client(); - var event_hub = null; - var member = new User('user1'); - var crypto_suite = utils.newCryptoSuite(); - crypto_suite.setCryptoKeyStore(utils.newCryptoKeyStore()); - member.setCryptoSuite(crypto_suite); - crypto_suite.generateKey() - .then(function (key) { - return member.setEnrollment(key, test_user.TEST_CERT_PEM, 'DEFAULT'); - }).then(() => { - //var id = member.getIdentity(); - client.setUserContext(member, true); - - // chaincode test 1 - event_hub = client.newEventHub(); - event_hub.setPeerAddr('grpc://localhost:9998'); - event_hub.connect(); - t.doesNotThrow( - () => { - event_hub.registerChaincodeEvent('123', 'event', () => { - t.fail('Failed callback should not have been called - chaincode test 1'); - }); - }, - null, - 'Check for The event hub has not been connected to the event source - chaincode test 1' - ); - - // chaincode test 2 - event_hub = client.newEventHub(); - event_hub.setPeerAddr('grpc://localhost:9998'); - event_hub.connect(); - t.doesNotThrow( - () => { - event_hub.registerChaincodeEvent('123', 'event', - () => { - t.fail('Failed callback should not have been called - chaincode test 2'); - }, - (error) =>{ - if(error.toString().indexOf('Connect Failed')) { - t.pass('Successfully got the error call back chaincode test 2 ::'+error); - } else { - t.failed('Failed to get connection failed error chaincode test 2 ::'+error); - } - }); - }, - null, - 'Check for The event hub has not been connected to the event source - chaincode test 2' - ); - - // chaincode test 3 - event_hub = client.newEventHub(); - event_hub.setPeerAddr('grpc://localhost:9998'); - event_hub.connect(); - - let sleep_time = 3000; - t.comment('about to sleep '+sleep_time); - return sleep(sleep_time); - }).then(() => { - t.pass('Sleep complete'); - // eventhub is now actually not connected - - t.doesNotThrow( - () => { - event_hub.registerChaincodeEvent('123', 'event', - () => { - t.fail('Failed callback should not have been called - chaincode test 3'); - }, - (error) =>{ - if(error.toString().indexOf('Shutdown')) { - t.pass('Successfully got the error call back chaincode test 3 ::'+error); - } else { - t.failed('Failed to get Shutdown error chaincode test 3:: '+error); - } - }); - }, - null, - 'Check for The event hub disconnect- chaincode test 4' - ); - event_hub.disconnect(); - - }).then(() => { - t.pass('Sleep complete'); - // eventhub is now actually not connected - - t.doesNotThrow( - () => { - event_hub.registerChaincodeEvent('123', 'event', - () => { - t.fail('Failed callback should not have been called - chaincode test 4'); - }, - (error) =>{ - if(error.toString().indexOf('Connect Failed')) { - t.pass('Successfully got the error call back chaincode test 4 ::'+error); - } else { - t.failed('Failed to get connection failed error chaincode test 4 :: '+error); - } - }); - }, - null, - 'Check for The event hub has not been connected to the event source - chaincode test 4' - ); - - t.end(); - }).catch((err) => { - t.fail(err.stack ? err.stack : err); - t.end(); - }); - -}); - -function sleep(ms) { - return new Promise(resolve => setTimeout(resolve, ms)); -} diff --git a/test/unit/network-config.js b/test/unit/network-config.js index 0ca9ac1043..4b4be1d70f 100644 --- a/test/unit/network-config.js +++ b/test/unit/network-config.js @@ -112,10 +112,6 @@ test('\n\n ** configuration testing **\n\n', function (t) { client.setCryptoSuite(Client.newCryptoSuite()); client.setUserContext(new User('testUser'), true); const channel = client.getChannel('mychannel2'); - let event_hubs = client.getEventHubsForOrg(); - t.equals('localhost:8053', event_hubs[0].getPeerAddr(), ' Check to see if we got the right event hub for org2 by default'); - event_hubs = client.getEventHubsForOrg('Org1MSP'); - t.equals('localhost:7053', event_hubs[0].getPeerAddr(), ' Check to see if we got the right event hub for org1 by specifically asking for mspid of org1'); let peers = client.getPeersForOrg(); t.equals('grpcs://localhost:8051', peers[0].getUrl(), ' Check to see if we got the right peer for org2 by default'); peers = client.getPeersForOrg('Org1MSP'); @@ -161,8 +157,6 @@ test('\n\n ** configuration testing **\n\n', function (t) { t.equals(peer._options['request-timeout'],120000, ' check that we get this peer endorser timeout set'); let orderer = client._network_config.getOrderer('orderer.example.com'); t.equals(orderer._options['request-timeout'],30000, ' check that we get this orderer timeout set'); - let eventHub = client._network_config.getEventHub('peer0.org1.example.com'); - t.equals(eventHub._ep._options['request-timeout'],3000, ' check that we get this eventHub timeout set'); delete client._network_config._network_config.certificateAuthorities['ca-org1'].tlsCACerts; delete client._network_config._network_config.certificateAuthorities['ca-org1'].httpOptions; @@ -528,7 +522,6 @@ test('\n\n ** configuration testing **\n\n', function (t) { let organization = client._network_config.getOrganization(organizations[0].getName()); let ca = organization.getCertificateAuthorities()[0]; t.equals('ca1',ca.getName(),'check the ca name'); - t.equals(organization.getEventHubs().length,0,'Check that there are no event hubs'); organization = client._network_config.getOrganization(organizations[1].getName()); ca = organization.getCertificateAuthorities()[0]; @@ -537,7 +530,6 @@ test('\n\n ** configuration testing **\n\n', function (t) { organization = client._network_config.getOrganizationByMspId(organizations[0].getMspid()); ca = organization.getCertificateAuthorities()[0]; t.equals('ca1',ca.getName(),'check the ca name'); - t.equals(organization.getEventHubs().length,0,'Check that there are no event hubs'); }, null, 'Should be able to get organizations' diff --git a/test/unit/util.js b/test/unit/util.js index 4b630fc52d..fabe5c886c 100644 --- a/test/unit/util.js +++ b/test/unit/util.js @@ -544,43 +544,8 @@ module.exports.setupChannel = async function(t, client_org1, client_org2, channe module.exports.buildJoinEventMonitor = function(t, client, channel_name, peer_name) { - const event_hub = client.getEventHub(peer_name); - const event_block_promise = new Promise((resolve, reject) => { - let registration_id = null; - const event_timeout = setTimeout(() => { - let message = 'REQUEST_TIMEOUT:' + event_hub._ep._endpoint.addr; - logger.error(message); - event_hub.disconnect(); - reject(new Error(message)); - }, 30000); - registration_id = event_hub.registerBlockEvent((block) => { - clearTimeout(event_timeout); - // A peer may have more than one channel, check that this block came - // is from the channel that is being joined. - // ... also this will be the first block channel, and the channel may - // have many more blocks - if (block.data.data.length === 1) { - const channel_header = block.data.data[0].payload.header.channel_header; - if (channel_header.channel_id === channel_name) { - const message = util.format('EventHub %s has reported a block update for channel %s',event_hub._ep._endpoint.addr,channel_name); - t.pass(message); - event_hub.unregisterBlockEvent(registration_id); - event_hub.disconnect(); - t.pass(util.format('EventHub %s has been disconnected',event_hub._ep._endpoint.addr)); - resolve(message); - } else { - t.pass('Keep waiting for the right block'); - } - } - }, (err) => { - clearTimeout(event_timeout); - const message = 'Problem setting up the event hub :'+ err.toString(); - t.fail(message); - event_hub.disconnect(); - reject(new Error(message)); - }); - event_hub.connect(); - }); + // no event available ... just going to wait + const event_block_promise = new Promise(resolve => setTimeout(resolve, 10000)); return event_block_promise; };