Skip to content

Commit

Permalink
Add Lambda Synchronous processor support (#4700)
Browse files Browse the repository at this point in the history
Add Lambda Processor Synchronous Mode support
Make LambdaClientFactory common to sink and processor

Signed-off-by: Srikanth Govindarajan <[email protected]>
  • Loading branch information
srikanthjg authored Sep 24, 2024
1 parent a1c11e5 commit a236804
Show file tree
Hide file tree
Showing 25 changed files with 1,267 additions and 280 deletions.
53 changes: 52 additions & 1 deletion data-prepper-plugins/aws-lambda/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,55 @@

# Lambda Processor

This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing.

## Usage
```aidl
lambda-pipeline:
...
processor:
- aws_lambda:
aws:
region: "us-east-1"
sts_role_arn: "<arn>"
function_name: "uploadToS3Lambda"
max_retries: 3
invocation_type: "RequestResponse"
payload_model: "batch_event"
batch:
key_name: "osi_key"
threshold:
event_count: 10
event_collect_timeout: 15s
maximum_size: 3mb
```

`invocation_type` as request-response is used when the response from aws lambda comes back to dataprepper.

In batch options, an implicit batch threshold option is that if events size is 3mb, we flush it.
`payload_model` this is used to define how the payload should be constructed from a dataprepper event by converting it to corresponding json.
`payload_model` as batch_event is used when the output needs to be formed as a batch of multiple events, and a key(key_name) will be associated with the set of events.
`payload_model` as single_event is used when the output each event is sent to lambda.
if batch option is not mentioned along with payload_model: batch_event , then batch will assume default options as follows.
default batch options:
batch_key: "events"
threshold:
event_count: 10
maximum_size: 3mb
event_collect_timeout: 15s


## Developer Guide

The integration tests for this plugin do not run as part of the Data Prepper build.
The following command runs the integration tests:

```
./gradlew :data-prepper-plugins:aws-lambda:integrationTest -Dtests.processor.lambda.region="us-east-1" -Dtests.processor.lambda.functionName="lambda_test_function" -Dtests.processor.lambda.sts_role_arn="arn:aws:iam::123456789012:role/dataprepper-role
```


# Lambda Sink

This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing.
Expand All @@ -15,7 +66,7 @@ lambda-pipeline:
function_name: "uploadToS3Lambda"
max_retries: 3
batch:
batch_key: "osi_key"
key_name: "osi_key"
threshold:
event_count: 3
maximum_size: 6mb
Expand Down
5 changes: 5 additions & 0 deletions data-prepper-plugins/aws-lambda/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies {
implementation project(path: ':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(':data-prepper-plugins:failures-common')
implementation project(':data-prepper-plugins:parse-json-processor')
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
Expand Down Expand Up @@ -65,6 +66,10 @@ task integrationTest(type: Test) {
systemProperty 'tests.lambda.sink.functionName', System.getProperty('tests.lambda.sink.functionName')
systemProperty 'tests.lambda.sink.sts_role_arn', System.getProperty('tests.lambda.sink.sts_role_arn')

systemProperty 'tests.lambda.processor.region', System.getProperty('tests.lambda.processor.region')
systemProperty 'tests.lambda.processor.functionName', System.getProperty('tests.lambda.processor.functionName')
systemProperty 'tests.lambda.processor.sts_role_arn', System.getProperty('tests.lambda.processor.sts_role_arn')

filter {
includeTestsMatching '*IT'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package org.opensearch.dataprepper.plugins.lambda.processor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import io.micrometer.core.instrument.Counter;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import static org.mockito.Mockito.when;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.lambda.LambdaClient;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;

@ExtendWith(MockitoExtension.class)

public class LambdaProcessorServiceIT {

private LambdaClient lambdaClient;
private String functionName;
private String lambdaRegion;
private String role;
private BufferFactory bufferFactory;
@Mock
private LambdaProcessorConfig lambdaProcessorConfig;
@Mock
private BatchOptions batchOptions;
@Mock
private ThresholdOptions thresholdOptions;
@Mock
private AwsAuthenticationOptions awsAuthenticationOptions;
@Mock
private AwsCredentialsSupplier awsCredentialsSupplier;
@Mock
private PluginMetrics pluginMetrics;
@Mock
private PluginFactory pluginFactory;
@Mock
private PluginSetting pluginSetting;
@Mock
private Counter numberOfRecordsSuccessCounter;
@Mock
private Counter numberOfRecordsFailedCounter;
@Mock
private ExpressionEvaluator expressionEvaluator;

private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));


@BeforeEach
public void setUp() throws Exception {
MockitoAnnotations.openMocks(this);
lambdaRegion = System.getProperty("tests.lambda.processor.region");
functionName = System.getProperty("tests.lambda.processor.functionName");
role = System.getProperty("tests.lambda.processor.sts_role_arn");

final Region region = Region.of(lambdaRegion);

lambdaClient = LambdaClient.builder()
.region(Region.of(lambdaRegion))
.build();

bufferFactory = new InMemoryBufferFactory();

when(pluginMetrics.counter(LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS)).
thenReturn(numberOfRecordsSuccessCounter);
when(pluginMetrics.counter(LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)).
thenReturn(numberOfRecordsFailedCounter);
}


private static Record<Event> createRecord() {
final JacksonEvent event = JacksonLog.builder().withData("[{\"name\":\"test\"}]").build();
return new Record<>(event);
}

public LambdaProcessor createObjectUnderTest(final String config) throws JsonProcessingException {

final LambdaProcessorConfig lambdaProcessorConfig = objectMapper.readValue(config, LambdaProcessorConfig.class);
return new LambdaProcessor(pluginMetrics,lambdaProcessorConfig,awsCredentialsSupplier,expressionEvaluator);
}

public LambdaProcessor createObjectUnderTest(LambdaProcessorConfig lambdaSinkConfig) throws JsonProcessingException {
return new LambdaProcessor(pluginMetrics,lambdaSinkConfig,awsCredentialsSupplier,expressionEvaluator);
}


private static Collection<Record<Event>> generateRecords(int numberOfRecords) {
List<Record<Event>> recordList = new ArrayList<>();

for (int rows = 1; rows <= numberOfRecords; rows++) {
HashMap<String, String> eventData = new HashMap<>();
eventData.put("name", "Person" + rows);
eventData.put("age", Integer.toString(rows));

Record<Event> eventRecord = new Record<>(JacksonEvent.builder().withData(eventData).withEventType("event").build());
recordList.add(eventRecord);
}
return recordList;
}

@ParameterizedTest
@ValueSource(ints = {1,3})
void verify_records_to_lambda_success(final int recordCount) throws Exception {

when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName);
when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3);
when(lambdaProcessorConfig.getInvocationType()).thenReturn("RequestResponse");

LambdaProcessor objectUnderTest = createObjectUnderTest(lambdaProcessorConfig);

Collection<Record<Event>> recordsData = generateRecords(recordCount);
List<Record<Event>> recordsResult = (List<Record<Event>>) objectUnderTest.doExecute(recordsData);
Thread.sleep(Duration.ofSeconds(10).toMillis());

assertEquals(recordsResult.size(),recordCount);
}

@ParameterizedTest
@ValueSource(ints = {1,3})
void verify_records_with_batching_to_lambda(final int recordCount) throws JsonProcessingException, InterruptedException {

when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName);
when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3);
when(lambdaProcessorConfig.getInvocationType()).thenReturn("RequestResponse");
when(thresholdOptions.getEventCount()).thenReturn(1);
when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse("2mb"));
when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.parse("PT10s"));
when(batchOptions.getKeyName()).thenReturn("lambda_batch_key");
when(batchOptions.getThresholdOptions()).thenReturn(thresholdOptions);
when(lambdaProcessorConfig.getBatchOptions()).thenReturn(batchOptions);

