Skip to content

Commit

Permalink
[FAB-2864] Replace hashtable module
Browse files Browse the repository at this point in the history
This module seems to be causing problems in certain platforms
(windows, node.js buildpack in cloudfoundry), replacing with
logic based on native associated arrays.

Change-Id: I16b31c5bbcce74ef136f4737aa756aeb49f9c256
Signed-off-by: Jim Zhang <[email protected]>
  • Loading branch information
jimthematrix committed May 23, 2017
1 parent b02f7a0 commit 392dd9f
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 46 deletions.
84 changes: 45 additions & 39 deletions fabric-client/lib/EventHub.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ var utils = require('./utils.js');
var Remote = require('./Remote.js');
var BlockDecoder = require('./BlockDecoder.js');
var grpc = require('grpc');
var HashTable = require('hashtable');
var logger = utils.getLogger('EventHub.js');

var _events = grpc.load(__dirname + '/protos/peer/events.proto').protos;
Expand Down Expand Up @@ -87,14 +86,14 @@ var EventHub = class {
constructor(clientContext) {
logger.debug('const ');
// hashtable of clients registered for chaincode events
this.chaincodeRegistrants = new HashTable();
this.chaincodeRegistrants = {};
// set of clients registered for block events
this.block_registrant_count = 1;
this.blockOnEvents = new HashTable();
this.blockOnErrors = new HashTable();
this.blockOnEvents = {};
this.blockOnErrors = {};
// hashtable of clients registered for transactional events
this.transactionOnEvents = new HashTable();
this.transactionOnErrors = new HashTable();
this.transactionOnEvents = {};
this.transactionOnErrors = {};
// peer node to connect to
this.ep = null;
// grpc event client interface
Expand Down Expand Up @@ -265,22 +264,27 @@ var EventHub = class {
_closeAllCallbacks(err) {
logger.debug('_closeAllCallbacks - start');

var closer = function(key, cb) {
logger.debug('_closeAllCallbacks - closing this callback %s',key);
cb(err);
var closer = function(list) {
for (let key in list) {
let cb = list[key];
logger.debug('_closeAllCallbacks - closing this callback %s',key);
cb(err);
}
};

logger.debug('_closeAllCallbacks - blockOnErrors %s',this.blockOnErrors.size());
this.blockOnErrors.forEach(closer);
this.blockOnEvents.clear();
this.blockOnErrors.clear();
logger.debug('_closeAllCallbacks - blockOnErrors %s', Object.keys(this.blockOnErrors).length);
closer(this.blockOnErrors);
this.blockOnEvents = {};
this.blockOnErrors = {};

logger.debug('_closeAllCallbacks - transactionOnErrors %s',this.transactionOnErrors.size());
this.transactionOnErrors.forEach(closer);
this.transactionOnEvents.clear();
this.transactionOnErrors.clear();
logger.debug('_closeAllCallbacks - transactionOnErrors %s', Object.keys(this.transactionOnErrors).length);
closer(this.transactionOnErrors);
this.transactionOnEvents = {};
this.transactionOnErrors = {};

var cc_closer = function(key, cbtable) {
var self = this;
var cc_closer = function(key) {
var cbtable = self.chaincodeRegistrants[key];
cbtable.forEach(function(cbe) {
logger.debug('_closeAllCallbacks - closing this chaincode event %s %s',cbe.ccid, cbe.eventNameFilter);
if(cbe.onError) {
Expand All @@ -289,9 +293,9 @@ var EventHub = class {
});
};

logger.debug('_closeAllCallbacks - chaincodeRegistrants %s',this.chaincodeRegistrants.size());
this.chaincodeRegistrants.forEach(cc_closer);
this.chaincodeRegistrants.clear();
logger.debug('_closeAllCallbacks - chaincodeRegistrants %s', Object.keys(this.chaincodeRegistrants).length);
Object.keys(this.chaincodeRegistrants).forEach(cc_closer);
this.chaincodeRegistrants = {};
}

/*
Expand Down Expand Up @@ -368,10 +372,10 @@ var EventHub = class {
this._checkConnection(!have_error_cb, false);

var cbe = new ChainCodeCBE(ccid, eventname, onEvent, onError);
var cbtable = this.chaincodeRegistrants.get(ccid);
var cbtable = this.chaincodeRegistrants[ccid];
if (!cbtable) {
cbtable = new Set();
this.chaincodeRegistrants.put(ccid, cbtable);
this.chaincodeRegistrants[ccid] = cbtable;
}
cbtable.add(cbe);

Expand All @@ -394,14 +398,14 @@ var EventHub = class {
if(!cbe) {
throw new Error('Missing "cbe" parameter');
}
var cbtable = this.chaincodeRegistrants.get(cbe.ccid);
var cbtable = this.chaincodeRegistrants[cbe.ccid];
if (!cbtable) {
logger.debug('No event registration for ccid %s ', cbe.ccid);
return;
}
cbtable.delete(cbe);
if (cbtable.size <= 0) {
this.chaincodeRegistrants.remove(cbe.ccid);
delete this.chaincodeRegistrants[cbe.ccid];
}
}

Expand Down Expand Up @@ -430,12 +434,12 @@ var EventHub = class {
this._checkConnection(!have_error_cb, false);

var block_registration_number = this.block_registrant_count++;
this.blockOnEvents.put(block_registration_number, onEvent);
this.blockOnEvents[block_registration_number] = onEvent;

// when there is an error callback try to reconnect this
// event hub if is not connected
if(have_error_cb) {
this.blockOnErrors.put(block_registration_number, onError);
this.blockOnErrors[block_registration_number] = onError;
this._checkConnection(false, this.force_reconnect);
}

Expand All @@ -453,8 +457,8 @@ var EventHub = class {
if(!block_registration_number) {
throw new Error('Missing "block_registration_number" parameter');
}
this.blockOnEvents.remove(block_registration_number);
this.blockOnErrors.remove(block_registration_number);
delete this.blockOnEvents[block_registration_number];
delete this.blockOnErrors[block_registration_number];
}

/**
Expand Down Expand Up @@ -483,12 +487,12 @@ var EventHub = class {
// when this hub is not connected
this._checkConnection(!have_error_cb, false);

this.transactionOnEvents.put(txid, onEvent);
this.transactionOnEvents[txid] = onEvent;

// when there is an onError callback try to reconnect this
// event hub if is not connected
if(have_error_cb) {
this.transactionOnErrors.put(txid, onError);
this.transactionOnErrors[txid] = onError;
this._checkConnection(false, this.force_reconnect);
}
}
Expand All @@ -502,8 +506,8 @@ var EventHub = class {
if(!txid) {
throw new Error('Missing "txid" parameter');
}
this.transactionOnEvents.remove(txid);
this.transactionOnErrors.remove(txid);
delete this.transactionOnEvents[txid];
delete this.transactionOnErrors[txid];
}

/*
Expand All @@ -512,13 +516,15 @@ var EventHub = class {
*/
_processBlockOnEvents(block) {
logger.debug('_processBlockOnEvents block=%s', block.header.number);
if(this.blockOnEvents.size() == 0) {
if(Object.keys(this.blockOnEvents).length == 0) {
logger.debug('_processBlockOnEvents - no registered block event "listeners"');
return;
}

// send to all registered block listeners
this.blockOnEvents.forEach(function(key, cb) {
let self = this;
Object.keys(this.blockOnEvents).forEach(function(key) {
var cb = self.blockOnEvents[key];
cb(block);
});
}
Expand All @@ -529,7 +535,7 @@ var EventHub = class {
*/
_processTxOnEvents(block) {
logger.debug('_processTxOnEvents block=%s', block.header.number);
if(this.transactionOnEvents.size() == 0) {
if(Object.keys(this.transactionOnEvents).length == 0) {
logger.debug('_processTxOnEvents - no registered transaction event "listeners"');
return;
}
Expand All @@ -541,7 +547,7 @@ var EventHub = class {
var channel_header = block.data.data[index].payload.header.channel_header;
var val_code = convertValidationCode(txStatusCodes[index]);
logger.debug('_processTxOnEvents - txid=%s val_code=%s', channel_header.tx_id, val_code);
var cb = this.transactionOnEvents.get(channel_header.tx_id);
var cb = this.transactionOnEvents[channel_header.tx_id];
if (cb){
logger.debug('_processTxOnEvents - about to stream the transaction call back for code=%s tx=%s', val_code, channel_header.tx_id);
cb(channel_header.tx_id, val_code);
Expand All @@ -555,7 +561,7 @@ var EventHub = class {
*/
_processChainCodeOnEvents(block) {
logger.debug('_processChainCodeOnEvents block=%s', block.header.number);
if(this.chaincodeRegistrants.size() == 0) {
if(Object.keys(this.chaincodeRegistrants).length == 0) {
logger.debug('_processChainCodeOnEvents - no registered chaincode event "listeners"');
return;
}
Expand All @@ -573,7 +579,7 @@ var EventHub = class {
var caPayload = propRespPayload.extension;
var ccEvent = caPayload.events;
logger.debug('_processChainCodeOnEvents - ccEvent %s',ccEvent);
var cbtable = this.chaincodeRegistrants.get(ccEvent.chaincode_id);
var cbtable = this.chaincodeRegistrants[ccEvent.chaincode_id];
if (!cbtable) {
return;
}
Expand Down
1 change: 0 additions & 1 deletion fabric-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
"fs": "0.0.2",
"fs-extra": ">=0.30.0 <0.31.0",
"grpc": ">=1.1.2 <1.3.0",
"hashtable": "^2.0.2",
"js-sha3": "^0.5.1",
"jsrsasign": "6.2.2",
"jssha": "^2.1.0",
Expand Down
72 changes: 66 additions & 6 deletions test/unit/event-hub.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,27 @@ test('\n\n** EventHub block callback \n\n', (t) => {
eh.connected = true; //force this into connected state
eh.force_reconnect = false;

eh.registerBlockEvent((block) => {
t.fail('Should not have called success callback');
var index = eh.registerBlockEvent((block) => {
t.fail('Should not have called success callback when disconnect() is called');
t.end();
}, (error) =>{
t.pass('Successfully called error callback');
t.pass('Successfully called error callback from disconnect()');
t.end();
});

t.pass('successfully registered block callbacks');
t.equal(index, 1, 'Check the first block listener is at index 1');

index = eh.registerBlockEvent(() => {
// empty method body
}, () => {
// empty method body
});

t.equal(index, 2, 'Check the 2nd block listener is at index 2');
t.equal(Object.keys(eh.blockOnEvents).length, 2, 'Check the size of the blockOnEvents hash table');
t.equal(Object.keys(eh.blockOnErrors).length, 2, 'Check the size of the blockOnErrors hash table');

eh.disconnect();
});

Expand All @@ -213,14 +226,36 @@ test('\n\n** EventHub transaction callback \n\n', (t) => {
eh.connected = true; //force this into connected state
eh.force_reconnect = false;

eh.registerTxEvent('txid', (block) => {
eh.registerTxEvent('txid1', (block) => {
// empty method body
}, (error) =>{
// empty method body
});
t.pass('successfully registered transaction callbacks');
t.equal(Object.keys(eh.transactionOnEvents).length, 1, 'Check the size of the transactionOnEvents hash table');
t.equal(Object.keys(eh.transactionOnErrors).length, 1, 'Check the size of the transactionOnErrors hash table');

eh.registerTxEvent('txid1', (block) => {
t.fail('Should not have called success callback');
t.end();
}, (error) =>{
t.pass('Successfully called error callback');
t.end();
});
t.pass('successfully registered transaction callbacks');
t.equal(Object.keys(eh.transactionOnEvents).length, 1,
'Size of the transactionOnEvents hash table should still be 1 since the listeners are for the same txId');
t.equal(Object.keys(eh.transactionOnErrors).length, 1,
'Size of the transactionOnErrors hash table should still be 1 since the listeners are for the same txId');

eh.registerTxEvent('txid2', (block) => {
// empty method body
}, (error) =>{
// empty method body
});

t.equal(Object.keys(eh.transactionOnEvents).length, 2, 'Check the size of the transactionOnEvents hash table');
t.equal(Object.keys(eh.transactionOnErrors).length, 2, 'Check the size of the transactionOnErrors hash table');

eh.disconnect();
});

Expand All @@ -230,14 +265,35 @@ test('\n\n** EventHub chaincode callback \n\n', (t) => {
eh.connected = true; //force this into connected state
eh.force_reconnect = false;

eh.registerChaincodeEvent('ccid', 'eventfilter', (block) => {
eh.registerChaincodeEvent('ccid1', 'eventfilter', (block) => {
t.fail('Should not have called success callback');
t.end();
}, (error) =>{
t.pass('Successfully called error callback');
t.end();
});
t.pass('successfully registered chaincode callbacks');

t.equal(Object.keys(eh.chaincodeRegistrants).length, 1, 'Check the size of the chaincodeRegistrants hash table');

eh.registerChaincodeEvent('ccid1', 'eventfilter', (block) => {
// empty method body
}, (error) =>{
// empty method body
});

t.equal(Object.keys(eh.chaincodeRegistrants).length, 1,
'Size of the chaincodeRegistrants hash table should still be 1 because both listeners are for the same chaincode');

eh.registerChaincodeEvent('ccid2', 'eventfilter', (block) => {
// empty method body
}, (error) =>{
// empty method body
});

t.equal(Object.keys(eh.chaincodeRegistrants).length, 2,
'Size of the chaincodeRegistrants hash table should still be 2');

eh.disconnect();
});

Expand Down Expand Up @@ -303,6 +359,7 @@ test('\n\n** EventHub remove block callback \n\n', (t) => {
var brn = eh.registerBlockEvent( blockcallback, blockerrorcallback);
t.pass('successfully registered block callbacks');
eh.unregisterBlockEvent(brn);
t.equal(Object.keys(eh.blockOnEvents).length, 0, 'Check the size of the blockOnEvents hash table');
t.pass('successfuly unregistered block callback');
eh.disconnect();
t.pass('successfuly disconnected eventhub');
Expand All @@ -326,6 +383,7 @@ test('\n\n** EventHub remove transaction callback \n\n', (t) => {
t.pass('successfully registered transaction callbacks');
eh.unregisterTxEvent(txid);
t.pass('successfuly unregistered transaction callback');
t.equal(Object.keys(eh.transactionOnEvents).length, 0, 'Check the size of the transactionOnEvents hash table');
eh.disconnect();
t.pass('successfuly disconnected eventhub');
t.end();
Expand All @@ -347,6 +405,8 @@ test('\n\n** EventHub remove chaincode callback \n\n', (t) => {
t.pass('successfully registered chaincode callbacks');
eh.unregisterChaincodeEvent(cbe);
t.pass('successfuly unregistered chaincode callback');
t.equal(Object.keys(eh.chaincodeRegistrants).length, 0,
'Size of the chaincodeRegistrants hash table should be 0');
eh.disconnect();
t.pass('successfuly disconnected eventhub');
t.end();
Expand Down

0 comments on commit 392dd9f

Please sign in to comment.