From 71e752cf2b6052a8adc5c79f2563421edc72edad Mon Sep 17 00:00:00 2001 From: afmika Date: Tue, 12 Nov 2024 20:10:37 +0300 Subject: [PATCH 1/5] fix: save/input kwargs affected by outside references issue + SUBSTANTIAL_TRACE=0 or 1 --- .../docs/reference/typegate/index.mdx | 1 + src/typegate/src/config.ts | 1 + src/typegate/src/config/types.ts | 2 + src/typegate/src/errors.ts | 8 +- src/typegate/src/log.ts | 3 +- src/typegate/src/runtimes/substantial.ts | 1 + .../src/runtimes/substantial/agent.ts | 69 ++++++----- .../src/runtimes/substantial/deno_context.ts | 36 ++++-- .../src/runtimes/substantial/worker.ts | 5 +- .../substantial/workflow_worker_manager.ts | 4 - src/typegate/src/system_typegraphs.ts | 13 +- tests/runtimes/substantial/common.ts | 112 ++++++++++++++++++ tests/runtimes/substantial/substantial.py | 12 ++ .../runtimes/substantial/substantial_test.ts | 4 +- tests/runtimes/substantial/workflow.ts | 43 ++++++- tools/tasks/dev.ts | 1 + 16 files changed, 247 insertions(+), 68 deletions(-) diff --git a/docs/metatype.dev/docs/reference/typegate/index.mdx b/docs/metatype.dev/docs/reference/typegate/index.mdx index 4ab6b1f1be..01f8d74d4b 100644 --- a/docs/metatype.dev/docs/reference/typegate/index.mdx +++ b/docs/metatype.dev/docs/reference/typegate/index.mdx @@ -82,3 +82,4 @@ The following environment variables can be used to configure the typegate. `SYNC | SUBSTANTIAL_POLL_INTERVAL_SEC | Rate at which new schedules are read. | 1.0 | 0.6 | | SUBSTANTIAL_LEASE_LIFESPAN_SEC | Lease duration associated to a workflow run | 2.0 | 6 | | SUBSTANTIAL_MAX_ACQUIRE_PER_TICK | Max amount of new acquired replay requests per tick | 3 | 5 | +| SUBSTANTIAL_TRACE | Enable/Disable debug traces for Substantial | `0` | `0`, `1` | diff --git a/src/typegate/src/config.ts b/src/typegate/src/config.ts index 4d3fd33c87..e618e79778 100644 --- a/src/typegate/src/config.ts +++ b/src/typegate/src/config.ts @@ -64,6 +64,7 @@ export const defaultTypegateConfigBase = { substantial_poll_interval_sec: 1, substantial_lease_lifespan_sec: 2, substantial_max_acquire_per_tick: 3, + substantial_trace: 0, }; const SYNC_PREFIX = "sync_"; diff --git a/src/typegate/src/config/types.ts b/src/typegate/src/config/types.ts index cf8cbd4225..7274330e6e 100644 --- a/src/typegate/src/config/types.ts +++ b/src/typegate/src/config/types.ts @@ -100,6 +100,8 @@ export const typegateConfigBaseSchema = z.object({ substantial_lease_lifespan_sec: z.coerce.number().positive().min(1), /** Maximum amount of new acquired replay requests per tick */ substantial_max_acquire_per_tick: z.coerce.number().positive().min(1), + /** Enable/Disable debug traces for Substantial */ + substantial_trace: z.coerce.number().min(0).max(1), }); export type TypegateConfigBase = z.infer; diff --git a/src/typegate/src/errors.ts b/src/typegate/src/errors.ts index e460ac5621..16f51f6870 100644 --- a/src/typegate/src/errors.ts +++ b/src/typegate/src/errors.ts @@ -1,8 +1,7 @@ // Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. // SPDX-License-Identifier: MPL-2.0 -import { basename, dirname } from "@std/url"; -import { extname } from "@std/path"; +import { basename, dirname, extname } from "@std/path"; import { getLogger } from "./log.ts"; import { globalConfig } from "./config.ts"; @@ -55,10 +54,7 @@ export class BaseError extends Error { return this; } - toResponse( - headers: Headers = new Headers(), - graphqlFormat = true, - ): Response { + toResponse(headers: Headers = new Headers(), graphqlFormat = true): Response { const type = this.#type ?? this.constructor.name; logger.error( "{}[{}:{}]: {}", diff --git a/src/typegate/src/log.ts b/src/typegate/src/log.ts index 42879e7571..487743cccb 100644 --- a/src/typegate/src/log.ts +++ b/src/typegate/src/log.ts @@ -2,8 +2,7 @@ // SPDX-License-Identifier: MPL-2.0 import { ConsoleHandler, type LevelName, Logger } from "@std/log"; -import { basename, dirname } from "@std/url"; -import { extname } from "@std/path"; +import { basename, dirname, extname } from "@std/path"; import { sharedConfig } from "./config/shared.ts"; // set rust log level is not explicit set diff --git a/src/typegate/src/runtimes/substantial.ts b/src/typegate/src/runtimes/substantial.ts index 3f4ac726f2..55d0959bae 100644 --- a/src/typegate/src/runtimes/substantial.ts +++ b/src/typegate/src/runtimes/substantial.ts @@ -110,6 +110,7 @@ export class SubstantialRuntime extends Runtime { pollIntervalSec: typegate.config.base.substantial_poll_interval_sec, leaseLifespanSec: typegate.config.base.substantial_lease_lifespan_sec, maxAcquirePerTick: typegate.config.base.substantial_max_acquire_per_tick, + debug: typegate.config.base.substantial_trace > 0, } satisfies AgentConfig; const agent = new Agent(backend, queue, agentConfig); diff --git a/src/typegate/src/runtimes/substantial/agent.ts b/src/typegate/src/runtimes/substantial/agent.ts index 78439d637c..634f1acfd8 100644 --- a/src/typegate/src/runtimes/substantial/agent.ts +++ b/src/typegate/src/runtimes/substantial/agent.ts @@ -5,7 +5,7 @@ import { ReadOrCloseScheduleInput, Run, } from "../../../engine/runtime.js"; -import { getLogger } from "../../log.ts"; +import { getLogger, Logger } from "../../log.ts"; import { TaskContext } from "../deno/shared_types.ts"; import { appendIfOngoing, @@ -16,8 +16,6 @@ import { } from "./types.ts"; import { RunId, WorkerManager } from "./workflow_worker_manager.ts"; -const logger = getLogger(); - export interface StdKwargs { taskContext: TaskContext; kwargs: Record; @@ -33,18 +31,22 @@ export interface AgentConfig { pollIntervalSec: number; leaseLifespanSec: number; maxAcquirePerTick: number; + debug?: boolean; } export class Agent { workerManager = new WorkerManager(); workflows: Array = []; pollIntervalHandle?: number; + logger: Logger; constructor( private backend: Backend, private queue: string, private config: AgentConfig, - ) {} + ) { + this.logger = getLogger(import.meta, config.debug ? "DEBUG" : "ERROR"); + } async schedule(input: AddScheduleInput) { await Meta.substantial.storeAddSchedule(input); @@ -59,7 +61,7 @@ export class Agent { content, }); } catch (err) { - logger.warn( + this.logger.warn( `Failed writing log metadata for schedule "${schedule}" (${runId}), skipping it: ${err}`, ); } @@ -92,7 +94,7 @@ export class Agent { start(workflows: Array) { this.workflows = workflows; - logger.warn( + this.logger.warn( `Initializing agent to handle ${ workflows .map(({ name }) => name) @@ -104,7 +106,7 @@ export class Agent { try { await this.#nextIteration(); } catch (err) { - logger.error(err); + this.logger.error(err); } }, 1000 * this.config.pollIntervalSec); } @@ -117,7 +119,7 @@ export class Agent { } async #nextIteration() { - logger.warn("POLL"); + this.logger.warn("POLL"); // Note: in multiple agents/typegate scenario, a single node may acquire all runs for itself within a tick span // To account for that, keep this reasonable @@ -142,16 +144,16 @@ export class Agent { ); while (requests.length > 0) { - // logger.warn(`Run workflow ${JSON.stringify(next)}`); + // this.logger.warn(`Run workflow ${JSON.stringify(next)}`); const next = requests.shift(); if (next) { try { await this.#replay(next, workflow); } catch (err) { - logger.error( + this.logger.error( `Replay failed for ${workflow.name} => ${JSON.stringify(next)}`, ); - logger.error(err); + this.logger.error(err); } finally { await this.log(next.run_id, next.schedule_date, { message: "Replaying workflow", @@ -169,7 +171,7 @@ export class Agent { lease_seconds: this.config.leaseLifespanSec, }); - logger.info(`Active leases: ${activeRunIds.join(", ")}`); + this.logger.info(`Active leases: ${activeRunIds.join(", ")}`); const next = await Meta.substantial.agentNextRun({ backend: this.backend, @@ -194,7 +196,7 @@ export class Agent { // Leases are for abstracting ongoing runs, and at a given tick, does not // necessarily represent the state of what is actually running on the current typegate node if (this.workerManager.isOngoing(next.run_id)) { - logger.warn( + this.logger.warn( `skip triggering ${next.run_id} for the current tick as it is still ongoing`, ); @@ -221,7 +223,9 @@ export class Agent { if (checkIfRunHasStopped(run)) { // This may occur if an event is sent but the underlying run already completed // Or does not exist. - logger.warn(`Run ${next.run_id} has already stopped, closing schedule`); + this.logger.warn( + `Run ${next.run_id} has already stopped, closing schedule`, + ); await Meta.substantial.storeCloseSchedule(schedDef); return; } @@ -239,7 +243,7 @@ export class Agent { if (first.event.type != "Start") { // A consequence of the above, a workflow is always triggered by gql { start(..) } // This can also occur if an event is sent from gql under a runId that is not valid (e.g. due to typo) - logger.warn( + this.logger.warn( `First item in the operation list is not a Start, got "${ JSON.stringify( first, @@ -251,7 +255,7 @@ export class Agent { return; } - const { taskContext, kwargs } = first.event.kwargs as unknown as StdKwargs; + const { taskContext } = first.event.kwargs as unknown as StdKwargs; try { this.workerManager.triggerStart( workflow.name, @@ -259,7 +263,6 @@ export class Agent { workflow.path, run, next.schedule_date, - kwargs, taskContext, ); @@ -273,7 +276,7 @@ export class Agent { if (run.operations.length == 1) { // Make sure it is already visible on the ongoing list // without waiting for interrupts to store the first run - logger.info(`Persist first run "${next.run_id}"`); + this.logger.info(`Persist first run "${next.run_id}"`); await Meta.substantial.storePersistRun({ backend: this.backend, run, @@ -287,14 +290,16 @@ export class Agent { if (result.error) { // All Worker/Runner non-user issue should fall here // Note: Should never throw (typegate will panic), this will run in a worker - logger.error( + this.logger.error( `result error for "${runId}": ${JSON.stringify(result.payload)}`, ); return; } const answer = result.payload as WorkerData; - logger.info(`"${runId}" answered: type ${JSON.stringify(answer.type)}`); + this.logger.info( + `"${runId}" answered: type ${JSON.stringify(answer.type)}`, + ); const startedAt = this.workerManager.getInitialTimeStartedAt(runId); @@ -326,7 +331,7 @@ export class Agent { break; } default: - logger.error( + this.logger.error( `Fatal: invalid type ${answer.type} sent by "${runId}": ${ JSON.stringify(answer.data) }`, @@ -342,18 +347,18 @@ export class Agent { ) { this.workerManager.destroyWorker(workflowName, runId); // ! - logger.warn(`Interrupt "${workflowName}": ${result}"`); + this.logger.warn(`Interrupt "${workflowName}": ${result}"`); // TODO: make all of these transactional - logger.info(`Persist records for "${workflowName}": ${result}"`); + this.logger.info(`Persist records for "${workflowName}": ${result}"`); const _run = await Meta.substantial.storePersistRun({ backend: this.backend, run, }); try { - logger.info(`Trying to close old schedule ${runId} at ${schedule}`); + this.logger.info(`Trying to close old schedule ${runId} at ${schedule}`); await Meta.substantial.storeCloseSchedule({ backend: this.backend, queue: this.queue, @@ -363,11 +368,11 @@ export class Agent { } catch (err) { // Note: underlying schedule may have been closed already // This could occur if multiple events were sent at the exact same time - logger.warn(err); + this.logger.warn(err); } const newSchedule = new Date().toJSON(); - logger.info(`Add back to schedule ${runId} at ${newSchedule}`); + this.logger.info(`Add back to schedule ${runId} at ${newSchedule}`); await Meta.substantial.storeAddSchedule({ backend: this.backend, queue: this.queue, @@ -375,7 +380,7 @@ export class Agent { schedule: newSchedule, }); - logger.info(`Renew lease ${runId}`); + this.logger.info(`Renew lease ${runId}`); await Meta.substantial.agentRenewLease({ backend: this.backend, lease_seconds: this.config.leaseLifespanSec, @@ -391,7 +396,7 @@ export class Agent { ) { this.workerManager.destroyWorker(workflowName, runId); - logger.info( + this.logger.info( `gracefull completion of "${runId}" (${kind}): ${ JSON.stringify( result, @@ -399,7 +404,7 @@ export class Agent { } started at "${startedAt}"`, ); - logger.info(`Append Stop ${runId}`); + this.logger.info(`Append Stop ${runId}`); const rustResult = kind == "FAIL" ? "Err" : "Ok"; // Note: run is a one-time value, thus can be mutated @@ -414,7 +419,7 @@ export class Agent { }, }); - logger.info( + this.logger.info( `Persist finalized records for "${workflowName}": ${result}" and closing everything..`, ); @@ -461,7 +466,7 @@ function checkIfRunHasStopped(run: Run) { for (const op of run.operations) { if (op.event.type == "Start") { if (life >= 1) { - logger.error( + this.logger.error( `bad logs: ${ JSON.stringify( run.operations.map(({ event }) => event.type), @@ -478,7 +483,7 @@ function checkIfRunHasStopped(run: Run) { hasStopped = false; } else if (op.event.type == "Stop") { if (life <= 0) { - logger.error( + this.logger.error( `bad logs: ${ JSON.stringify( run.operations.map(({ event }) => event.type), diff --git a/src/typegate/src/runtimes/substantial/deno_context.ts b/src/typegate/src/runtimes/substantial/deno_context.ts index 32b85f66b3..94a0c1d1d4 100644 --- a/src/typegate/src/runtimes/substantial/deno_context.ts +++ b/src/typegate/src/runtimes/substantial/deno_context.ts @@ -12,14 +12,12 @@ const additionalHeaders = { connection: "keep-alive" }; export class Context { private id = 0; + public kwargs = {}; gql: ReturnType; - constructor( - private run: Run, - private kwargs: Record, - private internal: TaskContext, - ) { + constructor(private run: Run, private internal: TaskContext) { this.gql = createGQLClient(internal); + this.kwargs = getKwargsCopy(run); } #nextId() { @@ -64,17 +62,18 @@ export class Context { result = await Promise.resolve(fn()); } + const clonedResult = deepClone(result ?? null); this.#appendOp({ type: "Save", id, value: { type: "Resolved", - payload: result ?? null, + payload: clonedResult, }, }); - return result; - } catch (err) { + return clonedResult; + } catch (err: any) { if ( option?.retry?.maxRetries && currRetryCount < option.retry.maxRetries @@ -427,3 +426,24 @@ function createGQLClient(internal: TaskContext) { const meta = { ...internal.meta, url: tgLocal.toString() }; return make_internal({ ...internal, meta }, additionalHeaders).gql; } + +function getKwargsCopy(run: Run): Record { + const first = run.operations.at(0); + if (!first) { + throw new Error( + `Bad run "${run.run_id}": cannot retrieve kwargs on a run that has not yet started`, + ); + } + + if (first.event.type != "Start") { + throw new Error( + `Corrupted run "${run.run_id}": first operation is not a run, got ${first.event.type} instead`, + ); + } + + return deepClone(first.event.kwargs.kwargs) as Record; +} + +function deepClone(clonableObject: T): T { + return JSON.parse(JSON.stringify(clonableObject)); +} diff --git a/src/typegate/src/runtimes/substantial/worker.ts b/src/typegate/src/runtimes/substantial/worker.ts index 61f7a5dec5..501f17485d 100644 --- a/src/typegate/src/runtimes/substantial/worker.ts +++ b/src/typegate/src/runtimes/substantial/worker.ts @@ -11,8 +11,7 @@ self.onmessage = async function (event) { const { type, data } = event.data as WorkerData; switch (type) { case "START": { - const { modulePath, functionName, run, schedule, kwargs, internal } = - data; + const { modulePath, functionName, run, schedule, internal } = data; // FIXME: handle case when script is missing and notify WorkerManager so it cleans up // its registry. const module = await import(modulePath); @@ -26,7 +25,7 @@ self.onmessage = async function (event) { return; } - runCtx = new Context(run, kwargs, internal); + runCtx = new Context(run, internal); workflowFn(runCtx, internal) .then((wfResult: unknown) => { diff --git a/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts b/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts index 62ff176d53..9650ecda96 100644 --- a/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts +++ b/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts @@ -118,8 +118,6 @@ export class WorkerManager { type: "module", deno: { permissions: { - // overrideable default permissions - hrtime: false, net: true, // on request permissions read: "inherit", // default read permission @@ -223,7 +221,6 @@ export class WorkerManager { workflowModPath: string, storedRun: Run, schedule: string, - kwargs: Record, internalTCtx: TaskContext, ) { this.#createWorker(name, workflowModPath, runId); @@ -231,7 +228,6 @@ export class WorkerManager { modulePath: workflowModPath, functionName: name, run: storedRun, - kwargs, schedule, internal: internalTCtx, }); diff --git a/src/typegate/src/system_typegraphs.ts b/src/typegate/src/system_typegraphs.ts index 3c4e934e8d..fadbc93308 100644 --- a/src/typegate/src/system_typegraphs.ts +++ b/src/typegate/src/system_typegraphs.ts @@ -1,7 +1,7 @@ // Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. // SPDX-License-Identifier: MPL-2.0 -import { basename } from "@std/url"; +import { basename } from "@std/path"; import { fromFileUrl, toFileUrl } from "@std/path"; import type { Register } from "./typegate/register.ts"; @@ -35,17 +35,18 @@ export class SystemTypegraph { } static check(name: string) { - return SystemTypegraph.all.find((sg) => sg.name == name) != - null; + return SystemTypegraph.all.find((sg) => sg.name == name) != null; } static async loadAll(typegate: Typegate, watch = false) { const reload = async (urls: string[]) => { for await (const url of urls) { logger.info(`reloading system graph ${basename(url)}`); - const json = (await import(url, { - with: { type: "json" }, - })).default; + const json = ( + await import(url, { + with: { type: "json" }, + }) + ).default; const tgString = JSON.stringify(json); const tgJson = await TypeGraph.parseJson(tgString); await typegate.pushTypegraph( diff --git a/tests/runtimes/substantial/common.ts b/tests/runtimes/substantial/common.ts index 5aca8d3dda..f56efa6845 100644 --- a/tests/runtimes/substantial/common.ts +++ b/tests/runtimes/substantial/common.ts @@ -623,6 +623,118 @@ export function childWorkflowTestTemplate( ); } +export function inputMutationTemplate( + backendName: BackendName, + { + secrets, + }: { + secrets?: Record; + }, + cleanup?: MetaTestCleanupFn +) { + Meta.test( + { + name: `kwargs input mutation after interrupts (${backendName})`, + }, + async (t) => { + Deno.env.set("SUB_BACKEND", backendName); + Deno.env.set("TEST_OVERRIDE_GQL_ORIGIN", `http://localhost:${t.port}`); + + cleanup && t.addCleanup(cleanup); + + const e = await t.engine("runtimes/substantial/substantial.py", { + secrets: { + MY_SECRET: "Hello", + ...secrets, + }, + }); + + const testItems = [ + { pos: 1, innerField: "A" }, + { pos: 2, innerField: "B" }, + { pos: 3, innerField: "C" }, + ]; + + let currentRunId: string | null = null; + await t.should( + `start accidentialInputMutation workflow and return its run id (${backendName})`, + async () => { + await gql` + mutation { + start_mut(kwargs: { items: $items }) + } + ` + .withVars({ + items: testItems, + }) + .expectBody((body) => { + currentRunId = body.data?.start_mut! as string; + assertExists( + currentRunId, + "Run id was not returned when workflow was started" + ); + }) + .on(e); + } + ); + + await sleep(15 * 1000); + + await t.should( + `complete without overwriting original kwargs + save should not depend on external references mutation (${backendName})`, + async () => { + await gql` + query { + results_raw(name: "accidentialInputMutation") { + ongoing { + count + runs { + run_id + } + } + completed { + count + runs { + run_id + result { + status + value + } + } + } + } + } + ` + .expectData({ + results_raw: { + ongoing: { + count: 0, + runs: [], + }, + completed: { + count: 1, + runs: [ + { + run_id: currentRunId, + result: { + status: "COMPLETED", + value: JSON.stringify({ + copy: testItems, + items: [], + }), + }, + }, + ], + }, + }, + }) + .on(e); + } + ); + } + ); +} + // TODO: // mock a very basic http server in another process that counts the number of request made by a workflow // This will allow.. diff --git a/tests/runtimes/substantial/substantial.py b/tests/runtimes/substantial/substantial.py index 55c2b82a9c..d3454e8f27 100644 --- a/tests/runtimes/substantial/substantial.py +++ b/tests/runtimes/substantial/substantial.py @@ -25,6 +25,7 @@ def substantial(g: Graph): "eventsAndExceptionExample", "retryExample", "secretsExample", + "accidentialInputMutation", ] ) .build() @@ -46,6 +47,7 @@ def substantial(g: Graph): results=sub.query_results( t.either([t.integer(), t.string()]).rename("ResultOrError") ), + results_raw=sub.query_results_raw(), workers=sub.query_resources(), # sleep start_sleep=sub.start(t.struct({"a": t.integer(), "b": t.integer()})).reduce( @@ -66,5 +68,15 @@ def substantial(g: Graph): start_secret=sub.start(t.struct({}), secrets=["MY_SECRET"]).reduce( {"name": "secretsExample"} ), + # input mutation + start_mut=sub.start( + t.struct( + { + "items": t.list( + t.struct({"pos": t.integer(), "innerField": t.string()}) + ) + } + ) + ).reduce({"name": "accidentialInputMutation"}), **sub.internals(), ) diff --git a/tests/runtimes/substantial/substantial_test.ts b/tests/runtimes/substantial/substantial_test.ts index f071401015..576da6af16 100644 --- a/tests/runtimes/substantial/substantial_test.ts +++ b/tests/runtimes/substantial/substantial_test.ts @@ -1,6 +1,8 @@ -import { basicTestTemplate } from "./common.ts"; +import { basicTestTemplate, inputMutationTemplate } from "./common.ts"; basicTestTemplate("memory", { secrets: { MY_SECRET: "Hello" }, delays: { awaitSleepCompleteSec: 10 }, }); + +inputMutationTemplate("memory", {}); diff --git a/tests/runtimes/substantial/workflow.ts b/tests/runtimes/substantial/workflow.ts index 59d7a70737..e8d814c70e 100644 --- a/tests/runtimes/substantial/workflow.ts +++ b/tests/runtimes/substantial/workflow.ts @@ -7,7 +7,7 @@ import { } from "./imports/common_types.ts"; export const eventsAndExceptionExample: Workflow = async ( - ctx: Context, + ctx: Context ) => { const { to } = ctx.kwargs; const messageDialog = await ctx.save(() => sendSubscriptionEmail(to)); @@ -31,18 +31,18 @@ export async function saveAndSleepExample(ctx: Context) { const sum = await ctx.save(async () => { const remoteAdd = new Date().getTime(); - const { data } = await ctx.gql /**/`query { remote_add(a: $a, b: $b) }`.run( + const { data } = await ctx.gql/**/ `query { remote_add(a: $a, b: $b) }`.run( { a: newA, b: newB, - }, + } ); const remoteAddEnd = new Date().getTime(); console.log( "Remote add:", (remoteAddEnd - remoteAdd) / 1000, ", Response:", - data, + data ); return (data as any)?.remote_add as number; @@ -70,7 +70,7 @@ export async function retryExample(ctx: Context) { maxBackoffMs: 5000, maxRetries: 4, }, - }, + } ); const timeoutRet = await ctx.save( @@ -89,7 +89,7 @@ export async function retryExample(ctx: Context) { maxBackoffMs: 3000, maxRetries: 5, }, - }, + } ); return [timeoutRet, retryRet].join(", "); @@ -105,3 +105,34 @@ export const secretsExample: Workflow = (_, { secrets }) => { } return Promise.resolve(); }; + +export async function accidentialInputMutation(ctx: Context) { + const { items } = ctx.kwargs; + + const copy = []; + + const mutValue = "MODIFIED"; + + while (items.length >= 1) { + const front = items.shift(); + + if (front.innerField == mutValue) { + // Should throw on shallow clones + throw new Error( + `actual kwargs was mutated after interrupts: copy ${JSON.stringify( + copy + )}, ${mutValue}` + ); + } + + copy.push(await ctx.save(() => front)); + console.log("PUSHED", front); + + front!.innerField = mutValue; + + ctx.sleep(10); // force replay + } + + console.log("FINAL copy", copy); + return { copy, items }; +} diff --git a/tools/tasks/dev.ts b/tools/tasks/dev.ts index 3d4c43296f..d9e2566c59 100644 --- a/tools/tasks/dev.ts +++ b/tools/tasks/dev.ts @@ -82,6 +82,7 @@ export default { vars: { PACKAGED: "false", LOG_LEVEL: "DEBUG", + SUBSTANTIAL_TRACE: "1", DEBUG: "true", REDIS_URL: "redis://:password@localhost:6379/0", TG_SECRET: From f974e1bbf2768f51cd8666780ce43e4bab07de30 Mon Sep 17 00:00:00 2001 From: afmika Date: Wed, 13 Nov 2024 19:56:22 +0300 Subject: [PATCH 2/5] feat: allow multiple log level setting --- .../docs/reference/typegate/index.mdx | 1 - src/typegate/src/config.ts | 1 - src/typegate/src/config/shared.ts | 6 ++- src/typegate/src/config/types.ts | 37 +++++++++++++-- src/typegate/src/log.ts | 46 +++++++++++++------ src/typegate/src/runtimes/substantial.ts | 5 +- .../src/runtimes/substantial/agent.ts | 12 +++-- tools/tasks/dev.ts | 3 +- 8 files changed, 79 insertions(+), 32 deletions(-) diff --git a/docs/metatype.dev/docs/reference/typegate/index.mdx b/docs/metatype.dev/docs/reference/typegate/index.mdx index 01f8d74d4b..4ab6b1f1be 100644 --- a/docs/metatype.dev/docs/reference/typegate/index.mdx +++ b/docs/metatype.dev/docs/reference/typegate/index.mdx @@ -82,4 +82,3 @@ The following environment variables can be used to configure the typegate. `SYNC | SUBSTANTIAL_POLL_INTERVAL_SEC | Rate at which new schedules are read. | 1.0 | 0.6 | | SUBSTANTIAL_LEASE_LIFESPAN_SEC | Lease duration associated to a workflow run | 2.0 | 6 | | SUBSTANTIAL_MAX_ACQUIRE_PER_TICK | Max amount of new acquired replay requests per tick | 3 | 5 | -| SUBSTANTIAL_TRACE | Enable/Disable debug traces for Substantial | `0` | `0`, `1` | diff --git a/src/typegate/src/config.ts b/src/typegate/src/config.ts index e618e79778..4d3fd33c87 100644 --- a/src/typegate/src/config.ts +++ b/src/typegate/src/config.ts @@ -64,7 +64,6 @@ export const defaultTypegateConfigBase = { substantial_poll_interval_sec: 1, substantial_lease_lifespan_sec: 2, substantial_max_acquire_per_tick: 3, - substantial_trace: 0, }; const SYNC_PREFIX = "sync_"; diff --git a/src/typegate/src/config/shared.ts b/src/typegate/src/config/shared.ts index 0865788fb0..ae2322aa60 100644 --- a/src/typegate/src/config/shared.ts +++ b/src/typegate/src/config/shared.ts @@ -3,6 +3,7 @@ import { sharedConfigSchema } from "./types.ts"; import { configOrExit } from "./loader.ts"; +import type { LevelName } from "@std/log/base-handler"; if (!Deno.env.has("VERSION")) { // set version for config and workers, only running in main engine @@ -10,6 +11,9 @@ if (!Deno.env.has("VERSION")) { Deno.env.set("VERSION", get_version()); } +export const MAIN_DEFAULT_LEVEL = "INFO" satisfies LevelName; +export const ADDRESSED_DEFAULT_LEVEL = "ERROR" satisfies LevelName; + export const envSharedWithWorkers = [ "TEST_OVERRIDE_GQL_ORIGIN", ...Object.keys(sharedConfigSchema.shape).map((k) => k.toUpperCase()), @@ -18,7 +22,7 @@ export const envSharedWithWorkers = [ export const sharedConfig = await configOrExit( sharedConfigSchema, { - log_level: "INFO", + log_level: MAIN_DEFAULT_LEVEL, }, [ Object.fromEntries( diff --git a/src/typegate/src/config/types.ts b/src/typegate/src/config/types.ts index 7274330e6e..e43d2dc421 100644 --- a/src/typegate/src/config/types.ts +++ b/src/typegate/src/config/types.ts @@ -5,6 +5,7 @@ import { RefinementCtx, z } from "zod"; import { decodeBase64 } from "@std/encoding/base64"; import type { RedisConnectOptions } from "redis"; import type { S3ClientConfig } from "aws-sdk/client-s3"; +import type { LevelName } from "@std/log"; const zBooleanString = z.preprocess( (a: unknown) => z.coerce.string().parse(a) === "true", @@ -100,8 +101,6 @@ export const typegateConfigBaseSchema = z.object({ substantial_lease_lifespan_sec: z.coerce.number().positive().min(1), /** Maximum amount of new acquired replay requests per tick */ substantial_max_acquire_per_tick: z.coerce.number().positive().min(1), - /** Enable/Disable debug traces for Substantial */ - substantial_trace: z.coerce.number().min(0).max(1), }); export type TypegateConfigBase = z.infer; @@ -129,12 +128,40 @@ export type TypegateConfig = { sync: SyncConfigX | null; }; +export const logLevelItemSchema = z.enum( + ["NOTSET", "DEBUG", "INFO", "WARN", "ERROR", "CRITICAL"] as readonly [ + LevelName, + ...LevelName[], + ], + { + description: "log_level item", + }, +); + +export const logLevelSchema = z.string().transform((value) => { + let defaultLevel: LevelName = "ERROR"; + const loggerConfs: Record = {}; + const confs = value.toUpperCase().split(","); + + for (const confSection of confs) { + const [left, ...right] = confSection.split("="); + if (right.length == 0) { + defaultLevel = logLevelItemSchema.parse(left.trim()); + } else { + loggerConfs[left.toLocaleLowerCase()] = logLevelItemSchema.parse( + right.join("=").trim(), + ); + } + } + + loggerConfs["default"] = defaultLevel; + return loggerConfs; +}); + // Those envs are split from the config as only a subset of them are shared with the workers export const sharedConfigSchema = z.object({ debug: zBooleanString, - log_level: z - .enum(["NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]) - .optional(), + log_level: logLevelSchema.optional(), rust_log: z.string().optional(), version: z.string(), deno_testing: zBooleanString, diff --git a/src/typegate/src/log.ts b/src/typegate/src/log.ts index 487743cccb..e806df7fa4 100644 --- a/src/typegate/src/log.ts +++ b/src/typegate/src/log.ts @@ -3,12 +3,16 @@ import { ConsoleHandler, type LevelName, Logger } from "@std/log"; import { basename, dirname, extname } from "@std/path"; -import { sharedConfig } from "./config/shared.ts"; +import { + ADDRESSED_DEFAULT_LEVEL, + MAIN_DEFAULT_LEVEL, + sharedConfig, +} from "./config/shared.ts"; // set rust log level is not explicit set if (!sharedConfig.rust_log) { const set = (level: string) => Deno.env.set("RUST_LOG", level); - switch (sharedConfig.log_level) { + switch (sharedConfig.log_level?.default) { case "NOTSET": set("off"); break; @@ -17,7 +21,7 @@ if (!sharedConfig.rust_log) { "info,native=trace,sql_schema_connector=warn,tracing=warn,schema_core=warn,quaint=warn", ); break; - case "WARNING": + case "WARN": set("warn"); break; case "ERROR": @@ -31,18 +35,21 @@ if (!sharedConfig.rust_log) { } } -const consoleHandler = new ConsoleHandler(sharedConfig.log_level as LevelName, { - formatter: (log) => { - let msg = log.msg; - for (const arg of log.args) { - msg = msg.replace( - "{}", - typeof arg === "string" ? arg : JSON.stringify(arg), - ); - } - return `${log.datetime.toISOString()} [${log.levelName} ${log.loggerName}] ${msg}`; +const consoleHandler = new ConsoleHandler( + sharedConfig.log_level?.default ?? MAIN_DEFAULT_LEVEL, + { + formatter: (log) => { + let msg = log.msg; + for (const arg of log.args) { + msg = msg.replace( + "{}", + typeof arg === "string" ? arg : JSON.stringify(arg), + ); + } + return `${log.datetime.toISOString()} [${log.levelName} ${log.loggerName}] ${msg}`; + }, }, -}); +); const loggers = new Map(); const defaultLogger = new Logger("default", "NOTSET", { @@ -69,4 +76,15 @@ export function getLogger( return logger; } +export function getLoggerByAddress( + name: ImportMeta | string | null = null, + address: string, +) { + const levelForAddress = sharedConfig?.log_level?.[address]; + + return levelForAddress + ? getLogger(name, levelForAddress) + : getLogger(name, ADDRESSED_DEFAULT_LEVEL); +} + export { Logger }; diff --git a/src/typegate/src/runtimes/substantial.ts b/src/typegate/src/runtimes/substantial.ts index 55d0959bae..35c366b0d2 100644 --- a/src/typegate/src/runtimes/substantial.ts +++ b/src/typegate/src/runtimes/substantial.ts @@ -6,7 +6,7 @@ import { Resolver, RuntimeInitParams } from "../types.ts"; import { ComputeStage } from "../engine/query_engine.ts"; import { TypeGraph, TypeGraphDS, TypeMaterializer } from "../typegraph/mod.ts"; import { registerRuntime } from "./mod.ts"; -import { getLogger, Logger } from "../log.ts"; +import { getLogger, getLoggerByAddress, Logger } from "../log.ts"; import * as ast from "graphql/ast"; import { path } from "compress/deps.ts"; import { Artifact } from "../typegraph/types.ts"; @@ -70,7 +70,7 @@ export class SubstantialRuntime extends Runtime { private secrets: Record, ) { super(typegraphName); - this.logger = getLogger(`substantial:'${typegraphName}'`); + this.logger = getLoggerByAddress(import.meta, "substantial"); this.backend = backend; this.queue = queue; this.agent = agent; @@ -110,7 +110,6 @@ export class SubstantialRuntime extends Runtime { pollIntervalSec: typegate.config.base.substantial_poll_interval_sec, leaseLifespanSec: typegate.config.base.substantial_lease_lifespan_sec, maxAcquirePerTick: typegate.config.base.substantial_max_acquire_per_tick, - debug: typegate.config.base.substantial_trace > 0, } satisfies AgentConfig; const agent = new Agent(backend, queue, agentConfig); diff --git a/src/typegate/src/runtimes/substantial/agent.ts b/src/typegate/src/runtimes/substantial/agent.ts index 634f1acfd8..0482a9856a 100644 --- a/src/typegate/src/runtimes/substantial/agent.ts +++ b/src/typegate/src/runtimes/substantial/agent.ts @@ -5,7 +5,7 @@ import { ReadOrCloseScheduleInput, Run, } from "../../../engine/runtime.js"; -import { getLogger, Logger } from "../../log.ts"; +import { getLoggerByAddress, Logger } from "../../log.ts"; import { TaskContext } from "../deno/shared_types.ts"; import { appendIfOngoing, @@ -31,7 +31,6 @@ export interface AgentConfig { pollIntervalSec: number; leaseLifespanSec: number; maxAcquirePerTick: number; - debug?: boolean; } export class Agent { @@ -45,7 +44,7 @@ export class Agent { private queue: string, private config: AgentConfig, ) { - this.logger = getLogger(import.meta, config.debug ? "DEBUG" : "ERROR"); + this.logger = getLoggerByAddress(import.meta, "substantial"); } async schedule(input: AddScheduleInput) { @@ -461,12 +460,15 @@ export class Agent { } function checkIfRunHasStopped(run: Run) { + const logger = getLoggerByAddress(import.meta, "substantial"); + let life = 0; let hasStopped = false; + for (const op of run.operations) { if (op.event.type == "Start") { if (life >= 1) { - this.logger.error( + logger.error( `bad logs: ${ JSON.stringify( run.operations.map(({ event }) => event.type), @@ -483,7 +485,7 @@ function checkIfRunHasStopped(run: Run) { hasStopped = false; } else if (op.event.type == "Stop") { if (life <= 0) { - this.logger.error( + logger.error( `bad logs: ${ JSON.stringify( run.operations.map(({ event }) => event.type), diff --git a/tools/tasks/dev.ts b/tools/tasks/dev.ts index d9e2566c59..04afea9cc4 100644 --- a/tools/tasks/dev.ts +++ b/tools/tasks/dev.ts @@ -81,8 +81,7 @@ export default { inherit: "_rust", vars: { PACKAGED: "false", - LOG_LEVEL: "DEBUG", - SUBSTANTIAL_TRACE: "1", + LOG_LEVEL: "DEBUG,substantial=ERROR", DEBUG: "true", REDIS_URL: "redis://:password@localhost:6379/0", TG_SECRET: From 042175b07ed5a8a3388e51be4093caf906ab2176 Mon Sep 17 00:00:00 2001 From: afmika Date: Wed, 13 Nov 2024 20:26:21 +0300 Subject: [PATCH 3/5] chore: set substantial=DEBUG on test runner --- src/typegate/src/config/shared.ts | 2 +- tests/runtimes/substantial/common.ts | 2 +- tools/test.ts | 11 +++++------ 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/typegate/src/config/shared.ts b/src/typegate/src/config/shared.ts index ae2322aa60..ae7341fe7b 100644 --- a/src/typegate/src/config/shared.ts +++ b/src/typegate/src/config/shared.ts @@ -3,7 +3,7 @@ import { sharedConfigSchema } from "./types.ts"; import { configOrExit } from "./loader.ts"; -import type { LevelName } from "@std/log/base-handler"; +import type { LevelName } from "@std/log"; if (!Deno.env.has("VERSION")) { // set version for config and workers, only running in main engine diff --git a/tests/runtimes/substantial/common.ts b/tests/runtimes/substantial/common.ts index f56efa6845..7c3ea90ec6 100644 --- a/tests/runtimes/substantial/common.ts +++ b/tests/runtimes/substantial/common.ts @@ -681,7 +681,7 @@ export function inputMutationTemplate( await sleep(15 * 1000); await t.should( - `complete without overwriting original kwargs + save should not depend on external references mutation (${backendName})`, + `complete without overwriting original kwargs or saved value (${backendName})`, async () => { await gql` query { diff --git a/tools/test.ts b/tools/test.ts index 014ca1c32d..26671a18bf 100755 --- a/tools/test.ts +++ b/tools/test.ts @@ -81,9 +81,7 @@ async function listTestFiles(filesArg: string[]): Promise { // run all the tests return ( await Array.fromAsync( - wd - .join("tests") - .expandGlob("**/*_test.ts", { globstar: true }), + wd.join("tests").expandGlob("**/*_test.ts", { globstar: true }), ) ).map((ent) => ent.path.toString()); } @@ -138,7 +136,7 @@ export async function testE2e(args: { RUST_SPANTRACE: "1", // "RUST_BACKTRACE": "short", RUST_MIN_STACK: "8388608", - LOG_LEVEL: "DEBUG", + LOG_LEVEL: "DEBUG,substantial=DEBUG", // "NO_COLOR": "1", DEBUG: "true", PACKAGED: "false", @@ -215,8 +213,9 @@ export async function testE2e(args: { $.logStep(`${prefix} Building xtask and meta-cli...`); await $`cargo build -p meta-cli -F typegate --profile ${buildProfile} && mv target/${profile}/meta target/${profile}/meta-full - && cargo build -p xtask -p meta-cli --profile ${buildProfile}` - .cwd(wd); + && cargo build -p xtask -p meta-cli --profile ${buildProfile}`.cwd( + wd, + ); $.logStep(`Discovered ${queue.length} test files to run`); From 20ee781c5ab259adfd205a054c7877392ddfb9fb Mon Sep 17 00:00:00 2001 From: michael-0acf4 Date: Wed, 13 Nov 2024 22:19:45 +0300 Subject: [PATCH 4/5] fix: published_test.ts Signed-off-by: michael-0acf4 --- tests/e2e/published/published_test.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/e2e/published/published_test.ts b/tests/e2e/published/published_test.ts index e7f6092cb1..f92e9ca96b 100644 --- a/tests/e2e/published/published_test.ts +++ b/tests/e2e/published/published_test.ts @@ -163,6 +163,7 @@ Meta.test( TG_ADMIN_PASSWORD: "password", TMP_DIR: typegateTempDir, TG_PORT: port, + LOG_LEVEL: "DEBUG", // TODO should not be necessary VERSION: previousVersion, ...syncEnvs, @@ -253,6 +254,7 @@ Meta.test( TG_ADMIN_PASSWORD: "password", TMP_DIR: typegateTempDir, TG_PORT: `${port}`, + LOG_LEVEL: "DEBUG", // TODO should not be necessary VERSION: previousVersion, ...syncEnvs, @@ -320,6 +322,7 @@ Meta.test( TG_ADMIN_PASSWORD: "password", TMP_DIR: typegateTempDir.toString(), TG_PORT: `${port}`, + LOG_LEVEL: "DEBUG", // TODO should not be necessary VERSION: previousVersion, DEBUG: "true", From e3a4bd12fca1b6d859a75c43c93d9af448ffe439 Mon Sep 17 00:00:00 2001 From: afmika Date: Thu, 14 Nov 2024 13:22:29 +0300 Subject: [PATCH 5/5] fix: small fixes --- src/typegate/src/config/types.ts | 2 +- src/typegate/src/runtimes/substantial/agent.ts | 4 +--- src/typegate/src/runtimes/substantial/deno_context.ts | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/typegate/src/config/types.ts b/src/typegate/src/config/types.ts index e43d2dc421..5397e05ac9 100644 --- a/src/typegate/src/config/types.ts +++ b/src/typegate/src/config/types.ts @@ -148,7 +148,7 @@ export const logLevelSchema = z.string().transform((value) => { if (right.length == 0) { defaultLevel = logLevelItemSchema.parse(left.trim()); } else { - loggerConfs[left.toLocaleLowerCase()] = logLevelItemSchema.parse( + loggerConfs[left.toLowerCase()] = logLevelItemSchema.parse( right.join("=").trim(), ); } diff --git a/src/typegate/src/runtimes/substantial/agent.ts b/src/typegate/src/runtimes/substantial/agent.ts index 0482a9856a..b98b6de11d 100644 --- a/src/typegate/src/runtimes/substantial/agent.ts +++ b/src/typegate/src/runtimes/substantial/agent.ts @@ -118,8 +118,6 @@ export class Agent { } async #nextIteration() { - this.logger.warn("POLL"); - // Note: in multiple agents/typegate scenario, a single node may acquire all runs for itself within a tick span // To account for that, keep this reasonable const acquireMaxForThisAgent = this.config.maxAcquirePerTick; @@ -170,7 +168,7 @@ export class Agent { lease_seconds: this.config.leaseLifespanSec, }); - this.logger.info(`Active leases: ${activeRunIds.join(", ")}`); + this.logger.debug(`Active leases: ${activeRunIds.join(", ")}`); const next = await Meta.substantial.agentNextRun({ backend: this.backend, diff --git a/src/typegate/src/runtimes/substantial/deno_context.ts b/src/typegate/src/runtimes/substantial/deno_context.ts index 94a0c1d1d4..a9a871db86 100644 --- a/src/typegate/src/runtimes/substantial/deno_context.ts +++ b/src/typegate/src/runtimes/substantial/deno_context.ts @@ -437,7 +437,7 @@ function getKwargsCopy(run: Run): Record { if (first.event.type != "Start") { throw new Error( - `Corrupted run "${run.run_id}": first operation is not a run, got ${first.event.type} instead`, + `Corrupted run "${run.run_id}": first operation is not a Start, got ${first.event.type} instead`, ); }