LambdaProcessor objectUnderTest = createObjectUnderTest(lambdaProcessorConfig);
Collection<Record<Event>> records = generateRecords(recordCount);
Collection<Record<Event>> recordsResult = objectUnderTest.doExecute(records);
Thread.sleep(Duration.ofSeconds(10).toMillis());
assertEquals(recordsResult.size(),recordCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import org.mockito.Mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
Expand All @@ -35,6 +39,9 @@
import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions;
import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.LAMBDA_LATENCY_METRIC;
import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.REQUEST_PAYLOAD_SIZE;
import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.RESPONSE_PAYLOAD_SIZE;
import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.lambda.LambdaClient;
Expand All @@ -45,6 +52,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

@ExtendWith(MockitoExtension.class)
class LambdaSinkServiceIT {
Expand Down Expand Up @@ -76,6 +84,14 @@ class LambdaSinkServiceIT {
private Counter numberOfRecordsSuccessCounter;
@Mock
private Counter numberOfRecordsFailedCounter;
@Mock
private ExpressionEvaluator expressionEvaluator;
@Mock
private Timer lambdaLatencyMetric;
@Mock
private AtomicLong requestPayload;
@Mock
private AtomicLong responsePayload;
private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));


Expand All @@ -98,6 +114,9 @@ public void setUp() throws Exception {
thenReturn(numberOfRecordsSuccessCounter);
when(pluginMetrics.counter(LambdaSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)).
thenReturn(numberOfRecordsFailedCounter);
when(pluginMetrics.timer(LAMBDA_LATENCY_METRIC)).thenReturn(lambdaLatencyMetric);
when(pluginMetrics.gauge(eq(REQUEST_PAYLOAD_SIZE), any(AtomicLong.class))).thenReturn(requestPayload);
when(pluginMetrics.gauge(eq(RESPONSE_PAYLOAD_SIZE), any(AtomicLong.class))).thenReturn(responsePayload);
}


Expand All @@ -119,7 +138,8 @@ public LambdaSinkService createObjectUnderTest(final String config) throws JsonP
codecContext,
awsCredentialsSupplier,
dlqPushHandler,
bufferFactory);
bufferFactory,
expressionEvaluator);
}

