Skip to content

Commit

Permalink
[FABN-1251] Added start/end block functionality
Browse files Browse the repository at this point in the history
- Ability to specify startBlock and endBlock when using fabric-network to handle events
- Added configurable event hub timeout when creating listeners
- Added configurable wait time between trying new event hubs
- Updated documentation
- Updated typescript definitions

Change-Id: Ifca2d775e6b5f9d583ddb7bd5b81d9b8f0018bcb
Signed-off-by: Liam Grace <[email protected]>
  • Loading branch information
liam-grace committed Jun 14, 2019
1 parent d86d83b commit fe56f25
Show file tree
Hide file tree
Showing 24 changed files with 619 additions and 275 deletions.
153 changes: 82 additions & 71 deletions docs/tutorials/event-checkpointer.md
Original file line number Diff line number Diff line change
@@ -1,32 +1,40 @@
# Event Checkpointing

This tutorial describes the approaches that can be selected by users of the fabric-network module for replaying missed events emitted by peers.

### Overview
## Overview

Events are emitted by peers when blocks are committed. Two types of events support checkpointing:

1. Contract events (also known as chaincode events) - Defined in transactions to be emitted. E.g. an event emitted when a commercial paper is sold
2. Block Events - Emitted when a block is committed

In the case of an application crashing and events being missed, applications may still want to execute the event callback for the event it missed. Peers in a Fabric network support event replay, and to support this, the fabric-network module supports checkpointing strategies that track the last block and transactions in that block, that have been seen by the client.
In the case of an application crashing and events being missed, applications may still want to execute the event callback for the event it missed. Peers in a Fabric network support event replay, and to support this, the fabric-network module supports checkpointing strategies that track the last block and transactions in that block, that have been seen by the client.

### Disclaimer

Checkpointing in its current form has not been tested to deal with all recovery scenarios, so it should be used alongside existing recovery infrastructure. {@link module:fabric-network~FileSystemCheckpointer} is designed for Proof of Technology projects, so we strongly suggest implementing your own checkpointer using the {@link module:fabric-network~BaseCheckpointer} interface.

### Notes

#### Notes
`Block Number` = `Block Height - 1`
`Block Number` = `Block Height - 1`
When using checkpointing:

- The listener will only catch up on events if the `startBlock` is less than the current `Block Number`
- If the latest block in the checkpointer is block `n` the `startBlock` will be `n + 1` (e.g. for checkpoint `blockNumber=1`,`startBlock=2`)

### Checkpointers
## Checkpointers

The `BaseCheckpoint` class is an interface that is to be used by all Checkpoint classes. fabric-network has one default class, {@link module:fabric-network~FileSystemCheckpointer} that is exported as a factory in the {@link module:fabric-network~CheckpointFactories}. The `FILE_SYSTEM_CHECKPOINTER` is the default checkpointer.

A checkpoint factory is a function that returns an instance with `BaseCheckpointer` as a parent class. These classes implement the `async save(channelName, listenerName)` and `async load()` functions.

A checkpointer is called each time the event callback is triggered.
A checkpoint factory is a function that returns an instance with `BaseCheckpointer` as a parent class. These classes implement the `async save(channelName, listenerName)` and `async load()` functions.

The checkpointer can be set when connecting to a gateway or when creating the event listener.
`BaseCheckpointer.save()` is called after the async callback function given to the event listener has finished processing.

### Custom Checkpointer

Users can configure their own checkpointer. This requires two components to be created:
Configuring a custom checkpointer requires two components to be created:

1. The Checkpointer class
2. The Factory

Expand All @@ -36,76 +44,79 @@ const path = require('path');
const { Gateway } = require('fabric-network');

