Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Lambda Synchronous processor support #4700

Merged
merged 7 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion data-prepper-plugins/aws-lambda/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,55 @@

# Lambda Processor

This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing.

## Usage
```aidl
lambda-pipeline:
...
processor:
- aws_lambda:
aws:
region: "us-east-1"
sts_role_arn: "<arn>"
function_name: "uploadToS3Lambda"
max_retries: 3
invocation_type: "RequestResponse"
payload_model: "batch_event"
batch:
key_name: "osi_key"
threshold:
event_count: 10
event_collect_timeout: 15s
maximum_size: 3mb
```

`invocation_type` as request-response is used when the response from aws lambda comes back to dataprepper.

In batch options, an implicit batch threshold option is that if events size is 3mb, we flush it.
`payload_model` this is used to define how the payload should be constructed from a dataprepper event by converting it to corresponding json.
`payload_model` as batch_event is used when the output needs to be formed as a batch of multiple events, and a key(key_name) will be associated with the set of events.
`payload_model` as single_event is used when the output each event is sent to lambda.
if batch option is not mentioned along with payload_model: batch_event , then batch will assume default options as follows.
default batch options:
batch_key: "events"
threshold:
event_count: 10
maximum_size: 3mb
event_collect_timeout: 15s


## Developer Guide

The integration tests for this plugin do not run as part of the Data Prepper build.
The following command runs the integration tests:

```
./gradlew :data-prepper-plugins:aws-lambda:integrationTest -Dtests.processor.lambda.region="us-east-1" -Dtests.processor.lambda.functionName="lambda_test_function" -Dtests.processor.lambda.sts_role_arn="arn:aws:iam::123456789012:role/dataprepper-role

```


# Lambda Sink

This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing.
Expand All @@ -15,7 +66,7 @@ lambda-pipeline:
function_name: "uploadToS3Lambda"
max_retries: 3
batch:
batch_key: "osi_key"
key_name: "osi_key"
threshold:
event_count: 3
maximum_size: 6mb
Expand Down
5 changes: 5 additions & 0 deletions data-prepper-plugins/aws-lambda/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies {
implementation project(path: ':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(':data-prepper-plugins:failures-common')
implementation project(':data-prepper-plugins:parse-json-processor')
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
Expand Down Expand Up @@ -65,6 +66,10 @@ task integrationTest(type: Test) {
systemProperty 'tests.lambda.sink.functionName', System.getProperty('tests.lambda.sink.functionName')
systemProperty 'tests.lambda.sink.sts_role_arn', System.getProperty('tests.lambda.sink.sts_role_arn')

systemProperty 'tests.lambda.processor.region', System.getProperty('tests.lambda.processor.region')
systemProperty 'tests.lambda.processor.functionName', System.getProperty('tests.lambda.processor.functionName')
systemProperty 'tests.lambda.processor.sts_role_arn', System.getProperty('tests.lambda.processor.sts_role_arn')

filter {
includeTestsMatching '*IT'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package org.opensearch.dataprepper.plugins.lambda.processor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import io.micrometer.core.instrument.Counter;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import static org.mockito.Mockito.when;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.lambda.LambdaClient;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;

@ExtendWith(MockitoExtension.class)

public class LambdaProcessorServiceIT {

private LambdaClient lambdaClient;
private String functionName;
private String lambdaRegion;
private String role;
private BufferFactory bufferFactory;
@Mock
private LambdaProcessorConfig lambdaProcessorConfig;
@Mock
private BatchOptions batchOptions;
@Mock
private ThresholdOptions thresholdOptions;
@Mock
private AwsAuthenticationOptions awsAuthenticationOptions;
@Mock
private AwsCredentialsSupplier awsCredentialsSupplier;
@Mock
private PluginMetrics pluginMetrics;
@Mock
private PluginFactory pluginFactory;
@Mock
private PluginSetting pluginSetting;
@Mock
private Counter numberOfRecordsSuccessCounter;
@Mock
private Counter numberOfRecordsFailedCounter;
@Mock
private ExpressionEvaluator expressionEvaluator;

private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));


@BeforeEach
public void setUp() throws Exception {
MockitoAnnotations.openMocks(this);
lambdaRegion = System.getProperty("tests.lambda.processor.region");
functionName = System.getProperty("tests.lambda.processor.functionName");
role = System.getProperty("tests.lambda.processor.sts_role_arn");

final Region region = Region.of(lambdaRegion);

lambdaClient = LambdaClient.builder()
.region(Region.of(lambdaRegion))
.build();

bufferFactory = new InMemoryBufferFactory();

when(pluginMetrics.counter(LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS)).
thenReturn(numberOfRecordsSuccessCounter);
when(pluginMetrics.counter(LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)).
thenReturn(numberOfRecordsFailedCounter);
}


private static Record<Event> createRecord() {
final JacksonEvent event = JacksonLog.builder().withData("[{\"name\":\"test\"}]").build();
return new Record<>(event);
}

public LambdaProcessor createObjectUnderTest(final String config) throws JsonProcessingException {

final LambdaProcessorConfig lambdaProcessorConfig = objectMapper.readValue(config, LambdaProcessorConfig.class);
return new LambdaProcessor(pluginMetrics,lambdaProcessorConfig,awsCredentialsSupplier,expressionEvaluator);
}

public LambdaProcessor createObjectUnderTest(LambdaProcessorConfig lambdaSinkConfig) throws JsonProcessingException {
return new LambdaProcessor(pluginMetrics,lambdaSinkConfig,awsCredentialsSupplier,expressionEvaluator);
}


private static Collection<Record<Event>> generateRecords(int numberOfRecords) {
List<Record<Event>> recordList = new ArrayList<>();

for (int rows = 1; rows <= numberOfRecords; rows++) {
HashMap<String, String> eventData = new HashMap<>();
eventData.put("name", "Person" + rows);
eventData.put("age", Integer.toString(rows));

Record<Event> eventRecord = new Record<>(JacksonEvent.builder().withData(eventData).withEventType("event").build());
recordList.add(eventRecord);
}
return recordList;
}

@ParameterizedTest
@ValueSource(ints = {1,3})
void verify_records_to_lambda_success(final int recordCount) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider adding test for InvocationType Event

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invocation type event will be disabled for now, will be releasing event type with asynchronous support that requires additional infra changes. I have disabled it in the verification for now, ll fix the readme.


when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName);
when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3);
when(lambdaProcessorConfig.getInvocationType()).thenReturn("RequestResponse");

LambdaProcessor objectUnderTest = createObjectUnderTest(lambdaProcessorConfig);

Collection<Record<Event>> recordsData = generateRecords(recordCount);
List<Record<Event>> recordsResult = (List<Record<Event>>) objectUnderTest.doExecute(recordsData);
Thread.sleep(Duration.ofSeconds(10).toMillis());

assertEquals(recordsResult.size(),recordCount);
}

