Skip to content

Commit

Permalink
feat(relay): configurable db_open retry mechanism added and in driver
Browse files Browse the repository at this point in the history
    To manage concurrent accesses to a LevelDB instance so that protocols
    don't fail because of temporary resource contention, added two env
    variables in driver and config variables in relay to configure retry
    mechanism while opening database:

    - Max number of retries
    - Backoff time in milli seconds.

    Additionally upgraded go versions to v1.20 in publish workflows.

Signed-off-by: Sandeep Nishad <[email protected]>
  • Loading branch information
sandeepnRES committed Apr 14, 2023
1 parent f006df8 commit ddf95a1
Show file tree
Hide file tree
Showing 35 changed files with 206 additions and 42 deletions.
10 changes: 5 additions & 5 deletions weaver/.github/workflows/deploy_go-pkgs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: '1.20.2'

- name: Set current date as env
run: echo "RELEASE_DATE=$(date +%b\ %d,\ %Y)" >> $GITHUB_ENV
Expand Down Expand Up @@ -129,7 +129,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: '1.20.2'

- name: Set current date as env
run: echo "RELEASE_DATE=$(date +%b\ %d,\ %Y)" >> $GITHUB_ENV
Expand Down Expand Up @@ -183,7 +183,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: '1.20.2'

- name: Set current date as env
run: echo "RELEASE_DATE=$(date +%b\ %d,\ %Y)" >> $GITHUB_ENV
Expand Down Expand Up @@ -237,7 +237,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: '1.20.2'

- name: Set current date as env
run: echo "RELEASE_DATE=$(date +%b\ %d,\ %Y)" >> $GITHUB_ENV
Expand Down Expand Up @@ -291,7 +291,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: '1.20.2'

