Skip to content

Commit

Permalink
Merge pull request #752 from OpenFn/worker-timestamp-everything
Browse files Browse the repository at this point in the history
Worker: timestamp events
  • Loading branch information
taylordowns2000 authored Sep 5, 2024
2 parents 6274e96 + 619edb6 commit 30aaa8e
Show file tree
Hide file tree
Showing 30 changed files with 268 additions and 23 deletions.
1 change: 1 addition & 0 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ List any considerations/cases/advice for testing/QA here.

- [ ] I have performed a self-review of my code
- [ ] I have added unit tests
- [ ] If this is a change to the Worker, does the API_VERSION need bumping?
- [ ] Changesets have been added (if there are production code changes)

## Release branch checklist
Expand Down
10 changes: 10 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# @openfn/integration-tests-worker

## 1.0.55

### Patch Changes

- Updated dependencies [870a836]
- Updated dependencies [eaa3859]
- @openfn/engine-multi@1.2.2
- @openfn/ws-worker@1.6.0
- @openfn/lightning-mock@2.0.16

## 1.0.54

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.54",
"version": "1.0.55",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
41 changes: 41 additions & 0 deletions integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,47 @@ test.serial("Don't send job logs to stdout", (t) => {
});
});

test.serial('Include timestamps on basically everything', (t) => {
return new Promise(async (done) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/language-common@latest',
body: 'fn((s) => s)',
},
],
};

const timestamps = {};

const assertAllTimestamps = () => {
t.is(timestamps['run-start'].length, 16);
t.is(timestamps['run-complete'].length, 16);
t.is(timestamps['step-start'].length, 16);
t.is(timestamps['step-complete'].length, 16);
};

lightning.once('run:start', ({ payload }) => {
timestamps['run-start'] = payload.timestamp;
});
lightning.once('step:start', ({ payload }) => {
timestamps['step-start'] = payload.timestamp;
});
lightning.once('step:complete', ({ payload }) => {
timestamps['step-complete'] = payload.timestamp;
});
lightning.once('run:complete', ({ payload }) => {
timestamps['run-complete'] = payload.timestamp;
assertAllTimestamps();

done();
});

lightning.enqueueRun(attempt);
});
});

test.serial("Don't send adaptor logs to stdout", (t) => {
return new Promise(async (done) => {
// We have to create a new worker with a different repo for this one
Expand Down
9 changes: 9 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# engine-multi

## 1.2.2

### Patch Changes

- 870a836: Add high resolution timestamps to key events
- Updated dependencies [44f7f57]
- @openfn/lexicon@1.1.0
- @openfn/runtime@1.4.1

## 1.2.1

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "1.2.1",
"version": "1.2.2",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
5 changes: 5 additions & 0 deletions packages/engine-multi/src/api/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import * as externalEvents from '../events';
import * as internalEvents from '../worker/events';
import type ExecutionContext from '../classes/ExecutionContext';
import { timestamp } from '@openfn/logger';

// Log events from the inner thread will be logged to stdout
// EXCEPT the keys listed here
Expand Down Expand Up @@ -39,6 +40,7 @@ export const workflowStart = (
context.emit(externalEvents.WORKFLOW_START, {
threadId,
versions: context.versions,
time: timestamp(),
});
};

Expand Down Expand Up @@ -70,6 +72,7 @@ export const workflowComplete = (
threadId,
duration: state.duration,
state: result,
time: timestamp(),
});
};

Expand All @@ -82,6 +85,7 @@ export const jobStart = (
context.emit(externalEvents.JOB_START, {
jobId,
threadId,
time: timestamp(),
});
};

Expand All @@ -98,6 +102,7 @@ export const jobComplete = (
jobId,
next,
mem,
time: timestamp(),
});
};

