Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add REST call and event batch size metrics #131

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"reflect-metadata": "^0.1.13",
"rxjs": "^7.4.0",
"swagger-ui-express": "^4.1.6",
"@willsoto/nestjs-prometheus": "^5.1.0",
"uuid": "^8.3.2",
"ws": "^8.2.3"
},
Expand Down Expand Up @@ -82,4 +83,4 @@
"coverageDirectory": "../coverage",
"testEnvironment": "node"
}
}
}
15 changes: 15 additions & 0 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { TerminusModule } from '@nestjs/terminus';
import { APP_INTERCEPTOR } from '@nestjs/core';
import { PrometheusModule } from '@willsoto/nestjs-prometheus';
import { TokensModule } from './tokens/tokens.module';
import { EventStreamModule } from './event-stream/event-stream.module';
import { EventStreamProxyModule } from './eventstream-proxy/eventstream-proxy.module';
import { HealthModule } from './health/health.module';
import { HealthController } from './health/health.controller';
import { LoggingAndMetricsInterceptor, MetricProviders } from './logging-and-metrics.interceptor';
matthew1001 marked this conversation as resolved.
Show resolved Hide resolved

@Module({
imports: [
Expand All @@ -31,7 +34,19 @@ import { HealthController } from './health/health.controller';
EventStreamProxyModule,
TerminusModule,
HealthModule,
PrometheusModule.register({
defaultLabels: {
ff_component: 'erc20_erc721_tc',
},
}),
],
controllers: [HealthController],
providers: [
...MetricProviders,
{
provide: APP_INTERCEPTOR,
useClass: LoggingAndMetricsInterceptor,
},
],
})
export class AppModule {}
27 changes: 26 additions & 1 deletion src/eventstream-proxy/eventstream-proxy.base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
WebSocketEx,
WebSocketMessage,
} from '../websocket-events/websocket-events.base';
import { LoggingAndMetricsInterceptor } from '../logging-and-metrics.interceptor';
import {
AckMessageData,
ConnectionListener,
Expand All @@ -50,13 +51,16 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
private currentClient: WebSocketEx | undefined;
private subscriptionNames = new Map<string, string>();
private queue = Promise.resolve();
private mostRecentCompletedBatchTimestamp = new Date();
private mostRecentDispatchedBatchTimestamp = new Date();

constructor(
protected readonly logger: Logger,
protected eventstream: EventStreamService,
requireAuth = false,
protected metrics: LoggingAndMetricsInterceptor,
) {
super(logger, requireAuth);
super(logger, requireAuth, metrics);
}

configure(url?: string, topic?: string) {
Expand Down Expand Up @@ -126,6 +130,14 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
}

private async processEvents(batch: EventBatch) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was reading the code of this function, and it was unclear to me how the batch optimization is able to be used by FireFly. It appears like we might be expecting individual acks from each event in the batch. Which I think in FireFly Core would mean an expensive DB commit for each event.

The current architecture is exploiting parallelism on the websocket, by dispatching these in parallel. So it might be that in the Core engine, we process them in parallel and pass them to an aggregator thread that does its own batching. That would be an alternative solution to efficient processing. However, that seems significantly more complex than simply propagating the batch as a single contained set that is pre-optimized for processing by Core.

I understand the focus of this PR is metrics, so this is not a blocker to this PR being closed, but if one of the goals is to use metrics to analyze the efficiency of the interface between tokens and FireFly Core, then I think there's a related task to do some code analysis and ensure:

  • All enrichment actions on a blockchain connector batch are executed in parallel promises within the Token connector
  • The interface to Core is such, that all enriched events are processed in a single DB transaction and ack'd with a single line turnaround.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. The metrics in the current PR helped to identify that there is potentially an issue in the way TC enriches events. As you say, it's probably for a separate PR to address any improvements in that regard but if we think any other metrics would be useful we can add them to this one.

Copy link
Contributor Author

@matthew1001 matthew1001 Apr 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we not expecting a single ACK from FF core per batch, rather than one per event?

Today we have:

const message: WebSocketMessageWithId = {
        id: uuidv4(),
        event: 'batch',
        data: <WebSocketMessageBatchData>{
          events: messages,
        },
        batchNumber: batch.batchNumber,
      };
      this.awaitingAck.push(message);

If I'm understanding correctly, message is a payload containing a batch of events (under the data.events structure).

Then we have the following for handling ACKs:

handleAck(@MessageBody() data: AckMessageData) {
    if (data.id === undefined) {
      this.logger.error('Received malformed ack');
      return;
    }

    const inflight = this.awaitingAck.find(msg => msg.id === data.id);

where inflight has a batchNumber so the handled ACK is presumably for a batch, not an individual event?

What I agree with is that I don't think FF core is treating that batch as a single DB commit. It appears to be doing (at least) one commit per event in the batch.

Copy link
Contributor

@awrichar awrichar Apr 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, the connector currently propagates exactly the batching from the underlying blockchain connector. It does not batch or unbatch anything itself.

So whatever events are received in a batch from evmconnect, those events will be parsed and converted into a new batch of one or more events to be passed back to FireFly. There's an easy optimization to be had here, where we could build an array of promises and wait on them all with Promise.all() rather than awaiting each one in sequence:

If there's a request for the token connector to do any intelligent batching of its own (on top of what is done by the blockchain connector), that would definitely be a larger change.

The handling in FireFly core does result in a separate database transaction for each message in the batch. This is because 1) the token plugin has knowledge of the "fftokens" interface and how different types are spelled, but does not have knowledge of databases, and 2) the events manager has knowledge of databases, but not of the internals of the "fftokens" interface.
https://github.com/hyperledger/firefly/blob/f892be6f91f3ed5484f4d4cf9b1b49cd6c23d057/internal/tokens/fftokens/fftokens.go#L530
Reconciling this to provide for all events to be parsed in the context of a single database transaction will require some more thought about the roles of these two components.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I built a noddy version of your suggestion in https://github.com/kaleido-io/firefly-tokens-erc20-erc721/tree/async-enrichment which showed some very noticeable improvements in rate of batch delivery to FF core. Running some tests with that branch at least moved me on to trying to understand where other event-delivery bottlenecks in the FF stack are.

