Skip to content

Commit

Permalink
use express and add UI status page for the relayer jobs (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjrjerome authored Dec 15, 2023
1 parent defc363 commit 42ccad3
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 75 deletions.
1 change: 0 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
},
"rules": {
"semi": ["error", "always"],
"quotes": ["error", "double"],
"@typescript-eslint/explicit-function-return-type": "off",
"@typescript-eslint/no-explicit-any": 1,
"@typescript-eslint/no-inferrable-types": [
Expand Down
21 changes: 9 additions & 12 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,18 @@
},
"engines": {
"node": "20.x.x",
"npm": "9.x.x"
"npm": "10.x.x"
},
"author": "Hashlabs",
"license": "GNU",
"homepage": "https://github.com/XinFinOrg/XDC-Relayer#readme",
"repository": "https://github.com/XinFinOrg/XDC-Relayer",
"devDependencies": {
"@types/bull": "^4.10.0",
"@types/body-parser": "^1.19.5",
"@types/bunyan": "^1.8.8",
"@types/cron": "^1.7.2",
"@types/cors": "^2.8.17",
"@types/express": "^4.17.21",
"@types/jest": "^29.4.0",
"@types/koa": "2.13.1",
"@types/koa__cors": "^3.0.2",
"@types/koa__router": "^8.0.4",
"@types/koa-bodyparser": "^4.3.0",
"@types/koa-helmet": "^6.0.2",
"@types/lodash": "^4.14.202",
"@types/node": "^18.14.1",
"@types/node-fetch": "^2.6.9",
Expand All @@ -46,19 +42,20 @@
"typescript": "^5.0.4"
},
"dependencies": {
"@bull-board/api": "^5.10.2",
"@bull-board/express": "^5.10.2",
"@ethereumjs/block": "^4.1.0",
"@ethereumjs/rlp": "^4.0.0",
"@koa/cors": "^3.1.0",
"@koa/router": "^10.0.0",
"agentkeepalive": "^4.3.0",
"axios": "^1.3.4",
"body-parser": "^1.20.2",
"bull": "^4.11.5",
"bunyan": "^1.8.15",
"cron": "^1.8.2",
"cors": "^2.8.5",
"dotenv": "^8.2.0",
"koa": "^2.13.1",
"koa-bodyparser": "^4.3.0",
"koa-helmet": "^6.1.0",
"express": "^4.18.2",
"lodash": "^4.17.21",
"node-cache": "^5.1.2",
"node-fetch": "2",
Expand Down
2 changes: 1 addition & 1 deletion src/processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

1. Read our `lite.ts`(simple version) or the `standard.ts`(more complex version) as examples
2. Assume you plan to add a new processor called `XXX`. First create the file `XXX.ts` under current directory.
3. Add `export class XXX implements ProcessorInterface` where all our processors has some common methods such as `init` and `reset`. Implement those methods.
3. Add `export class XXX extends BaseProcessor` where all our processors has some common methods such as `init` and `reset`. Implement those methods.
4. Go to `index.ts` in this directory, register your processors with `enum Mode`, `private processors` (class property), `reset` method and add your custom start up condition in `getRunningModes` method
5. Done
28 changes: 28 additions & 0 deletions src/processors/base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import Bull from "bull";

export abstract class BaseProcessor {
queue: Bull.Queue;
constructor(name: string) {
this.queue = new Bull(name);
}

/**
* Initialise the processor, but this method won't trigger the event processing
* @returns The processor
*/
abstract init(): this;

/**
* Reset everything(cache) if processor is already running, otherwise start the event processing.
* @returns Promise<void>
*/
abstract reset(): Promise<void>;

getQueue() {
return this.queue;
}

async clean(): Promise<void> {
await this.queue.obliterate({ force: true });
}
}
25 changes: 20 additions & 5 deletions src/processors/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { Zero } from "./zero";
import { config } from "./../config";
import bunyan from "bunyan";
import * as _ from "lodash";
import { ProcessorInterface } from "./type";
import { createBullBoard } from '@bull-board/api';
import { BullAdapter } from '@bull-board/api/bullAdapter';
import { ExpressAdapter } from '@bull-board/express';

import { Zero } from "./zero";
import { config } from "./../config";
import { Lite } from "./lite";
import { Standard } from "./standard";
import { MainnetService } from "../service/mainnet";
Expand All @@ -13,7 +16,7 @@ enum Mode {
ZERO = "ZERO"
}

export class Processors implements ProcessorInterface {
export class Processors {
logger: bunyan;
private processors: {
lite: Lite;
Expand All @@ -34,10 +37,22 @@ export class Processors implements ProcessorInterface {
}

// Register the event process. NOTE: this won't actually start the job processing until you call the reset
init() {
init(serverAdapter: ExpressAdapter) {
const adapters: BullAdapter[] = [];
_.forIn(this.processors, (p, _) => {
p.init();
adapters.push(new BullAdapter(p.getQueue(), { readOnlyMode: true }));
});
createBullBoard({
queues: adapters,
serverAdapter: serverAdapter,
options: {
uiConfig: {
boardTitle: "Relayer Status"
}
}
});

return this;
}

Expand Down
16 changes: 5 additions & 11 deletions src/processors/lite.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import bunyan from "bunyan";
import { ProcessorInterface } from "./type";
import Bull from "bull";
import { LiteMainnetService, SmartContractData } from "../service/mainnet";
import { config } from "../config";
import { SubnetService } from "../service/subnet";
import { ForkingError } from "../errors/forkingError";
import { BaseProcessor } from "./base";

const NAME = "LITE";

export class Lite implements ProcessorInterface {
export const NAME = "LITE";

export class Lite extends BaseProcessor {
logger: bunyan;
private queue: Bull.Queue;
liteMainnetService: LiteMainnetService;
subnetService: SubnetService;

constructor(logger: bunyan) {
super(NAME);
this.logger = logger;
this.queue = new Bull(NAME);
this.liteMainnetService = new LiteMainnetService(config.mainnet, logger);
this.subnetService = new SubnetService(config.subnet, logger);
}
Expand All @@ -37,10 +36,6 @@ export class Lite implements ProcessorInterface {
return this;
};

async clean(): Promise<void> {
await this.queue.obliterate({ force: true });
}

// In lite mode, the reset does nothing other than just trigger the jobs. We can trigger it multiple time, it has no effect
async reset(): Promise<void> {
await this.queue.add({}, { jobId: NAME, repeat: { cron: config.cronJob.liteJobExpression}});
Expand All @@ -61,7 +56,6 @@ export class Lite implements ProcessorInterface {
);
}


private async liteSubmitTxs(
gapAndEpoch: { gap: number; epoch: number },
latestBlock: SmartContractData,
Expand Down
20 changes: 8 additions & 12 deletions src/processors/standard.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
import Bull from "bull";
import bunyan from "bunyan";
import { config } from "../config";
import { MainnetService, SmartContractData } from "../service/mainnet";
import { SubnetBlockInfo, SubnetService } from "../service/subnet";
import { Cache } from "../service/cache";
import { chunkBy, sleep } from "../utils";
import { ForkingError } from "../errors/forkingError";
import { ProcessorInterface } from "./type";

export const NAME = "STANDARD";
import { BaseProcessor } from "./base";

const chunkByMaxFetchSize = chunkBy(config.chunkSize);
export const NAME = "STANDARD";
const REPEAT_JOB_OPT = { jobId: NAME, repeat: { cron: config.cronJob.jobExpression}};

export class Standard implements ProcessorInterface {
private queue: Bull.Queue;
export class Standard extends BaseProcessor {
private mainnetService: MainnetService;
private subnetService: SubnetService;
cache: Cache;
logger: bunyan;

constructor(logger: bunyan) {
super(NAME);
this.logger = logger;
this.queue = new Bull(NAME);
this.mainnetService = new MainnetService(config.mainnet, logger);
this.subnetService = new SubnetService(config.subnet, logger);
this.cache = this.cache = new Cache(logger);
}

getQueue() {
return this.queue;
}

init() {
this.logger.info("Initialising XDC relayer");
this.queue.process(async (_, done) => {
Expand All @@ -46,10 +46,6 @@ export class Standard implements ProcessorInterface {
return this;
}

async clean(): Promise<void> {
await this.queue.obliterate({ force: true });
}

// Reset and start the state sync until success
async reset() {
try {
Expand Down
13 changes: 0 additions & 13 deletions src/processors/type.ts

This file was deleted.

16 changes: 5 additions & 11 deletions src/processors/zero.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
import Bull from "bull";
import bunyan from "bunyan";
import { ProcessorInterface } from "./type";
import { ZeroService } from "../service/zero";
import { config } from "../config";
import { BaseProcessor } from "./base";

const NAME = "ZERO";
export const NAME = "ZERO";
const REPEAT_JOB_OPT = {
jobId: NAME,
repeat: { cron: config.cronJob.zeroJobExpression },
};

export class Zero implements ProcessorInterface {
private queue: Bull.Queue;
export class Zero extends BaseProcessor {
private logger: bunyan;
private zeroService: ZeroService;

constructor(logger: bunyan) {
super(NAME);
this.logger = logger;
this.queue = new Bull(NAME);
this.zeroService = new ZeroService(logger);
}
init() {
Expand All @@ -38,15 +36,11 @@ export class Zero implements ProcessorInterface {
});
return this;
}

async reset(): Promise<void> {
await this.queue.add({}, REPEAT_JOB_OPT);
}

async clean(): Promise<void> {
await this.queue.obliterate({ force: true });
}

async processEvent() {
const payloads = await this.zeroService.getPayloads();
if (payloads.length == 0) {
Expand Down
27 changes: 18 additions & 9 deletions src/server.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,34 @@
import Koa from "koa";
import bodyParser from "koa-bodyparser";
import cors from "@koa/cors";
import bunyan from "bunyan";
import express from "express";
import cors from "cors";
import bodyParser from "body-parser";
import { ExpressAdapter } from '@bull-board/express';

import { config } from "./config";
import { Processors } from "./processors";
// import { sync } from "./service/zero";

const app = new Koa();
const app = express();

// Enable cors with default options
app.use(cors());
const logger = bunyan.createLogger({ name: "xdc-relayer" });

const processors = new Processors(logger);

// Enable bodyParser with default options
app.use(bodyParser());
// Body Parser Middleware
// To parse URL-encoded data
app.use(bodyParser.urlencoded({ extended: true }));
// To parse json data
app.use(bodyParser.json());

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/status');

// "/status" route show the relayer job status
app.use('/status', serverAdapter.getRouter());

app.listen(config.port, async () => {
logger.info(`Relayer running on port ${config.port}`);
await processors.init().reset();
logger.info(`Relayer running on port ${config.port}, check its status at "/stats"`);
await processors.init(serverAdapter).reset();
});

0 comments on commit 42ccad3

Please sign in to comment.