class FileSystemCheckpointer extends BaseCheckpointer {
constructor(channelName, listenerName, fsOptions) {
super(channelName, listenerName);
this.basePath = path.resolve(fsOptions.basePath);
this.channelName = channelName;
this.listenerName = listenerName;
}

/**
* Initializes the checkpointer directory structure
*/
async _initialize() {
const cpPath = this._getCheckpointFileName()
}

/**
* Constructs the checkpoint files name
*/
_getCheckpointFileName() {
let filePath = path.join(this._basePath, this._channelName);
if (this._chaincodeId) {
filePath = path.join(filePath, this._chaincodeId);
}
return path.join(filePath, this._listenerName);
}

async save(transactionId, blockNumber) {
const cpPath = this._getCheckpointFileName()
if (!(await fs.exists(cpPath))) {
await this._initialize();
}
const latestCheckpoint = await this.load();
if (Number(latestCheckpoint.blockNumber) === Number(blockNumber)) {
const transactionIds = latestCheckpoint.transactionIds;
latestCheckpoint.transactionIds = transactionIds;
} else {
latestCheckpoint.blockNumber = blockNumber;
latestCheckpoint.transactionIds = [transactionIds];
}
await fs.writeFile(cppPath, JSON.stringify(latestCheckpoint));
}

async load() {
const cpPath = this._getCheckpointFileName(this._chaincodeId);
if (!(await fs.exists(cpPath))) {
await this._initialize();
}
const chkptBuffer = await fs.readFile(cpFile);
let checkpoint = checkpointBuffer.toString('utf8');
if (!checkpoint) {
checkpoint = {};
} else {
checkpoint = JSON.parse(checkpoint);
}
return checkpoint;
}
constructor(channelName, listenerName, fsOptions) {
super(channelName, listenerName);
this.basePath = path.resolve(fsOptions.basePath);
this.channelName = channelName;
this.listenerName = listenerName;
}

/**
* Initializes the checkpointer directory structure
*/
async _initialize() {
const cpPath = this._getCheckpointFileName()
}

/**
* Constructs the checkpoint files name
*/
_getCheckpointFileName() {
let filePath = path.join(this._basePath, this._channelName);
if (this._chaincodeId) {
filePath = path.join(filePath, this._chaincodeId);
}
return path.join(filePath, this._listenerName);
}

async save(transactionId, blockNumber) {
const cpPath = this._getCheckpointFileName()
if (!(await fs.exists(cpPath))) {
await this._initialize();
}
const latestCheckpoint = await this.load();
if (Number(latestCheckpoint.blockNumber) === Number(blockNumber)) {
const transactionIds = latestCheckpoint.transactionIds;
latestCheckpoint.transactionIds = transactionIds;
} else {
latestCheckpoint.blockNumber = blockNumber;
latestCheckpoint.transactionIds = [transactionIds];
}
await fs.writeFile(cppPath, JSON.stringify(latestCheckpoint));
}

async load() {
const cpPath = this._getCheckpointFileName(this._chaincodeId);
if (!(await fs.exists(cpPath))) {
await this._initialize();
}
const chkptBuffer = await fs.readFile(cpFile);
let checkpoint = checkpointBuffer.toString('utf8');
if (!checkpoint) {
checkpoint = {};
} else {
checkpoint = JSON.parse(checkpoint);
}
return checkpoint;
}
}

function File_SYSTEM_CHECKPOINTER_FACTORY(channelName, listenerName, options) {
return new FileSystemCheckpointer(channelName, listenerName, options);
return new FileSystemCheckpointer(channelName, listenerName, options);
}

