Skip to content

Commit

Permalink
feat(lambda-event-sources): starting position timestamp for kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
nikovirtala committed Oct 18, 2024
1 parent fccb006 commit ab944c3
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 0 deletions.
27 changes: 27 additions & 0 deletions packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ export interface KafkaEventSourceProps extends BaseStreamEventSourceProps {
* @default - discarded records are ignored
*/
readonly onFailure?: lambda.IEventSourceDlq;

/**
* The time from which to start reading, in Unix time seconds.
*
* @default - no timestamp
*/
readonly startingPositionTimestamp?: number;
}

/**
Expand Down Expand Up @@ -148,6 +155,15 @@ export class ManagedKafkaEventSource extends StreamEventSource {

constructor(props: ManagedKafkaEventSourceProps) {
super(props);

if (props.startingPosition === lambda.StartingPosition.AT_TIMESTAMP && !props.startingPositionTimestamp) {
throw new Error('startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP');
}

if (props.startingPosition !== lambda.StartingPosition.AT_TIMESTAMP && props.startingPositionTimestamp) {
throw new Error('startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP');
}

this.innerProps = props;
}

Expand All @@ -159,6 +175,7 @@ export class ManagedKafkaEventSource extends StreamEventSource {
filters: this.innerProps.filters,
filterEncryption: this.innerProps.filterEncryption,
startingPosition: this.innerProps.startingPosition,
startingPositionTimestamp: this.innerProps.startingPositionTimestamp,
sourceAccessConfigurations: this.sourceAccessConfigurations(),
kafkaTopic: this.innerProps.topic,
kafkaConsumerGroupId: this.innerProps.consumerGroupId,
Expand Down Expand Up @@ -239,6 +256,15 @@ export class SelfManagedKafkaEventSource extends StreamEventSource {
} else if (!props.secret) {
throw new Error('secret must be set if Kafka brokers accessed over Internet');
}

if (props.startingPosition === lambda.StartingPosition.AT_TIMESTAMP && !props.startingPositionTimestamp) {
throw new Error('startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP');
}

if (props.startingPosition !== lambda.StartingPosition.AT_TIMESTAMP && props.startingPositionTimestamp) {
throw new Error('startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP');
}

this.innerProps = props;
}

Expand All @@ -253,6 +279,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource {
kafkaTopic: this.innerProps.topic,
kafkaConsumerGroupId: this.innerProps.consumerGroupId,
startingPosition: this.innerProps.startingPosition,
startingPositionTimestamp: this.innerProps.startingPositionTimestamp,
sourceAccessConfigurations: this.sourceAccessConfigurations(),
onFailure: this.innerProps.onFailure,
supportS3OnFailureDestination: true,
Expand Down
93 changes: 93 additions & 0 deletions packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,48 @@ describe('KafkaEventSource', () => {
});
});

test('AT_TIMESTAMP starting position', () => {
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const clusterArn = 'some-arn';
const kafkaTopic = 'some-topic';

fn.addEventSource(new sources.ManagedKafkaEventSource({
clusterArn,
topic: kafkaTopic,
startingPosition: lambda.StartingPosition.AT_TIMESTAMP,
startingPositionTimestamp: 1640995200,
}),
);

Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', {
StartingPosition: 'AT_TIMESTAMP',
StartingPositionTimestamp: 1640995200,
});
});

test('startingPositionTimestamp missing throws error', () => {
const clusterArn = 'some-arn';
const kafkaTopic = 'some-topic';

expect(() => new sources.ManagedKafkaEventSource({
clusterArn,
topic: kafkaTopic,
startingPosition: lambda.StartingPosition.AT_TIMESTAMP,
})).toThrow(/startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP/);
});

test('startingPositionTimestamp without AT_TIMESTAMP throws error', () => {
const clusterArn = 'some-arn';
const kafkaTopic = 'some-topic';

expect(() => new sources.ManagedKafkaEventSource({
clusterArn,
topic: kafkaTopic,
startingPosition: lambda.StartingPosition.LATEST,
startingPositionTimestamp: 1640995200,
})).toThrow(/startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP/);
});
});

describe('self-managed kafka', () => {
Expand Down Expand Up @@ -998,5 +1040,56 @@ describe('KafkaEventSource', () => {
expect(mskEventMapping.eventSourceMappingId).toBeDefined();
expect(mskEventMapping.eventSourceMappingArn).toBeDefined();
});

test('AT_TIMESTAMP starting position', () => {
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const bootstrapServers = ['kafka-broker:9092'];
const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' });
const kafkaTopic = 'some-topic';

fn.addEventSource(new sources.SelfManagedKafkaEventSource({
bootstrapServers,
secret: secret,
topic: kafkaTopic,
startingPosition: lambda.StartingPosition.AT_TIMESTAMP,
startingPositionTimestamp: 1640995200,
}),
);

Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', {
StartingPosition: 'AT_TIMESTAMP',
StartingPositionTimestamp: 1640995200,
});
});

test('startingPositionTimestamp missing throws error', () => {
const stack = new cdk.Stack();
const bootstrapServers = ['kafka-broker:9092'];
const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' });
const kafkaTopic = 'some-topic';

expect(() => new sources.SelfManagedKafkaEventSource({
bootstrapServers,
secret: secret,
topic: kafkaTopic,
startingPosition: lambda.StartingPosition.AT_TIMESTAMP,
})).toThrow(/startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP/);
});

test('startingPositionTimestamp without AT_TIMESTAMP throws error', () => {
const stack = new cdk.Stack();
const bootstrapServers = ['kafka-broker:9092'];
const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' });
const kafkaTopic = 'some-topic';

expect(() => new sources.SelfManagedKafkaEventSource({
bootstrapServers,
secret: secret,
topic: kafkaTopic,
startingPosition: lambda.StartingPosition.LATEST,
startingPositionTimestamp: 1640995200,
})).toThrow(/startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP/);
});
});
});

0 comments on commit ab944c3

Please sign in to comment.