Skip to content

Commit

Permalink
fix: responseSteam crash
Browse files Browse the repository at this point in the history
  • Loading branch information
Eyal Golan authored and Eyal Golan committed Aug 29, 2024
1 parent 45f80b1 commit 854a4dd
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 51 deletions.
17 changes: 17 additions & 0 deletions src/tracer/tracer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
TRANSACTION_ID_KEY,
} from '../utils';
import * as tracer from './tracer';
import { HANDLER_STREAMING, STREAM_RESPONSE } from './tracer';
jest.mock('../hooks/http');

const TOKEN = 't_10faa5e13e7844aaa1234';
Expand Down Expand Up @@ -816,8 +817,24 @@ describe('tracer', () => {
expect(Http.addStepFunctionEvent).not.toBeCalled();
});

test('responseStreamFunctionLogic - tracer disabled and decorator marked as responseStream', async () => {
const handler = jest.fn(async () => {});
handler[HANDLER_STREAMING] = STREAM_RESPONSE;

const { event, context } = new HandlerInputsBuilder().build();

const decoratedUserHandler = tracer.trace({})(handler);
await decoratedUserHandler(event, context);

expect(decoratedUserHandler[HANDLER_STREAMING]).toEqual(STREAM_RESPONSE);
expect(spies.warnClient).toHaveBeenCalledWith(
'Tracer is disabled, running on a response stream function'
);
});

test('performStepFunctionLogic - Happy flow', async () => {
const handler = jest.fn(async () => ({ hello: 'world' }));

spies.getRandomId.mockReturnValueOnce('123');

const { event, context } = new HandlerInputsBuilder().build();
Expand Down
120 changes: 69 additions & 51 deletions src/tracer/tracer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import { TraceOptions } from './trace-options.type';
import { GenericSpan } from '../types/spans/basicSpan';

export const HANDLER_CALLBACKED = 'handler_callbacked';
export const HANDLER_STREAMING = Symbol.for('aws.lambda.runtime.handler.streaming');
export const STREAM_RESPONSE = 'response';
export const ASYNC_HANDLER_RESOLVED = 'async_handler_resolved';
export const ASYNC_HANDLER_REJECTED = 'async_handler_rejected';
export const NON_ASYNC_HANDLER_ERRORED = 'non_async_errored';
Expand All @@ -49,70 +51,86 @@ export const LEAK_MESSAGE =

export const trace =
({ token, debug, edgeHost, switchOff, stepFunction }: TraceOptions) =>
(userHandler: Handler) =>
async <Event = any>(event: Event, context?: Context, callback?: Callback): Promise<Handler> => {
if (!!switchOff || isSwitchedOff()) {
info(
`The '${SWITCH_OFF_FLAG}' environment variable is set to 'true': this invocation will not be traced by Lumigo`
);
return userHandler(event, context, callback);
}
(userHandler: Handler) => {
const isResponseStreamFunction =
userHandler[HANDLER_STREAMING] !== undefined &&
userHandler[HANDLER_STREAMING] === STREAM_RESPONSE;
const decoratedUserHandler = async <Event = any>(
event: Event,
context?: Context,
callback?: Callback
): Promise<Handler> => {
if (!!switchOff || isSwitchedOff()) {
info(
`The '${SWITCH_OFF_FLAG}' environment variable is set to 'true': this invocation will not be traced by Lumigo`
);
return userHandler(event, context, callback);
}

if (!isAwsEnvironment()) {
warnClient('Tracer is disabled, running on non-aws environment');
return userHandler(event, context, callback);
}
if (!isAwsEnvironment()) {
warnClient('Tracer is disabled, running on non-aws environment');
return userHandler(event, context, callback);
}
if (isResponseStreamFunction) {
warnClient('Tracer is disabled, running on a response stream function');
return userHandler(event, context, callback);
}
try {
TracerGlobals.setHandlerInputs({ event, context });
TracerGlobals.setTracerInputs({
token,
debug,
edgeHost,
switchOff,
stepFunction,
lambdaTimeout: context.getRemainingTimeInMillis(),
});
ExecutionTags.autoTagEvent(event);
} catch (err) {
logger.warn('Failed to start tracer', err);
}

try {
TracerGlobals.setHandlerInputs({ event, context });
TracerGlobals.setTracerInputs({
token,
debug,
edgeHost,
switchOff,
stepFunction,
lambdaTimeout: context.getRemainingTimeInMillis(),
});
ExecutionTags.autoTagEvent(event);
} catch (err) {
logger.warn('Failed to start tracer', err);
}
if (!context || !isAwsContext(context)) {
logger.warnClient(
'missing context parameter - learn more at https://docs.lumigo.io/docs/nodejs'
);
const { err, data, type } = await promisifyUserHandler(userHandler, event, context);
return performPromisifyType(err, data, type, callback);
}

if (!context || !isAwsContext(context)) {
logger.warnClient(
'missing context parameter - learn more at https://docs.lumigo.io/docs/nodejs'
);
const { err, data, type } = await promisifyUserHandler(userHandler, event, context);
return performPromisifyType(err, data, type, callback);
}
if (context.__wrappedByLumigo) {
const { err, data, type } = await promisifyUserHandler(userHandler, event, context);
return performPromisifyType(err, data, type, callback);
}
context.__wrappedByLumigo = true;

if (context.__wrappedByLumigo) {
const { err, data, type } = await promisifyUserHandler(userHandler, event, context);
return performPromisifyType(err, data, type, callback);
}
context.__wrappedByLumigo = true;
const functionSpan = getFunctionSpan(event, context);

const functionSpan = getFunctionSpan(event, context);
await hookUnhandledRejection(functionSpan);

await hookUnhandledRejection(functionSpan);
const pStartTrace = startTrace(functionSpan);
const pUserHandler = promisifyUserHandler(userHandler, event, context);

const pStartTrace = startTrace(functionSpan);
const pUserHandler = promisifyUserHandler(userHandler, event, context);
let [, handlerReturnValue] = await Promise.all([pStartTrace, pUserHandler]);

let [, handlerReturnValue] = await Promise.all([pStartTrace, pUserHandler]);
handlerReturnValue = normalizeLambdaError(handlerReturnValue);

handlerReturnValue = normalizeLambdaError(handlerReturnValue);
if (isStepFunction()) {
handlerReturnValue = performStepFunctionLogic(handlerReturnValue);
}

if (isStepFunction()) {
handlerReturnValue = performStepFunctionLogic(handlerReturnValue);
}
const cleanedHandlerReturnValue = removeLumigoFromStacktrace(handlerReturnValue);

const cleanedHandlerReturnValue = removeLumigoFromStacktrace(handlerReturnValue);
await endTrace(functionSpan, cleanedHandlerReturnValue);
const { err, data, type } = cleanedHandlerReturnValue;

await endTrace(functionSpan, cleanedHandlerReturnValue);
const { err, data, type } = cleanedHandlerReturnValue;
return performPromisifyType(err, data, type, callback);
};

return performPromisifyType(err, data, type, callback);
if (isResponseStreamFunction) {
(decoratedUserHandler as any)[HANDLER_STREAMING] = STREAM_RESPONSE;
}
return decoratedUserHandler;
};

export const startTrace = async (functionSpan: GenericSpan) => {
Expand Down

0 comments on commit 854a4dd

Please sign in to comment.