const gateway = new Gateway();
await gateway.connect({
checkpointer: {
factory: FILE_SYSTEM_CHECKPOINTER_FACTORY,
options: {basePath: '/home/blockchain/checkpoints'} // These options will vary depending on the checkpointer implementation
checkpointer: {
factory: FILE_SYSTEM_CHECKPOINTER_FACTORY,
options: {basePath: '/home/blockchain/checkpoints'} // These options will vary depending on the checkpointer implementation
});

```
`Note:` When using the filesystem checkpointer, use absolute paths rather than relative paths
When specifying a specific type of checkpointer for a listener, the `checkpointer` option in {@link module:fabric-network.Network~EventListenerOptions`}
In addition to `save()` and `load()` the `BaseCheckpointer` interface also has the `loadLatestCheckpoint()` function which, in the case that `load()` returns a list of checkpoints, will return the latest incomplete checkpoint (or whichever is most relevant for the specific implementation).
`Note:` When using the filesystem checkpointer, use absolute paths rather than relative paths.
When specifying a specific type of checkpointer for a listener, the `checkpointer` option in {@link module:fabric-network.Network~EventListenerOptions`}.

92 changes: 54 additions & 38 deletions docs/tutorials/listening-to-events.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,35 @@

# Listening to events with Fabric Network

This tutorial describes the different ways to listen to events emitted by a network using the fabric-network module.

### Overview
## Overview

There are three event types that can be subscribed to:

1. Contract events - Those emitted explicitly by the chaincode developer within a transaction
2. Transaction (Commit) events - Those emitted automatically when a transaction is committed after an invoke
3. Block events - Those emitted automatically when a block is committed

Listening for these events allows the application to react without directly calling a transaction. This is ideal in use cases such as monitoring network analytics.

### Usage
## Usage

Each listener type takes at least one parameter, the event callback. This is the function that is called when an event is received.

The callback function given is expected to be a promise, meaning that the callback can perform asynchronous tasks without risking missing events.

### Options
## Options

{@link module:fabric-network.Network~EventListenerOptions}.

*Note*: Listeners will connect to event hubs and ask to receive _unfiltered_ events by default. To receive _filtered_ events, set `EventListenerOptions.filtered: true`.

### Naming
## Naming

All event listeners (including CommitEventListeners, which use the transaction ID) must have a unique name at the `Network` level

#### Contract events
## Contract events

```javascript
const gateway = new Gateway();
Expand All @@ -39,16 +44,17 @@ const contract = network.getContract('my-contract');
* @param {module:fabric-network.Network~EventListenerOptions} options
**/
const listener = await contract.addContractListener('my-contract-listener', 'sale', (err, event, blockNumber, transactionId, status) => {
if (err) {
console.error(err);
return;
}
console.log(`Block Number: ${blockNumber} Transaction ID: ${transactionId} Status: ${status}`);
if (err) {
console.error(err);
return;
}
console.log(`Block Number: ${blockNumber} Transaction ID: ${transactionId} Status: ${status}`);
})
```

Notice that there is no need to specify an event hub, as the `EventHubSelectionStrategy` will select it automatically.

#### Block events
### Block events

```javascript
const gateway = new Gateway();
Expand All @@ -61,20 +67,24 @@ const network = await gateway.getNetwork('mychannel');
* @param {module:fabric-network.Network~EventListenerOptions} options
**/
const listener = await network.addBlockListener('my-block-listener', (error, block) => {
if (err) {
console.error(err);
return;
}
console.log(`Block: ${block}`);
})
if (err) {
console.error(err);
return;
}
console.log(`Block: ${block}`);
});
```
When listening for block events, it is important to specify if you want a filtered or none filtered event, as this determines which event hub is compatible with the request.

#### Commit events
When listening for block events, it is important to specify if you want a filtered or none filtered event, as this determines which event hub is compatible with the request.

## Commit events

*Note*: The listener listener name is _transactionId_._\<some random string\>_

#### Option 1:
There are two methods for subscribing to a transaction commit event. Using {@link module:fabric-network.Network} and directly, using {@link module:fabric-network.Transaction}. Using {@link module:fabric-network.Transaction} directly, abstracts away the need to specify which transaction ID you wish to listen for.

### Option 1

```javascript
const gateway = new Gateway();
await gateway.connect(connectionProfile, gatewayOptions);
Expand All @@ -83,20 +93,21 @@ const contract = network.getContract('my-contract');