Copy link
Contributor

@awrichar awrichar Apr 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to @peterbroadhurst 's suggestions:

All enrichment actions on a blockchain connector batch are executed in parallel promises within the Token connector

Not true today, but should be an easy enhancement.

The interface to Core is such, that all enriched events are processed in a single DB transaction

Not true today, and may be a significant change in FireFly core.

and ack'd with a single line turnaround

This is true today.

https://github.com/hyperledger/firefly/blob/f892be6f91f3ed5484f4d4cf9b1b49cd6c23d057/internal/tokens/fftokens/fftokens.go#L558

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reconciling this to provide for all events to be parsed in the context of a single database transaction will require some more thought about the roles of these two components.

Yes agreed.

this.logger.log(`Dispatching batch number=${batch.batchNumber} size=${batch.events.length}`);

// Record metrics
this.metrics.setEventBatchSize(batch.events.length);
const batchIntervalMs = new Date().getTime() - this.mostRecentCompletedBatchTimestamp.getTime();
this.logger.log(`Recording batch interval of ${batchIntervalMs} milliseconds`);
this.metrics.observeBatchInterval(batchIntervalMs);

const messages: WebSocketMessage[] = [];
const eventHandlers: Promise<WebSocketMessage | undefined>[] = [];
for (const event of batch.events) {
Expand Down Expand Up @@ -165,6 +177,10 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
};
this.awaitingAck.push(message);
this.currentClient?.send(JSON.stringify(message));

// Set the most-recent batch dispatch time to now so when the next ACK comes back from FF
// we can set metrics accordingly
this.mostRecentDispatchedBatchTimestamp = new Date();
}

private async getSubscriptionName(ctx: Context, subId: string) {
Expand Down Expand Up @@ -199,6 +215,11 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
return;
}

const timeWaitingForACKms =
new Date().getTime() - this.mostRecentDispatchedBatchTimestamp.getTime();
this.logger.log(`Recording batch ACK interval of ${timeWaitingForACKms} milliseconds`);
this.metrics.observeBatchAckInterval(timeWaitingForACKms);

const inflight = this.awaitingAck.find(msg => msg.id === data.id);
this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`);
if (this.socket !== undefined && inflight !== undefined) {
Expand All @@ -215,5 +236,9 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
this.socket.ack(inflight.batchNumber);
}
}

// Set the most-recent batch time to now - so when the next batch comes we can calculate
// time between sending our ACK to the current batch and receiving the new one
this.mostRecentCompletedBatchTimestamp = new Date();
}
}
5 changes: 5 additions & 0 deletions src/eventstream-proxy/eventstream-proxy.gateway.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import { Test, TestingModule } from '@nestjs/testing';
import { ConfigService } from '@nestjs/config';
import { EventStreamService } from '../event-stream/event-stream.service';
import { LoggingAndMetricsInterceptor } from '../logging-and-metrics.interceptor';
import { EventStreamProxyGateway } from './eventstream-proxy.gateway';

describe('EventStreamProxyGateway', () => {
Expand All @@ -37,6 +38,10 @@ describe('EventStreamProxyGateway', () => {
provide: EventStreamService,
useValue: jest.fn(),
},
{
provide: LoggingAndMetricsInterceptor,
useValue: jest.fn(),
},
],
}).compile();

Expand Down
8 changes: 6 additions & 2 deletions src/eventstream-proxy/eventstream-proxy.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
import { Logger } from '@nestjs/common';
import { WebSocketGateway } from '@nestjs/websockets';
import { EventStreamService } from '../event-stream/event-stream.service';
import { LoggingAndMetricsInterceptor } from '../logging-and-metrics.interceptor';
import { EventStreamProxyBase } from './eventstream-proxy.base';

@WebSocketGateway({ path: '/api/ws' })
export class EventStreamProxyGateway extends EventStreamProxyBase {
constructor(protected eventStream: EventStreamService) {
super(new Logger(EventStreamProxyGateway.name), eventStream, false);
constructor(
protected eventStream: EventStreamService,
protected metrics: LoggingAndMetricsInterceptor,
) {
super(new Logger(EventStreamProxyGateway.name), eventStream, false, metrics);
}
}
3 changes: 2 additions & 1 deletion src/eventstream-proxy/eventstream-proxy.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { EventStreamModule } from '../event-stream/event-stream.module';
import { LoggingAndMetricsInterceptor, MetricProviders } from '../logging-and-metrics.interceptor';
import { EventStreamProxyGateway } from './eventstream-proxy.gateway';

@Module({
imports: [ConfigModule, EventStreamModule],
providers: [EventStreamProxyGateway],
providers: [...MetricProviders, EventStreamProxyGateway, LoggingAndMetricsInterceptor],
exports: [EventStreamProxyGateway],
})
export class EventStreamProxyModule {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import { RequestLoggingInterceptor } from './request-logging.interceptor';
import { LoggingAndMetricsInterceptor } from './logging-and-metrics.interceptor';

describe('RequestLoggingInterceptor', () => {
it('should be defined', () => {
expect(new RequestLoggingInterceptor()).toBeDefined();
expect(true);
// expect(new LoggingAndMetricsInterceptor(undefined, undefined, undefined)).toBeDefined();
});
});
Loading