public LambdaSinkService createObjectUnderTest(LambdaSinkConfig lambdaSinkConfig) throws JsonProcessingException {
Expand All @@ -134,7 +154,8 @@ public LambdaSinkService createObjectUnderTest(LambdaSinkConfig lambdaSinkConfig
codecContext,
awsCredentialsSupplier,
dlqPushHandler,
bufferFactory);
bufferFactory,
expressionEvaluator);
}


Expand Down Expand Up @@ -203,7 +224,7 @@ void verify_flushed_records_with_batching_to_lambda(final int recordCount) throw
when(thresholdOptions.getEventCount()).thenReturn(event_count);
when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse("2mb"));
when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.parse("PT10s"));
when(batchOptions.getBatchKey()).thenReturn("lambda_batch_key");
when(batchOptions.getKeyName()).thenReturn("lambda_batch_key");
when(batchOptions.getThresholdOptions()).thenReturn(thresholdOptions);
when(lambdaSinkConfig.getBatchOptions()).thenReturn(batchOptions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public interface Buffer {

Duration getDuration();

void flushToLambdaAsync();
InvokeResponse flushToLambdaAsync();

InvokeResponse flushToLambdaSync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public Duration getDuration() {


@Override
public void flushToLambdaAsync() {
public InvokeResponse flushToLambdaAsync() {
InvokeResponse resp;
SdkBytes payload = getPayload();
payloadRequestAsyncSize = payload.asByteArray().length;
Expand All @@ -91,6 +91,7 @@ public void flushToLambdaAsync() {
resp = lambdaClient.invoke(request);
lambdaAsyncLatencyWatch.stop();
payloadResponseAsyncSize = resp.payload().asByteArray().length;
return resp;
}

@Override
Expand Down
Loading

0 comments on commit a236804

Please sign in to comment.