diff --git a/fabric-client/lib/EventHub.js b/fabric-client/lib/EventHub.js index 6cf03243a2..fe504496ba 100644 --- a/fabric-client/lib/EventHub.js +++ b/fabric-client/lib/EventHub.js @@ -69,57 +69,88 @@ var ChainCodeCBE = class { }; /** - * The EventHub class is used to distribute events from an - * event source(peer). - *

Sample usage: -
-var eh = client.newEventHub();
-eh.setPeerAddr(
-	'grpcs://localhost:9999',
-	{
-		pem: Buffer.from(certdata).toString(),
-		'ssl-target-name-override': 'peer1']
-	}
-);
-eh.connect();
-  eh.registerTxEvent(
-  	transactionId,
-	(tx, code) => {
-		eh.unregisterTxEvent(transactionId);
-		console.log('Transaction ' + transactionId +
-		'has completed');
-	},
-	(err) => {
-		eh.unregisterTxEvent(transactionId);
-		console.log('Transaction listener has been closed on ' +
-		eh.getPeerAddr());
-	}
-);
-

- * Use the "newEventHub" method on {@link Client} to get a new EventHub instance. - * Use the "setPeerAddr" method on EventHub to indicate to the EventHub - * instance the Peer's event hub address. - * Use the "connect" method on EventHub to connect to the Peer's event - * hub. This operation will be asynchronous and as such the call will not - * fail if there is an issue with the connection. - * Use the "registerTxEvent", "registerChaincodeEvent", or "registerBlockEvent" - * calls to register your callback listeners to be notified when this EventHub - * receives an event. Notice in the example there is both a callback for processing - * the event and one to process error issues. The primary error to watch for is - * a network issue that will cause the connection to close. Registering - * an error callback will guarantee that you get notified of network issues, - * otherwise there is no path available for this EventHub to notify the - * listeners. + * Transaction processing in fabric v1.0 is a long operation spanning multiple + * components (application, endorsing peer, orderer, committing peer) and takes + * a relatively lengthy period of time (think seconds instead of milliseconds) + * to complete. As a result the applications must design their handling of the + * transaction lifecyle in an asynchrous fashion. After the transaction proposal + * has been successfully [endorsed]{@link Channel#sendTransactionProposal}, and before + * the transaction message has been successfully [broadcast]{@link Channel#sendTransaction} + * to the orderer, the application should register a listener to be nofified of + * the event when the transaction achieves finality, which is when the block + * containing the transaction gets added to the peer's ledger/blockchain. + *

+ * Fabric committing peers provides an event stream to publish events to registered + * listeners. As of v1.0, the only events that get published are Block events. A + * Block event gets published whenever the committing peer adds a validated block + * to the ledger. There are three ways to register a listener to get notified: + *
  • register a "block listener" to get called for every block event on all channels. The listener + * will be passed a fully decoded {@link Block} object. See [registerBlockEvent]{@link EventHub#registerBlockEvent} + *
  • register a "transaction listener" to get called when the specific transaction + * by id is committed (discovered inside a block event). The listener will be + * passed the transaction id and the [validation code]{@link https://github.com/hyperledger/fabric/blob/v1.0.0-beta/protos/peer/transaction.proto#L125}. + * See [registerTxEvent]{@link EventHub#registerTxEvent} + *
  • register a "chaincode event listener" to get called when a specific + * [chaincode event]{@link https://github.com/hyperledger/fabric/blob/v1.0.0-beta/examples/chaincode/go/eventsender/eventsender.go} + * has arrived. The listener will be passed the {@link ChaincodeEvent}. See + * [registerChaincodeEvent]{@link EventHub#registerChaincodeEvent} + *

    + * The events are ephemeral, such that if a registered listener + * crashed when the event is published, the listener will miss the event. + * There are several techniques to compensate for missed events due to client crashes: + *
  • register block event listeners and record the block numbers received, such that + * when the next block arrives and its number is not the next in sequence, then + * the application knows exactly which block events have been missed. It can then use + * [queryBlock]{@link Channel#queryBlock} to get those missed blocks from the target peer. + *
  • use a message queue to catch all the block events. With many robust message queue + * implementations available today, you will be guaranteed to not miss an event. A + * fabric event listener can be written in any programming language. The following + * implementations can be used as reference to write the necessary glue code between + * the fabric event stream and a message queue: + * + * + * @example + * var eh = client.newEventHub(); + * eh.setPeerAddr( + * 'grpcs://localhost:7053', + * { + * pem: Buffer.from(certdata).toString(), + * 'ssl-target-name-override': 'peer1'] + * } + * ); + * + * // register the listeners before calling "connect()" so that we can + * // have the error callback ready to process an error in case the + * // connect() call fails + * eh.registerTxEvent( + * transactionId, + * (tx, code) => { + * eh.unregisterTxEvent(transactionId); + * console.log(util.format('Transaction %s has completed', transactionId); + * }, + * (err) => { + * eh.unregisterTxEvent(transactionId); + * console.log(util.format('Error %s! Transaction listener for %s has been ' + + * 'deregistered with %s', transactionId, err, eh.getPeerAddr())); + * } + * ); + * + * eh.connect(); + * * @class */ var EventHub = class { /** - * Constructs an unconnected EventHub + * Constructs an EventHub object * - * @param {Client} clientContext An instance of the Client class + * @param {Client} clientContext - An instance of the Client class * which has already been initialzed with a userContext. - * + * @returns {EventHub} An instance of this class */ constructor(clientContext) { @@ -160,17 +191,14 @@ var EventHub = class { } /** - * Set peer url for event source

    - * @param {string} peeraddr peer url - * @param {Object} opts An Object that may contain options to pass to grpcs calls - *
    - pem {string} The certificate file, in PEM format, - * to use with the gRPC protocol (that is, with TransportCredentials). - * Required when using the grpcs protocol. - *
    - ssl-target-name-override {string} Used in test environment only, when the server certificate's - * hostname (in the 'CN' field) does not match the actual host endpoint that the server process runs - * at, the application can work around the client TLS verify failure by setting this property to the - * value of the server certificate's hostname - *
    - any other standard grpc stream options will be passed to the grpc service calls directly + * @typedef {Object} EventRegistrationRequest + */ + + /** + * Set peer event source url. + * + * @param {string} peeraddr - grpc or grpcs URL for the target peer's event source + * @param {ConnectionOpts} opts - The options for the connection to the peer. */ setPeerAddr(peerUrl, opts) { logger.debug('setPeerAddr - %s',peerUrl); @@ -180,7 +208,7 @@ var EventHub = class { } /** - * Get the peer url for this event source + * Return the peer url of this event hub object */ getPeerAddr() { var addr = null; @@ -192,19 +220,26 @@ var EventHub = class { } /** - * Get connected state of eventhub - * @returns true if connected to event source, false otherwise + * Is the event hub connected to the event source? + * @returns {boolean} True if connected to the event source, false otherwise */ isconnected() { return this._connected; } /** - * Establishes a connection with the peer event source - * The peer address must be set using the "setPeerAddr" + * Establishes a connection with the peer event source. + * The peer address must be set by calling the [setPeerAddr()]{@link EventHub#setPeerAddr} * method before calling this method. * - * The connection will be established asynchronously. + * The connection will be established asynchronously. If the connection fails to + * get established, the application will be notified via the error callbacks + * from the registerXXXEvent() methods. It is recommended that an application always + * registers at least one event listener with an error callback, by calling any one of the + * [registerBlockEvent]{@link EventHub#registerBlockEvent}, + * [registerTxEvent]{@link EventHub#registerTxEvent} or + * [registerChaincodeEvent]{@link EventHub#registerChaincodeEvent} + * methods, before calling connect(). */ connect(){ logger.debug('connect - start'); @@ -291,7 +326,7 @@ var EventHub = class { clearTimeout(send_timeout); logger.debug('on.end - event stream:%s _current_stream:%s',stream_id, self._current_stream); if(stream_id != self._current_stream) { - logger.debug('on.end - incoming event was from a cancel stream'); + logger.debug('on.end - incoming event was from a canceled stream'); return; } @@ -306,7 +341,7 @@ var EventHub = class { clearTimeout(send_timeout); logger.debug('on.error - event stream:%s _current_stream:%s',stream_id, self._current_stream); if(stream_id != self._current_stream) { - logger.debug('on.error - incoming event was from a cancel stream'); + logger.debug('on.error - incoming event was from a canceled stream'); return; } @@ -326,8 +361,9 @@ var EventHub = class { } /** - * Disconnects the connection to the peer event source. - * Will close all event listeners and send an `Error` to + * Disconnects the event hub from the peer event source. + * Will close all event listeners and send an Error object + * with the message "EventHub has been shutdown" to * all listeners that provided an "onError" callback. */ disconnect() { @@ -470,26 +506,37 @@ var EventHub = class { } /** - * Register a callback function to receive chaincode events. - * This EventHub instance must be connected to a remote - * peer's event hub before registering for events by calling - * the "connect()" method. + * @typedef {Object} ChaincodeEvent + * @property {string} chaincode_id + * @property {string} tx_id + * @property {string} event_name + * @property {byte[]} payload - Application-specific byte array that the chaincode set + * when it called stub.SetEvent(event_name, payload) + */ + + /** + * Register a listener to receive chaincode events. + *

    * An error may be thrown by this call if no "onError" callback * is provided and this EventHub has noticed that the connection has not been * established. However since the connection establishment is running * asynchronously, a register call could be made before this EventHub has been * notified of the network issue. The best practice would be to provide an * "onError" callback to be notified when this EventHub has an issue. - * @param {string} ccid - string chaincode id - * @param {string} eventname - string The regex string used to filter events - * @param {function} onEvent - callback function for filter matches - * that takes a single parameter which is a json object representation - * of type "message ChaincodeEvent" from lib/proto/chaincode_event.proto - * @param {function} onError - optional callback function to be notified when this - * event hub is shutdown. The shutdown may be caused by a network error or by - * a call to the "disconnect()" method. - * @returns {Object} ChainCodeCBE object that should be treated as an opaque - * handle used to unregister (see unregisterChaincodeEvent) + * + * @param {string} ccid - Id of the chaincode of interest + * @param {string} eventname - The exact name of the chaincode event (must match + * the name given to the target chaincode's call to + * stub.SetEvent(name, payload)), or a + * regex string to match more than one event by this + * chaincode + * @param {function} onEvent - callback function for matched events. It gets passed + * a single parameter which is a {@link ChaincodeEvent} object + * @param {function} onError - Optional callback function to be notified when this event hub + * is shutdown. The shutdown may be caused by a network error or by + * a call to the "disconnect()" method or a connection error. + * @returns {Object} An object that should be treated as an opaque handle used + * to unregister (see unregisterChaincodeEvent) */ registerChaincodeEvent(ccid, eventname, onEvent, onError) { logger.debug('registerChaincodeEvent - start'); @@ -525,45 +572,66 @@ var EventHub = class { } /** - * Unregister chaincode event registration - * @param {Object} cbe - ChainCodeCBE handle return from call to - * registerChaincodeEvent. + * Unregister the chaincode event listener represented by + * the listener_handle object returned by + * the registerChaincodeEvent() method + * + * @param {Object} listener_handle - The handle object returned from the call to + * registerChaincodeEvent. */ - unregisterChaincodeEvent(cbe) { + unregisterChaincodeEvent(listener_handle) { logger.debug('unregisterChaincodeEvent - start'); - if(!cbe) { - throw new Error('Missing "cbe" parameter'); + if(!listener_handle) { + throw new Error('Missing "listener_handle" parameter'); } - var cbtable = this._chaincodeRegistrants[cbe.ccid]; + var cbtable = this._chaincodeRegistrants[listener_handle.ccid]; if (!cbtable) { - logger.debug('No event registration for ccid %s ', cbe.ccid); + logger.debug('No event registration for ccid %s ', listener_handle.ccid); return; } - cbtable.delete(cbe); + cbtable.delete(listener_handle); if (cbtable.size <= 0) { - delete this._chaincodeRegistrants[cbe.ccid]; + delete this._chaincodeRegistrants[listener_handle.ccid]; } } /** - * Register a callback function to receive block events. - * This EventHub instance must be connected to a remote - * peer's event hub before registering for events by calling - * the "connect()" method. + * Register a listener to receive all block events from all the channels that + * the target peer is part of. The listener's "onEvent" callback gets called + * on the arrival of every block. If the target peer is expected to participate + * in more than one channel, then care must be taken in the listener's implementation + * to differentiate blocks from different channels. See the example below on + * how to accomplish that. + *

    * An error may be thrown by this call if no "onError" callback * is provided and this EventHub has noticed that the connection has not been * established. However since the connection establishment is running * asynchronously, a register call could be made before this EventHub has been * notified of the network issue. The best practice would be to provide an * "onError" callback to be notified when this EventHub has an issue. - * @param {function} onEvent Function that takes a single parameter - * which is a JSON object representation of type GRPC message "Block" - * from lib/proto/common/common.proto. - * @see {@link Block} - * @param {function} onError - optional callback function to be notified when this - * event hub is shutdown. + * + * @param {function} onEvent - Callback function that takes a single parameter + * of a {@link Block} object + * @param {function} onError - Optional callback function to be notified when this event hub + * is shutdown. The shutdown may be caused by a network error or by + * a call to the "disconnect()" method or a connection error. * @returns {int} This is the block registration number that must be - * used to unregister (see unregisterBlockEvent) + * sed to unregister (see unregisterBlockEvent) + * + * @example Find out the channel Id of the arriving block + * eh.registerBlockEvent( + * (block) => { + * var first_tx = block.data.data[0]; // get the first transaction + * var header = first_tx.payload.header; // the "header" object contains metadata of the transaction + * var channel_id = header.channel_header.channel_id; + * if ("mychannel" !== channel_id) return; + * + * // do useful processing of the block + * }, + * (err) => { + * console.log('Oh snap!'); + * } + * ); */ registerBlockEvent(onEvent, onError) { logger.debug('registerBlockEvent - start'); @@ -589,10 +657,12 @@ var EventHub = class { } /** - * Unregister the block event listener with the block - * registration number. + * Unregister the block event listener using the block + * registration number that is returned by the call to + * the registerBlockEvent() method. + * * @param {int} The block registration number that was returned - * during registration. + * during registration. */ unregisterBlockEvent(block_registration_number) { logger.debug('unregisterBlockEvent - start %s',block_registration_number); @@ -604,23 +674,24 @@ var EventHub = class { } /** - * Register a callback function to receive transactional events. - * This EventHub instance must be connected to a remote - * peer's event hub before registering for events by calling - * the "connect()" method. + * Register a callback function to receive a notification when the transaction + * by the given id has been committed into a block. + *

    * An error may be thrown by this call if no "onError" callback * is provided and this EventHub has noticed that the connection has not been * established. However since the connection establishment is running * asynchronously, a register call could be made before this EventHub has been * notified of the network issue. The best practice would be to provide an * "onError" callback to be notified when this EventHub has an issue. - * @param {string} txid string transaction id - * @param {function} onEvent Function that takes a parameter which - * is a json object representation of type "message Transaction" - * from lib/proto/fabric.proto and a parameter which is a boolean - * that indicates if the transaction is invalid (true=invalid) - * @param {function} onError - optional callback function to be notified when this - * event hub is shutdown. + * + * @param {string} txid - Transaction id string + * @param {function} onEvent - Callback function that takes a parameter of type + * {@link Transaction}, and a string parameter which + * indicates if the transaction is valid (code = 'VALID'), + * or not (code string indicating the reason for invalid transaction) + * @param {function} onError - Optional callback function to be notified when this event hub + * is shutdown. The shutdown may be caused by a network error or by + * a call to the "disconnect()" method or a connection error. */ registerTxEvent(txid, onEvent, onError) { logger.debug('registerTxEvent txid ' + txid); @@ -646,8 +717,8 @@ var EventHub = class { } /** - * Unregister transactional event registration. - * @param txid string transaction id + * Unregister transaction event listener for the transaction id. + * @param {string} txid - The transaction id */ unregisterTxEvent(txid) { logger.debug('unregisterTxEvent txid ' + txid); diff --git a/test/unit/event-hub.js b/test/unit/event-hub.js index 1b124f3821..c17a98a8d5 100644 --- a/test/unit/event-hub.js +++ b/test/unit/event-hub.js @@ -164,8 +164,8 @@ test('\n\n** EventHub tests\n\n', (t) => { () => { eh.unregisterChaincodeEvent(); }, - /Missing "cbe" parameter/, - 'Check the Missing "cbe" parameter' + /Missing "listener_handle" parameter/, + 'Check the Missing "listener_handle" parameter' ); t.throws( () => {