@ParameterizedTest
@ValueSource(ints = {1,3})
void verify_records_with_batching_to_lambda(final int recordCount) throws JsonProcessingException, InterruptedException {

when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName);
when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3);
when(lambdaProcessorConfig.getInvocationType()).thenReturn("RequestResponse");
when(thresholdOptions.getEventCount()).thenReturn(1);
when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse("2mb"));
when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.parse("PT10s"));
when(batchOptions.getKeyName()).thenReturn("lambda_batch_key");
when(batchOptions.getThresholdOptions()).thenReturn(thresholdOptions);
when(lambdaProcessorConfig.getBatchOptions()).thenReturn(batchOptions);

LambdaProcessor objectUnderTest = createObjectUnderTest(lambdaProcessorConfig);
Collection<Record<Event>> records = generateRecords(recordCount);
Collection<Record<Event>> recordsResult = objectUnderTest.doExecute(records);
Thread.sleep(Duration.ofSeconds(10).toMillis());
assertEquals(recordsResult.size(),recordCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import org.mockito.Mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
Expand All @@ -35,6 +39,9 @@
import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions;
import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.LAMBDA_LATENCY_METRIC;
import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.REQUEST_PAYLOAD_SIZE;
import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.RESPONSE_PAYLOAD_SIZE;
import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.lambda.LambdaClient;
Expand All @@ -45,6 +52,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

@ExtendWith(MockitoExtension.class)
class LambdaSinkServiceIT {
Expand Down Expand Up @@ -76,6 +84,14 @@ class LambdaSinkServiceIT {
private Counter numberOfRecordsSuccessCounter;
@Mock
private Counter numberOfRecordsFailedCounter;
@Mock
private ExpressionEvaluator expressionEvaluator;
@Mock
private Timer lambdaLatencyMetric;
@Mock
private AtomicLong requestPayload;
@Mock
private AtomicLong responsePayload;
private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));


Expand All @@ -98,6 +114,9 @@ public void setUp() throws Exception {
thenReturn(numberOfRecordsSuccessCounter);
when(pluginMetrics.counter(LambdaSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)).
thenReturn(numberOfRecordsFailedCounter);
when(pluginMetrics.timer(LAMBDA_LATENCY_METRIC)).thenReturn(lambdaLatencyMetric);
when(pluginMetrics.gauge(eq(REQUEST_PAYLOAD_SIZE), any(AtomicLong.class))).thenReturn(requestPayload);
when(pluginMetrics.gauge(eq(RESPONSE_PAYLOAD_SIZE), any(AtomicLong.class))).thenReturn(responsePayload);
}


Expand All @@ -119,7 +138,8 @@ public LambdaSinkService createObjectUnderTest(final String config) throws JsonP
codecContext,
awsCredentialsSupplier,
dlqPushHandler,
bufferFactory);
bufferFactory,
expressionEvaluator);
}

public LambdaSinkService createObjectUnderTest(LambdaSinkConfig lambdaSinkConfig) throws JsonProcessingException {
Expand All @@ -134,7 +154,8 @@ public LambdaSinkService createObjectUnderTest(LambdaSinkConfig lambdaSinkConfig
codecContext,
awsCredentialsSupplier,
dlqPushHandler,
bufferFactory);
bufferFactory,
expressionEvaluator);
}


Expand Down Expand Up @@ -203,7 +224,7 @@ void verify_flushed_records_with_batching_to_lambda(final int recordCount) throw
when(thresholdOptions.getEventCount()).thenReturn(event_count);
when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse("2mb"));
when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.parse("PT10s"));
when(batchOptions.getBatchKey()).thenReturn("lambda_batch_key");
when(batchOptions.getKeyName()).thenReturn("lambda_batch_key");
when(batchOptions.getThresholdOptions()).thenReturn(thresholdOptions);
when(lambdaSinkConfig.getBatchOptions()).thenReturn(batchOptions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public interface Buffer {

Duration getDuration();

void flushToLambdaAsync();
InvokeResponse flushToLambdaAsync();

InvokeResponse flushToLambdaSync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public Duration getDuration() {


@Override
public void flushToLambdaAsync() {
public InvokeResponse flushToLambdaAsync() {
InvokeResponse resp;
SdkBytes payload = getPayload();
payloadRequestAsyncSize = payload.asByteArray().length;
Expand All @@ -91,6 +91,7 @@ public void flushToLambdaAsync() {
resp = lambdaClient.invoke(request);
lambdaAsyncLatencyWatch.stop();
payloadResponseAsyncSize = resp.payload().asByteArray().length;
return resp;
}

@Override
Expand Down
Loading
Loading