const transaction = contract.newTransaction('sell');
/**
* @param {String} transactionId the name of the event listener
* @param {String} transactionId the transaction ID
* @param {Function} callback the callback function with signature (error, transactionId, status, blockNumber)
* @param {Object} options
**/
const listener = await network.addCommitListener(transaction.getTransactionID().getTransactionID(), (err, transactionId, status, blockNumber) => {
if (err) {
console.error(err);
return;
}
console.log(`Transaction ID: ${transactionId} Status: ${status} Block number: ${blockNumber}`);
});
if (err) {
console.error(err);
return;
}
console.log(`Transaction ID: ${transactionId} Status: ${status} Block number: ${blockNumber}`);
});
```

#### Option 2:
### Option 2

```javascript
const gateway = new Gateway();
await gateway.connect(connectionProfile, gatewayOptions);
Expand All @@ -105,24 +116,29 @@ const contract = network.getContract('my-contract');

const transaction = contract.newTransaction('sell');
/**
* @param {String} transactionId the name of the event listener
* @param {String} transactionId the transaction ID
* @param {Function} callback the callback function with signature (error, transactionId, status, blockNumber)
* @param {Object} options
**/
const listener = await transaction.addCommitListener((err, transactionId, status, blockNumber) => {
if (err) {
console.error(err);
return;
}
console.log(`Transaction ID: ${transactionId} Status: ${status} Block number: ${blockNumber}`);
});
if (err) {
console.error(err);
return;
}
console.log(`Transaction ID: ${transactionId} Status: ${status} Block number: ${blockNumber}`);
});
```

Both `Network.addCommitListener` and `Contract.addCommitListener` have an optional `eventHub` parameter. When set, the listener will only listen to that event hub, and in the event of an unforeseen disconnect, it will try and to reconnect without using the `EventHubSelectionStrategy`.

### Checkpointing
## Checkpointing

{@tutorial event-checkpointer}

### Unregistering listeners
## Start Block and End Block

In the {@link module:fabric-network~EventListenerOptions} it is possible to specify a `startBlock` and an `endBlock`. This behaves in the same way as the same options on {@link ChannelEventHub} shown in the tutorial here {@tutorial channel-events}. Using `startBlock` and `endBlock` disables event replay using a checkpointer for the events received by that listener.

## Unregistering listeners

`addContractListener`, `addBlockListener` and `addCommitListener` return a `ContractEventListener`, `BlockEventListener` and `CommitEventListener` respectively. Each has an `unregister()` function that removes the listener from the event hub, meaning no further events will be received from that listener until `register()` is called again
`addContractListener`, `addBlockListener` and `addCommitListener` return a `ContractEventListener`, `BlockEventListener` and `CommitEventListener` respectively. Each has an `unregister()` function that removes the listener from the event hub, meaning no further events will be received from that listener until `register()` is called again.
2 changes: 1 addition & 1 deletion fabric-client/lib/ChannelEventHub.js
Original file line number Diff line number Diff line change
Expand Up @@ -1481,7 +1481,7 @@ class ChannelEventHub {
}
if (trans_reg.disconnect) {
logger.debug('_callTransactionListener - automatically disconnect');
this._disconnect(new EventHubDisconnectError('Shutdown due to disconnect on transaction id registration'));
this._disconnect(new Error('Shutdown due to disconnect on transaction id registration'));
}
}

Expand Down
2 changes: 1 addition & 1 deletion fabric-network/lib/contract.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class Contract {
options.checkpointer = this.getCheckpointer(options);
const listener = new ContractEventListener(this, listenerName, eventName, callback, options);
const network = this.getNetwork();
network.saveListener(listenerName, listener);
network._checkListenerNameIsUnique(listener.listenerName);
await listener.register();
return listener;
}
Expand Down
Loading

0 comments on commit fe56f25

Please sign in to comment.