Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Update actions/events communication format to JSON-RPC - Closes #5918 #5948

Merged
merged 6 commits into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 31 additions & 22 deletions framework/src/controller/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import { strict as assert } from 'assert';
import { actionWithModuleNameReg, moduleNameReg } from '../constants';
import { RequestObject, VERSION } from './jsonrpc';

export interface ActionInfoObject {
readonly module: string;
Expand All @@ -32,56 +33,64 @@ export interface ActionsObject {
[key: string]: Action;
}

type ID = string | number | null;

export class Action {
public jsonrpc = VERSION;
public id: ID;
public method: string;
public params: object;
public module: string;
public name: string;
public handler?: (action: ActionInfoObject) => unknown;
public source?: string;
public params: object;
public handler?: (action: ActionInfoObject) => unknown;

public constructor(
name: string,
id: ID,
method: string,
ManuGowda marked this conversation as resolved.
Show resolved Hide resolved
params?: object,
source?: string,
handler?: (action: ActionInfoObject) => unknown,
) {
assert(
actionWithModuleNameReg.test(name),
`Action name "${name}" must be a valid name with module name.`,
actionWithModuleNameReg.test(method),
`Action method "${method}" must be a valid method with module name and action name.`,
);
[this.module, this.name] = name.split(':');
this.params = params ?? {};

if (source) {
assert(moduleNameReg.test(source), `Source name "${source}" must be a valid module name.`);
this.source = source;
}

this.id = id;
ManuGowda marked this conversation as resolved.
Show resolved Hide resolved
this.method = method;
[this.module, this.name] = this.method.split(':');
this.params = params ?? {};
this.handler = handler;
}

public static deserialize(data: ActionInfoObject | string): Action {
const parsedAction: ActionInfoObject =
typeof data === 'string' ? (JSON.parse(data) as ActionInfoObject) : data;
public static fromJSONRPC(data: RequestObject | string): Action {
ManuGowda marked this conversation as resolved.
Show resolved Hide resolved
const { id, method, params } =
typeof data === 'string' ? (JSON.parse(data) as RequestObject) : data;

return new Action(
`${parsedAction.module}:${parsedAction.name}`,
parsedAction.params,
parsedAction.source,
);
return new Action(id, method, params);
}

public serialize(): ActionInfoObject {
public toJSONRPC(): RequestObject {
return {
name: this.name,
module: this.module,
source: this.source,
jsonrpc: this.jsonrpc,
id: this.id,
method: this.method,
params: this.params,
};
}

public toString(): string {
return `${this.source ?? 'undefined'} -> ${this.module}:${this.name}`;
public toObject(): ActionInfoObject {
return {
module: this.module,
name: this.name,
source: this.source,
params: this.params,
};
}

public key(): string {
Expand Down
34 changes: 22 additions & 12 deletions framework/src/controller/bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import * as axon from 'pm2-axon';
import { PubSocket, PullSocket, PushSocket, ReqSocket, SubSocket } from 'pm2-axon';
import { Client as RPCClient, Server as RPCServer } from 'pm2-axon-rpc';
import { EventEmitter2, Listener } from 'eventemitter2';
import { Action, ActionInfoObject, ActionsObject } from './action';
import { Action, ActionsObject } from './action';
import * as JSONRPC from './jsonrpc';
import { Logger } from '../logger';
import { BaseChannel } from './channels/base_channel';
import { EventInfoObject, EventsArray } from './event';
Expand All @@ -42,7 +43,7 @@ interface RegisterChannelOptions {
readonly rpcSocketPath?: string;
}

type NodeCallback = (error: Error | null, result?: unknown) => void;
type NodeCallback = (error: JSONRPC.ErrorObject | Error | null, result?: unknown) => void;

enum ChannelType {
InMemory,
Expand Down Expand Up @@ -124,12 +125,20 @@ export class Bus {
);

this._rpcServer.expose('invoke', (action, cb: NodeCallback) => {
// Parse and validate incoming jsonrpc request
const parsedAction = Action.fromJSONRPC(action);
try {
JSONRPC.validateJSONRPC(parsedAction);
} catch (error) {
cb(JSONRPC.errorObject(parsedAction.id, JSONRPC.invalidRequest()));
}

this.invoke(action)
.then(data => {
cb(null, data);
cb(null, JSONRPC.successObject(parsedAction.id, data as JSONRPC.Result));
})
.catch(error => {
cb(error);
cb(JSONRPC.errorObject(parsedAction.id, JSONRPC.internalError(error)));
});
});

Expand Down Expand Up @@ -186,15 +195,15 @@ export class Bus {
}
}

public async invoke<T>(actionData: string | ActionInfoObject): Promise<T> {
const action = Action.deserialize(actionData);
public async invoke<T>(actionData: string | JSONRPC.RequestObject): Promise<T> {
const action = Action.fromJSONRPC(actionData);
const actionFullName = action.key();
const actionParams = action.params;

if (this.actions[actionFullName] === undefined) {
throw new Error(`Action '${action.key()}' is not registered to bus.`);
throw new Error(`Action '${actionFullName}' is not registered to bus.`);
}

const actionParams = action.params;
const channelInfo = this.channels[action.module];
if (channelInfo.type === ChannelType.InMemory) {
return (channelInfo.channel as BaseChannel).invoke<T>(actionFullName, actionParams);
Expand All @@ -204,7 +213,7 @@ export class Bus {
return new Promise((resolve, reject) => {
(channelInfo.rpcClient as RPCClient).call(
'invoke',
action.serialize(),
action.toJSONRPC(),
(err: Error | undefined, data: T) => {
if (err) {
return reject(err);
Expand All @@ -216,17 +225,18 @@ export class Bus {
});
}

public publish(eventName: string, eventValue: object): void {
public publish(eventName: string, eventValue?: object): void {
if (!this.getEvents().includes(eventName)) {
throw new Error(`Event ${eventName} is not registered to bus.`);
}
// Communicate through event emitter
this._emitter.emit(eventName, eventValue);
const response = JSONRPC.notificationObject(eventName, eventValue);
this._emitter.emit(eventName, response);

// Communicate through unix socket
if (this.config.ipc.enabled) {
try {
this._pubSocket.send(eventName, eventValue);
this._pubSocket.send(eventName, response);
} catch (error) {
this.logger.debug(
{ err: error as Error },
Expand Down
9 changes: 2 additions & 7 deletions framework/src/controller/channels/base_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,8 @@ export abstract class BaseChannel {
const actionData = actions[actionName];

const handler = typeof actionData === 'object' ? actionData.handler : actionData;

this.actions[actionName] = new Action(
`${this.moduleAlias}:${actionName}`,
undefined,
undefined,
handler,
);
const method = `${this.moduleAlias}:${actionName}`;
this.actions[actionName] = new Action(null, method, undefined, undefined, handler);
}
this.actionsList = Object.keys(this.actions);
}
Expand Down
19 changes: 11 additions & 8 deletions framework/src/controller/channels/in_memory_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { Event, EventCallback } from '../event';
import { Action } from '../action';
import { BaseChannel } from './base_channel';
import { Bus } from '../bus';
import * as JSONRPC from '../jsonrpc';

export class InMemoryChannel extends BaseChannel {
private bus!: Bus;
Expand All @@ -30,15 +31,17 @@ export class InMemoryChannel extends BaseChannel {
}

public subscribe(eventName: string, cb: EventCallback): void {
this.bus.subscribe(eventName, data =>
this.bus.subscribe(eventName, (notificationObject: JSONRPC.NotificationObject) =>
// eslint-disable-next-line @typescript-eslint/no-misused-promises
setImmediate(cb, Event.deserialize(data)),
setImmediate(cb, { data: notificationObject.result }),
);
}

public once(eventName: string, cb: EventCallback): void {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
this.bus.once(eventName, data => setImmediate(cb, Event.deserialize(data)));
this.bus.once(eventName, (notificationObject: JSONRPC.NotificationObject) =>
// eslint-disable-next-line @typescript-eslint/no-misused-promises
setImmediate(cb, { data: notificationObject.result }),
);
}

public publish(eventName: string, data?: object): void {
Expand All @@ -47,11 +50,11 @@ export class InMemoryChannel extends BaseChannel {
if (event.module !== this.moduleAlias) {
throw new Error(`Event "${eventName}" not registered in "${this.moduleAlias}" module.`);
}
this.bus.publish(event.key(), event.serialize());
this.bus.publish(event.key(), data);
}

public async invoke<T>(actionName: string, params?: object): Promise<T> {
const action = new Action(actionName, params, this.moduleAlias);
const action = new Action(null, actionName, params);

if (action.module === this.moduleAlias) {
if (this.actions[action.name] === undefined) {
Expand All @@ -65,9 +68,9 @@ export class InMemoryChannel extends BaseChannel {
throw new Error('Handler does not exist.');
}

return handler(action.serialize()) as T;
return handler(action.toObject()) as T;
}

return this.bus.invoke(action.serialize());
return this.bus.invoke(action.toJSONRPC());
}
}
32 changes: 20 additions & 12 deletions framework/src/controller/channels/ipc_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { Event, EventInfoObject } from '../event';
import { BaseChannel, BaseChannelOptions } from './base_channel';
import { IPCClient } from '../ipc/ipc_client';
import { ActionInfoForBus, SocketPaths } from '../../types';
import * as JSONRPC from '../jsonrpc';

type NodeCallback = (error: Error | null, result?: unknown) => void;

Expand Down Expand Up @@ -61,7 +62,8 @@ export class IPCChannel extends BaseChannel {
await this._ipcClient.start();
// Listen to messages
this._subSocket.on('message', (eventName: string, eventData: EventInfoObject) => {
if (eventData.module !== this.moduleAlias) {
const event = new Event(eventName, eventData);
if (event.module !== this.moduleAlias) {
this._emitter.emit(eventName, eventData);
}
});
Expand Down Expand Up @@ -102,7 +104,7 @@ export class IPCChannel extends BaseChannel {
// Channel RPC Server is only required if the module has actions
if (this.actionsList.length > 0) {
this._rpcServer.expose('invoke', (action, cb: NodeCallback) => {
const actionObject = Action.deserialize(action);
const actionObject = Action.fromJSONRPC(action);
this.invoke(`${actionObject.module}:${actionObject.name}`, actionObject.params)
.then(data => cb(null, data))
.catch(error => cb(error));
Expand All @@ -112,45 +114,51 @@ export class IPCChannel extends BaseChannel {

public subscribe(eventName: string, cb: Listener): void {
const event = new Event(eventName);
this._emitter.on(event.key(), cb);
this._emitter.on(event.key(), (notificationObject: JSONRPC.NotificationObject) =>
// When IPC channel used without bus the data will not contain result
setImmediate(cb, { data: notificationObject.result ?? notificationObject }),
);
}

public once(eventName: string, cb: Listener): void {
const event = new Event(eventName);
this._emitter.once(event.key(), cb);
this._emitter.once(event.key(), (notificationObject: JSONRPC.NotificationObject) => {
// When IPC channel used without bus the data will not contain result
setImmediate(cb, { data: notificationObject.result ?? notificationObject });
});
}

public publish(eventName: string, data?: object): void {
const event = new Event(eventName, data);

if (event.module !== this.moduleAlias || !this.eventsList.includes(event.name)) {
throw new Error(`Event "${eventName}" not registered in "${this.moduleAlias}" module.`);
}

this._pubSocket.send(event.key(), event.serialize());
this._pubSocket.send(event.key(), data);
}

public async invoke<T>(actionName: string, params?: object): Promise<T> {
const action = new Action(actionName, params, this.moduleAlias);
const action = new Action(null, actionName, params);

if (action.module === this.moduleAlias) {
const handler = this.actions[action.name]?.handler;
if (!handler) {
throw new Error('Handler does not exist.');
}
return handler(action.serialize()) as T;
// change this to lisk format
return handler(action.toObject()) as T;
}

return new Promise((resolve, reject) => {
this._rpcClient.call(
'invoke',
action.serialize(),
(err: Error | undefined, data: T | PromiseLike<T>) => {
action.toJSONRPC(),
(err: JSONRPC.ErrorObject, data: JSONRPC.SuccessObject) => {
if (err) {
return reject(err);
return reject(err.error.data);
}

return resolve(data);
return resolve((data.result as unknown) as T);
},
);
});
Expand Down
13 changes: 5 additions & 8 deletions framework/src/controller/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { PluginOptions, PluginsOptions, SocketPaths } from '../types';
import { Bus } from './bus';
import { BaseChannel } from './channels';
import { InMemoryChannel } from './channels/in_memory_channel';
import { EventInfoObject } from './event';
import * as JSONRPC from './jsonrpc';

export interface ControllerOptions {
readonly appLabel: string;
Expand Down Expand Up @@ -191,12 +191,9 @@ export class Controller {

await this.channel.registerToBus(this.bus);

// If log level is greater than info
if (this.logger.level !== undefined && this.logger.level() < 30) {
this.bus.subscribe('*', (event: EventInfoObject) => {
this.logger.trace(`eventName: ${event.name},`, 'Monitor Bus Channel');
});
}
this.bus.subscribe('*', (event: JSONRPC.NotificationObject) => {
this.logger.error(`eventName: ${event.method},`, 'Monitor Bus Channel');
});
}

private async _loadInMemoryPlugin(
Expand Down Expand Up @@ -327,7 +324,7 @@ export class Controller {
new Promise((_, reject) => {
this.channel.once(`${alias}:unloading:error`, event => {
this.logger.info(`Child process plugin "${alias}" unloaded with error`);
this.logger.error(event.data || {}, 'Unloading plugin error.');
this.logger.error(event.data ?? {}, 'Unloading plugin error.');
delete this._childProcesses[alias];
reject(event.data);
});
Expand Down
Loading