Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: no halting 🚀 #178

Merged
merged 8 commits into from
Nov 17, 2022
9 changes: 9 additions & 0 deletions schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,12 @@ type Contract @entity {
storeMessage: StoreContractMessage!
instantiateMessage: InstantiateContractMessage!
}

type UnprocessedEntity @entity {
id: ID!
error: String!
event: Event
message: Message
transaction: Transaction
block: Block
}
6 changes: 5 additions & 1 deletion src/mappings/authz/exec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import {messageId} from "../utils";
import {attemptHandling, messageId, unprocessedEventHandler} from "../utils";
import {AuthzExecMsg} from "../types";
import {CosmosMessage} from "@subql/types-cosmos";

import {AuthzExec, AuthzExecMessage, Message} from "../../types";
import allModuleTypes from "../../cosmjs/proto";

export async function handleAuthzExec(msg: CosmosMessage<AuthzExecMsg>): Promise<void> {
await attemptHandling(msg, _handleAuthzExec, unprocessedEventHandler);
}

async function _handleAuthzExec(msg: CosmosMessage<AuthzExecMsg>): Promise<void> {
logger.info(`[handleAuthzExec] (tx ${msg.tx.hash}): indexing message ${msg.idx + 1} / ${msg.tx.decodedTx.body.messages.length}`);
logger.debug(`[handleAuthzExec] (msg.msg): ${JSON.stringify(msg.msg, null, 2)}`);

Expand Down
17 changes: 15 additions & 2 deletions src/mappings/bank/balanceChange.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import {CosmosEvent} from "@subql/types-cosmos";
import {NativeBalanceChange, Transaction} from "../../types";
import {checkBalancesAccount, messageId} from "../utils";
import {
attemptHandling,
checkBalancesAccount,
messageId,
unprocessedEventHandler
} from "../utils";
import {parseCoins} from "../../cosmjs/utils";

export async function saveNativeBalanceEvent(id: string, address: string, amount: bigint, denom: string, event: CosmosEvent) {
Expand Down Expand Up @@ -28,6 +33,14 @@ async function saveNativeFeesEvent(event: CosmosEvent) {
}

export async function handleNativeBalanceDecrement(event: CosmosEvent): Promise<void> {
await attemptHandling(event, _handleNativeBalanceDecrement, unprocessedEventHandler);
}

export async function handleNativeBalanceIncrement(event: CosmosEvent): Promise<void> {
await attemptHandling(event, _handleNativeBalanceDecrement, unprocessedEventHandler);
}

async function _handleNativeBalanceDecrement(event: CosmosEvent): Promise<void> {
logger.info(`[handleNativeBalanceDecrement] (tx ${event.tx.hash}): indexing event ${event.idx + 1} / ${event.tx.tx.events.length}`);
logger.debug(`[handleNativeBalanceDecrement] (event.event): ${JSON.stringify(event.event, null, 2)}`);
logger.debug(`[handleNativeBalanceDecrement] (event.log): ${JSON.stringify(event.log, null, 2)}`);
Expand Down Expand Up @@ -65,7 +78,7 @@ export async function handleNativeBalanceDecrement(event: CosmosEvent): Promise<
await saveNativeFeesEvent(event);
}

export async function handleNativeBalanceIncrement(event: CosmosEvent): Promise<void> {
async function _handleNativeBalanceIncrement(event: CosmosEvent): Promise<void> {
logger.info(`[handleNativeBalanceIncrement] (tx ${event.tx.hash}): indexing event ${event.idx + 1} / ${event.tx.tx.events.length}`);
logger.debug(`[handleNativeBalanceIncrement] (event.event): ${JSON.stringify(event.event, null, 2)}`);
logger.debug(`[handleNativeBalanceIncrement] (event.log): ${JSON.stringify(event.log, null, 2)}`);
Expand Down
6 changes: 5 additions & 1 deletion src/mappings/bank/transfer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import {CosmosEvent, CosmosMessage} from "@subql/types-cosmos";
import {NativeTransferMsg} from "../types";
import {messageId} from "../utils";
import {attemptHandling, messageId, unprocessedEventHandler} from "../utils";
import {NativeTransfer} from "../../types";

export async function handleNativeTransfer(event: CosmosEvent): Promise<void> {
await attemptHandling(event, _handleNativeTransfer, unprocessedEventHandler);
}

async function _handleNativeTransfer(event: CosmosEvent): Promise<void> {
const msg: CosmosMessage<NativeTransferMsg> = event.msg;
logger.info(`[handleNativeTransfer] (tx ${msg.tx.hash}): indexing message ${msg.idx + 1} / ${msg.tx.decodedTx.body.messages.length}`);
logger.debug(`[handleNativeTransfer] (msg.msg): ${JSON.stringify(msg.msg, null, 2)}`);
Expand Down
16 changes: 14 additions & 2 deletions src/mappings/dist/rewards.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
import {CosmosEvent, CosmosMessage} from "@subql/types-cosmos";
import {DistDelegatorClaimMsg} from "../types";
import {messageId} from "../utils";
import {attemptHandling, messageId, unprocessedEventHandler} from "../utils";
import {DistDelegatorClaim} from "../../types";
import {parseCoins} from "../../cosmjs/utils";

export async function handleDistDelegatorClaim(event: CosmosEvent): Promise<void> {
await attemptHandling(event,
_handleDistDelegatorClaim,
unprocessedEventHandler);
}

export async function handleDelegatorWithdrawRewardEvent(event: CosmosEvent): Promise<void> {
await attemptHandling(event,
_handleDelegatorWithdrawRewardEvent,
unprocessedEventHandler);
}

async function _handleDistDelegatorClaim(event: CosmosEvent): Promise<void> {
const msg: CosmosMessage<DistDelegatorClaimMsg> = event.msg;
logger.info(`[handleDistDelegatorClaim] (tx ${msg.tx.hash}): indexing DistDelegatorClaim ${messageId(msg)}`);
logger.debug(`[handleDistDelegatorClaim] (event.msg.msg): ${JSON.stringify(msg.msg, null, 2)}`);
Expand Down Expand Up @@ -36,7 +48,7 @@ export async function handleDistDelegatorClaim(event: CosmosEvent): Promise<void
await claim.save();
}

export async function handleDelegatorWithdrawRewardEvent(event: CosmosEvent): Promise<void> {
async function _handleDelegatorWithdrawRewardEvent(event: CosmosEvent): Promise<void> {
logger.debug(`[handleDelegateWithdrawRewardEvent] (event.event): ${JSON.stringify(event.event, null, 2)}`);
logger.debug(`[handleDelegateWithdrawRewardEvent] (event.log): ${JSON.stringify(event.log, null, 2)}`);

Expand Down
6 changes: 5 additions & 1 deletion src/mappings/gov/votes.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import {CosmosEvent, CosmosMessage} from "@subql/types-cosmos";
import {GovProposalVoteMsg} from "../types";
import {messageId} from "../utils";
import {attemptHandling, messageId, unprocessedEventHandler} from "../utils";
import {GovProposalVote, GovProposalVoteOption} from "../../types";

export async function handleGovProposalVote(event: CosmosEvent): Promise<void> {
await attemptHandling(event, _handleGovProposalVote, unprocessedEventHandler);
}

async function _handleGovProposalVote(event: CosmosEvent): Promise<void> {
const msg: CosmosMessage<GovProposalVoteMsg> = event.msg;
logger.info(`[handleGovProposalVote] (tx ${msg.tx.hash}): indexing GovProposalVote ${messageId(msg)}`);
logger.debug(`[handleGovProposalVote] (event.msg.msg): ${JSON.stringify(msg.msg, null, 2)}`);
Expand Down
6 changes: 5 additions & 1 deletion src/mappings/ibc/transfer.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import {CosmosEvent} from "@subql/types-cosmos";
import {IbcTransfer} from "../../types";
import {messageId} from "../utils";
import {attemptHandling, messageId, unprocessedEventHandler} from "../utils";

export async function handleIBCTransfer(event: CosmosEvent): Promise<void> {
await attemptHandling(event, _handleIBCTransfer, unprocessedEventHandler);
}

async function _handleIBCTransfer(event: CosmosEvent): Promise<void> {
const msg = event.msg;
logger.info(`[handleIBCTransfer] (tx ${msg.tx.hash}): indexing message ${msg.idx + 1} / ${msg.tx.decodedTx.body.messages.length}`);
logger.debug(`[handleIBCTransfer] (msg.msg): ${JSON.stringify(msg.msg, null, 2)}`);
Expand Down
44 changes: 40 additions & 4 deletions src/mappings/primitives.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,33 @@
import {CosmosBlock, CosmosEvent, CosmosMessage, CosmosTransaction} from "@subql/types-cosmos";
import {Block, Event, Message, Transaction, TxStatus} from "../types";
import {messageId} from "./utils";
import {
attemptHandling,
messageId,
primitivesFromMsg,
primitivesFromTx,
trackUnprocessed,
unprocessedEventHandler
} from "./utils";
import {createHash} from "crypto";
import {toBech32} from "@cosmjs/encoding";

export async function handleBlock(block: CosmosBlock): Promise<void> {
await attemptHandling(block, _handleBlock, _handleBlockError);
}

export async function handleTransaction(tx: CosmosTransaction): Promise<void> {
await attemptHandling(tx, _handleTransaction, _handleTransactionError);
}

export async function handleMessage(msg: CosmosMessage): Promise<void> {
await attemptHandling(msg, _handleMessage, _handleMessageError);
}

export async function handleEvent(event: CosmosEvent): Promise<void> {
await attemptHandling(event, _handleEvent, unprocessedEventHandler);
}

async function _handleBlock(block: CosmosBlock): Promise<void> {
logger.info(`[handleBlock] (block.header.height): indexing block ${block.block.header.height}`);

const {id, header: {chainId, height, time}} = block.block;
Expand All @@ -19,7 +42,7 @@ export async function handleBlock(block: CosmosBlock): Promise<void> {
await blockEntity.save();
}

export async function handleTransaction(tx: CosmosTransaction): Promise<void> {
async function _handleTransaction(tx: CosmosTransaction): Promise<void> {
logger.info(`[handleTransaction] (block ${tx.block.block.header.height}): indexing transaction ${tx.idx + 1} / ${tx.block.txs.length}`);
logger.debug(`[handleTransaction] (tx.decodedTx): ${JSON.stringify(tx.decodedTx, null, 2)}`);
logger.debug(`[handleTransaction] (tx.tx.log): ${tx.tx.log}`);
Expand Down Expand Up @@ -67,7 +90,7 @@ export async function handleTransaction(tx: CosmosTransaction): Promise<void> {
await txEntity.save();
}

export async function handleMessage(msg: CosmosMessage): Promise<void> {
async function _handleMessage(msg: CosmosMessage): Promise<void> {
logger.info(`[handleMessage] (tx ${msg.tx.hash}): indexing message ${msg.idx + 1} / ${msg.tx.decodedTx.body.messages.length}`);
logger.debug(`[handleMessage] (msg.msg): ${JSON.stringify(msg.msg, null, 2)}`);
delete msg.msg?.decodedMsg?.wasmByteCode;
Expand All @@ -83,7 +106,7 @@ export async function handleMessage(msg: CosmosMessage): Promise<void> {
await msgEntity.save();
}

export async function handleEvent(event: CosmosEvent): Promise<void> {
async function _handleEvent(event: CosmosEvent): Promise<void> {
logger.info(`[handleEvent] (tx ${event.tx.hash}): indexing event ${event.idx + 1} / ${event.tx.tx.events.length}`);
logger.debug(`[handleEvent] (event.event): ${JSON.stringify(event.event, null, 2)}`);
logger.debug(`[handleEvent] (event.log): ${JSON.stringify(event.log, null, 2)}`);
Expand All @@ -105,3 +128,16 @@ export async function handleEvent(event: CosmosEvent): Promise<void> {

await eventEntity.save();
}

async function _handleBlockError(err: Error, _: CosmosBlock): Promise<void> {
// NB: we won't have persisted any related entities yet.
await trackUnprocessed(err, {});
}

async function _handleTransactionError(err: Error, tx: CosmosTransaction): Promise<void> {
await trackUnprocessed(err, primitivesFromTx(tx));
}

async function _handleMessageError(err: Error, msg: CosmosMessage): Promise<void> {
await trackUnprocessed(err, primitivesFromMsg(msg));
}
77 changes: 74 additions & 3 deletions src/mappings/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {CosmosEvent, CosmosMessage} from "@subql/types-cosmos";
import {Account, Interface} from "../types";
import {CosmosBlock, CosmosEvent, CosmosMessage, CosmosTransaction} from "@subql/types-cosmos";
import {Account, Interface, UnprocessedEntity} from "../types";
import { createHash } from "crypto";

// messageId returns the id of the message passed or
// that of the message which generated the event passed.
Expand Down Expand Up @@ -42,6 +43,76 @@ export function getJaccardResult(payload: object): Interface {
return prediction.getInterface(); // return best matched Interface to contract
}

export type Primitive = CosmosEvent | CosmosMessage | CosmosTransaction | CosmosBlock;

export interface Primitives {
event?: CosmosEvent;
msg?: CosmosMessage;
tx?: CosmosTransaction;
block?: CosmosBlock;
}

export async function attemptHandling(input: Primitive,
handlerFn: (primitive) => Promise<void>,
errorFn: (Error, Primitive) => void): Promise<void> {
try {
await handlerFn(input);
} catch (error) {
errorFn(error, input);
}
}

export async function unprocessedEventHandler(err: Error, event: CosmosEvent): Promise<void> {
await trackUnprocessed(err, primitivesFromEvent(event));
}

export function primitivesFromTx(tx: CosmosTransaction): Primitives {
return {block: tx.block, tx: tx};
}

export function primitivesFromMsg(msg: CosmosMessage): Primitives {
return {block: msg.block, tx: msg.tx};
}

export function primitivesFromEvent(event: CosmosEvent): Primitives {
return {block: event.block, tx: event.tx};
}

export async function trackUnprocessed(error: Error, primitives: Primitives): Promise<void> {
logger.warn(`[trackUnprocessable] (error.message): ${error.message}`);
// NB: failsafe try/catch
try {
const {event, msg, tx, block} = primitives;
const sha256 = createHash("sha256");
// NB: use error stack if no primitives available (i.e. block handler).
const hashInput = event ?
messageId(event) : msg ?
messageId(msg) : tx ?
tx.hash : block ?
block.block.id : error.stack;
sha256.write(hashInput);
sha256.end();
// NB: ID is base64 encoded representation of the sha256 of `raw`.
const id = sha256.read().toString("base64");
const eventId = event ? messageId(event) : undefined;
const _messageId = event ? messageId(event) : undefined;
const transactionId = tx ? tx.hash : undefined;

const unprocessedEntity = UnprocessedEntity.create({
id,
error: error.stack,
eventId,
messageId: _messageId,
transactionId,
blockId: block.block.id,
});
return await unprocessedEntity.save();
} catch {
logger.error("[trackUnprocessable] (ERROR): unable to persist unprocessable entity");
logger.error(`[trackUnprocessable] (ERROR | stack): ${error.stack}`);
}
}

class Structure {
static getInterface() {
return Interface.Uncertain;
Expand Down Expand Up @@ -94,4 +165,4 @@ class LegacyBridgeSwapStructure extends Structure {
static getInterface() {
return Interface.LegacyBridgeSwap;
}
}
}
10 changes: 9 additions & 1 deletion src/mappings/wasm/bridge.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import {CosmosEvent, CosmosMessage} from "@subql/types-cosmos";
import {LegacyBridgeSwapMsg} from "../types";
import {messageId} from "../utils";
import {attemptHandling, messageId, primitivesFromEvent, trackUnprocessed} from "../utils";
import {LegacyBridgeSwap} from "../../types";

export async function handleLegacyBridgeSwap(event: CosmosEvent): Promise<void> {
await attemptHandling(event, _handleLegacyBridgeSwap, _handleLegacyBridgeSwapError);
}

async function _handleLegacyBridgeSwap(event: CosmosEvent): Promise<void> {
const msg: CosmosMessage<LegacyBridgeSwapMsg> = event.msg;
const id = messageId(msg);
logger.info(`[handleLegacyBridgeSwap] (tx ${msg.tx.hash}): indexing LegacyBridgeSwap ${id}`);
Expand Down Expand Up @@ -38,3 +42,7 @@ export async function handleLegacyBridgeSwap(event: CosmosEvent): Promise<void>

await legacySwap.save();
}

async function _handleLegacyBridgeSwapError(err: Error, event: CosmosEvent): Promise<void> {
await trackUnprocessed(err, primitivesFromEvent(event));
}
29 changes: 26 additions & 3 deletions src/mappings/wasm/contracts.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import {CosmosEvent, CosmosMessage} from "@subql/types-cosmos";
import {ExecuteContractMsg} from "../types";
import {getJaccardResult, messageId} from "../utils";
import {
attemptHandling,
getJaccardResult,
messageId,
unprocessedEventHandler
} from "../utils";
import {
Contract,
ExecuteContractMessage,
Expand All @@ -9,6 +14,24 @@ import {
} from "../../types/";

export async function handleExecuteContractEvent(event: CosmosEvent): Promise<void> {
await attemptHandling(event,
_handleExecuteContractEvent,
unprocessedEventHandler);
}

export async function handleContractStoreEvent(event: CosmosEvent): Promise<void> {
await attemptHandling(event,
_handleContractStoreEvent,
unprocessedEventHandler);
}

export async function handleContractInstantiateEvent(event: CosmosEvent): Promise<void> {
await attemptHandling(event,
_handleContractInstantiateEvent,
unprocessedEventHandler);
}

async function _handleExecuteContractEvent(event: CosmosEvent): Promise<void> {
const msg: CosmosMessage<ExecuteContractMsg> = event.msg;
logger.info(`[handleExecuteContractMessage] (tx ${msg.tx.hash}): indexing ExecuteContractMessage ${messageId(msg)}`);
logger.debug(`[handleExecuteContractMessage] (event.msg.msg): ${JSON.stringify(msg.msg, null, 2)}`);
Expand Down Expand Up @@ -37,7 +60,7 @@ export async function handleExecuteContractEvent(event: CosmosEvent): Promise<vo
await msgEntity.save();
}

export async function handleContractStoreEvent(event: CosmosEvent): Promise<void> {
async function _handleContractStoreEvent(event: CosmosEvent): Promise<void> {
logger.info(`[handleContractStoreEvent] (tx ${event.msg.tx.hash}): indexing event ${messageId(event.msg)}`);
logger.debug(`[handleContractStoreEvent] (event.event): ${JSON.stringify(event.event, null, 2)}`);
logger.debug(`[handleContractStoreEvent] (event.log): ${JSON.stringify(event.log, null, 2)}`);
Expand Down Expand Up @@ -67,7 +90,7 @@ export async function handleContractStoreEvent(event: CosmosEvent): Promise<void
await storeMsg.save();
}

export async function handleContractInstantiateEvent(event: CosmosEvent): Promise<void> {
async function _handleContractInstantiateEvent(event: CosmosEvent): Promise<void> {
logger.info(`[handleContractInstantiateEvent] (tx ${event.msg.tx.hash}): indexing event ${messageId(event.msg)}`);
logger.debug(`[handleContractInstantiateEvent] (event.event): ${JSON.stringify(event.event, null, 2)}`);
logger.debug(`[handleContractInstantiateEvent] (event.log): ${JSON.stringify(event.log, null, 2)}`);
Expand Down
Loading