Skip to content

Commit

Permalink
Added event class MskFirehoseEvent.java for Firehose Lambda transform…
Browse files Browse the repository at this point in the history
…ation when MSK is the source (#490)

* Create MskFirehoseEvent.java
* Create MSKFirehoseResponse.java
  • Loading branch information
ShashankAWS authored Jul 10, 2024
1 parent 9a5450a commit c0b4f60
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 0 deletions.
2 changes: 2 additions & 0 deletions aws-lambda-java-events/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
* `KinesisFirehoseEvent`
* `LambdaDestinationEvent`
* `LexEvent`
* `MSKFirehoseEvent`
* `MSKFirehoseResponse`
* `RabbitMQEvent`
* `S3BatchEvent`
* `S3BatchResponse`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package com.amazonaws.services.lambda.runtime.events;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder(setterPrefix = "with")
@NoArgsConstructor
@AllArgsConstructor

public class MSKFirehoseEvent {

private String invocationId;

private String deliveryStreamArn;

private String sourceMSKArn;

private String region;

private List<Record> records;

@Data
@Builder(setterPrefix = "with")
@NoArgsConstructor
@AllArgsConstructor
public static class Record {

private ByteBuffer kafkaRecordValue;

private String recordId;

private Long approximateArrivalEpoch;

private Long approximateArrivalTimestamp;

private Map<String, String> mskRecordMetadata;

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package com.amazonaws.services.lambda.runtime.events;

import java.nio.ByteBuffer;
import java.util.List;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Response model for Amazon Data Firehose Lambda transformation with MSK as a source.
* [+] Amazon Data Firehose Data Transformation - Data Transformation and Status Model - <a href="https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html#data-transformation-status-model">...</a>
* OK : Indicates that processing of this item succeeded.
* ProcessingFailed : Indicate that the processing of this item failed.
* Dropped : Indicates that this item should be silently dropped
*/

@Data
@Builder(setterPrefix = "with")
@NoArgsConstructor
@AllArgsConstructor

public class MSKFirehoseResponse {

public enum Result {

/**
* Indicates that processing of this item succeeded.
*/
Ok,

/**
* Indicate that the processing of this item failed
*/
ProcessingFailed,

/**
* Indicates that this item should be silently dropped
*/
Dropped
}
public List<Record> records;

@Data
@NoArgsConstructor
@Builder(setterPrefix = "with")
@AllArgsConstructor

public static class Record {
public String recordId;
public Result result;
public ByteBuffer kafkaRecordValue;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ public static LexEvent loadLexEvent(String filename) {
return loadEvent(filename, LexEvent.class);
}

public static MSKFirehoseEvent loadMSKFirehoseEvent(String filename) {
return loadEvent(filename, MSKFirehoseEvent.class);
}

public static S3Event loadS3Event(String filename) {
return loadEvent(filename, S3Event.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,19 @@ public void testLoadKinesisFirehoseEvent() {
assertThat(event.getRecords().get(0).getData().array()).asString().isEqualTo("Hello, this is a test 123.");
}

@Test
public void testLoadMSKFirehoseEvent() {
MSKFirehoseEvent event = EventLoader.loadMSKFirehoseEvent("msk_firehose_event.json");

assertThat(event).isNotNull();
assertThat(event.getSourceMSKArn()).isEqualTo("arn:aws:kafka:EXAMPLE");
assertThat(event.getDeliveryStreamArn()).isEqualTo("arn:aws:firehose:EXAMPLE");
assertThat(event.getRecords()).hasSize(1);
assertThat(event.getRecords().get(0).getKafkaRecordValue().array()).asString().isEqualTo("{\"Name\":\"Hello World\"}");
assertThat(event.getRecords().get(0).getApproximateArrivalTimestamp()).asString().isEqualTo("1716369573887");
assertThat(event.getRecords().get(0).getMskRecordMetadata()).asString().isEqualTo("{offset=0, partitionId=1, approximateArrivalTimestamp=1716369573887}");
}

@Test
public void testLoadS3Event() {
S3Event event = EventLoader.loadS3Event("s3_event.json");
Expand Down
18 changes: 18 additions & 0 deletions aws-lambda-java-tests/src/test/resources/msk_firehose_event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"invocationId": "12345621-4787-0000-a418-36e56Example",
"sourceMSKArn": "arn:aws:kafka:EXAMPLE",
"deliveryStreamArn": "arn:aws:firehose:EXAMPLE",
"region": "us-east-1",
"records": [
{
"recordId": "00000000000000000000000000000000000000000000000000000000000000",
"approximateArrivalTimestamp": 1716369573887,
"mskRecordMetadata": {
"offset": "0",
"partitionId": "1",
"approximateArrivalTimestamp": 1716369573887
},
"kafkaRecordValue": "eyJOYW1lIjoiSGVsbG8gV29ybGQifQ=="
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package example;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.MSKFirehoseResponse;
import com.amazonaws.services.lambda.runtime.events.MSKFirehoseEvent;
import org.json.JSONObject;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

/**
* A sample MSKFirehoseEvent handler
* For more information see the developer guide - <a href="https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html">...</a>
*/
public class MSKFirehoseEventHandler implements RequestHandler<MSKFirehoseEvent, MSKFirehoseResponse> {

@Override
public MSKFirehoseResponse handleRequest(MSKFirehoseEvent MSKFirehoseEvent, Context context) {
List<MSKFirehoseResponse.Record> records = new ArrayList<>();

for (MSKFirehoseEvent.Record record : MSKFirehoseEvent.getRecords()) {
String recordData = new String(record.getKafkaRecordValue().array());
// Your business logic
JSONObject jsonObject = new JSONObject(recordData);
records.add(new MSKFirehoseResponse.Record(record.getRecordId(), MSKFirehoseResponse.Result.Ok, encode(jsonObject.toString())));
}
return new MSKFirehoseResponse(records);
}
private ByteBuffer encode(String content) {
return ByteBuffer.wrap(content.getBytes());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package example;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.tests.annotations.Event;
import com.amazonaws.services.lambda.runtime.events.MSKFirehoseEvent;
import com.amazonaws.services.lambda.runtime.events.MSKFirehoseResponse;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;

import static java.nio.charset.StandardCharsets.UTF_8;

public class MSKFirehoseEventHandlerTest {

private Context context; // intentionally null as it's not used in the test

@ParameterizedTest
@Event(value = "event.json", type = MSKFirehoseEvent.class)
public void testEventHandler(MSKFirehoseEvent event) {
MSKFirehoseEventHandler Sample = new MSKFirehoseEventHandler();
MSKFirehoseResponse response = Sample.handleRequest(event, context);

String expectedString = "{\"Name\":\"Hello World\"}";
MSKFirehoseResponse.Record firstRecord = response.getRecords().get(0);
Assertions.assertEquals(expectedString, UTF_8.decode(firstRecord.getKafkaRecordValue()).toString());
Assertions.assertEquals(MSKFirehoseResponse.Result.Ok, firstRecord.getResult());
}
}
18 changes: 18 additions & 0 deletions samples/msk-firehose-event-handler/src/test/resources/event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"invocationId": "12345621-4787-0000-a418-36e56Example",
"sourceMSKArn": "",
"deliveryStreamArn": "",
"region": "us-east-1",
"records": [
{
"recordId": "00000000000000000000000000000000000000000000000000000000000000",
"approximateArrivalTimestamp": 1716369573887,
"mskRecordMetadata": {
"offset": "0",
"partitionId": "1",
"approximateArrivalTimestamp": 1716369573887
},
"kafkaRecordValue": "eyJOYW1lIjoiSGVsbG8gV29ybGQifQ=="
}
]
}

0 comments on commit c0b4f60

Please sign in to comment.