Skip to content

Commit

Permalink
feat(lambda-event-sources): starting position timestamp for kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
nikovirtala committed Oct 18, 2024
1 parent fccb006 commit 81e290b
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 2 deletions.
11 changes: 9 additions & 2 deletions packages/aws-cdk-lib/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ behavior:
* __onFailure__: In the event a record fails and consumes all retries, the record will be sent to SQS queue or SNS topic that is specified here
* __parallelizationFactor__: The number of batches to concurrently process on each shard.
* __retryAttempts__: The maximum number of times a record should be retried in the event of failure.
* __startingPosition__: Will determine where to begin consumption. 'LATEST' will start at the most recent record and ignore all records that arrived prior to attaching the event source, 'TRIM_HORIZON' will start at the oldest record and ensure you process all available data, while 'AT_TIMESTAMP' will start reading records from a specified time stamp. Note that 'AT_TIMESTAMP' is only supported for Amazon Kinesis streams.
* __startingPosition__: Will determine where to begin consumption. 'LATEST' will start at the most recent record and ignore all records that arrived prior to attaching the event source, 'TRIM_HORIZON' will start at the oldest record and ensure you process all available data, while 'AT_TIMESTAMP' will start reading records from a specified time stamp.
* __startingPositionTimestamp__: The time stamp from which to start reading. Used in conjunction with __startingPosition__ when set to 'AT_TIMESTAMP'.
* __tumblingWindow__: The duration in seconds of a processing window when using streams.
* __enabled__: If the DynamoDB Streams event source mapping should be enabled. The default is true.
Expand All @@ -252,7 +252,14 @@ myFunction.addEventSource(new KinesisEventSource(stream, {

## Kafka

You can write Lambda functions to process data either from [Amazon MSK](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) or a [self managed Kafka](https://docs.aws.amazon.com/lambda/latest/dg/kafka-smaa.html) cluster.
You can write Lambda functions to process data either from [Amazon MSK](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) or a [self-managed Kafka](https://docs.aws.amazon.com/lambda/latest/dg/kafka-smaa.html) cluster. The following parameters will impact to the polling behavior:

* __startingPosition__: Will determine where to begin consumption. 'LATEST' will start at the most recent record and ignore all records that arrived prior to attaching the event source, 'TRIM_HORIZON' will start at the oldest record and ensure you process all available data, while 'AT_TIMESTAMP' will start reading records from a specified time stamp.
* __startingPositionTimestamp__: The time stamp from which to start reading. Used in conjunction with __startingPosition__ when set to 'AT_TIMESTAMP'.
* __batchSize__: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low).
* __maxBatchingWindow__: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of possibly delaying processing.
* __onFailure__: In the event a record fails and consumes all retries, the record will be sent to SQS queue or SNS topic that is specified here
* __enabled__: If the Kafka event source mapping should be enabled. The default is true.

The following code sets up Amazon MSK as an event source for a lambda function. Credentials will need to be configured to access the
MSK cluster, as described in [Username/Password authentication](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html).
Expand Down
27 changes: 27 additions & 0 deletions packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ export interface KafkaEventSourceProps extends BaseStreamEventSourceProps {
* @default - discarded records are ignored
*/
readonly onFailure?: lambda.IEventSourceDlq;

/**
* The time from which to start reading, in Unix time seconds.
*
* @default - no timestamp
*/
readonly startingPositionTimestamp?: number;
}

/**
Expand Down Expand Up @@ -148,6 +155,15 @@ export class ManagedKafkaEventSource extends StreamEventSource {

constructor(props: ManagedKafkaEventSourceProps) {
super(props);

if (props.startingPosition === lambda.StartingPosition.AT_TIMESTAMP && !props.startingPositionTimestamp) {
throw new Error('startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP');
}

if (props.startingPosition !== lambda.StartingPosition.AT_TIMESTAMP && props.startingPositionTimestamp) {
throw new Error('startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP');
}

this.innerProps = props;
}

Expand All @@ -159,6 +175,7 @@ export class ManagedKafkaEventSource extends StreamEventSource {
filters: this.innerProps.filters,
filterEncryption: this.innerProps.filterEncryption,
startingPosition: this.innerProps.startingPosition,
startingPositionTimestamp: this.innerProps.startingPositionTimestamp,
sourceAccessConfigurations: this.sourceAccessConfigurations(),
kafkaTopic: this.innerProps.topic,
kafkaConsumerGroupId: this.innerProps.consumerGroupId,
Expand Down Expand Up @@ -239,6 +256,15 @@ export class SelfManagedKafkaEventSource extends StreamEventSource {
} else if (!props.secret) {
throw new Error('secret must be set if Kafka brokers accessed over Internet');
}

if (props.startingPosition === lambda.StartingPosition.AT_TIMESTAMP && !props.startingPositionTimestamp) {
throw new Error('startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP');
}

if (props.startingPosition !== lambda.StartingPosition.AT_TIMESTAMP && props.startingPositionTimestamp) {
throw new Error('startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP');
}

this.innerProps = props;
}

Expand All @@ -253,6 +279,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource {
kafkaTopic: this.innerProps.topic,
kafkaConsumerGroupId: this.innerProps.consumerGroupId,
startingPosition: this.innerProps.startingPosition,
startingPositionTimestamp: this.innerProps.startingPositionTimestamp,
sourceAccessConfigurations: this.sourceAccessConfigurations(),
onFailure: this.innerProps.onFailure,
supportS3OnFailureDestination: true,
Expand Down
93 changes: 93 additions & 0 deletions packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,48 @@ describe('KafkaEventSource', () => {
});
});

test('AT_TIMESTAMP starting position', () => {
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const clusterArn = 'some-arn';
const kafkaTopic = 'some-topic';

fn.addEventSource(new sources.ManagedKafkaEventSource({
clusterArn,
topic: kafkaTopic,
startingPosition: lambda.StartingPosition.AT_TIMESTAMP,
startingPositionTimestamp: 1640995200,
}),
);

Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', {
StartingPosition: 'AT_TIMESTAMP',
StartingPositionTimestamp: 1640995200,
});
});

test('startingPositionTimestamp missing throws error', () => {
const clusterArn = 'some-arn';
const kafkaTopic = 'some-topic';

expect(() => new sources.ManagedKafkaEventSource({
clusterArn,
topic: kafkaTopic,
startingPosition: lambda.StartingPosition.AT_TIMESTAMP,
})).toThrow(/startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP/);
});

test('startingPositionTimestamp without AT_TIMESTAMP throws error', () => {
const clusterArn = 'some-arn';
const kafkaTopic = 'some-topic';

expect(() => new sources.ManagedKafkaEventSource({
clusterArn,
topic: kafkaTopic,
startingPosition: lambda.StartingPosition.LATEST,
startingPositionTimestamp: 1640995200,
})).toThrow(/startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP/);
});
});

describe('self-managed kafka', () => {
Expand Down Expand Up @@ -998,5 +1040,56 @@ describe('KafkaEventSource', () => {
expect(mskEventMapping.eventSourceMappingId).toBeDefined();
expect(mskEventMapping.eventSourceMappingArn).toBeDefined();
});

test('AT_TIMESTAMP starting position', () => {
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const bootstrapServers = ['kafka-broker:9092'];
const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' });
const kafkaTopic = 'some-topic';

fn.addEventSource(new sources.SelfManagedKafkaEventSource({
bootstrapServers,
secret: secret,
topic: kafkaTopic,
startingPosition: lambda.StartingPosition.AT_TIMESTAMP,
startingPositionTimestamp: 1640995200,
}),
);

Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', {
StartingPosition: 'AT_TIMESTAMP',
StartingPositionTimestamp: 1640995200,
});
});

test('startingPositionTimestamp missing throws error', () => {
const stack = new cdk.Stack();
const bootstrapServers = ['kafka-broker:9092'];
const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' });
const kafkaTopic = 'some-topic';

expect(() => new sources.SelfManagedKafkaEventSource({
bootstrapServers,
secret: secret,
topic: kafkaTopic,
startingPosition: lambda.StartingPosition.AT_TIMESTAMP,
})).toThrow(/startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP/);
});

test('startingPositionTimestamp without AT_TIMESTAMP throws error', () => {
const stack = new cdk.Stack();
const bootstrapServers = ['kafka-broker:9092'];
const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' });
const kafkaTopic = 'some-topic';

expect(() => new sources.SelfManagedKafkaEventSource({
bootstrapServers,
secret: secret,
topic: kafkaTopic,
startingPosition: lambda.StartingPosition.LATEST,
startingPositionTimestamp: 1640995200,
})).toThrow(/startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP/);
});
});
});

0 comments on commit 81e290b

Please sign in to comment.