Expand Down
4 changes: 4 additions & 0 deletions packages/engine-multi/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ interface ExternalEvent {

export interface WorkflowStartPayload extends ExternalEvent {
versions: Versions;
time: bigint;
}

export interface WorkflowCompletePayload extends ExternalEvent {
state: any;
duration: number;
time: bigint;
}

export interface WorkflowErrorPayload extends ExternalEvent {
Expand All @@ -66,13 +68,15 @@ export interface WorkflowErrorPayload extends ExternalEvent {

export interface JobStartPayload extends ExternalEvent {
jobId: string;
time: bigint;
}

export interface JobCompletePayload extends ExternalEvent {
jobId: string;
duration: number;
state: any; // the result state
next: string[]; // downstream jobs
time: bigint;
mem: {
job: number;
system: number;
Expand Down
16 changes: 12 additions & 4 deletions packages/engine-multi/test/api/lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const createContext = (workflowId: string, state?: any) =>
options: {},
});

test(`workflowStart: emits ${e.WORKFLOW_START}`, (t) => {
test(`workflowStart: emits ${e.WORKFLOW_START} with key fields`, (t) => {
return new Promise((done) => {
const workflowId = 'a';

Expand All @@ -39,6 +39,8 @@ test(`workflowStart: emits ${e.WORKFLOW_START}`, (t) => {
t.truthy(evt.versions);
t.is(evt.workflowId, workflowId);
t.is(evt.threadId, '123');
t.assert(evt.time > 0);
t.assert(typeof evt.time === 'bigint');
done();
});

Expand Down Expand Up @@ -68,7 +70,7 @@ test('onWorkflowStart: updates state', (t) => {
test.todo('onWorkflowStart: logs');
test.todo('onWorkflowStart: throws if the workflow is already started');

test(`workflowComplete: emits ${e.WORKFLOW_COMPLETE}`, (t) => {
test(`workflowComplete: emits ${e.WORKFLOW_COMPLETE} with key fields`, (t) => {
return new Promise((done) => {
const workflowId = 'a';
const result = { a: 777 };
Expand All @@ -89,6 +91,8 @@ test(`workflowComplete: emits ${e.WORKFLOW_COMPLETE}`, (t) => {
context.on(e.WORKFLOW_COMPLETE, (evt) => {
t.is(evt.workflowId, workflowId);
t.deepEqual(evt.state, result);
t.assert(evt.time > 0);
t.assert(typeof evt.time === 'bigint');
t.assert(evt.duration > 0);
done();
});
Expand Down Expand Up @@ -120,7 +124,7 @@ test('workflowComplete: updates state', (t) => {
t.deepEqual(state.result, result);
});

test(`job-start: emits ${e.JOB_START}`, (t) => {
test(`job-start: emits ${e.JOB_START} with key fields`, (t) => {
return new Promise((done) => {
const workflowId = 'a';

Expand All @@ -142,14 +146,16 @@ test(`job-start: emits ${e.JOB_START}`, (t) => {
t.is(evt.workflowId, workflowId);
t.is(evt.threadId, '1');
t.is(evt.jobId, 'j');
t.assert(evt.time > 0);
t.assert(typeof evt.time === 'bigint');
done();
});

jobStart(context, event);
});
});

test(`job-complete: emits ${e.JOB_COMPLETE}`, (t) => {
test(`job-complete: emits ${e.JOB_COMPLETE} with key fields`, (t) => {
return new Promise((done) => {
const workflowId = 'a';

Expand Down Expand Up @@ -179,6 +185,8 @@ test(`job-complete: emits ${e.JOB_COMPLETE}`, (t) => {
t.is(evt.duration, 200);
t.deepEqual(evt.next, []);
t.deepEqual(evt.mem, event.mem);
t.assert(evt.time > 0);
t.assert(typeof evt.time === 'bigint');
done();
});

Expand Down
6 changes: 6 additions & 0 deletions packages/lexicon/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# lexicon

## 1.1.0

### Minor Changes

- 44f7f57: Bump API_VERSION to 1.2 (timestamps on events)

## 1.0.2

### Patch Changes
Expand Down
11 changes: 9 additions & 2 deletions packages/lexicon/lightning.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ export const API_VERSION: number;

type StepId = string;

type TimeInMicroSeconds = string;

/**
* Type definitions for Lightning and Worker interfaces
*
Expand Down Expand Up @@ -145,17 +147,20 @@ export type GetCredentialReply = {};
export type GetDataclipPayload = { id: string };
export type GetDataClipReply = Uint8Array; // represents a json string Run

export type RunStartPayload = void; // no payload
export type RunStartPayload = {
timestamp: TimeInMicroSeconds;
}; // no payload
export type RunStartReply = {}; // no payload

export type RunCompletePayload = ExitReason & {
timestamp: TimeInMicroSeconds;
final_dataclip_id?: string; // TODO this will be removed soon
};
export type RunCompleteReply = undefined;

export type RunLogPayload = {
message: Array<string | object>;
timestamp: string;
timestamp: TimeInMicroSeconds;
run_id: string;
level?: string;
source?: string; // namespace
Expand All @@ -169,6 +174,7 @@ export type StepStartPayload = {
step_id: string;
run_id?: string;
input_dataclip_id?: string;
timestamp: TimeInMicroSeconds;
};
export type StepStartReply = void;

Expand All @@ -185,5 +191,6 @@ export type StepCompletePayload = ExitReason & {
system: number;
};
duration: number;
timestamp: TimeInMicroSeconds;
};
export type StepCompleteReply = void;
2 changes: 1 addition & 1 deletion packages/lexicon/lightning.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
* Note that the major version represents the API spec version, while the minor version
* represents the lexicon implementation of it
*/
export const API_VERSION = 1.1;
export const API_VERSION = 1.2;
2 changes: 1 addition & 1 deletion packages/lexicon/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lexicon",
"version": "1.0.2",
"version": "1.1.0",
"description": "Central repo of names and type definitions",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
10 changes: 10 additions & 0 deletions packages/lightning-mock/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# @openfn/lightning-mock

## 2.0.16

### Patch Changes

- Updated dependencies [870a836]
- Updated dependencies [44f7f57]
- @openfn/engine-multi@1.2.2
- @openfn/lexicon@1.1.0
- @openfn/runtime@1.4.1

## 2.0.15

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/lightning-mock/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lightning-mock",
"version": "2.0.15",
"version": "2.0.16",
"private": true,
"description": "A mock Lightning server",
"main": "dist/index.js",
Expand Down
14 changes: 14 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# ws-worker

## 1.6.0

### Minor Changes

- eaa3859: Include timestamps in key events

### Patch Changes

- Updated dependencies [870a836]
- Updated dependencies [44f7f57]
- @openfn/engine-multi@1.2.2
- @openfn/lexicon@1.1.0
- @openfn/runtime@1.4.1

## 1.5.1

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.5.1",
"version": "1.6.0",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
5 changes: 2 additions & 3 deletions packages/ws-worker/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
createRunState,
throttle as createThrottle,
stringify,
timeInMicroseconds,
} from '../util';
import {
RUN_COMPLETE,
Expand Down Expand Up @@ -213,8 +214,6 @@ export function onJobError(context: Context, event: any) {
}

export function onJobLog({ channel, state, options }: Context, event: JSONLog) {
const timeInMicroseconds = BigInt(event.time) / BigInt(1e3);

let message = event.message;
try {
// The message body, the actual thing that is logged,
Expand All @@ -240,7 +239,7 @@ export function onJobLog({ channel, state, options }: Context, event: JSONLog) {
message: message,
source: event.name,
level: event.level,
timestamp: timeInMicroseconds.toString(),
timestamp: timeInMicroseconds(event.time) as string,
};

if (state.activeStep) {
Expand Down
Loading

0 comments on commit 30aaa8e

Please sign in to comment.