- name: Set current date as env
run: echo "RELEASE_DATE=$(date +%b\ %d,\ %Y)" >> $GITHUB_ENV
Expand Down
2 changes: 2 additions & 0 deletions weaver/core/drivers/fabric-driver/.env.docker.template
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ INTEROP_CHAINCODE=<interop-chaincode-name>
DB_PATH=driverdbs
WALLET_PATH=
TLS_CREDENTIALS_DIR=<dir-with-tls-cert-and-key>
LEVELDB_LOCKED_MAX_RETRIES=<max-attempts-in-retry>
LEVELDB_LOCKED_RETRY_BACKOFF_MSEC=<retry-back-off-time-in-ms>
DOCKER_IMAGE_NAME=ghcr.io/hyperledger-labs/weaver-fabric-driver
DOCKER_TAG=1.4.0
EXTERNAL_NETWORK=<docker-bridge-network>
Expand Down
2 changes: 2 additions & 0 deletions weaver/core/drivers/fabric-driver/.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ MOCK=false
DB_PATH=driverdbs
WALLET_PATH=
DEBUG=true
LEVELDB_LOCKED_MAX_RETRIES=
LEVELDB_LOCKED_RETRY_BACKOFF_MSEC=
2 changes: 2 additions & 0 deletions weaver/core/drivers/fabric-driver/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ services:
- DRIVER_TLS_KEY_PATH=${DRIVER_TLS_KEY_PATH}
- WALLET_PATH=${WALLET_PATH}
- DEBUG=false
- LEVELDB_LOCKED_MAX_RETRIES=${LEVELDB_LOCKED_MAX_RETRIES}
- LEVELDB_LOCKED_RETRY_BACKOFF_MSEC=${LEVELDB_LOCKED_RETRY_BACKOFF_MSEC}
volumes:
- ${CONNECTION_PROFILE}:/fabric-driver/ccp.json
- ${DRIVER_CONFIG}:/fabric-driver/config.json
Expand Down
12 changes: 10 additions & 2 deletions weaver/core/drivers/fabric-driver/server/dbConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class LevelDBConnector implements DBConnector {
DB_TYPE: string = "Level";
DB_NAME: string;
dbHandle: any;
dbOpenMaxRetries: number;
dbRetryBackoffTime: number;

constructor(
dbName: string
Expand All @@ -53,15 +55,20 @@ class LevelDBConnector implements DBConnector {
}
this.DB_NAME = dbName;
this.dbHandle = new Level(path.join(process.env.DB_PATH ? process.env.DB_PATH : "./driverdbs", dbName), { valueEncoding: 'json' });
// Retry max attempts, default 250, making it 5 seconds for retries
this.dbOpenMaxRetries = process.env.LEVELDB_LOCKED_MAX_RETRIES ? parseInt(process.env.LEVELDB_LOCKED_MAX_RETRIES) : 250;
// Retry back off time in ms, default 20ms
this.dbRetryBackoffTime = process.env.LEVELDB_LOCKED_RETRY_BACKOFF_MSEC ? parseInt(process.env.LEVELDB_LOCKED_RETRY_BACKOFF_MSEC) : 20;
}

async open(
i: number = 0
): Promise<boolean> {
logger.debug(`attempt #${i} to open database ${this.DB_NAME}`)
try {
await this.dbHandle.open();
} catch (error: any) {
if (i>=10) {
if (i >= this.dbOpenMaxRetries) {
logger.error(`failed to open database connection with error: ${error.toString()}`);
if (error.code == 'LEVEL_DATABASE_NOT_OPEN' && error.cause && error.cause.code == 'LEVEL_LOCKED') {
throw new DBLockedError(error.toString());
Expand All @@ -70,7 +77,8 @@ class LevelDBConnector implements DBConnector {
}
}
else {
await delay(1000);
logger.debug(`failed to open database connection with error: ${error.toString()}`);
await delay(this.dbRetryBackoffTime);
await this.open(i+1);
}
}
Expand Down
25 changes: 14 additions & 11 deletions weaver/core/drivers/fabric-driver/server/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ async function addEventSubscription(
var subscriptionsSerialized: string = await db.read(key) as string;
subscriptions = JSON.parse(subscriptionsSerialized);

logger.debug(`subscriptions.length: ${subscriptions.length}`);
logger.debug(`existing subscriptions.length: ${subscriptions.length}`);
// check if the event to be subscribed is already present in the DB
for (const subscriptionSerialized of subscriptions) {
var subscription: queryPb.Query = queryPb.Query.deserializeBinary(Buffer.from(subscriptionSerialized, 'base64'));
Expand Down Expand Up @@ -200,14 +200,14 @@ async function addEventSubscription(
}
}

logger.debug(`subscriptions.length: ${subscriptions.length}`);
logger.debug(`new subscriptions.length: ${subscriptions.length}`);
subscriptionsSerialized = JSON.stringify(subscriptions);
// insert the value against key in the DB (it can be the scenario of a new key addition, or update to the value of an existing key)
await db.insert(key, subscriptionsSerialized);
await db.close();

// TODO: register the event with fabric sdk
logger.info(`end addEventSubscription() .. requestId: ${query.getRequestId()}`);
logger.debug(`end addEventSubscription() .. requestId: ${query.getRequestId()}`);
return query.getRequestId();

} catch(error: any) {
Expand Down Expand Up @@ -269,7 +269,7 @@ const deleteEventSubscription = async (
}

await db.close();
logger.info(`end deleteEventSubscription() .. retVal: ${JSON.stringify(retVal.toObject())}`);
logger.debug(`end deleteEventSubscription() .. retVal: ${JSON.stringify(retVal.toObject())}`);
return retVal;
} catch(error: any) {
logger.error(`Error during delete: ${error.toString()}`);
Expand All @@ -280,10 +280,11 @@ const deleteEventSubscription = async (

function filterEventMatcher(keySerialized: string, eventMatcher: eventsPb.EventMatcher) : boolean {
var item: eventsPb.EventMatcher = eventsPb.EventMatcher.deserializeBinary(Buffer.from(keySerialized, 'base64'));
logger.debug(`eventMatcher from db: ${JSON.stringify(item.toObject())}`)
if ((eventMatcher.getEventClassId() == '*' || eventMatcher.getEventClassId() == item.getEventClassId()) &&
(eventMatcher.getTransactionContractId() == '*' || eventMatcher.getTransactionContractId() == item.getTransactionContractId()) &&
(eventMatcher.getTransactionLedgerId() == '*' || eventMatcher.getTransactionLedgerId() == item.getTransactionLedgerId()) &&
(eventMatcher.getTransactionFunc() == '*' || eventMatcher.getTransactionFunc() == item.getTransactionFunc().toLowerCase())) {
(eventMatcher.getTransactionFunc() == '*' || eventMatcher.getTransactionFunc().toLowerCase() == item.getTransactionFunc().toLowerCase())) {

return true;
} else {
Expand All @@ -294,7 +295,7 @@ function filterEventMatcher(keySerialized: string, eventMatcher: eventsPb.EventM
async function lookupEventSubscriptions(
eventMatcher: eventsPb.EventMatcher
): Promise<Array<queryPb.Query>> {
logger.info(`finding the subscriptions with eventMatcher: ${JSON.stringify(eventMatcher.toObject())}`);
logger.debug(`finding the subscriptions with eventMatcher: ${JSON.stringify(eventMatcher.toObject())}`);
var subscriptions: Array<string>;
var returnSubscriptions: Array<queryPb.Query> = new Array<queryPb.Query>();
let db: DBConnector;
Expand All @@ -313,8 +314,8 @@ async function lookupEventSubscriptions(
}
}

logger.debug(`returnSubscriptions.length: ${returnSubscriptions.length}`);
logger.info(`end lookupEventSubscriptions()`);
logger.info(`found ${returnSubscriptions.length} matching subscriptions`);
logger.debug(`end lookupEventSubscriptions()`);
await db.close();
return returnSubscriptions;

Expand All @@ -324,7 +325,7 @@ async function lookupEventSubscriptions(
if (error instanceof DBKeyNotFoundError) {
// case of read failing due to key not found
returnSubscriptions = new Array<queryPb.Query>();
logger.debug(`returnSubscriptions.length: ${returnSubscriptions.length}`);
logger.info(`found ${returnSubscriptions.length} matching subscriptions`);
return returnSubscriptions;
} else {
// case of read failing due to some other issue
Expand All @@ -347,6 +348,7 @@ async function readAllEventMatchers(): Promise<Array<eventsPb.EventMatcher>> {
const eventMatcher = eventsPb.EventMatcher.deserializeBinary(Uint8Array.from(Buffer.from(key, 'base64')))
returnMatchers.push(eventMatcher)
}
logger.info(`found ${returnMatchers.length} eventMatchers`);
logger.debug(`end readAllEventMatchers()`);
await db.close();
return returnMatchers;
Expand Down Expand Up @@ -434,7 +436,7 @@ async function writeExternalStateHelper(
ccArgs: ccArgsStr,
contractName: ctx.getContractId()
}
logger.info(`invokeObject.ccArgs: ${invokeObject.ccArgs}`)
logger.info(`writing external state to contract: ${ctx.getContractId()} with function: ${ctx.getFunc()}, and args: ${invokeObject.ccArgs} on channel: ${ctx.getLedgerId()}`);

const [ response, responseError ] = await handlePromise(InteroperableHelper.submitTransactionWithRemoteViews(
interopContract,
Expand All @@ -450,7 +452,8 @@ async function writeExternalStateHelper(
gateway.disconnect();
throw responseError;
}
gateway.disconnect();
logger.debug(`write successful`);
gateway.disconnect();
} else {
const errorString: string = `erroneous viewPayload identified in WriteExternalState processing`;
logger.error(`error viewPayload.getError(): ${JSON.stringify(viewPayload.getError())}`);
Expand Down
14 changes: 8 additions & 6 deletions weaver/core/drivers/fabric-driver/server/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const initBlockEventListenerForChannel = async (
// const responsePayload = transaction.payload.action.proposal_response_payload.extension.events.payload;
// Get transaction function name: first argument according to convention
const chaincodeFunc = transaction.payload.chaincode_proposal_payload.input.chaincode_spec.input.args[0].toString();
logger.info(`Trying to find match for channel: ${channelId}, chaincode: ${chaincodeId}, function: ${chaincodeFunc}`);
logger.info(`Event Listener: Trying to find match for channel: ${channelId}, chaincode: ${chaincodeId}, function: ${chaincodeFunc}`);
// Find all matching event subscriptions stored in the database
let eventMatcher = new events_pb.EventMatcher();
eventMatcher.setEventType(events_pb.EventType.LEDGER_STATE);
Expand All @@ -68,14 +68,15 @@ const initBlockEventListenerForChannel = async (
} catch(error) {
let errorString: string = error.toString();
if (!(error instanceof DBLockedError)) { // Check if contention was preventing DB access
console.error(`Event Listener: ${error}`)
throw(error);
}
await new Promise(f => setTimeout(f, 2000)); // Sleep 2 seconds
}
}
// Iterate through the view requests in the matching event subscriptions
eventSubscriptionQueries.forEach(async (eventSubscriptionQuery: query_pb.Query) => {
logger.info(`Generating view and collecting proof for channel: ${channelId}, chaincode: ${chaincodeId}, function: ${chaincodeFunc}, responsePayload: ${responsePayload.toString()}`);
logger.info(`Event Listener: Generating view and collecting proof for channel: ${channelId}, chaincode: ${chaincodeId}, function: ${chaincodeFunc}, responsePayload: ${responsePayload.toString()}`);
// Trigger proof collection
const [result, invokeError] = await handlePromise(
invoke(
Expand All @@ -90,7 +91,7 @@ const initBlockEventListenerForChannel = async (
const client = getRelayClientForEventPublish();
const viewPayload = packageFabricView(eventSubscriptionQuery, result);

logger.info('Sending block event');
logger.info('Event Listener: Sending block event');
// Sending the Fabric event to the relay.
client.sendDriverState(viewPayload, relayCallback);
}
Expand All @@ -115,7 +116,7 @@ const initContractEventListener = (
chaincodeId: string,
): any => {
const listener: ContractListener = async (event: ContractEvent) => {
logger.info(`Trying to find match for channel: ${channelId}, chaincode: ${chaincodeId}, event class: ${event.eventName}`);
logger.info(`Event Listener: Trying to find match for channel: ${channelId}, chaincode: ${chaincodeId}, event class: ${event.eventName}`);
// Find all matching event subscriptions stored in the database
let eventMatcher = new events_pb.EventMatcher();
eventMatcher.setEventType(events_pb.EventType.LEDGER_STATE);
Expand All @@ -131,14 +132,15 @@ const initContractEventListener = (
} catch(error) {
let errorString: string = error.toString();
if (!(error instanceof DBLockedError)) { // Check if contention was preventing DB access
logger.error(`Event Listener: ${error}`);
throw(error);
}
await new Promise(f => setTimeout(f, 2000)); // Sleep 2 seconds
}
}
// Iterate through the view requests in the matching event subscriptions
eventSubscriptionQueries.forEach(async (eventSubscriptionQuery: query_pb.Query) => {
logger.info(`Generating view and collecting proof for event class: ${event.eventName}, channel: ${channelId}, chaincode: ${chaincodeId}, event.payload: ${event.payload.toString()}`);
logger.info(`Event Listener: Generating view and collecting proof for event class: ${event.eventName}, channel: ${channelId}, chaincode: ${chaincodeId}, event.payload: ${event.payload.toString()}`);
// Trigger proof collection
const [result, invokeError] = await handlePromise(
invoke(
Expand All @@ -153,7 +155,7 @@ const initContractEventListener = (
const client = getRelayClientForEventPublish();
const viewPayload = packageFabricView(eventSubscriptionQuery, result);

logger.info('Sending contract event');
logger.info('Event Listener: Sending contract event');
// Sending the Fabric event to the relay.
client.sendDriverState(viewPayload, relayCallback);
}
Expand Down
2 changes: 1 addition & 1 deletion weaver/core/network/fabric-interop-cc/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.16 AS build
FROM golang:1.20 AS build

COPY . /fabric-interop-cc
WORKDIR /fabric-interop-cc
Expand Down
2 changes: 2 additions & 0 deletions weaver/core/relay/.env.template.2
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ DRIVER_PORT=<driver-server-port>
DRIVER_NAME=<driver-name>
RELAY_NAME=<docker-relay-name>
RELAY_PORT=<relay-grpc-server-port>
DB_OPEN_MAX_RETRIES=<max-retries-for-opening-db-when-locked>
DB_OPEN_RETRY_BACKOFF_MSEC=<retry-backoff-time-for-opening-db-when-locked>
DOCKER_IMAGE_NAME=ghcr.io/hyperledger-labs/weaver-relay-server
DOCKER_TAG=1.4.2
EXTERNAL_NETWORK=<docker-bridge-network>
Expand Down
4 changes: 4 additions & 0 deletions weaver/core/relay/config/Corda_Relay.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ hostname="localhost"
db_path="db/Corda_Relay/requests"
# This will be replaced by the task queue.
remote_db_path="db/Corda_Relay/remote_request"
# max retries opening sled db if it is locked
db_open_max_retries=500
# retry back off time in ms if sled db is locked
db_open_retry_backoff_msec=10

# FOR TLS
cert_path="credentials/fabric_cert.pem"
Expand Down
4 changes: 4 additions & 0 deletions weaver/core/relay/config/Corda_Relay2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ hostname="localhost"
db_path="db/Corda_Relay2/requests"
# This will be replaced by the task queue.
remote_db_path="db/Corda_Relay2/remote_request"
# max retries opening sled db if it is locked
db_open_max_retries=500
# retry back off time in ms if sled db is locked
db_open_retry_backoff_msec=10

# FOR TLS
cert_path="credentials/fabric_cert.pem"
Expand Down
4 changes: 4 additions & 0 deletions weaver/core/relay/config/Dummy_Relay.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ hostname="localhost"
db_path="db/Dummy_Relay/requests"
# This will be replaced by the task queue.
remote_db_path="db/Dummy_Relay/remote_request"
# max retries opening sled db if it is locked
db_open_max_retries=500
# retry back off time in ms if sled db is locked
db_open_retry_backoff_msec=10

# FOR TLS
cert_path="credentials/fabric_cert.pem"
Expand Down
4 changes: 4 additions & 0 deletions weaver/core/relay/config/Dummy_Relay_tls.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ hostname="localhost"
db_path="db/Dummy_Relay_tls/requests"
# This will be replaced by the task queue.
remote_db_path="db/Dummy_Relay_tls/remote_request"
# max retries opening sled db if it is locked
db_open_max_retries=500
# retry back off time in ms if sled db is locked
db_open_retry_backoff_msec=10

# FOR TLS
cert_path="credentials/fabric_cert.pem"
Expand Down
4 changes: 4 additions & 0 deletions weaver/core/relay/config/Fabric_Relay.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ hostname="localhost"
db_path="db/Fabric_Relay/requests"
# This will be replaced by the task queue.
remote_db_path="db/Fabric_Relay/remote_request"
# max retries opening sled db if it is locked
db_open_max_retries=500
# retry back off time in ms if sled db is locked
db_open_retry_backoff_msec=10


# FOR TLS
Expand Down
4 changes: 4 additions & 0 deletions weaver/core/relay/config/Fabric_Relay2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ hostname="localhost"
db_path="db/Fabric_Relay2/requests"
# This will be replaced by the task queue.
remote_db_path="db/Fabric_Relay2/remote_request"
# max retries opening sled db if it is locked
db_open_max_retries=500
# retry back off time in ms if sled db is locked
db_open_retry_backoff_msec=10


# FOR TLS
Expand Down
4 changes: 4 additions & 0 deletions weaver/core/relay/config/Settings.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ hostname="localhost"
db_path="db/requests"
# This will be replaced by the task queue.
remote_db_path="db/remote_requests"
# max retries opening sled db if it is locked
db_open_max_retries=500
# retry back off time in ms if sled db is locked
db_open_retry_backoff_msec=10

[networks]
[networks.Fabric_Network]
Expand Down
5 changes: 5 additions & 0 deletions weaver/core/relay/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ services:
# working ok, or if you want to add additional assets such as
# custom configurations and additional remote relay definitions
#
# - DB_OPEN_MAX_RETRIES=${DB_OPEN_MAX_RETRIES}
# max retries opening sled db if it is locked
# - DB_OPEN_RETRY_BACKOFF_MSEC=${DB_OPEN_RETRY_BACKOFF_MSEC}
# retries back off time for opening sled db if it is locked
#
volumes:
#
# Uncomment these two files if you want to mount your specialised
Expand Down
2 changes: 2 additions & 0 deletions weaver/core/relay/docker/server.template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ port="${RELAY_PORT}"
hostname="0.0.0.0"
db_path="db/${RELAY_NAME}/requests"
remote_db_path="db/${RELAY_NAME}/remote_request"
db_open_max_retries="${DB_OPEN_MAX_RETRIES}"
db_open_retry_backoff_msec="${DB_OPEN_RETRY_BACKOFF_MSEC}"
cert_path="${RELAY_TLS_CERT_PATH}"
key_path="${RELAY_TLS_KEY_PATH}"
tls="${RELAY_TLS}"
Expand Down
Loading

0 comments on commit ddf95a1

Please sign in to comment.