diff --git a/weaver/.github/workflows/deploy_go-pkgs.yml b/weaver/.github/workflows/deploy_go-pkgs.yml index f4dd6ca0c80..f1f528578ea 100644 --- a/weaver/.github/workflows/deploy_go-pkgs.yml +++ b/weaver/.github/workflows/deploy_go-pkgs.yml @@ -75,7 +75,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.16 + go-version: '1.20.2' - name: Set current date as env run: echo "RELEASE_DATE=$(date +%b\ %d,\ %Y)" >> $GITHUB_ENV @@ -129,7 +129,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.16 + go-version: '1.20.2' - name: Set current date as env run: echo "RELEASE_DATE=$(date +%b\ %d,\ %Y)" >> $GITHUB_ENV @@ -183,7 +183,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.16 + go-version: '1.20.2' - name: Set current date as env run: echo "RELEASE_DATE=$(date +%b\ %d,\ %Y)" >> $GITHUB_ENV @@ -237,7 +237,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.16 + go-version: '1.20.2' - name: Set current date as env run: echo "RELEASE_DATE=$(date +%b\ %d,\ %Y)" >> $GITHUB_ENV @@ -291,7 +291,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.16 + go-version: '1.20.2' - name: Set current date as env run: echo "RELEASE_DATE=$(date +%b\ %d,\ %Y)" >> $GITHUB_ENV diff --git a/weaver/core/drivers/fabric-driver/.env.docker.template b/weaver/core/drivers/fabric-driver/.env.docker.template index 5402fd97e25..2b530d034db 100644 --- a/weaver/core/drivers/fabric-driver/.env.docker.template +++ b/weaver/core/drivers/fabric-driver/.env.docker.template @@ -12,6 +12,8 @@ INTEROP_CHAINCODE= DB_PATH=driverdbs WALLET_PATH= TLS_CREDENTIALS_DIR= +LEVELDB_LOCKED_MAX_RETRIES= +LEVELDB_LOCKED_RETRY_BACKOFF_MSEC= DOCKER_IMAGE_NAME=ghcr.io/hyperledger-labs/weaver-fabric-driver DOCKER_TAG=1.4.0 EXTERNAL_NETWORK= diff --git a/weaver/core/drivers/fabric-driver/.env.template b/weaver/core/drivers/fabric-driver/.env.template index 92259f161d4..1ef2bc31fc6 100644 --- a/weaver/core/drivers/fabric-driver/.env.template +++ b/weaver/core/drivers/fabric-driver/.env.template @@ -13,3 +13,5 @@ MOCK=false DB_PATH=driverdbs WALLET_PATH= DEBUG=true +LEVELDB_LOCKED_MAX_RETRIES= +LEVELDB_LOCKED_RETRY_BACKOFF_MSEC= \ No newline at end of file diff --git a/weaver/core/drivers/fabric-driver/docker-compose.yml b/weaver/core/drivers/fabric-driver/docker-compose.yml index 5fc91e75e79..dadee390bf9 100644 --- a/weaver/core/drivers/fabric-driver/docker-compose.yml +++ b/weaver/core/drivers/fabric-driver/docker-compose.yml @@ -24,6 +24,8 @@ services: - DRIVER_TLS_KEY_PATH=${DRIVER_TLS_KEY_PATH} - WALLET_PATH=${WALLET_PATH} - DEBUG=false + - LEVELDB_LOCKED_MAX_RETRIES=${LEVELDB_LOCKED_MAX_RETRIES} + - LEVELDB_LOCKED_RETRY_BACKOFF_MSEC=${LEVELDB_LOCKED_RETRY_BACKOFF_MSEC} volumes: - ${CONNECTION_PROFILE}:/fabric-driver/ccp.json - ${DRIVER_CONFIG}:/fabric-driver/config.json diff --git a/weaver/core/drivers/fabric-driver/server/dbConnector.ts b/weaver/core/drivers/fabric-driver/server/dbConnector.ts index 96c61fb1325..a1d3232e969 100644 --- a/weaver/core/drivers/fabric-driver/server/dbConnector.ts +++ b/weaver/core/drivers/fabric-driver/server/dbConnector.ts @@ -44,6 +44,8 @@ class LevelDBConnector implements DBConnector { DB_TYPE: string = "Level"; DB_NAME: string; dbHandle: any; + dbOpenMaxRetries: number; + dbRetryBackoffTime: number; constructor( dbName: string @@ -53,15 +55,20 @@ class LevelDBConnector implements DBConnector { } this.DB_NAME = dbName; this.dbHandle = new Level(path.join(process.env.DB_PATH ? process.env.DB_PATH : "./driverdbs", dbName), { valueEncoding: 'json' }); + // Retry max attempts, default 250, making it 5 seconds for retries + this.dbOpenMaxRetries = process.env.LEVELDB_LOCKED_MAX_RETRIES ? parseInt(process.env.LEVELDB_LOCKED_MAX_RETRIES) : 250; + // Retry back off time in ms, default 20ms + this.dbRetryBackoffTime = process.env.LEVELDB_LOCKED_RETRY_BACKOFF_MSEC ? parseInt(process.env.LEVELDB_LOCKED_RETRY_BACKOFF_MSEC) : 20; } async open( i: number = 0 ): Promise { + logger.debug(`attempt #${i} to open database ${this.DB_NAME}`) try { await this.dbHandle.open(); } catch (error: any) { - if (i>=10) { + if (i >= this.dbOpenMaxRetries) { logger.error(`failed to open database connection with error: ${error.toString()}`); if (error.code == 'LEVEL_DATABASE_NOT_OPEN' && error.cause && error.cause.code == 'LEVEL_LOCKED') { throw new DBLockedError(error.toString()); @@ -70,7 +77,8 @@ class LevelDBConnector implements DBConnector { } } else { - await delay(1000); + logger.debug(`failed to open database connection with error: ${error.toString()}`); + await delay(this.dbRetryBackoffTime); await this.open(i+1); } } diff --git a/weaver/core/drivers/fabric-driver/server/events.ts b/weaver/core/drivers/fabric-driver/server/events.ts index a928a248600..7389892e71e 100644 --- a/weaver/core/drivers/fabric-driver/server/events.ts +++ b/weaver/core/drivers/fabric-driver/server/events.ts @@ -164,7 +164,7 @@ async function addEventSubscription( var subscriptionsSerialized: string = await db.read(key) as string; subscriptions = JSON.parse(subscriptionsSerialized); - logger.debug(`subscriptions.length: ${subscriptions.length}`); + logger.debug(`existing subscriptions.length: ${subscriptions.length}`); // check if the event to be subscribed is already present in the DB for (const subscriptionSerialized of subscriptions) { var subscription: queryPb.Query = queryPb.Query.deserializeBinary(Buffer.from(subscriptionSerialized, 'base64')); @@ -200,14 +200,14 @@ async function addEventSubscription( } } - logger.debug(`subscriptions.length: ${subscriptions.length}`); + logger.debug(`new subscriptions.length: ${subscriptions.length}`); subscriptionsSerialized = JSON.stringify(subscriptions); // insert the value against key in the DB (it can be the scenario of a new key addition, or update to the value of an existing key) await db.insert(key, subscriptionsSerialized); await db.close(); // TODO: register the event with fabric sdk - logger.info(`end addEventSubscription() .. requestId: ${query.getRequestId()}`); + logger.debug(`end addEventSubscription() .. requestId: ${query.getRequestId()}`); return query.getRequestId(); } catch(error: any) { @@ -269,7 +269,7 @@ const deleteEventSubscription = async ( } await db.close(); - logger.info(`end deleteEventSubscription() .. retVal: ${JSON.stringify(retVal.toObject())}`); + logger.debug(`end deleteEventSubscription() .. retVal: ${JSON.stringify(retVal.toObject())}`); return retVal; } catch(error: any) { logger.error(`Error during delete: ${error.toString()}`); @@ -280,10 +280,11 @@ const deleteEventSubscription = async ( function filterEventMatcher(keySerialized: string, eventMatcher: eventsPb.EventMatcher) : boolean { var item: eventsPb.EventMatcher = eventsPb.EventMatcher.deserializeBinary(Buffer.from(keySerialized, 'base64')); + logger.debug(`eventMatcher from db: ${JSON.stringify(item.toObject())}`) if ((eventMatcher.getEventClassId() == '*' || eventMatcher.getEventClassId() == item.getEventClassId()) && (eventMatcher.getTransactionContractId() == '*' || eventMatcher.getTransactionContractId() == item.getTransactionContractId()) && (eventMatcher.getTransactionLedgerId() == '*' || eventMatcher.getTransactionLedgerId() == item.getTransactionLedgerId()) && - (eventMatcher.getTransactionFunc() == '*' || eventMatcher.getTransactionFunc() == item.getTransactionFunc().toLowerCase())) { + (eventMatcher.getTransactionFunc() == '*' || eventMatcher.getTransactionFunc().toLowerCase() == item.getTransactionFunc().toLowerCase())) { return true; } else { @@ -294,7 +295,7 @@ function filterEventMatcher(keySerialized: string, eventMatcher: eventsPb.EventM async function lookupEventSubscriptions( eventMatcher: eventsPb.EventMatcher ): Promise> { - logger.info(`finding the subscriptions with eventMatcher: ${JSON.stringify(eventMatcher.toObject())}`); + logger.debug(`finding the subscriptions with eventMatcher: ${JSON.stringify(eventMatcher.toObject())}`); var subscriptions: Array; var returnSubscriptions: Array = new Array(); let db: DBConnector; @@ -313,8 +314,8 @@ async function lookupEventSubscriptions( } } - logger.debug(`returnSubscriptions.length: ${returnSubscriptions.length}`); - logger.info(`end lookupEventSubscriptions()`); + logger.info(`found ${returnSubscriptions.length} matching subscriptions`); + logger.debug(`end lookupEventSubscriptions()`); await db.close(); return returnSubscriptions; @@ -324,7 +325,7 @@ async function lookupEventSubscriptions( if (error instanceof DBKeyNotFoundError) { // case of read failing due to key not found returnSubscriptions = new Array(); - logger.debug(`returnSubscriptions.length: ${returnSubscriptions.length}`); + logger.info(`found ${returnSubscriptions.length} matching subscriptions`); return returnSubscriptions; } else { // case of read failing due to some other issue @@ -347,6 +348,7 @@ async function readAllEventMatchers(): Promise> { const eventMatcher = eventsPb.EventMatcher.deserializeBinary(Uint8Array.from(Buffer.from(key, 'base64'))) returnMatchers.push(eventMatcher) } + logger.info(`found ${returnMatchers.length} eventMatchers`); logger.debug(`end readAllEventMatchers()`); await db.close(); return returnMatchers; @@ -434,7 +436,7 @@ async function writeExternalStateHelper( ccArgs: ccArgsStr, contractName: ctx.getContractId() } - logger.info(`invokeObject.ccArgs: ${invokeObject.ccArgs}`) + logger.info(`writing external state to contract: ${ctx.getContractId()} with function: ${ctx.getFunc()}, and args: ${invokeObject.ccArgs} on channel: ${ctx.getLedgerId()}`); const [ response, responseError ] = await handlePromise(InteroperableHelper.submitTransactionWithRemoteViews( interopContract, @@ -450,7 +452,8 @@ async function writeExternalStateHelper( gateway.disconnect(); throw responseError; } - gateway.disconnect(); + logger.debug(`write successful`); + gateway.disconnect(); } else { const errorString: string = `erroneous viewPayload identified in WriteExternalState processing`; logger.error(`error viewPayload.getError(): ${JSON.stringify(viewPayload.getError())}`); diff --git a/weaver/core/drivers/fabric-driver/server/listener.ts b/weaver/core/drivers/fabric-driver/server/listener.ts index f6fd2d2af83..7f18fad7af6 100644 --- a/weaver/core/drivers/fabric-driver/server/listener.ts +++ b/weaver/core/drivers/fabric-driver/server/listener.ts @@ -52,7 +52,7 @@ const initBlockEventListenerForChannel = async ( // const responsePayload = transaction.payload.action.proposal_response_payload.extension.events.payload; // Get transaction function name: first argument according to convention const chaincodeFunc = transaction.payload.chaincode_proposal_payload.input.chaincode_spec.input.args[0].toString(); - logger.info(`Trying to find match for channel: ${channelId}, chaincode: ${chaincodeId}, function: ${chaincodeFunc}`); + logger.info(`Event Listener: Trying to find match for channel: ${channelId}, chaincode: ${chaincodeId}, function: ${chaincodeFunc}`); // Find all matching event subscriptions stored in the database let eventMatcher = new events_pb.EventMatcher(); eventMatcher.setEventType(events_pb.EventType.LEDGER_STATE); @@ -68,6 +68,7 @@ const initBlockEventListenerForChannel = async ( } catch(error) { let errorString: string = error.toString(); if (!(error instanceof DBLockedError)) { // Check if contention was preventing DB access + console.error(`Event Listener: ${error}`) throw(error); } await new Promise(f => setTimeout(f, 2000)); // Sleep 2 seconds @@ -75,7 +76,7 @@ const initBlockEventListenerForChannel = async ( } // Iterate through the view requests in the matching event subscriptions eventSubscriptionQueries.forEach(async (eventSubscriptionQuery: query_pb.Query) => { - logger.info(`Generating view and collecting proof for channel: ${channelId}, chaincode: ${chaincodeId}, function: ${chaincodeFunc}, responsePayload: ${responsePayload.toString()}`); + logger.info(`Event Listener: Generating view and collecting proof for channel: ${channelId}, chaincode: ${chaincodeId}, function: ${chaincodeFunc}, responsePayload: ${responsePayload.toString()}`); // Trigger proof collection const [result, invokeError] = await handlePromise( invoke( @@ -90,7 +91,7 @@ const initBlockEventListenerForChannel = async ( const client = getRelayClientForEventPublish(); const viewPayload = packageFabricView(eventSubscriptionQuery, result); - logger.info('Sending block event'); + logger.info('Event Listener: Sending block event'); // Sending the Fabric event to the relay. client.sendDriverState(viewPayload, relayCallback); } @@ -115,7 +116,7 @@ const initContractEventListener = ( chaincodeId: string, ): any => { const listener: ContractListener = async (event: ContractEvent) => { - logger.info(`Trying to find match for channel: ${channelId}, chaincode: ${chaincodeId}, event class: ${event.eventName}`); + logger.info(`Event Listener: Trying to find match for channel: ${channelId}, chaincode: ${chaincodeId}, event class: ${event.eventName}`); // Find all matching event subscriptions stored in the database let eventMatcher = new events_pb.EventMatcher(); eventMatcher.setEventType(events_pb.EventType.LEDGER_STATE); @@ -131,6 +132,7 @@ const initContractEventListener = ( } catch(error) { let errorString: string = error.toString(); if (!(error instanceof DBLockedError)) { // Check if contention was preventing DB access + logger.error(`Event Listener: ${error}`); throw(error); } await new Promise(f => setTimeout(f, 2000)); // Sleep 2 seconds @@ -138,7 +140,7 @@ const initContractEventListener = ( } // Iterate through the view requests in the matching event subscriptions eventSubscriptionQueries.forEach(async (eventSubscriptionQuery: query_pb.Query) => { - logger.info(`Generating view and collecting proof for event class: ${event.eventName}, channel: ${channelId}, chaincode: ${chaincodeId}, event.payload: ${event.payload.toString()}`); + logger.info(`Event Listener: Generating view and collecting proof for event class: ${event.eventName}, channel: ${channelId}, chaincode: ${chaincodeId}, event.payload: ${event.payload.toString()}`); // Trigger proof collection const [result, invokeError] = await handlePromise( invoke( @@ -153,7 +155,7 @@ const initContractEventListener = ( const client = getRelayClientForEventPublish(); const viewPayload = packageFabricView(eventSubscriptionQuery, result); - logger.info('Sending contract event'); + logger.info('Event Listener: Sending contract event'); // Sending the Fabric event to the relay. client.sendDriverState(viewPayload, relayCallback); } diff --git a/weaver/core/network/fabric-interop-cc/Dockerfile b/weaver/core/network/fabric-interop-cc/Dockerfile index a413d31711a..c962f078720 100644 --- a/weaver/core/network/fabric-interop-cc/Dockerfile +++ b/weaver/core/network/fabric-interop-cc/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.16 AS build +FROM golang:1.20 AS build COPY . /fabric-interop-cc WORKDIR /fabric-interop-cc diff --git a/weaver/core/relay/.env.template.2 b/weaver/core/relay/.env.template.2 index dbc0291cd4a..b74e7c748ac 100644 --- a/weaver/core/relay/.env.template.2 +++ b/weaver/core/relay/.env.template.2 @@ -6,6 +6,8 @@ DRIVER_PORT= DRIVER_NAME= RELAY_NAME= RELAY_PORT= +DB_OPEN_MAX_RETRIES= +DB_OPEN_RETRY_BACKOFF_MSEC= DOCKER_IMAGE_NAME=ghcr.io/hyperledger-labs/weaver-relay-server DOCKER_TAG=1.4.2 EXTERNAL_NETWORK= diff --git a/weaver/core/relay/config/Corda_Relay.toml b/weaver/core/relay/config/Corda_Relay.toml index 5316f022c86..ea47a431793 100644 --- a/weaver/core/relay/config/Corda_Relay.toml +++ b/weaver/core/relay/config/Corda_Relay.toml @@ -5,6 +5,10 @@ hostname="localhost" db_path="db/Corda_Relay/requests" # This will be replaced by the task queue. remote_db_path="db/Corda_Relay/remote_request" +# max retries opening sled db if it is locked +db_open_max_retries=500 +# retry back off time in ms if sled db is locked +db_open_retry_backoff_msec=10 # FOR TLS cert_path="credentials/fabric_cert.pem" diff --git a/weaver/core/relay/config/Corda_Relay2.toml b/weaver/core/relay/config/Corda_Relay2.toml index 2549287bdc9..84293fe036c 100644 --- a/weaver/core/relay/config/Corda_Relay2.toml +++ b/weaver/core/relay/config/Corda_Relay2.toml @@ -5,6 +5,10 @@ hostname="localhost" db_path="db/Corda_Relay2/requests" # This will be replaced by the task queue. remote_db_path="db/Corda_Relay2/remote_request" +# max retries opening sled db if it is locked +db_open_max_retries=500 +# retry back off time in ms if sled db is locked +db_open_retry_backoff_msec=10 # FOR TLS cert_path="credentials/fabric_cert.pem" diff --git a/weaver/core/relay/config/Dummy_Relay.toml b/weaver/core/relay/config/Dummy_Relay.toml index 9aaf75db2e8..5d68da94403 100644 --- a/weaver/core/relay/config/Dummy_Relay.toml +++ b/weaver/core/relay/config/Dummy_Relay.toml @@ -4,6 +4,10 @@ hostname="localhost" db_path="db/Dummy_Relay/requests" # This will be replaced by the task queue. remote_db_path="db/Dummy_Relay/remote_request" +# max retries opening sled db if it is locked +db_open_max_retries=500 +# retry back off time in ms if sled db is locked +db_open_retry_backoff_msec=10 # FOR TLS cert_path="credentials/fabric_cert.pem" diff --git a/weaver/core/relay/config/Dummy_Relay_tls.toml b/weaver/core/relay/config/Dummy_Relay_tls.toml index b47955ae184..17677e6f2c0 100644 --- a/weaver/core/relay/config/Dummy_Relay_tls.toml +++ b/weaver/core/relay/config/Dummy_Relay_tls.toml @@ -4,6 +4,10 @@ hostname="localhost" db_path="db/Dummy_Relay_tls/requests" # This will be replaced by the task queue. remote_db_path="db/Dummy_Relay_tls/remote_request" +# max retries opening sled db if it is locked +db_open_max_retries=500 +# retry back off time in ms if sled db is locked +db_open_retry_backoff_msec=10 # FOR TLS cert_path="credentials/fabric_cert.pem" diff --git a/weaver/core/relay/config/Fabric_Relay.toml b/weaver/core/relay/config/Fabric_Relay.toml index ee7036ba6ad..98536a597df 100644 --- a/weaver/core/relay/config/Fabric_Relay.toml +++ b/weaver/core/relay/config/Fabric_Relay.toml @@ -4,6 +4,10 @@ hostname="localhost" db_path="db/Fabric_Relay/requests" # This will be replaced by the task queue. remote_db_path="db/Fabric_Relay/remote_request" +# max retries opening sled db if it is locked +db_open_max_retries=500 +# retry back off time in ms if sled db is locked +db_open_retry_backoff_msec=10 # FOR TLS diff --git a/weaver/core/relay/config/Fabric_Relay2.toml b/weaver/core/relay/config/Fabric_Relay2.toml index 4c002ae7326..995f7250e84 100644 --- a/weaver/core/relay/config/Fabric_Relay2.toml +++ b/weaver/core/relay/config/Fabric_Relay2.toml @@ -4,6 +4,10 @@ hostname="localhost" db_path="db/Fabric_Relay2/requests" # This will be replaced by the task queue. remote_db_path="db/Fabric_Relay2/remote_request" +# max retries opening sled db if it is locked +db_open_max_retries=500 +# retry back off time in ms if sled db is locked +db_open_retry_backoff_msec=10 # FOR TLS diff --git a/weaver/core/relay/config/Settings.toml b/weaver/core/relay/config/Settings.toml index 4d9e198ea05..4628dba61fd 100644 --- a/weaver/core/relay/config/Settings.toml +++ b/weaver/core/relay/config/Settings.toml @@ -4,6 +4,10 @@ hostname="localhost" db_path="db/requests" # This will be replaced by the task queue. remote_db_path="db/remote_requests" +# max retries opening sled db if it is locked +db_open_max_retries=500 +# retry back off time in ms if sled db is locked +db_open_retry_backoff_msec=10 [networks] [networks.Fabric_Network] diff --git a/weaver/core/relay/docker-compose.yaml b/weaver/core/relay/docker-compose.yaml index da6a295d561..22875683f0e 100644 --- a/weaver/core/relay/docker-compose.yaml +++ b/weaver/core/relay/docker-compose.yaml @@ -93,6 +93,11 @@ services: # working ok, or if you want to add additional assets such as # custom configurations and additional remote relay definitions # + # - DB_OPEN_MAX_RETRIES=${DB_OPEN_MAX_RETRIES} + # max retries opening sled db if it is locked + # - DB_OPEN_RETRY_BACKOFF_MSEC=${DB_OPEN_RETRY_BACKOFF_MSEC} + # retries back off time for opening sled db if it is locked + # volumes: # # Uncomment these two files if you want to mount your specialised diff --git a/weaver/core/relay/docker/server.template.toml b/weaver/core/relay/docker/server.template.toml index 29a231df91a..d0e29f1c28b 100644 --- a/weaver/core/relay/docker/server.template.toml +++ b/weaver/core/relay/docker/server.template.toml @@ -9,6 +9,8 @@ port="${RELAY_PORT}" hostname="0.0.0.0" db_path="db/${RELAY_NAME}/requests" remote_db_path="db/${RELAY_NAME}/remote_request" +db_open_max_retries="${DB_OPEN_MAX_RETRIES}" +db_open_retry_backoff_msec="${DB_OPEN_RETRY_BACKOFF_MSEC}" cert_path="${RELAY_TLS_CERT_PATH}" key_path="${RELAY_TLS_KEY_PATH}" tls="${RELAY_TLS}" diff --git a/weaver/core/relay/docker/testnet-envs/.env.corda b/weaver/core/relay/docker/testnet-envs/.env.corda index ddba41ec24a..5b9e7b491d0 100644 --- a/weaver/core/relay/docker/testnet-envs/.env.corda +++ b/weaver/core/relay/docker/testnet-envs/.env.corda @@ -6,6 +6,8 @@ DRIVER_PORT=9099 DRIVER_NAME=corda-driver-Corda_Network RELAY_NAME=relay-corda RELAY_PORT=9081 +DB_OPEN_MAX_RETRIES=500 +DB_OPEN_RETRY_BACKOFF_MSEC=10 DOCKER_IMAGE_NAME=ghcr.io/hyperledger-labs/weaver-relay-server DOCKER_TAG=latest EXTERNAL_NETWORK=corda_default diff --git a/weaver/core/relay/docker/testnet-envs/.env.corda.tls b/weaver/core/relay/docker/testnet-envs/.env.corda.tls index df5e570de6d..bcb0443aa2d 100644 --- a/weaver/core/relay/docker/testnet-envs/.env.corda.tls +++ b/weaver/core/relay/docker/testnet-envs/.env.corda.tls @@ -6,6 +6,8 @@ DRIVER_PORT=9099 DRIVER_NAME=corda-driver-Corda_Network RELAY_NAME=relay-corda RELAY_PORT=9081 +DB_OPEN_MAX_RETRIES=500 +DB_OPEN_RETRY_BACKOFF_MSEC=10 DOCKER_IMAGE_NAME=ghcr.io/hyperledger-labs/weaver-relay-server DOCKER_TAG=latest EXTERNAL_NETWORK=corda_default diff --git a/weaver/core/relay/docker/testnet-envs/.env.corda2 b/weaver/core/relay/docker/testnet-envs/.env.corda2 index 436b5e5e212..ed7c9a7c4f8 100644 --- a/weaver/core/relay/docker/testnet-envs/.env.corda2 +++ b/weaver/core/relay/docker/testnet-envs/.env.corda2 @@ -6,6 +6,8 @@ DRIVER_PORT=9098 DRIVER_NAME=corda-driver-Corda_Network2 RELAY_NAME=relay-corda2 RELAY_PORT=9082 +DB_OPEN_MAX_RETRIES=500 +DB_OPEN_RETRY_BACKOFF_MSEC=10 DOCKER_IMAGE_NAME=ghcr.io/hyperledger-labs/weaver-relay-server DOCKER_TAG=latest EXTERNAL_NETWORK=corda_network2_default diff --git a/weaver/core/relay/docker/testnet-envs/.env.corda2.tls b/weaver/core/relay/docker/testnet-envs/.env.corda2.tls index 0c8183111e4..167d693a1ba 100644 --- a/weaver/core/relay/docker/testnet-envs/.env.corda2.tls +++ b/weaver/core/relay/docker/testnet-envs/.env.corda2.tls @@ -6,6 +6,8 @@ DRIVER_PORT=9098 DRIVER_NAME=corda-driver-Corda_Network2 RELAY_NAME=relay-corda2 RELAY_PORT=9082 +DB_OPEN_MAX_RETRIES=500 +DB_OPEN_RETRY_BACKOFF_MSEC=10 DOCKER_IMAGE_NAME=ghcr.io/hyperledger-labs/weaver-relay-server DOCKER_TAG=latest EXTERNAL_NETWORK=corda_network2_default diff --git a/weaver/core/relay/docker/testnet-envs/.env.n1 b/weaver/core/relay/docker/testnet-envs/.env.n1 index 94f4189536b..26dd3e27cd4 100644 --- a/weaver/core/relay/docker/testnet-envs/.env.n1 +++ b/weaver/core/relay/docker/testnet-envs/.env.n1 @@ -6,6 +6,8 @@ DRIVER_PORT=9090 DRIVER_NAME=fabric-driver-network1 RELAY_NAME=relay-network1 RELAY_PORT=9080 +DB_OPEN_MAX_RETRIES=500 +DB_OPEN_RETRY_BACKOFF_MSEC=10 DOCKER_IMAGE_NAME=ghcr.io/hyperledger-labs/weaver-relay-server DOCKER_TAG=latest EXTERNAL_NETWORK=network1_net diff --git a/weaver/core/relay/docker/testnet-envs/.env.n1.tls b/weaver/core/relay/docker/testnet-envs/.env.n1.tls index e7d63b13f60..bcf327d9c24 100644 --- a/weaver/core/relay/docker/testnet-envs/.env.n1.tls +++ b/weaver/core/relay/docker/testnet-envs/.env.n1.tls @@ -6,6 +6,8 @@ DRIVER_PORT=9090 DRIVER_NAME=fabric-driver-network1 RELAY_NAME=relay-network1 RELAY_PORT=9080 +DB_OPEN_MAX_RETRIES=500 +DB_OPEN_RETRY_BACKOFF_MSEC=10 DOCKER_IMAGE_NAME=ghcr.io/hyperledger-labs/weaver-relay-server DOCKER_TAG=latest EXTERNAL_NETWORK=network1_net diff --git a/weaver/core/relay/docker/testnet-envs/.env.n2 b/weaver/core/relay/docker/testnet-envs/.env.n2 index bbe7c89ecdf..7d59b6ea7c7 100644 --- a/weaver/core/relay/docker/testnet-envs/.env.n2 +++ b/weaver/core/relay/docker/testnet-envs/.env.n2 @@ -6,6 +6,8 @@ DRIVER_PORT=9095 DRIVER_NAME=fabric-driver-network2 RELAY_NAME=relay-network2 RELAY_PORT=9083 +DB_OPEN_MAX_RETRIES=500 +DB_OPEN_RETRY_BACKOFF_MSEC=10 DOCKER_IMAGE_NAME=ghcr.io/hyperledger-labs/weaver-relay-server DOCKER_TAG=latest EXTERNAL_NETWORK=network2_net diff --git a/weaver/core/relay/docker/testnet-envs/.env.n2.tls b/weaver/core/relay/docker/testnet-envs/.env.n2.tls index 9c08d2671de..6e37806a699 100644 --- a/weaver/core/relay/docker/testnet-envs/.env.n2.tls +++ b/weaver/core/relay/docker/testnet-envs/.env.n2.tls @@ -6,6 +6,8 @@ DRIVER_PORT=9095 DRIVER_NAME=fabric-driver-network2 RELAY_NAME=relay-network2 RELAY_PORT=9083 +DB_OPEN_MAX_RETRIES=500 +DB_OPEN_RETRY_BACKOFF_MSEC=10 DOCKER_IMAGE_NAME=ghcr.io/hyperledger-labs/weaver-relay-server DOCKER_TAG=latest EXTERNAL_NETWORK=network2_net diff --git a/weaver/core/relay/scripts/convert-compose.sh b/weaver/core/relay/scripts/convert-compose.sh index ce4d62e40ea..75b9fb723f1 100755 --- a/weaver/core/relay/scripts/convert-compose.sh +++ b/weaver/core/relay/scripts/convert-compose.sh @@ -28,15 +28,19 @@ then uncomment 69,73 docker-compose.yaml uncomment 79,80 docker-compose.yaml uncomment 89 docker-compose.yaml - uncomment 110 docker-compose.yaml - comment 109 docker-compose.yaml + uncomment 96 docker-compose.yaml + uncomment 98 docker-compose.yaml + uncomment 115 docker-compose.yaml + comment 114 docker-compose.yaml else comment 59,61 docker-compose.yaml comment 69,73 docker-compose.yaml comment 79,80 docker-compose.yaml comment 89 docker-compose.yaml - comment 110 docker-compose.yaml - uncomment 109 docker-compose.yaml + comment 96 docker-compose.yaml + comment 98 docker-compose.yaml + comment 115 docker-compose.yaml + uncomment 114 docker-compose.yaml fi rm -f docker-compose.yaml.scriptbak diff --git a/weaver/core/relay/src/db.rs b/weaver/core/relay/src/db.rs index 60cca380591..538efab2a71 100644 --- a/weaver/core/relay/src/db.rs +++ b/weaver/core/relay/src/db.rs @@ -7,6 +7,8 @@ use crate::error::Error; /// Struct for managing all db interactions pub struct Database { pub db_path: String, + pub db_open_max_retries: u32, + pub db_open_retry_backoff_msec: u32, } impl Database { @@ -15,16 +17,16 @@ impl Database { match req_db_result { Ok(db) => Ok(db), Err(error) => { - println!("Db open error: {:?}", error.to_string()); - if retry.clone() >= 10 { + if retry.clone() >= self.db_open_max_retries.clone() { println!("Db open error: {:?}", error); return Err(Error::SledError(error)); } let retry_error = "Resource temporarily unavailable"; return match error.to_string().find(retry_error) { Some(_index) => { - sleep(time::Duration::from_millis(1000)); - println!("Retrying DB open..."); + println!("Db locked temporarily with error: {:?}", error.to_string()); + sleep(time::Duration::from_millis(self.db_open_retry_backoff_msec.clone() as u64)); + println!("Retrying DB open attempt #{:?}...", retry.clone()+1); let db_result = self.open_db(retry+1); db_result }, diff --git a/weaver/core/relay/src/services/data_transfer_service.rs b/weaver/core/relay/src/services/data_transfer_service.rs index 584c886f75d..cc7eb2c93b7 100644 --- a/weaver/core/relay/src/services/data_transfer_service.rs +++ b/weaver/core/relay/src/services/data_transfer_service.rs @@ -39,6 +39,8 @@ impl DataTransfer for DataTransferService { // Database access/storage let remote_db = Database { db_path: conf.get_str("remote_db_path").unwrap(), + db_open_max_retries: conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + db_open_retry_backoff_msec: conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, }; match request_state_helper(remote_db, request_id.to_string(), query, conf.clone()) { Ok(ack) => { @@ -76,6 +78,8 @@ impl DataTransfer for DataTransferService { // Database access/storage let remote_db = Database { db_path: conf.get_str("remote_db_path").unwrap(), + db_open_max_retries: conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + db_open_retry_backoff_msec: conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, }; let result = @@ -114,6 +118,8 @@ impl DataTransfer for DataTransferService { // Database access/storage let db = Database { db_path: conf.get_str("db_path").unwrap(), + db_open_max_retries: conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + db_open_retry_backoff_msec: conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, }; let result = send_state_helper(request_view_payload.state, request_id.to_string(), db); @@ -280,6 +286,8 @@ fn spawn_request_driver_state(query: Query, driver_info: Driver, conf: config::C // Database access/storage let remote_db = Database { db_path: conf.get_str("remote_db_path").unwrap(), + db_open_max_retries: conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + db_open_retry_backoff_msec: conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, }; let error_state = ViewPayload { request_id: query.request_id.to_string(), diff --git a/weaver/core/relay/src/services/event_publish_service.rs b/weaver/core/relay/src/services/event_publish_service.rs index 9d6fe03ac19..ef9a7781d8f 100644 --- a/weaver/core/relay/src/services/event_publish_service.rs +++ b/weaver/core/relay/src/services/event_publish_service.rs @@ -46,6 +46,8 @@ impl EventPublish for EventPublishService { // Database access/storage let remote_db = Database { db_path: conf.get_str("remote_db_path").unwrap(), + db_open_max_retries: conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + db_open_retry_backoff_msec: conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, }; let result = @@ -84,6 +86,8 @@ impl EventPublish for EventPublishService { // Database access/storage let db = Database { db_path: conf.get_str("db_path").unwrap(), + db_open_max_retries: conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + db_open_retry_backoff_msec: conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, }; let result = send_state_helper(request_view_payload, request_id.to_string(), db, conf); @@ -295,6 +299,8 @@ fn spawn_handle_event(state: ViewPayload, publication_spec: EventPublication, re event_id, request_state::Status::EventWritten, conf.get_str("db_path").unwrap(), + conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, message.to_string(), ) } @@ -308,6 +314,8 @@ fn spawn_handle_event(state: ViewPayload, publication_spec: EventPublication, re event_id, request_state::Status::EventWriteError, conf.get_str("db_path").unwrap(), + conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, format!("Write Error: {:?}", e), ) } @@ -329,7 +337,7 @@ async fn handle_event( match result { Ok(driver_info) => { let client = get_driver_client(driver_info).await?; - println!("Sending Received Event to driver: {:?}", state.clone()); + println!("Sending Received Event to driver: {:?}", state.clone().request_id.to_string()); let write_external_state_message: WriteExternalStateMessage = WriteExternalStateMessage { view_payload: Some(state), ctx: Some(ctx), diff --git a/weaver/core/relay/src/services/event_subscribe_service.rs b/weaver/core/relay/src/services/event_subscribe_service.rs index 3ec0feeaf6a..70fc066417e 100644 --- a/weaver/core/relay/src/services/event_subscribe_service.rs +++ b/weaver/core/relay/src/services/event_subscribe_service.rs @@ -40,6 +40,8 @@ impl EventSubscribe for EventSubscribeService { // Database access/storage let remote_db = Database { db_path: conf.get_str("remote_db_path").unwrap(), + db_open_max_retries: conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + db_open_retry_backoff_msec: conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, }; match subscribe_event_helper(remote_db, request_id.to_string(), event_subscription, conf.clone()) { Ok(ack) => { @@ -75,6 +77,8 @@ impl EventSubscribe for EventSubscribeService { // Database access/storage let remote_db = Database { db_path: conf.get_str("remote_db_path").unwrap(), + db_open_max_retries: conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + db_open_retry_backoff_msec: conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, }; let result = @@ -109,8 +113,16 @@ impl EventSubscribe for EventSubscribeService { let conf = self.config_lock.read().await.clone(); // Database access/storage let db_path = conf.get_str("db_path").unwrap(); + let db_open_max_retries = conf.get_int("db_open_max_retries").unwrap_or(500) as u32; + let db_open_retry_backoff_msec = conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32; - let result = send_subscription_status_helper(request_ack, request_id.clone().to_string(), db_path); + let result = send_subscription_status_helper( + request_ack, + request_id.clone().to_string(), + db_path, + db_open_max_retries, + db_open_retry_backoff_msec + ); match result { Ok(_) => println!("Successfully set event subscription status in DB."), @@ -174,6 +186,8 @@ fn spawn_driver_subscribe_event(event_subscription: EventSubscription, driver_in // Database access/storage let remote_db = Database { db_path: conf.get_str("remote_db_path").unwrap(), + db_open_max_retries: conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + db_open_retry_backoff_msec: conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, }; let error_ack = Ack { status: ack::Status::Error as i32, @@ -358,18 +372,24 @@ fn send_subscription_status_helper( request_ack: Ack, request_id: String, db_path: String, + db_open_max_retries: u32, + db_open_retry_backoff_msec: u32, ) -> Result<(), Error> { match ack::Status::from_i32(request_ack.status) { Some(status) => update_event_subscription_status( request_id.to_string(), status, db_path.to_string(), + db_open_max_retries, + db_open_retry_backoff_msec, request_ack.message.to_string(), ), None => update_event_subscription_status( request_id.to_string(), ack::Status::Error, db_path.to_string(), + db_open_max_retries, + db_open_retry_backoff_msec, "Status is not supported or is invalid".to_string(), ), }; diff --git a/weaver/core/relay/src/services/helpers.rs b/weaver/core/relay/src/services/helpers.rs index 87e6a028909..cb1825e0d52 100644 --- a/weaver/core/relay/src/services/helpers.rs +++ b/weaver/core/relay/src/services/helpers.rs @@ -22,6 +22,8 @@ pub fn update_event_subscription_status( curr_request_id: String, new_status: ack::Status, curr_db_path: String, + db_open_max_retries: u32, + db_open_retry_backoff_msec: u32, message: String, ) { let driver_error_constants = fs::read_to_string("./driver/driver-error-constants.json").expect("Unable to read file: ./driver/driver-error-constants.json"); @@ -32,6 +34,8 @@ pub fn update_event_subscription_status( let db = Database { db_path: curr_db_path, + db_open_max_retries: db_open_max_retries, + db_open_retry_backoff_msec: db_open_retry_backoff_msec, }; let event_sub_key = get_event_subscription_key(curr_request_id.clone()); let result = db.get::(event_sub_key.to_string()); @@ -171,7 +175,6 @@ pub fn update_event_subscription_status( .expect("Failed to insert into DB"); println!("Successfully written EventSubscriptionState to database"); println!("{:?}\n", db.get::(event_sub_key.to_string()).unwrap()) - }, Err(e) => { println!("EventSubscription Request not found. Error: {:?}", e); @@ -277,10 +280,14 @@ pub fn update_event_state( event_id: String, new_status: request_state::Status, curr_db_path: String, + db_open_max_retries: u32, + db_open_retry_backoff_msec: u32, message: String, ) { let db = Database { db_path: curr_db_path, + db_open_max_retries: db_open_max_retries, + db_open_retry_backoff_msec: db_open_retry_backoff_msec, }; let event_publish_key = get_event_publication_key(request_id.to_string()); let result = db.get::(event_publish_key.to_string()); @@ -309,7 +316,6 @@ pub fn update_event_state( db.set(&event_publish_key.to_string(), &updated_event_states) .expect("Failed to insert into DB"); println!("Successfully updated EventStates in database"); - println!("{:?}\n", updated_event_states); }, Err(e) => { println!("EventStates not found. Error: {:?}", e); @@ -361,10 +367,14 @@ pub fn mark_event_states_deleted(fetched_event_states: EventStates, request_id: pub fn delete_event_pub_spec( request_id: String, event_pub_spec: EventPublication, - curr_db_path: String + curr_db_path: String, + db_open_max_retries: u32, + db_open_retry_backoff_msec: u32, ) -> u8 { let db = Database { db_path: curr_db_path, + db_open_max_retries: db_open_max_retries, + db_open_retry_backoff_msec: db_open_retry_backoff_msec, }; let mut event_sub_key = get_event_subscription_key(request_id.to_string()); let mut event_sub_state = db.get::(event_sub_key.to_string()) diff --git a/weaver/core/relay/src/services/network_service.rs b/weaver/core/relay/src/services/network_service.rs index 9207123569b..0f4d14e0e26 100644 --- a/weaver/core/relay/src/services/network_service.rs +++ b/weaver/core/relay/src/services/network_service.rs @@ -37,6 +37,8 @@ impl Network for NetworkService { let conf = self.config_lock.read().await.clone(); let db = Database { db_path: conf.get_str("db_path").unwrap(), + db_open_max_retries: conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + db_open_retry_backoff_msec: conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, }; let request_id = request.into_inner().request_id; let result = db.get::(request_id.to_string()); @@ -123,6 +125,8 @@ impl Network for NetworkService { // Database access/storage let db = Database { db_path: conf.get_str("db_path").unwrap(), + db_open_max_retries: conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + db_open_retry_backoff_msec: conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, }; let request_id = Uuid::new_v4(); @@ -201,6 +205,8 @@ impl Network for NetworkService { // Database access/storage let db = Database { db_path: conf.get_str("db_path").unwrap(), + db_open_max_retries: conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + db_open_retry_backoff_msec: conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, }; let request_id = Uuid::new_v4(); @@ -252,6 +258,8 @@ impl Network for NetworkService { let conf = self.config_lock.read().await.clone(); let db = Database { db_path: conf.get_str("db_path").unwrap(), + db_open_max_retries: conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + db_open_retry_backoff_msec: conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, }; let event_sub_key = get_event_subscription_key(request.into_inner().request_id); let result = db.get::(event_sub_key.to_string()); @@ -298,6 +306,8 @@ impl Network for NetworkService { // Database access/storage let db = Database { db_path: conf.get_str("db_path").unwrap(), + db_open_max_retries: conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + db_open_retry_backoff_msec: conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, }; let net_event_sub = request.into_inner().clone(); @@ -305,7 +315,13 @@ impl Network for NetworkService { let request_id = net_event_sub.request_id.to_string(); let requested_unsub_pub_spec = network_event_subscription.event_publication_spec.clone().expect("No event publication spec provided for unsubscription request."); - let delete_pub_spec_status = delete_event_pub_spec(request_id.to_string(), requested_unsub_pub_spec, conf.get_str("db_path").unwrap().to_string()); + let delete_pub_spec_status = delete_event_pub_spec( + request_id.to_string(), + requested_unsub_pub_spec, + conf.get_str("db_path").unwrap().to_string(), + conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32 + ); if delete_pub_spec_status == 0 { let reply = Ack { @@ -372,6 +388,8 @@ impl Network for NetworkService { let conf = self.config_lock.read().await.clone(); let db = Database { db_path: conf.get_str("db_path").unwrap(), + db_open_max_retries: conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + db_open_retry_backoff_msec: conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, }; let request_id = request.into_inner().request_id; let event_publish_key = get_event_publication_key(request_id.to_string()); @@ -528,10 +546,14 @@ fn spawn_send_request( curr_request_id: String, new_status: request_state::Status, curr_db_path: String, + db_open_max_retries: u32, + db_open_retry_backoff_msec: u32, state: Option, ) { let db = Database { db_path: curr_db_path, + db_open_max_retries: db_open_max_retries, + db_open_retry_backoff_msec: db_open_retry_backoff_msec, }; let target: RequestState = RequestState { status: new_status as i32, @@ -584,12 +606,16 @@ fn spawn_send_request( request_id.to_string(), request_state::Status::Pending, db_path.to_string(), + conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, None, ), ack::Status::Error => update_request_status( request_id.to_string(), request_state::Status::Error, db_path.to_string(), + conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, Some(request_state::State::Error( ack_response_into_inner.message.to_string(), )), @@ -599,6 +625,8 @@ fn spawn_send_request( request_id.to_string(), request_state::Status::Error, db_path.to_string(), + conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, Some(request_state::State::Error( "Status is not supported or is invalid".to_string(), )), @@ -609,6 +637,8 @@ fn spawn_send_request( request_id.to_string(), request_state::Status::Error, db_path.to_string(), + conf.get_int("db_open_max_retries").unwrap_or(500) as u32, + conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32, Some(request_state::State::Error(format!("{:?}", result_error))), ), } @@ -674,6 +704,8 @@ fn spawn_send_event_subscription_request( // Spawning new thread to make the subscribe_event_call to remote relay tokio::spawn(async move { let db_path = conf.get_str("db_path").unwrap(); + let db_open_max_retries = conf.get_int("db_open_max_retries").unwrap_or(500) as u32; + let db_open_retry_backoff_msec = conf.get_int("db_open_retry_backoff_msec").unwrap_or(10) as u32; // Iterate through the relay entries in the configuration to find a match let relays_table = conf.get_table("relays").unwrap(); @@ -707,12 +739,16 @@ fn spawn_send_event_subscription_request( request_id.to_string(), status, db_path.to_string(), + db_open_max_retries.clone(), + db_open_retry_backoff_msec.clone(), ack_response_into_inner.message.to_string(), ), None => update_event_subscription_status( request_id.to_string(), ack::Status::Error, db_path.to_string(), + db_open_max_retries.clone(), + db_open_retry_backoff_msec.clone(), "Status is not supported or is invalid".to_string(), ), } @@ -721,6 +757,8 @@ fn spawn_send_event_subscription_request( request_id.to_string(), ack::Status::Error, db_path.to_string(), + db_open_max_retries.clone(), + db_open_retry_backoff_msec.clone(), format!("{:?}", result_error).to_string(), ), } diff --git a/weaver/sdks/corda/src/main/kotlin/org/hyperledger/cacti/weaver/corda/sdk/InteroperableHelper.kt b/weaver/sdks/corda/src/main/kotlin/org/hyperledger/cacti/weaver/corda/sdk/InteroperableHelper.kt index f197ae587dd..5410c8434c5 100644 --- a/weaver/sdks/corda/src/main/kotlin/org/hyperledger/cacti/weaver/corda/sdk/InteroperableHelper.kt +++ b/weaver/sdks/corda/src/main/kotlin/org/hyperledger/cacti/weaver/corda/sdk/InteroperableHelper.kt @@ -329,7 +329,7 @@ class InteroperableHelper { if (retryCount > num) { Left(Error("Error: Timeout, remote network took longer than $num seconds to respond")) } else { - delay(1000L) + delay(500L) val requestState = async { client.getState(requestId) }.await() logger.debug("Response from getState: $requestState") when (requestState.status.toString()) { diff --git a/weaver/sdks/fabric/interoperation-node-sdk/src/Relay.ts b/weaver/sdks/fabric/interoperation-node-sdk/src/Relay.ts index d85a26a9f5c..b94d2eb3d94 100644 --- a/weaver/sdks/fabric/interoperation-node-sdk/src/Relay.ts +++ b/weaver/sdks/fabric/interoperation-node-sdk/src/Relay.ts @@ -25,6 +25,8 @@ class Relay { _endPoint = ""; _useTls = false; _tlsRootCACerts = ''; + // TODO: make this configurable parameter + backOffMSec = 500; /** * Construct a Relay object with the given url. A Relay object @@ -172,7 +174,7 @@ class Relay { if (dateObj.getTime() < Date.now()) { throw new Error("Timeout: State is still pending."); } else { - await helpers.delay(1000); + await helpers.delay(this.backOffMSec); return await this.recursiveState(requestID, dateObj); } } else { @@ -306,7 +308,7 @@ class Relay { if (dateObj.getTime() < Date.now()) { throw new Error("Timeout: State is still pending."); } else { - await helpers.delay(1000); + await helpers.delay(this.backOffMSec); return await this.recursiveEventSubscriptionState(requestID, dateObj); } } else {