From 69605615a4474f547eeb4d93d45ba30f484743ac Mon Sep 17 00:00:00 2001 From: Sandeep Nishad Date: Fri, 21 Apr 2023 00:40:56 +0530 Subject: [PATCH] feat(driver): added Monitor to fabric driver for missed events Signed-off-by: Sandeep Nishad --- .../fabric-driver/.env.docker.template | 2 + .../core/drivers/fabric-driver/.env.template | 4 +- .../drivers/fabric-driver/docker-compose.yml | 2 + .../fabric-driver/server/dbConnector.ts | 6 +- .../drivers/fabric-driver/server/listener.ts | 446 ++++++++++-------- .../drivers/fabric-driver/server/server.ts | 22 +- .../fabric-driver/server/walletSetup.ts | 2 +- .../core/drivers/fabric-driver/tsconfig.json | 2 +- 8 files changed, 292 insertions(+), 194 deletions(-) diff --git a/weaver/core/drivers/fabric-driver/.env.docker.template b/weaver/core/drivers/fabric-driver/.env.docker.template index 2b530d034db..89421845f4f 100644 --- a/weaver/core/drivers/fabric-driver/.env.docker.template +++ b/weaver/core/drivers/fabric-driver/.env.docker.template @@ -14,6 +14,8 @@ WALLET_PATH= TLS_CREDENTIALS_DIR= LEVELDB_LOCKED_MAX_RETRIES= LEVELDB_LOCKED_RETRY_BACKOFF_MSEC= +ENABLE_MONITOR= +MONITOR_SYNC_PERIOD= DOCKER_IMAGE_NAME=ghcr.io/hyperledger-labs/weaver-fabric-driver DOCKER_TAG=1.4.0 EXTERNAL_NETWORK= diff --git a/weaver/core/drivers/fabric-driver/.env.template b/weaver/core/drivers/fabric-driver/.env.template index 1ef2bc31fc6..6485969c1df 100644 --- a/weaver/core/drivers/fabric-driver/.env.template +++ b/weaver/core/drivers/fabric-driver/.env.template @@ -14,4 +14,6 @@ DB_PATH=driverdbs WALLET_PATH= DEBUG=true LEVELDB_LOCKED_MAX_RETRIES= -LEVELDB_LOCKED_RETRY_BACKOFF_MSEC= \ No newline at end of file +LEVELDB_LOCKED_RETRY_BACKOFF_MSEC= +ENABLE_MONITOR=false +MONITOR_SYNC_PERIOD= \ No newline at end of file diff --git a/weaver/core/drivers/fabric-driver/docker-compose.yml b/weaver/core/drivers/fabric-driver/docker-compose.yml index dadee390bf9..e2a64f9f27c 100644 --- a/weaver/core/drivers/fabric-driver/docker-compose.yml +++ b/weaver/core/drivers/fabric-driver/docker-compose.yml @@ -26,6 +26,8 @@ services: - DEBUG=false - LEVELDB_LOCKED_MAX_RETRIES=${LEVELDB_LOCKED_MAX_RETRIES} - LEVELDB_LOCKED_RETRY_BACKOFF_MSEC=${LEVELDB_LOCKED_RETRY_BACKOFF_MSEC} + - ENABLE_MONITOR=${ENABLE_MONITOR} + - MONITOR_SYNC_PERIOD=${MONITOR_SYNC_PERIOD} volumes: - ${CONNECTION_PROFILE}:/fabric-driver/ccp.json - ${DRIVER_CONFIG}:/fabric-driver/config.json diff --git a/weaver/core/drivers/fabric-driver/server/dbConnector.ts b/weaver/core/drivers/fabric-driver/server/dbConnector.ts index a1d3232e969..2db36bb0d82 100644 --- a/weaver/core/drivers/fabric-driver/server/dbConnector.ts +++ b/weaver/core/drivers/fabric-driver/server/dbConnector.ts @@ -48,7 +48,8 @@ class LevelDBConnector implements DBConnector { dbRetryBackoffTime: number; constructor( - dbName: string + dbName: string, + retryTimeout: number = 0 ) { if (!dbName || dbName.length == 0) { dbName = "driverdb"; @@ -59,6 +60,9 @@ class LevelDBConnector implements DBConnector { this.dbOpenMaxRetries = process.env.LEVELDB_LOCKED_MAX_RETRIES ? parseInt(process.env.LEVELDB_LOCKED_MAX_RETRIES) : 250; // Retry back off time in ms, default 20ms this.dbRetryBackoffTime = process.env.LEVELDB_LOCKED_RETRY_BACKOFF_MSEC ? parseInt(process.env.LEVELDB_LOCKED_RETRY_BACKOFF_MSEC) : 20; + if (retryTimeout > 0) { + this.dbOpenMaxRetries = Math.floor(retryTimeout / this.dbRetryBackoffTime); + } } async open( diff --git a/weaver/core/drivers/fabric-driver/server/listener.ts b/weaver/core/drivers/fabric-driver/server/listener.ts index 7f18fad7af6..7e881642ec3 100644 --- a/weaver/core/drivers/fabric-driver/server/listener.ts +++ b/weaver/core/drivers/fabric-driver/server/listener.ts @@ -5,165 +5,181 @@ */ import * as fabproto6 from 'fabric-protos'; -import { Gateway, Network, Contract, ContractEvent, BlockListener, ContractListener, BlockEvent } from 'fabric-network'; +import { BlockDecoder } from 'fabric-common/index'; +import { Gateway, Network, Contract, ContractEvent, BlockListener, ContractListener, BlockEvent, ListenerOptions } from 'fabric-network'; import query_pb from '@hyperledger-labs/weaver-protos-js/common/query_pb'; import events_pb from '@hyperledger-labs/weaver-protos-js/common/events_pb'; import { lookupEventSubscriptions, readAllEventMatchers } from './events'; import { invoke, getNetworkGateway, packageFabricView } from './fabric-code'; -import { handlePromise, relayCallback, getRelayClientForEventPublish } from './utils'; -import { DBLockedError } from './dbConnector'; +import { handlePromise, relayCallback, getRelayClientForEventPublish, delay } from './utils'; +import { DBConnector, LevelDBConnector, DBLockedError, DBKeyNotFoundError } from './dbConnector'; import logger from './logger'; let networkGatewayMap = new Map(); let networkChannelMap = new Map(); -let networkChannelContractMap = new Map(); let channelBlockListenerMap = new Map(); -let channelBlockListenerCount = new Map(); -let channelContractListenerMap = new Map(); -let channelContractListenerCount = new Map(); +let channelContractListenerMap = new Map(); +let globalLedgerListenerCount = new Map(); + +const DB_NAME: string = "BLOCKCHAIN_DB"; +const DB_OPEN_TIMEOUT = 20000; +const BH_KEY = "LAST_BLOCK_HEIGHT"; function getChannelContractKey(channelId: string, contractId: string) { return channelId + ':' + contractId; } +function getBHKey(channelId: string) { + return BH_KEY + ':' + channelId +} -/** - * Register a block listener with a callback - **/ -const initBlockEventListenerForChannel = async ( - network: Network, - networkName: string, +async function setLastReadBlockNumber(db: DBConnector, channelId: string, blockNum: number) { + logger.debug(`Set last read block number ${blockNum} on channel: ${channelId}`); + await db.insert(getBHKey(channelId), blockNum); +} +async function getLastReadBlockNumber(db: DBConnector, channelId: string): Promise { + try { + const blockNum = await db.read(getBHKey(channelId)); + return blockNum; + } catch(error: any) { + logger.error(`Error during GET block number in db: ${error.toString()}`); + throw error; + } +} + +function createEventMatcher(eventName, channelId, chaincodeId, functionName) { + let eventMatcher = new events_pb.EventMatcher(); + eventMatcher.setEventType(events_pb.EventType.LEDGER_STATE); + eventMatcher.setEventClassId(eventName); + eventMatcher.setTransactionLedgerId(channelId); + eventMatcher.setTransactionContractId(chaincodeId); + eventMatcher.setTransactionFunc(functionName); + return eventMatcher; +} + +async function eventHandler(eventMatcher: events_pb.EventMatcher, eventPayload: Buffer, networkName: string, loggerName: string) { + logger.info(`${loggerName}: Trying to find subscriptions for ${JSON.stringify(eventMatcher.toObject())}`); + // Find all matching event subscriptions stored in the database + const eventSubscriptionQueries = await lookupEventSubscriptions(eventMatcher); + // Iterate through the view requests in the matching event subscriptions + eventSubscriptionQueries.forEach(async (eventSubscriptionQuery: query_pb.Query) => { + logger.info(`${loggerName}: Generating view and collecting proof for event matcher: ${JSON.stringify(eventMatcher.toObject())} with event payload: ${eventPayload.toString()}`); + // Trigger proof collection + const [result, invokeError] = await handlePromise( + invoke( + eventSubscriptionQuery, + networkName, + 'HandleEventRequest', + eventPayload + ), + ); + if (!invokeError) { + // Package view and send to relay + const client = getRelayClientForEventPublish(); + const viewPayload = packageFabricView(eventSubscriptionQuery, result); + + logger.info(`${loggerName}: Sending event`); + // Sending the Fabric event to the relay. + client.sendDriverState(viewPayload, relayCallback); + } + }) +} + +/* + * For all VALID transactions in a block: + * 1. Emits all subscribed Transaction Events + * 2. Emits all subscribed Contract Events + */ +const processBlockForEvents = async ( + block: any, channelId: string, -): Promise => { - const listener: BlockListener = async (event: BlockEvent) => { - // Parse the block data; typically there is only one element in this array but we will interate over it just to be safe - const blockData = ((event.blockData as fabproto6.common.Block).data as fabproto6.common.BlockData).data; - blockData.forEach((item) => { - const payload = Object.values(item)[Object.keys(item).indexOf('payload')]; - const payloadData = Object.values(payload)[Object.keys(payload).indexOf('data')]; - const transactions = payloadData.actions; + networkName: string, + loggerName: string = "" +) => { + // Parse the block data; typically there is only one element in this array but we will interate over it just to be safe + const blockNum = block.header.number; + const blockData = ((block as fabproto6.common.Block).data as fabproto6.common.BlockData).data; + const blockMetadata = ((block as fabproto6.common.Block).metadata as fabproto6.common.BlockMetadata).metadata; + const txFilterIndex = fabproto6.common.BlockMetadataIndex.TRANSACTIONS_FILTER; + const txValid = blockMetadata[txFilterIndex]; + logger.debug(`Event ${loggerName}: block #${blockNum}, #Transactions: ${blockData.length}`); + logger.debug(`Event ${loggerName}: block #${blockNum}, TxValidity: ${JSON.stringify(txValid)}`); + + blockData.forEach((item, index) => { + const payload = item['payload']; + const transactions = payload['data'].actions; + const tx_id = payload['header'].channel_header.tx_id; + logger.debug(`Event ${loggerName}: Transaction with TxId: ${tx_id}, index: ${index}`); + + if (txValid[index] === fabproto6.protos.TxValidationCode.VALID) { // Iterate through the transaction list transactions.forEach(async (transaction: any) => { + // Check if block subscription and then call handle block event + if (channelBlockListenerMap.has(channelId)) { + if (transaction.payload.chaincode_proposal_payload.input.chaincode_spec.input.args.length > 0) { + // Get transaction chaincode ID + const chaincodeId = transaction.payload.chaincode_proposal_payload.input.chaincode_spec.chaincode_id.name; + // below way of fetching payload requires that the response has been set by the chaincode function via return value + const responsePayload = transaction.payload.action.proposal_response_payload.extension.response.payload; + // Get transaction function name: first argument according to convention + const chaincodeFunc = transaction.payload.chaincode_proposal_payload.input.chaincode_spec.input.args[0].toString(); + const eventMatcher = createEventMatcher('', channelId, chaincodeId, chaincodeFunc); + eventHandler(eventMatcher, responsePayload, networkName, `BlockEvent ${loggerName}`); + } + } + + // Check if contract subscription and then call handle contract event // Get transaction chaincode ID const chaincodeId = transaction.payload.chaincode_proposal_payload.input.chaincode_spec.chaincode_id.name; - if (transaction.payload.chaincode_proposal_payload.input.chaincode_spec.input.args.length > 0) { - // below way of fetching payload requires that the response has been set by the chaincode function via return value - const responsePayload = transaction.payload.action.proposal_response_payload.extension.response.payload; + if (channelContractListenerMap.has(getChannelContractKey(channelId, chaincodeId))) { // below way of fetching payload is similar to ContractEventListener in which we fetch event.payload - // const responsePayload = transaction.payload.action.proposal_response_payload.extension.events.payload; - // Get transaction function name: first argument according to convention - const chaincodeFunc = transaction.payload.chaincode_proposal_payload.input.chaincode_spec.input.args[0].toString(); - logger.info(`Event Listener: Trying to find match for channel: ${channelId}, chaincode: ${chaincodeId}, function: ${chaincodeFunc}`); - // Find all matching event subscriptions stored in the database - let eventMatcher = new events_pb.EventMatcher(); - eventMatcher.setEventType(events_pb.EventType.LEDGER_STATE); - eventMatcher.setEventClassId(''); // Only match subscriptions where class ID is not specified - eventMatcher.setTransactionLedgerId(channelId); - eventMatcher.setTransactionContractId(chaincodeId); - eventMatcher.setTransactionFunc(chaincodeFunc); - let eventSubscriptionQueries; - for (let i = 0 ; i < 10 ; i++) { - try{ - eventSubscriptionQueries = await lookupEventSubscriptions(eventMatcher); - break; - } catch(error) { - let errorString: string = error.toString(); - if (!(error instanceof DBLockedError)) { // Check if contention was preventing DB access - console.error(`Event Listener: ${error}`) - throw(error); - } - await new Promise(f => setTimeout(f, 2000)); // Sleep 2 seconds - } - } - // Iterate through the view requests in the matching event subscriptions - eventSubscriptionQueries.forEach(async (eventSubscriptionQuery: query_pb.Query) => { - logger.info(`Event Listener: Generating view and collecting proof for channel: ${channelId}, chaincode: ${chaincodeId}, function: ${chaincodeFunc}, responsePayload: ${responsePayload.toString()}`); - // Trigger proof collection - const [result, invokeError] = await handlePromise( - invoke( - eventSubscriptionQuery, - networkName, - 'HandleEventRequest', - responsePayload - ), - ); - if (!invokeError) { - // Package view and send to relay - const client = getRelayClientForEventPublish(); - const viewPayload = packageFabricView(eventSubscriptionQuery, result); - - logger.info('Event Listener: Sending block event'); - // Sending the Fabric event to the relay. - client.sendDriverState(viewPayload, relayCallback); - } - }) + const eventPayload = transaction.payload.action.proposal_response_payload.extension.events.payload; + const eventName = transaction.payload.action.proposal_response_payload.extension.events.event_name; + const eventMatcher = createEventMatcher(eventName, channelId, chaincodeId, '*'); + eventHandler(eventMatcher, eventPayload, networkName, `ContractEvent ${loggerName}`); } }) - }) - }; - await network.addBlockListener(listener); - channelBlockListenerMap.set(channelId, listener); - logger.info(`Added block listener for channel ${channelId}`); - return listener; + } else { + logger.error(`Event ${loggerName}: Transaction with TxId: ${tx_id} is invalid with code ${txValid[index]}:${fabproto6.protos.TxValidationCode[txValid[index]]}. Discarding.`); + } + }) } /** - * Register a chaincode event listener with a callback + * Register a block listener with a callback **/ -const initContractEventListener = ( - contract: Contract, +const initBlockEventListenerForChannel = async ( + network: Network, networkName: string, channelId: string, - chaincodeId: string, -): any => { - const listener: ContractListener = async (event: ContractEvent) => { - logger.info(`Event Listener: Trying to find match for channel: ${channelId}, chaincode: ${chaincodeId}, event class: ${event.eventName}`); - // Find all matching event subscriptions stored in the database - let eventMatcher = new events_pb.EventMatcher(); - eventMatcher.setEventType(events_pb.EventType.LEDGER_STATE); - eventMatcher.setEventClassId(event.eventName); - eventMatcher.setTransactionLedgerId(channelId); - eventMatcher.setTransactionContractId(chaincodeId); - eventMatcher.setTransactionFunc('*'); - let eventSubscriptionQueries; - for (let i = 0 ; i < 10 ; i++) { - try{ - eventSubscriptionQueries = await lookupEventSubscriptions(eventMatcher); - break; - } catch(error) { - let errorString: string = error.toString(); - if (!(error instanceof DBLockedError)) { // Check if contention was preventing DB access - logger.error(`Event Listener: ${error}`); - throw(error); - } - await new Promise(f => setTimeout(f, 2000)); // Sleep 2 seconds +): Promise => { + const listener: BlockListener = async (event: BlockEvent) => { + let bh_db: DBConnector; + bh_db = new LevelDBConnector(DB_NAME!, DB_OPEN_TIMEOUT); + await bh_db.open(); + try { + const lastBlockNum = await getLastReadBlockNumber(bh_db, channelId); + const currBlockNum = event.blockNumber.toNumber(); + // Log failed events for debugging purpose + event.getTransactionEvents().forEach((txEvent) => { + logger.info(`BlockEvent Listener: Block #${currBlockNum} event; LastReadBlock #${lastBlockNum}\n + TxId: ${txEvent.transactionId}, + TxStatus: ${txEvent.status}, + isTxValid: ${txEvent.isValid}\n`) + }); + if (currBlockNum === lastBlockNum + 1) { + await processBlockForEvents(event.blockData, channelId, networkName, 'Listener'); + // Set current block number as listener block height + await setLastReadBlockNumber(bh_db, channelId, currBlockNum); } + } catch(error: any) { + logger.error(`BlockEvent Listener: ${error.toString()}`); + } finally { + await bh_db.close(); } - // Iterate through the view requests in the matching event subscriptions - eventSubscriptionQueries.forEach(async (eventSubscriptionQuery: query_pb.Query) => { - logger.info(`Event Listener: Generating view and collecting proof for event class: ${event.eventName}, channel: ${channelId}, chaincode: ${chaincodeId}, event.payload: ${event.payload.toString()}`); - // Trigger proof collection - const [result, invokeError] = await handlePromise( - invoke( - eventSubscriptionQuery, - networkName, - 'HandleEventRequest', - event.payload - ), - ); - if (!invokeError) { - // Package view and send to relay - const client = getRelayClientForEventPublish(); - const viewPayload = packageFabricView(eventSubscriptionQuery, result); - - logger.info('Event Listener: Sending contract event'); - // Sending the Fabric event to the relay. - client.sendDriverState(viewPayload, relayCallback); - } - }) }; - contract.addContractListener(listener); - channelContractListenerMap.set(getChannelContractKey(channelId, chaincodeId), listener); - logger.info(`Added contract listener for channel: ${channelId}, and contract: ${chaincodeId}`); + await network.addBlockListener(listener); + channelBlockListenerMap.set(channelId, listener); + logger.info(`Added block listener for channel ${channelId}`); return listener; } @@ -175,6 +191,7 @@ const registerListenerForEventSubscription = async ( networkName: string, ): Promise => { const channelId = eventMatcher.getTransactionLedgerId(); + const chaincodeId = eventMatcher.getTransactionContractId(); let gateway: Gateway, network: Network; if (networkGatewayMap.has(networkName)) { gateway = networkGatewayMap.get(networkName); @@ -188,38 +205,29 @@ const registerListenerForEventSubscription = async ( network = await gateway.getNetwork(channelId); networkChannelMap.set(channelId, network); } - // Check if the event_class_id is specified in the event matcher field. - if (eventMatcher.getEventClassId().length === 0) { - // Check if there is an active block listener for the channel specified in this event subscription. - if (channelBlockListenerMap.has(channelId)) { - channelBlockListenerCount.set(channelId, channelBlockListenerCount.get(channelId) + 1); - } else { - // Start a block listener. - const listener = await initBlockEventListenerForChannel(network, networkName, channelId); - channelBlockListenerCount.set(channelId, 1); - return listener; - } + let listener = null; + if (globalLedgerListenerCount.has(channelId)) { + globalLedgerListenerCount.set(channelId, globalLedgerListenerCount.get(channelId) + 1); } else { - // Check if there is an active contract listener for the contract function specified in this event subscription. - const contractId = eventMatcher.getTransactionContractId(); - const channelContractKey = getChannelContractKey(channelId, contractId); - let contract: Contract; - if (networkChannelContractMap.has(channelContractKey)) { - contract = networkChannelContractMap.get(channelContractKey); - } else { - contract = network.getContract(contractId); - networkChannelContractMap.set(channelContractKey, contract); - } - if (channelContractListenerMap.has(channelContractKey)) { - channelContractListenerCount.set(channelContractKey, channelContractListenerCount.get(channelContractKey) + 1); - } else { - // Start a contract listener. - const listener = initContractEventListener(contract, networkName, channelId, contractId); - channelContractListenerCount.set(channelContractKey, 1); - return listener; + let bh_db: DBConnector; + bh_db = new LevelDBConnector(DB_NAME!, DB_OPEN_TIMEOUT); + await bh_db.open(); + try { + const currBlockNum = await getCurrBlockNumber(network, channelId); + await setLastReadBlockNumber(bh_db, channelId, currBlockNum); + listener = await initBlockEventListenerForChannel(network, networkName, channelId); + if (eventMatcher.getEventClassId().length > 0) { + channelContractListenerMap.set(getChannelContractKey(channelId, chaincodeId), true); + } + globalLedgerListenerCount.set(channelId, 1); + } catch(error: any) { + logger.error(`registerListenerForEventSubscription: ${error.toString()}`); + throw error + } finally { + await bh_db.close(); } } - return null; // Listener was already running. Nothing to do. + return listener; // If null, Listener was already running. Nothing to do. } /** @@ -230,6 +238,7 @@ const unregisterListenerForEventSubscription = async ( networkName: string, ): Promise => { const channelId = eventMatcher.getTransactionLedgerId(); + const chaincodeId = eventMatcher.getTransactionContractId(); let gateway: Gateway, network: Network; if (networkGatewayMap.has(networkName)) { gateway = networkGatewayMap.get(networkName); @@ -243,39 +252,32 @@ const unregisterListenerForEventSubscription = async ( network = await gateway.getNetwork(channelId); networkChannelMap.set(channelId, network); } - // Check if the event_class_id is specified in the event matcher field. - if (eventMatcher.getEventClassId().length === 0) { - if (!channelBlockListenerMap.has(channelId)) { - return false; - } - channelBlockListenerCount.set(channelId, channelBlockListenerCount.get(channelId) - 1); - if (channelBlockListenerCount.get(channelId) === 0) { // Remove listener and counter + if (!channelBlockListenerMap.has(channelId)) { + return false; + } + // Update Global Listener count for the ledger + if (globalLedgerListenerCount.get(channelId) > 1) { + globalLedgerListenerCount.set(channelId, globalLedgerListenerCount.get(channelId) - 1); + return true; + } else { + let bh_db: DBConnector; + bh_db = new LevelDBConnector(DB_NAME!, DB_OPEN_TIMEOUT); + await bh_db.open(); + try { + // Set DB Height to -1 if no listener running + await setLastReadBlockNumber(bh_db, channelId, -1); network.removeBlockListener(channelBlockListenerMap.get(channelId)); - channelBlockListenerCount.delete(channelId); - return channelBlockListenerMap.delete(channelId); - } else { + if (eventMatcher.getEventClassId().length > 0) { + channelContractListenerMap.delete(getChannelContractKey(channelId, chaincodeId)); + } + channelBlockListenerMap.delete(channelId); + globalLedgerListenerCount.set(channelId, 0); return true; - } - } else { - const contractId = eventMatcher.getTransactionContractId(); - const channelContractKey = getChannelContractKey(channelId, contractId); - let contract: Contract; - if (networkChannelContractMap.has(channelContractKey)) { - contract = networkChannelContractMap.get(channelContractKey); - } else { - contract = network.getContract(contractId); - networkChannelContractMap.set(channelContractKey, contract); - } - if (!channelContractListenerMap.has(channelContractKey)) { + } catch(error: any) { + logger.error(`unregisterListenerForEventSubscription: ${error.toString()}`) return false; - } - channelContractListenerCount.set(channelContractKey, channelContractListenerCount.get(channelContractKey) - 1); - if (channelContractListenerCount.get(channelContractKey) === 0) { // Remove listener and counter - contract.removeContractListener(channelContractListenerMap.get(channelContractKey)); - channelContractListenerCount.delete(channelContractKey); - return channelContractListenerMap.delete(channelContractKey); - } else { - return true; + } finally { + await bh_db.close(); } } } @@ -306,4 +308,72 @@ const loadEventSubscriptionsFromStorage = async (networkName: string): Promise { + const contract = network.getContract("qscc"); + + let [result, invokeError] = await handlePromise( + contract.evaluateTransaction("GetBlockByNumber", channelId, `${blockNum}`) + ); + if (invokeError) { + throw invokeError + } + const block = BlockDecoder.decode(result); + return block; +} + +/* + * Get latest block number using query to QSCC + */ +async function getCurrBlockNumber(network: Network, channelId: string): Promise { + const contract = network.getContract("qscc"); + + let [result, invokeError] = await handlePromise( + contract.evaluateTransaction("GetChainInfo", channelId) + ); + if (invokeError) { + throw invokeError + } + const blockHeight = fabproto6.common.BlockchainInfo.decode(Buffer.from(result)).height as number; + const blockNum = blockHeight - 1; + logger.debug(`getCurrBlockNumber: Get current block number: ${blockNum}`); + return blockNum; +} + +/* + * Monitor to handle block events that were missed/discarded by listener. + */ +const monitorBlockForMissedEvents = async (networkName: string) => { + logger.debug("############### Monitor Begin #################") + // Create connection to a database + let bh_db: DBConnector; + bh_db = new LevelDBConnector(DB_NAME!, DB_OPEN_TIMEOUT); + await bh_db.open(); + try { + if (networkGatewayMap.has(networkName)) { + const gateway = networkGatewayMap.get(networkName); + // Handle Block Events + for (let [channelId, network] of networkChannelMap) { + const currBlockNum = await getCurrBlockNumber(network, channelId); + const lastBlockNum = await getLastReadBlockNumber(bh_db, channelId); + logger.debug(`Monitor: Current Block #${currBlockNum}; LastReadBlock #${lastBlockNum}`); + if (currBlockNum > lastBlockNum) { + for (let bnum=lastBlockNum+1; bnum<=currBlockNum; bnum++) { + const block = await getBlockByNum(network, channelId, bnum); + await processBlockForEvents(block, channelId, networkName, "Monitor"); + } + // Update block number in DB + await setLastReadBlockNumber(bh_db, channelId, currBlockNum); + } + } + } + } catch(error: any) { + logger.error(`Monitor Error: ${error}`) + } + await bh_db.close(); + logger.debug("############### Monitor End ###################"); +} + +export { registerListenerForEventSubscription, unregisterListenerForEventSubscription, loadEventSubscriptionsFromStorage, monitorBlockForMissedEvents }; diff --git a/weaver/core/drivers/fabric-driver/server/server.ts b/weaver/core/drivers/fabric-driver/server/server.ts index 8a02f5244fd..58cbddb8539 100644 --- a/weaver/core/drivers/fabric-driver/server/server.ts +++ b/weaver/core/drivers/fabric-driver/server/server.ts @@ -16,11 +16,11 @@ import events_grpc_pb from '@hyperledger-labs/weaver-protos-js/relay/events_grpc import state_pb from '@hyperledger-labs/weaver-protos-js/common/state_pb'; import { invoke, packageFabricView } from './fabric-code'; import 'dotenv/config'; -import { loadEventSubscriptionsFromStorage } from './listener' +import { loadEventSubscriptionsFromStorage, monitorBlockForMissedEvents } from './listener' import { walletSetup } from './walletSetup'; import { subscribeEventHelper, unsubscribeEventHelper, signEventSubscriptionQuery, writeExternalStateHelper } from "./events" import * as path from 'path'; -import { handlePromise, relayCallback, getRelayClientForQueryResponse, getRelayClientForEventSubscription } from './utils'; +import { handlePromise, relayCallback, getRelayClientForQueryResponse, getRelayClientForEventSubscription, delay } from './utils'; import { dbConnectionTest, eventSubscriptionTest } from "./tests" import driverPb from '@hyperledger-labs/weaver-protos-js/driver/driver_pb'; import logger from './logger'; @@ -253,6 +253,22 @@ const configSetup = async () => { logger.info(`Load Event Subscriptions Status: ${status}`); }; +const monitorService = async () => { + const delayTime: number = parseInt(process.env.MONITOR_SYNC_PERIOD ? process.env.MONITOR_SYNC_PERIOD : '30'); + const networkName = process.env.NETWORK_NAME ? process.env.NETWORK_NAME : 'network1'; + const flagEnable = process.env.ENABLE_MONITOR === 'false' ? false : true; + if (flagEnable) { + logger.info("Starting monitor..."); + logger.info("Monitor sync period: ", delayTime); + } else { + logger.info("Monitor disabled."); + } + while (flagEnable) { + await monitorBlockForMissedEvents(networkName); + await delay(delayTime * 1000); + } +} + // SERVER: Start the server with the provided url. // TODO: We should have credentials locally to ensure that the driver can only communicate with the local relay. if (process.env.DRIVER_TLS === 'true') { @@ -268,6 +284,7 @@ if (process.env.DRIVER_TLS === 'true') { configSetup().then(() => { logger.info('Starting server with TLS'); server.start(); + monitorService(); }); }); } else { @@ -275,6 +292,7 @@ if (process.env.DRIVER_TLS === 'true') { configSetup().then(() => { logger.info('Starting server without TLS'); server.start(); + monitorService(); }); }); } diff --git a/weaver/core/drivers/fabric-driver/server/walletSetup.ts b/weaver/core/drivers/fabric-driver/server/walletSetup.ts index b44173aaeea..af1353a8a9c 100644 --- a/weaver/core/drivers/fabric-driver/server/walletSetup.ts +++ b/weaver/core/drivers/fabric-driver/server/walletSetup.ts @@ -117,7 +117,7 @@ const getDriverKeyCert = async (): Promise => { const walletPath = process.env.WALLET_PATH ? process.env.WALLET_PATH : path.join(process.cwd(), `wallet-${process.env.NETWORK_NAME ? process.env.NETWORK_NAME : 'network1'}`); const config = getConfig(); const wallet = await Wallets.newFileSystemWallet(walletPath); - logger.debug(`Wallet path: ${walletPath}`); + logger.info(`Wallet path: ${walletPath}, relay id: ${config.relay.name}`); const [keyCert, keyCertError] = await handlePromise( InteroperableHelper.getKeyAndCertForRemoteRequestbyUserName(wallet, config.relay.name) diff --git a/weaver/core/drivers/fabric-driver/tsconfig.json b/weaver/core/drivers/fabric-driver/tsconfig.json index 37e3f701e53..01861158f12 100644 --- a/weaver/core/drivers/fabric-driver/tsconfig.json +++ b/weaver/core/drivers/fabric-driver/tsconfig.json @@ -4,7 +4,7 @@ /* Basic Options */ // "incremental": true, /* Enable incremental compilation */ - "target": "es5", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', or 'ESNEXT'. */ + "target": "es2015", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', or 'ESNEXT'. */ "module": "commonjs", /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', 'es2020', or 'ESNext'. */ // "lib": [], /* Specify library files to be included in the compilation. */ // "allowJs": true, /* Allow javascript files to be compiled. */