diff --git a/data-prepper-plugins/aws-lambda/README.md b/data-prepper-plugins/aws-lambda/README.md index 4c49873350..89298f7715 100644 --- a/data-prepper-plugins/aws-lambda/README.md +++ b/data-prepper-plugins/aws-lambda/README.md @@ -1,4 +1,55 @@ +# Lambda Processor + +This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing. + +## Usage +```aidl +lambda-pipeline: +... + processor: + - aws_lambda: + aws: + region: "us-east-1" + sts_role_arn: "" + function_name: "uploadToS3Lambda" + max_retries: 3 + invocation_type: "RequestResponse" + payload_model: "batch_event" + batch: + key_name: "osi_key" + threshold: + event_count: 10 + event_collect_timeout: 15s + maximum_size: 3mb +``` + +`invocation_type` as request-response is used when the response from aws lambda comes back to dataprepper. + +In batch options, an implicit batch threshold option is that if events size is 3mb, we flush it. +`payload_model` this is used to define how the payload should be constructed from a dataprepper event by converting it to corresponding json. +`payload_model` as batch_event is used when the output needs to be formed as a batch of multiple events, and a key(key_name) will be associated with the set of events. +`payload_model` as single_event is used when the output each event is sent to lambda. +if batch option is not mentioned along with payload_model: batch_event , then batch will assume default options as follows. +default batch options: + batch_key: "events" + threshold: + event_count: 10 + maximum_size: 3mb + event_collect_timeout: 15s + + +## Developer Guide + +The integration tests for this plugin do not run as part of the Data Prepper build. +The following command runs the integration tests: + +``` +./gradlew :data-prepper-plugins:aws-lambda:integrationTest -Dtests.processor.lambda.region="us-east-1" -Dtests.processor.lambda.functionName="lambda_test_function" -Dtests.processor.lambda.sts_role_arn="arn:aws:iam::123456789012:role/dataprepper-role + +``` + + # Lambda Sink This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing. @@ -15,7 +66,7 @@ lambda-pipeline: function_name: "uploadToS3Lambda" max_retries: 3 batch: - batch_key: "osi_key" + key_name: "osi_key" threshold: event_count: 3 maximum_size: 6mb diff --git a/data-prepper-plugins/aws-lambda/build.gradle b/data-prepper-plugins/aws-lambda/build.gradle index be9280e8c8..6186ba05a4 100644 --- a/data-prepper-plugins/aws-lambda/build.gradle +++ b/data-prepper-plugins/aws-lambda/build.gradle @@ -8,6 +8,7 @@ dependencies { implementation project(path: ':data-prepper-plugins:common') implementation project(':data-prepper-plugins:aws-plugin-api') implementation project(':data-prepper-plugins:failures-common') + implementation project(':data-prepper-plugins:parse-json-processor') implementation 'io.micrometer:micrometer-core' implementation 'com.fasterxml.jackson.core:jackson-core' implementation 'com.fasterxml.jackson.core:jackson-databind' @@ -65,6 +66,10 @@ task integrationTest(type: Test) { systemProperty 'tests.lambda.sink.functionName', System.getProperty('tests.lambda.sink.functionName') systemProperty 'tests.lambda.sink.sts_role_arn', System.getProperty('tests.lambda.sink.sts_role_arn') + systemProperty 'tests.lambda.processor.region', System.getProperty('tests.lambda.processor.region') + systemProperty 'tests.lambda.processor.functionName', System.getProperty('tests.lambda.processor.functionName') + systemProperty 'tests.lambda.processor.sts_role_arn', System.getProperty('tests.lambda.processor.sts_role_arn') + filter { includeTestsMatching '*IT' } diff --git a/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorServiceIT.java b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorServiceIT.java new file mode 100644 index 0000000000..2676fc5f17 --- /dev/null +++ b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorServiceIT.java @@ -0,0 +1,165 @@ +package org.opensearch.dataprepper.plugins.lambda.processor; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import io.micrometer.core.instrument.Counter; +import static org.junit.jupiter.api.Assertions.assertEquals; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import static org.mockito.Mockito.when; +import org.mockito.MockitoAnnotations; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.lambda.LambdaClient; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; + +@ExtendWith(MockitoExtension.class) + +public class LambdaProcessorServiceIT { + + private LambdaClient lambdaClient; + private String functionName; + private String lambdaRegion; + private String role; + private BufferFactory bufferFactory; + @Mock + private LambdaProcessorConfig lambdaProcessorConfig; + @Mock + private BatchOptions batchOptions; + @Mock + private ThresholdOptions thresholdOptions; + @Mock + private AwsAuthenticationOptions awsAuthenticationOptions; + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; + @Mock + private PluginMetrics pluginMetrics; + @Mock + private PluginFactory pluginFactory; + @Mock + private PluginSetting pluginSetting; + @Mock + private Counter numberOfRecordsSuccessCounter; + @Mock + private Counter numberOfRecordsFailedCounter; + @Mock + private ExpressionEvaluator expressionEvaluator; + + private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + + + @BeforeEach + public void setUp() throws Exception { + MockitoAnnotations.openMocks(this); + lambdaRegion = System.getProperty("tests.lambda.processor.region"); + functionName = System.getProperty("tests.lambda.processor.functionName"); + role = System.getProperty("tests.lambda.processor.sts_role_arn"); + + final Region region = Region.of(lambdaRegion); + + lambdaClient = LambdaClient.builder() + .region(Region.of(lambdaRegion)) + .build(); + + bufferFactory = new InMemoryBufferFactory(); + + when(pluginMetrics.counter(LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS)). + thenReturn(numberOfRecordsSuccessCounter); + when(pluginMetrics.counter(LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)). + thenReturn(numberOfRecordsFailedCounter); + } + + + private static Record createRecord() { + final JacksonEvent event = JacksonLog.builder().withData("[{\"name\":\"test\"}]").build(); + return new Record<>(event); + } + + public LambdaProcessor createObjectUnderTest(final String config) throws JsonProcessingException { + + final LambdaProcessorConfig lambdaProcessorConfig = objectMapper.readValue(config, LambdaProcessorConfig.class); + return new LambdaProcessor(pluginMetrics,lambdaProcessorConfig,awsCredentialsSupplier,expressionEvaluator); + } + + public LambdaProcessor createObjectUnderTest(LambdaProcessorConfig lambdaSinkConfig) throws JsonProcessingException { + return new LambdaProcessor(pluginMetrics,lambdaSinkConfig,awsCredentialsSupplier,expressionEvaluator); + } + + + private static Collection> generateRecords(int numberOfRecords) { + List> recordList = new ArrayList<>(); + + for (int rows = 1; rows <= numberOfRecords; rows++) { + HashMap eventData = new HashMap<>(); + eventData.put("name", "Person" + rows); + eventData.put("age", Integer.toString(rows)); + + Record eventRecord = new Record<>(JacksonEvent.builder().withData(eventData).withEventType("event").build()); + recordList.add(eventRecord); + } + return recordList; + } + + @ParameterizedTest + @ValueSource(ints = {1,3}) + void verify_records_to_lambda_success(final int recordCount) throws Exception { + + when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName); + when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3); + when(lambdaProcessorConfig.getInvocationType()).thenReturn("RequestResponse"); + + LambdaProcessor objectUnderTest = createObjectUnderTest(lambdaProcessorConfig); + + Collection> recordsData = generateRecords(recordCount); + List> recordsResult = (List>) objectUnderTest.doExecute(recordsData); + Thread.sleep(Duration.ofSeconds(10).toMillis()); + + assertEquals(recordsResult.size(),recordCount); + } + + @ParameterizedTest + @ValueSource(ints = {1,3}) + void verify_records_with_batching_to_lambda(final int recordCount) throws JsonProcessingException, InterruptedException { + + when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName); + when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3); + when(lambdaProcessorConfig.getInvocationType()).thenReturn("RequestResponse"); + when(thresholdOptions.getEventCount()).thenReturn(1); + when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse("2mb")); + when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.parse("PT10s")); + when(batchOptions.getKeyName()).thenReturn("lambda_batch_key"); + when(batchOptions.getThresholdOptions()).thenReturn(thresholdOptions); + when(lambdaProcessorConfig.getBatchOptions()).thenReturn(batchOptions); + + LambdaProcessor objectUnderTest = createObjectUnderTest(lambdaProcessorConfig); + Collection> records = generateRecords(recordCount); + Collection> recordsResult = objectUnderTest.doExecute(records); + Thread.sleep(Duration.ofSeconds(10).toMillis()); + assertEquals(recordsResult.size(),recordCount); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceIT.java b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceIT.java index 1a7e169a47..b160c8d3d9 100644 --- a/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceIT.java +++ b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceIT.java @@ -10,10 +10,13 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import org.mockito.Mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -21,6 +24,7 @@ import org.mockito.MockitoAnnotations; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; @@ -35,6 +39,9 @@ import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions; +import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.LAMBDA_LATENCY_METRIC; +import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.REQUEST_PAYLOAD_SIZE; +import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.RESPONSE_PAYLOAD_SIZE; import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.lambda.LambdaClient; @@ -45,6 +52,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; @ExtendWith(MockitoExtension.class) class LambdaSinkServiceIT { @@ -76,6 +84,14 @@ class LambdaSinkServiceIT { private Counter numberOfRecordsSuccessCounter; @Mock private Counter numberOfRecordsFailedCounter; + @Mock + private ExpressionEvaluator expressionEvaluator; + @Mock + private Timer lambdaLatencyMetric; + @Mock + private AtomicLong requestPayload; + @Mock + private AtomicLong responsePayload; private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); @@ -98,6 +114,9 @@ public void setUp() throws Exception { thenReturn(numberOfRecordsSuccessCounter); when(pluginMetrics.counter(LambdaSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)). thenReturn(numberOfRecordsFailedCounter); + when(pluginMetrics.timer(LAMBDA_LATENCY_METRIC)).thenReturn(lambdaLatencyMetric); + when(pluginMetrics.gauge(eq(REQUEST_PAYLOAD_SIZE), any(AtomicLong.class))).thenReturn(requestPayload); + when(pluginMetrics.gauge(eq(RESPONSE_PAYLOAD_SIZE), any(AtomicLong.class))).thenReturn(responsePayload); } @@ -119,7 +138,8 @@ public LambdaSinkService createObjectUnderTest(final String config) throws JsonP codecContext, awsCredentialsSupplier, dlqPushHandler, - bufferFactory); + bufferFactory, + expressionEvaluator); } public LambdaSinkService createObjectUnderTest(LambdaSinkConfig lambdaSinkConfig) throws JsonProcessingException { @@ -134,7 +154,8 @@ public LambdaSinkService createObjectUnderTest(LambdaSinkConfig lambdaSinkConfig codecContext, awsCredentialsSupplier, dlqPushHandler, - bufferFactory); + bufferFactory, + expressionEvaluator); } @@ -203,7 +224,7 @@ void verify_flushed_records_with_batching_to_lambda(final int recordCount) throw when(thresholdOptions.getEventCount()).thenReturn(event_count); when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse("2mb")); when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.parse("PT10s")); - when(batchOptions.getBatchKey()).thenReturn("lambda_batch_key"); + when(batchOptions.getKeyName()).thenReturn("lambda_batch_key"); when(batchOptions.getThresholdOptions()).thenReturn(thresholdOptions); when(lambdaSinkConfig.getBatchOptions()).thenReturn(batchOptions); diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java index a2c5dde4a9..8864ba4c31 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java @@ -22,7 +22,7 @@ public interface Buffer { Duration getDuration(); - void flushToLambdaAsync(); + InvokeResponse flushToLambdaAsync(); InvokeResponse flushToLambdaSync(); diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java index 095e6f47b2..606dd50f56 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java @@ -75,7 +75,7 @@ public Duration getDuration() { @Override - public void flushToLambdaAsync() { + public InvokeResponse flushToLambdaAsync() { InvokeResponse resp; SdkBytes payload = getPayload(); payloadRequestAsyncSize = payload.asByteArray().length; @@ -91,6 +91,7 @@ public void flushToLambdaAsync() { resp = lambdaClient.invoke(request); lambdaAsyncLatencyWatch.stop(); payloadResponseAsyncSize = resp.payload().asByteArray().length; + return resp; } @Override diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactory.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/client/LambdaClientFactory.java similarity index 58% rename from data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactory.java rename to data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/client/LambdaClientFactory.java index 03b94340f0..c0185a8eba 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactory.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/client/LambdaClientFactory.java @@ -3,11 +3,13 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.lambda.sink; +package org.opensearch.dataprepper.plugins.lambda.common.client; import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.metricpublisher.MicrometerMetricPublisher; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; @@ -16,22 +18,27 @@ public final class LambdaClientFactory { private LambdaClientFactory() { } - static LambdaClient createLambdaClient(final LambdaSinkConfig lambdaSinkConfig, - final AwsCredentialsSupplier awsCredentialsSupplier) { - final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(lambdaSinkConfig.getAwsAuthenticationOptions()); + public static LambdaClient createLambdaClient(final AwsAuthenticationOptions awsAuthenticationOptions, + final int maxConnectionRetries, + final AwsCredentialsSupplier awsCredentialsSupplier) { + final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(awsAuthenticationOptions); final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions); + final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws"); return LambdaClient.builder() - .region(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsRegion()) - .credentialsProvider(awsCredentialsProvider) - .overrideConfiguration(createOverrideConfiguration(lambdaSinkConfig)).build(); + .region(awsAuthenticationOptions.getAwsRegion()) + .credentialsProvider(awsCredentialsProvider) + .overrideConfiguration(createOverrideConfiguration(maxConnectionRetries,awsSdkMetrics)) + .build(); } - private static ClientOverrideConfiguration createOverrideConfiguration(final LambdaSinkConfig lambdaSinkConfig) { - final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(lambdaSinkConfig.getMaxConnectionRetries()).build(); + private static ClientOverrideConfiguration createOverrideConfiguration(final int maxConnectionRetries, + final PluginMetrics awsSdkMetrics) { + final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(maxConnectionRetries).build(); return ClientOverrideConfiguration.builder() .retryPolicy(retryPolicy) + .addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics)) .build(); } diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodec.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodec.java index a1ccaa8561..29e91dde88 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodec.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodec.java @@ -17,6 +17,7 @@ import java.util.Map; import java.util.Objects; +@Deprecated public class LambdaJsonCodec implements OutputCodec { private final ObjectMapper objectMapper = new ObjectMapper(); private static final String JSON = "json"; diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/BatchOptions.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/BatchOptions.java index 099bed2b54..bd911b8220 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/BatchOptions.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/BatchOptions.java @@ -7,21 +7,24 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; public class BatchOptions { + static final String DEFAULT_KEY_NAME = "events"; - private static final String DEFAULT_BATCH_KEY = "events"; - - @JsonProperty("batch_key") - private String batchKey = DEFAULT_BATCH_KEY; + @JsonProperty("key_name") + @Size(min = 1, max = 2048) + private String keyName = DEFAULT_KEY_NAME; @JsonProperty("threshold") @NotNull ThresholdOptions thresholdOptions = new ThresholdOptions(); - public String getBatchKey(){return batchKey;} - public ThresholdOptions getThresholdOptions(){return thresholdOptions;} + public String getKeyName() { + return keyName; + } + } \ No newline at end of file diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/LambdaCommonConfig.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/LambdaCommonConfig.java new file mode 100644 index 0000000000..d70d205aa1 --- /dev/null +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/LambdaCommonConfig.java @@ -0,0 +1,20 @@ +package org.opensearch.dataprepper.plugins.lambda.common.config; + +import java.util.Map; + +public class LambdaCommonConfig { + public static final String REQUEST_RESPONSE = "request-response"; + public static final String EVENT = "event"; + public static final String BATCH_EVENT = "batch-event"; + public static final String SINGLE_EVENT = "single-event"; + + //AWS Lambda payload options needs this format + public static final String REQUEST_RESPONSE_LAMBDA = "RequestResponse"; + public static final String EVENT_LAMBDA = "Event"; + + //Translate dataprepper invocation type to lambda invocation type + public static final Map invocationTypeMap = Map.of( + LambdaCommonConfig.EVENT, EVENT_LAMBDA, + LambdaCommonConfig.REQUEST_RESPONSE, REQUEST_RESPONSE_LAMBDA + ); +} diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java index ca8ed6e574..a8d5d08b02 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java @@ -16,13 +16,14 @@ public class ThresholdOptions { - + private static final int DEFAULT_EVENT_COUNT = 10; private static final String DEFAULT_BYTE_CAPACITY = "3mb"; + private static final Duration DEFAULT_EVENT_TIMEOUT = Duration.ofSeconds(10); @JsonProperty("event_count") @Size(min = 0, max = 10000000, message = "event_count size should be between 0 and 10000000") @NotNull - private int eventCount; + private int eventCount = DEFAULT_EVENT_COUNT; @JsonProperty("maximum_size") private String maximumSize = DEFAULT_BYTE_CAPACITY; @@ -31,7 +32,7 @@ public class ThresholdOptions { @DurationMin(seconds = 1) @DurationMax(seconds = 3600) @NotNull - private Duration eventCollectTimeOut; + private Duration eventCollectTimeOut = DEFAULT_EVENT_TIMEOUT; /** * Read event collection duration configuration. diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java new file mode 100644 index 0000000000..255cbcbe23 --- /dev/null +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java @@ -0,0 +1,265 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.lambda.processor; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.codec.OutputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.processor.AbstractProcessor; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.codec.json.JsonOutputCodec; +import org.opensearch.dataprepper.plugins.codec.json.JsonOutputCodecConfig; +import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputCodec; +import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputConfig; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.client.LambdaClientFactory; +import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig; +import static org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig.BATCH_EVENT; +import static org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig.SINGLE_EVENT; +import static org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig.invocationTypeMap; +import org.opensearch.dataprepper.plugins.lambda.common.util.ThresholdCheck; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.lambda.LambdaClient; +import software.amazon.awssdk.services.lambda.model.InvokeResponse; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +@DataPrepperPlugin(name = "aws_lambda", pluginType = Processor.class, pluginConfigurationType = LambdaProcessorConfig.class) +public class LambdaProcessor extends AbstractProcessor, Record> { + + public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS = "lambdaProcessorObjectsEventsSucceeded"; + public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED = "lambdaProcessorObjectsEventsFailed"; + public static final String LAMBDA_LATENCY_METRIC = "lambdaProcessorLatency"; + public static final String REQUEST_PAYLOAD_SIZE = "requestPayloadSize"; + public static final String RESPONSE_PAYLOAD_SIZE = "responsePayloadSize"; + + private static final Logger LOG = LoggerFactory.getLogger(LambdaProcessor.class); + + private final String functionName; + private final String whenCondition; + private final ExpressionEvaluator expressionEvaluator; + private final Counter numberOfRecordsSuccessCounter; + private final Counter numberOfRecordsFailedCounter; + private final Timer lambdaLatencyMetric; + private final String invocationType; + private final Collection bufferedEventHandles; + private final List events; + private final BatchOptions batchOptions; + private final ObjectMapper objectMapper = new ObjectMapper(); + private final BufferFactory bufferFactory; + private final LambdaClient lambdaClient; + private final Boolean isBatchEnabled; + Buffer currentBuffer; + private final AtomicLong requestPayloadMetric; + private final AtomicLong responsePayload; + private String payloadModel = null; + private int maxEvents = 0; + private ByteCount maxBytes = null; + private Duration maxCollectionDuration = null; + private int maxRetries = 0; + private OutputCodec codec = null; + OutputCodecContext codecContext = null; + + @DataPrepperPluginConstructor + public LambdaProcessor(final PluginMetrics pluginMetrics, final LambdaProcessorConfig lambdaProcessorConfig, final AwsCredentialsSupplier awsCredentialsSupplier, final ExpressionEvaluator expressionEvaluator) { + super(pluginMetrics); + this.expressionEvaluator = expressionEvaluator; + this.numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS); + this.numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED); + this.lambdaLatencyMetric = pluginMetrics.timer(LAMBDA_LATENCY_METRIC); + this.requestPayloadMetric = pluginMetrics.gauge(REQUEST_PAYLOAD_SIZE, new AtomicLong()); + this.responsePayload = pluginMetrics.gauge(RESPONSE_PAYLOAD_SIZE, new AtomicLong()); + + functionName = lambdaProcessorConfig.getFunctionName(); + whenCondition = lambdaProcessorConfig.getWhenCondition(); + maxRetries = lambdaProcessorConfig.getMaxConnectionRetries(); + batchOptions = lambdaProcessorConfig.getBatchOptions(); + payloadModel = lambdaProcessorConfig.getPayloadModel(); + codecContext = new OutputCodecContext(); + + if (payloadModel.equals(BATCH_EVENT)) { + JsonOutputCodecConfig jsonOutputCodecConfig = new JsonOutputCodecConfig(); + jsonOutputCodecConfig.setKeyName(batchOptions.getKeyName()); + codec = new JsonOutputCodec(jsonOutputCodecConfig); + maxEvents = batchOptions.getThresholdOptions().getEventCount(); + maxBytes = batchOptions.getThresholdOptions().getMaximumSize(); // remove + maxCollectionDuration = batchOptions.getThresholdOptions().getEventCollectTimeOut(); + isBatchEnabled = true; + LOG.info("maxEvents:" + maxEvents + " maxbytes:" + maxBytes + " maxDuration:" + maxCollectionDuration); + } else if(payloadModel.equals(SINGLE_EVENT)) { + NdjsonOutputConfig ndjsonOutputCodecConfig = new NdjsonOutputConfig(); + codec = new NdjsonOutputCodec(ndjsonOutputCodecConfig); + isBatchEnabled = false; + } else{ + throw new RuntimeException("invalid payload_model option"); + } + + //EVENT type will soon be supported. + if(lambdaProcessorConfig.getInvocationType().equals(LambdaCommonConfig.EVENT) && + !lambdaProcessorConfig.getInvocationType().equals(LambdaCommonConfig.REQUEST_RESPONSE)){ + throw new RuntimeException("Unsupported invocation type " + lambdaProcessorConfig.getInvocationType()); + } + + invocationType = invocationTypeMap.get(lambdaProcessorConfig.getInvocationType()); + + bufferedEventHandles = new LinkedList<>(); + events = new ArrayList(); + + lambdaClient = LambdaClientFactory.createLambdaClient(lambdaProcessorConfig.getAwsAuthenticationOptions(), + lambdaProcessorConfig.getMaxConnectionRetries() + , awsCredentialsSupplier); + + this.bufferFactory = new InMemoryBufferFactory(); + try { + currentBuffer = this.bufferFactory.getBuffer(lambdaClient, functionName, invocationType); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + + @Override + public Collection> doExecute(Collection> records) { + if (records.isEmpty()) { + return records; + } + + //lambda mutates event + List> resultRecords = new ArrayList<>(); + + for (Record record : records) { + final Event event = record.getData(); + + if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, event)) { + continue; + } + + try { + if (currentBuffer.getEventCount() == 0) { + codec.start(currentBuffer.getOutputStream(), event, codecContext); + } + codec.writeEvent(event, currentBuffer.getOutputStream()); + int count = currentBuffer.getEventCount() + 1; + currentBuffer.setEventCount(count); + + // flush to lambda and update result record + flushToLambdaIfNeeded(resultRecords); + } catch (Exception e) { + numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount()); + LOG.error(EVENT, "There was an exception while processing Event [{}]" , event, e); + //reset buffer + try { + currentBuffer = bufferFactory.getBuffer(lambdaClient, functionName, invocationType); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + } + return resultRecords; + } + + @Override + public void prepareForShutdown() { + + } + + @Override + public boolean isReadyForShutdown() { + return false; + } + + @Override + public void shutdown() { + + } + + protected void flushToLambdaIfNeeded(List> resultRecords) throws InterruptedException, IOException { + LOG.info("Flush to Lambda check: currentBuffer.size={}, currentBuffer.events={}, currentBuffer.duration={}", currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration()); + + if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration, isBatchEnabled)) { + try{ + codec.complete(currentBuffer.getOutputStream()); + responsePayload.set(currentBuffer.getPayloadResponseSyncSize()); + InvokeResponse lambdaResponse = currentBuffer.flushToLambdaSync(); + handleLambdaResponse(lambdaResponse); + numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount()); + lambdaLatencyMetric.record(currentBuffer.getFlushLambdaSyncLatencyMetric()); + requestPayloadMetric.set(currentBuffer.getPayloadRequestSyncSize()); + Event lambdaEvent = convertLambdaResponseToEvent(lambdaResponse); + resultRecords.add(new Record<>(lambdaEvent)); + } catch(AwsServiceException | SdkClientException e) { + LOG.error(EVENT, "Exception occurred while uploading records to lambda. functionName: {} | exception:", functionName, e); + numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount()); + } catch (final IOException e) { + LOG.error("Exception while completing codec", e); + numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount()); + } + //Reset Buffer + try { + currentBuffer = bufferFactory.getBuffer(lambdaClient, functionName, invocationType); + } catch (IOException ex) { + throw new RuntimeException("Failed to reset buffer after exception", ex); + } + } + } + + protected Event convertLambdaResponseToEvent(InvokeResponse lambdaResponse) { + try { + SdkBytes payload = lambdaResponse.payload(); + if (payload != null) { + String payloadJsonString = payload.asString(StandardCharsets.UTF_8); + + JsonNode jsonNode = null; + try { + jsonNode = objectMapper.readTree(payloadJsonString); + } catch (JsonParseException e) { + throw new RuntimeException("payload output is not json formatted"); + } + return JacksonEvent.builder().withEventType("event").withData(jsonNode).build(); + } + } catch (Exception e) { + LOG.error("Error converting Lambda response to Event", e); + throw new RuntimeException("Error converting Lambda response to Event"); + } + return null; + } + + private void handleLambdaResponse(InvokeResponse response){ + int statusCode = response.statusCode(); + if (statusCode < 200 || statusCode >= 300) { + throw new RuntimeException("Lambda invocation failed with status code: " + statusCode); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java new file mode 100644 index 0000000000..51faabf10d --- /dev/null +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.lambda.processor; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; +import static org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig.BATCH_EVENT; +import static org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig.REQUEST_RESPONSE; + +public class LambdaProcessorConfig { + + private static final int DEFAULT_CONNECTION_RETRIES = 3; + + @JsonProperty("aws") + @NotNull + @Valid + private AwsAuthenticationOptions awsAuthenticationOptions; + + @JsonProperty("function_name") + @NotEmpty + @Size(min = 3, max = 500, message = "function name length should be at least 3 characters") + private String functionName; + + @JsonProperty("max_retries") + private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES; + + @JsonProperty("invocation_type") + private String invocationType = REQUEST_RESPONSE; + + @JsonProperty("payload_model") + private String payloadModel = BATCH_EVENT; + + @JsonProperty("batch") + private BatchOptions batchOptions; + + @JsonProperty("lambda_when") + private String whenCondition; + + public AwsAuthenticationOptions getAwsAuthenticationOptions() { + return awsAuthenticationOptions; + } + + public BatchOptions getBatchOptions(){return batchOptions;} + + public String getFunctionName() { + return functionName; + } + + public int getMaxConnectionRetries() { + return maxConnectionRetries; + } + + public String getInvocationType(){return invocationType;} + + public String getWhenCondition() { + return whenCondition; + } + + public String getPayloadModel() { + return payloadModel; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java index 715ef3295d..39559bf154 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java @@ -6,19 +6,21 @@ package org.opensearch.dataprepper.plugins.lambda.sink; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.AbstractSink; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.sink.Sink; -import org.opensearch.dataprepper.model.plugin.PluginFactory; -import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory; import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.client.LambdaClientFactory; import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,12 +44,15 @@ public LambdaSink(final PluginSetting pluginSetting, final LambdaSinkConfig lambdaSinkConfig, final PluginFactory pluginFactory, final SinkContext sinkContext, - final AwsCredentialsSupplier awsCredentialsSupplier + final AwsCredentialsSupplier awsCredentialsSupplier, + final ExpressionEvaluator expressionEvaluator ) { super(pluginSetting); sinkInitialized = Boolean.FALSE; OutputCodecContext outputCodecContext = OutputCodecContext.fromSinkContext(sinkContext); - LambdaClient lambdaClient = LambdaClientFactory.createLambdaClient(lambdaSinkConfig, awsCredentialsSupplier); + LambdaClient lambdaClient = LambdaClientFactory.createLambdaClient(lambdaSinkConfig.getAwsAuthenticationOptions(), + lambdaSinkConfig.getMaxConnectionRetries() + , awsCredentialsSupplier); if(lambdaSinkConfig.getDlqPluginSetting() != null) { this.dlqPushHandler = new DlqPushHandler(pluginFactory, String.valueOf(lambdaSinkConfig.getDlqPluginSetting().get(BUCKET)), @@ -55,6 +60,7 @@ public LambdaSink(final PluginSetting pluginSetting, , lambdaSinkConfig.getDlqStsRegion(), String.valueOf(lambdaSinkConfig.getDlqPluginSetting().get(KEY_PATH))); } + this.bufferFactory = new InMemoryBufferFactory(); lambdaSinkService = new LambdaSinkService(lambdaClient, @@ -65,7 +71,8 @@ public LambdaSink(final PluginSetting pluginSetting, outputCodecContext, awsCredentialsSupplier, dlqPushHandler, - bufferFactory); + bufferFactory, + expressionEvaluator); } diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfig.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfig.java index bb50e2510e..6601dbb26b 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfig.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfig.java @@ -12,6 +12,8 @@ import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; +import static org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig.BATCH_EVENT; +import static org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig.EVENT; import java.util.Map; import java.util.Objects; @@ -19,9 +21,7 @@ public class LambdaSinkConfig { private static final int DEFAULT_CONNECTION_RETRIES = 3; - public static final String STS_REGION = "region"; - public static final String STS_ROLE_ARN = "sts_role_arn"; @JsonProperty("aws") @@ -38,12 +38,21 @@ public class LambdaSinkConfig { @JsonProperty("max_retries") private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES; + @JsonProperty("invocation_type") + private String invocationType = EVENT; + + @JsonProperty("payload_model") + private String payloadModel = BATCH_EVENT; + @JsonProperty("dlq") private PluginModel dlq; @JsonProperty("batch") private BatchOptions batchOptions; + @JsonProperty("lambda_when") + private String whenCondition; + public AwsAuthenticationOptions getAwsAuthenticationOptions() { return awsAuthenticationOptions; } @@ -77,4 +86,14 @@ public String getDlqStsRegion(){ public Map getDlqPluginSetting(){ return dlq != null ? dlq.getPluginSettings() : Map.of(); } + + public String getInvocationType(){return invocationType;} + + public String getWhenCondition() { + return whenCondition; + } + + public String getPayloadModel() { + return payloadModel; + } } \ No newline at end of file diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkService.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkService.java index 9a788e6816..b9f33c26cb 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkService.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkService.java @@ -6,7 +6,10 @@ package org.opensearch.dataprepper.plugins.lambda.sink; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -16,10 +19,16 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.codec.json.JsonOutputCodec; +import org.opensearch.dataprepper.plugins.codec.json.JsonOutputCodecConfig; +import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputCodec; +import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputConfig; import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory; -import org.opensearch.dataprepper.plugins.lambda.common.codec.LambdaJsonCodec; import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; +import static org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig.BATCH_EVENT; +import static org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig.SINGLE_EVENT; +import static org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig.invocationTypeMap; import org.opensearch.dataprepper.plugins.lambda.common.util.ThresholdCheck; import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler; import org.opensearch.dataprepper.plugins.lambda.sink.dlq.LambdaSinkFailedDlqData; @@ -29,6 +38,7 @@ import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.lambda.LambdaClient; +import software.amazon.awssdk.services.lambda.model.InvokeResponse; import java.io.IOException; import java.time.Duration; @@ -36,89 +46,94 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class LambdaSinkService { - private static final Logger LOG = LoggerFactory.getLogger(LambdaSinkService.class); public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS = "lambdaSinkObjectsEventsSucceeded"; public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED = "lambdaSinkObjectsEventsFailed"; + public static final String LAMBDA_LATENCY_METRIC = "lambdaSinkLatency"; + public static final String REQUEST_PAYLOAD_SIZE = "requestPayloadSize"; + public static final String RESPONSE_PAYLOAD_SIZE = "responsePayloadSize"; + private static final Logger LOG = LoggerFactory.getLogger(LambdaSinkService.class); + private final AtomicLong requestPayloadMetric; + private final AtomicLong responsePayloadMetric; private final PluginSetting pluginSetting; private final Lock reentrantLock; private final LambdaSinkConfig lambdaSinkConfig; private final LambdaClient lambdaClient; private final String functionName; - private int maxEvents = 0; - private ByteCount maxBytes = null; - private Duration maxCollectionDuration = null; - private int maxRetries = 0; + private final String whenCondition; + private final ExpressionEvaluator expressionEvaluator; private final Counter numberOfRecordsSuccessCounter; private final Counter numberOfRecordsFailedCounter; - private final String ASYNC_INVOCATION_TYPE = "Event"; + private final Timer lambdaLatencyMetric; private final String invocationType; - private Buffer currentBuffer; private final BufferFactory bufferFactory; private final DlqPushHandler dlqPushHandler; private final Collection bufferedEventHandles; private final List events; - private OutputCodec codec = null; private final BatchOptions batchOptions; private final Boolean isBatchEnabled; + private int maxEvents = 0; + private ByteCount maxBytes = null; + private Duration maxCollectionDuration = null; + private int maxRetries = 0; + private Buffer currentBuffer; + private OutputCodec codec = null; private OutputCodecContext codecContext = null; - private final String batchKey; - - public LambdaSinkService(final LambdaClient lambdaClient, - final LambdaSinkConfig lambdaSinkConfig, - final PluginMetrics pluginMetrics, - final PluginFactory pluginFactory, - final PluginSetting pluginSetting, - final OutputCodecContext codecContext, - final AwsCredentialsSupplier awsCredentialsSupplier, - final DlqPushHandler dlqPushHandler, - final BufferFactory bufferFactory) { + private String payloadModel = null; + + public LambdaSinkService(final LambdaClient lambdaClient, final LambdaSinkConfig lambdaSinkConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final PluginSetting pluginSetting, final OutputCodecContext codecContext, final AwsCredentialsSupplier awsCredentialsSupplier, final DlqPushHandler dlqPushHandler, final BufferFactory bufferFactory, final ExpressionEvaluator expressionEvaluator) { this.lambdaSinkConfig = lambdaSinkConfig; this.pluginSetting = pluginSetting; + this.expressionEvaluator = expressionEvaluator; this.dlqPushHandler = dlqPushHandler; this.lambdaClient = lambdaClient; + this.numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS); + this.numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED); + this.lambdaLatencyMetric = pluginMetrics.timer(LAMBDA_LATENCY_METRIC); + this.requestPayloadMetric = pluginMetrics.gauge(REQUEST_PAYLOAD_SIZE, new AtomicLong()); + this.responsePayloadMetric = pluginMetrics.gauge(RESPONSE_PAYLOAD_SIZE, new AtomicLong()); reentrantLock = new ReentrantLock(); - numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS); - numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED); functionName = lambdaSinkConfig.getFunctionName(); - + payloadModel = lambdaSinkConfig.getPayloadModel(); maxRetries = lambdaSinkConfig.getMaxConnectionRetries(); batchOptions = lambdaSinkConfig.getBatchOptions(); + whenCondition = lambdaSinkConfig.getWhenCondition(); - if (!Objects.isNull(batchOptions)){ + if (payloadModel.equals(BATCH_EVENT)) { + JsonOutputCodecConfig jsonOutputCodecConfig = new JsonOutputCodecConfig(); + jsonOutputCodecConfig.setKeyName(batchOptions.getKeyName()); + codec = new JsonOutputCodec(jsonOutputCodecConfig); maxEvents = batchOptions.getThresholdOptions().getEventCount(); maxBytes = batchOptions.getThresholdOptions().getMaximumSize(); maxCollectionDuration = batchOptions.getThresholdOptions().getEventCollectTimeOut(); - batchKey = batchOptions.getBatchKey(); isBatchEnabled = true; - }else{ - batchKey = null; + } else if (payloadModel.equals(SINGLE_EVENT)) { + NdjsonOutputConfig ndjsonOutputCodecConfig = new NdjsonOutputConfig(); + codec = new NdjsonOutputCodec(ndjsonOutputCodecConfig); isBatchEnabled = false; + } else { + throw new RuntimeException("invalid payload_model option"); } this.codecContext = codecContext; - - codec = new LambdaJsonCodec(batchKey); bufferedEventHandles = new LinkedList<>(); events = new ArrayList(); - invocationType = ASYNC_INVOCATION_TYPE; + invocationType = invocationTypeMap.get(lambdaSinkConfig.getInvocationType()); this.bufferFactory = bufferFactory; try { - currentBuffer = bufferFactory.getBuffer(lambdaClient,functionName,invocationType); + currentBuffer = bufferFactory.getBuffer(lambdaClient, functionName, invocationType); } catch (IOException e) { throw new RuntimeException(e); } } - public void output(Collection> records){ - // Don't acquire the lock if there's no work to be done + public void output(Collection> records) { if (records.isEmpty() && currentBuffer.getEventCount() == 0) { return; } @@ -128,6 +143,11 @@ public void output(Collection> records){ try { for (Record record : records) { final Event event = record.getData(); + + if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, event)) { + continue; + } + try { if (currentBuffer.getEventCount() == 0) { codec.start(currentBuffer.getOutputStream(), event, codecContext); @@ -137,24 +157,27 @@ public void output(Collection> records){ currentBuffer.setEventCount(count); bufferedEventHandles.add(event.getEventHandle()); + flushToLambdaIfNeeded(); } catch (Exception ex) { - if(sampleException == null) { + LOG.error(EVENT, "There was an exception while processing Event [{}]" , event, ex); + if (sampleException == null) { sampleException = ex; } failedEvents.add(event); + handleFailure(currentBuffer, ex); //reset buffer + try { + currentBuffer = bufferFactory.getBuffer(lambdaClient, functionName, invocationType); + } catch (IOException e) { + throw new RuntimeException(e); + } } - - flushToLambdaIfNeeded(); } } finally { reentrantLock.unlock(); } - if(!failedEvents.isEmpty()) { - failedEvents - .stream() - .map(Event::getEventHandle) - .forEach(eventHandle -> eventHandle.release(false)); + if (!failedEvents.isEmpty()) { + failedEvents.stream().map(Event::getEventHandle).forEach(eventHandle -> eventHandle.release(false)); LOG.error("Unable to add {} events to buffer. Dropping these events. Sample exception provided.", failedEvents.size(), sampleException); } } @@ -167,66 +190,49 @@ private void releaseEventHandles(final boolean result) { } private void flushToLambdaIfNeeded() { - LOG.trace("Flush to Lambda check: currentBuffer.size={}, currentBuffer.events={}, currentBuffer.duration={}", - currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration()); - final AtomicReference errorMsgObj = new AtomicReference<>(); - - try { - if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration, isBatchEnabled)) { - try { - codec.complete(currentBuffer.getOutputStream()); - LOG.info("Writing {} to Lambda with {} events and size of {} bytes.", - functionName, currentBuffer.getEventCount(), currentBuffer.getSize()); - final boolean isFlushToLambda = retryFlushToLambda(currentBuffer, errorMsgObj); - - if (isFlushToLambda) { - LOG.info("Successfully flushed to Lambda {}.", functionName); - numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount()); - releaseEventHandles(true); - } else { - LOG.error("Failed to save to Lambda {}", functionName); - numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount()); - SdkBytes payload = currentBuffer.getPayload(); - if(dlqPushHandler!=null) { - dlqPushHandler.perform(pluginSetting, new LambdaSinkFailedDlqData(payload, errorMsgObj.get(), 0)); - releaseEventHandles(true); - }else{ - releaseEventHandles(false); - } - } + LOG.info("Flush to Lambda check: currentBuffer.size={}, currentBuffer.events={}, currentBuffer.duration={}", currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration()); - //reset buffer after flush - currentBuffer = bufferFactory.getBuffer(lambdaClient,functionName,invocationType); - } catch (final IOException e) { - releaseEventHandles(false); - LOG.error("Exception while completing codec", e); - } + if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration, isBatchEnabled)) { + try { + codec.complete(currentBuffer.getOutputStream()); + responsePayloadMetric.set(currentBuffer.getPayloadResponseSyncSize()); + InvokeResponse response = currentBuffer.flushToLambdaAsync(); + handleLambdaResponse(response); + lambdaLatencyMetric.record(currentBuffer.getFlushLambdaSyncLatencyMetric()); + requestPayloadMetric.set(currentBuffer.getPayloadRequestSyncSize()); + numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount()); + releaseEventHandles(true); + } catch (AwsServiceException | SdkClientException e) { + LOG.error("Exception occurred while uploading records to lambda : {} | exception:", functionName, e); + handleFailure(currentBuffer, e); + } catch (final IOException e) { + LOG.error("Exception while completing codec", e); + handleFailure(currentBuffer, e); + } + // Reset buffer + try { + currentBuffer = bufferFactory.getBuffer(lambdaClient, functionName, invocationType); + } catch (IOException ex) { + throw new RuntimeException("Failed to reset buffer after exception", ex); } - } catch (InterruptedException e) { - throw new RuntimeException(e); } } - protected boolean retryFlushToLambda(Buffer currentBuffer, - final AtomicReference errorMsgObj) throws InterruptedException { - boolean isUploadedToLambda = Boolean.FALSE; - int retryCount = maxRetries; - do { + private void handleFailure(Buffer currentBuffer, Exception e) { + numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount()); + SdkBytes payload = currentBuffer.getPayload(); + if (dlqPushHandler != null) { + dlqPushHandler.perform(pluginSetting, new LambdaSinkFailedDlqData(payload, e.getMessage(), 0)); + releaseEventHandles(true); + } else { + releaseEventHandles(false); + } + } - try { - currentBuffer.flushToLambdaAsync(); - isUploadedToLambda = Boolean.TRUE; - } catch (AwsServiceException | SdkClientException e) { - errorMsgObj.set(e.getMessage()); - LOG.error("Exception occurred while uploading records to lambda. Retry countdown : {} | exception:", - retryCount, e); - --retryCount; - if (retryCount == 0) { - return isUploadedToLambda; - } - Thread.sleep(5000); - } - } while (!isUploadedToLambda); - return isUploadedToLambda; + private void handleLambdaResponse(InvokeResponse response) { + int statusCode = response.statusCode(); + if (statusCode < 200 || statusCode >= 300) { + LOG.warn("Lambda invocation returned with non-success status code: {}", statusCode); + } } } diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/LambdaSinkFailedDlqData.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/LambdaSinkFailedDlqData.java index 8941966b77..5ca8301be1 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/LambdaSinkFailedDlqData.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/LambdaSinkFailedDlqData.java @@ -4,7 +4,6 @@ */ package org.opensearch.dataprepper.plugins.lambda.sink.dlq; -import com.fasterxml.jackson.core.JsonProcessingException; import software.amazon.awssdk.core.SdkBytes; @@ -16,7 +15,7 @@ public class LambdaSinkFailedDlqData { private int status; - public LambdaSinkFailedDlqData(SdkBytes payload, String message, int status) throws JsonProcessingException { + public LambdaSinkFailedDlqData(SdkBytes payload, String message, int status) { this.payload = payload; this.message = message; this.status = status; diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/client/LambdaClientFactoryTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/client/LambdaClientFactoryTest.java new file mode 100644 index 0000000000..da3428dc1a --- /dev/null +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/client/LambdaClientFactoryTest.java @@ -0,0 +1,103 @@ +package org.opensearch.dataprepper.plugins.lambda.common.client; + +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import static org.mockito.ArgumentMatchers.any; +import org.mockito.Mock; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.lambda.LambdaClient; +import software.amazon.awssdk.services.lambda.LambdaClientBuilder; + +import java.util.Map; +import java.util.UUID; + +@ExtendWith(MockitoExtension.class) +class LambdaClientFactoryTest { + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; + + @Mock + private AwsAuthenticationOptions awsAuthenticationOptions; + + @Mock + private AwsCredentialsProvider awsCredentialsProvider; + + @BeforeEach + void setUp() { + // No setup needed here as we're mocking static methods in tests + } + + @Test + void createLambdaClient_with_real_LambdaClient() { + try (var mockedStaticLambdaClient = mockStatic(LambdaClient.class)) { + LambdaClientBuilder lambdaClientBuilder = mock(LambdaClientBuilder.class); + mockedStaticLambdaClient.when(LambdaClient::builder).thenReturn(lambdaClientBuilder); + + when(lambdaClientBuilder.region(any(Region.class))).thenReturn(lambdaClientBuilder); + when(lambdaClientBuilder.credentialsProvider(any(AwsCredentialsProvider.class))).thenReturn(lambdaClientBuilder); + when(lambdaClientBuilder.overrideConfiguration(any(ClientOverrideConfiguration.class))).thenReturn(lambdaClientBuilder); + when(lambdaClientBuilder.build()).thenReturn(mock(LambdaClient.class)); + + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1); + when(awsCredentialsSupplier.getProvider(any(AwsCredentialsOptions.class))).thenReturn(awsCredentialsProvider); + + final LambdaClient lambdaClient = LambdaClientFactory.createLambdaClient(awsAuthenticationOptions, 3, awsCredentialsSupplier); + + assertThat(lambdaClient, notNullValue()); + } + } + + @ParameterizedTest + @ValueSource(strings = {"us-east-1", "us-west-2", "eu-central-1"}) + void createlambdaClient_provides_correct_inputs(final String regionString) { + try (var mockedStaticLambdaClient = mockStatic(LambdaClient.class)) { + LambdaClientBuilder lambdaClientBuilder = mock(LambdaClientBuilder.class); + mockedStaticLambdaClient.when(LambdaClient::builder).thenReturn(lambdaClientBuilder); + + final Region region = Region.of(regionString); + final String stsRoleArn = UUID.randomUUID().toString(); + final Map stsHeaderOverrides = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(region); + when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn(stsRoleArn); + when(awsAuthenticationOptions.getAwsStsHeaderOverrides()).thenReturn(stsHeaderOverrides); + when(awsCredentialsSupplier.getProvider(any())).thenReturn(awsCredentialsProvider); + + when(lambdaClientBuilder.region(any(Region.class))).thenReturn(lambdaClientBuilder); + when(lambdaClientBuilder.credentialsProvider(any(AwsCredentialsProvider.class))).thenReturn(lambdaClientBuilder); + when(lambdaClientBuilder.overrideConfiguration(any(ClientOverrideConfiguration.class))).thenReturn(lambdaClientBuilder); + when(lambdaClientBuilder.build()).thenReturn(mock(LambdaClient.class)); + + final LambdaClient lambdaClient = LambdaClientFactory.createLambdaClient(awsAuthenticationOptions, 3, awsCredentialsSupplier); + + final ArgumentCaptor credentialsProviderArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsProvider.class); + verify(lambdaClientBuilder).credentialsProvider(credentialsProviderArgumentCaptor.capture()); + final AwsCredentialsProvider actualCredentialsProvider = credentialsProviderArgumentCaptor.getValue(); + assertThat(actualCredentialsProvider, equalTo(awsCredentialsProvider)); + + final ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class); + verify(awsCredentialsSupplier).getProvider(optionsArgumentCaptor.capture()); + + final AwsCredentialsOptions actualCredentialsOptions = optionsArgumentCaptor.getValue(); + assertThat(actualCredentialsOptions.getRegion(), equalTo(region)); + assertThat(actualCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn)); + assertThat(actualCredentialsOptions.getStsHeaderOverrides(), equalTo(stsHeaderOverrides)); + } + } +} diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptionsTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptionsTest.java index 98437b49fe..ee69848dbe 100644 --- a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptionsTest.java +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptionsTest.java @@ -10,9 +10,12 @@ import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.types.ByteCount; +import java.time.Duration; + class ThresholdOptionsTest { + private static final int DEFAULT_EVENT_COUNT = 10; private static final String DEFAULT_BYTE_CAPACITY = "3mb"; - private static final int DEFAULT_EVENT_COUNT = 0; + private static final Duration DEFAULT_EVENT_TIMEOUT = Duration.ofSeconds(10); @Test void test_default_byte_capacity_test() { @@ -22,7 +25,7 @@ void test_default_byte_capacity_test() { @Test void test_get_event_collection_duration_test() { - assertThat(new ThresholdOptions().getEventCollectTimeOut(), equalTo(null)); + assertThat(new ThresholdOptions().getEventCollectTimeOut(), equalTo(DEFAULT_EVENT_TIMEOUT)); } @Test diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfigTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfigTest.java new file mode 100644 index 0000000000..3f94ccf6fe --- /dev/null +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfigTest.java @@ -0,0 +1,32 @@ +package org.opensearch.dataprepper.plugins.lambda.processor; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.regions.Region; + +public class LambdaProcessorConfigTest { + + public static final int DEFAULT_MAX_RETRIES = 3; + private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + + @Test + void lambda_processor_default_max_connection_retries_test() { + assertThat(new LambdaProcessorConfig().getMaxConnectionRetries(), equalTo(DEFAULT_MAX_RETRIES)); + } + + @Test + public void testAwsAuthenticationOptionsNotNull() throws JsonProcessingException { + final String config = " function_name: test_function\n" + " aws:\n" + " region: ap-south-1\n" + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + " sts_header_overrides: {\"test\":\"test\"}\n" + " max_retries: 10\n"; + final LambdaProcessorConfig lambdaProcessorConfig = objectMapper.readValue(config, LambdaProcessorConfig.class); + + assertThat(lambdaProcessorConfig.getMaxConnectionRetries(), equalTo(10)); + assertThat(lambdaProcessorConfig.getAwsAuthenticationOptions().getAwsRegion(), equalTo(Region.AP_SOUTH_1)); + assertThat(lambdaProcessorConfig.getAwsAuthenticationOptions().getAwsStsRoleArn(), equalTo("arn:aws:iam::524239988912:role/app-test")); + assertThat(lambdaProcessorConfig.getAwsAuthenticationOptions().getAwsStsHeaderOverrides().get("test"), equalTo("test")); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java new file mode 100644 index 0000000000..c5ca12117b --- /dev/null +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java @@ -0,0 +1,290 @@ +package org.opensearch.dataprepper.plugins.lambda.processor; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; +import org.junit.jupiter.api.AfterEach; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import org.mockito.MockitoAnnotations; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.client.LambdaClientFactory; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions; +import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.LAMBDA_LATENCY_METRIC; +import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED; +import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS; +import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.REQUEST_PAYLOAD_SIZE; +import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.RESPONSE_PAYLOAD_SIZE; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.lambda.LambdaClient; +import software.amazon.awssdk.services.lambda.model.InvokeRequest; +import software.amazon.awssdk.services.lambda.model.InvokeResponse; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +@ExtendWith(MockitoExtension.class) +public class LambdaProcessorTest { + private static final String RESPONSE_PAYLOAD = "{\"k1\":\"v1\",\"k2\":\"v2\"}"; + private static MockedStatic lambdaClientFactoryMockedStatic; + private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private ExpressionEvaluator expressionEvaluator; + + @Mock + private LambdaProcessorConfig lambdaProcessorConfig; + + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; + + @Mock + private LambdaClient lambdaClient; + + @Spy + private BufferFactory bufferFactory; + + @Mock + private Buffer buffer; + + @Mock + private Counter numberOfRecordsSuccessCounter; + + @Mock + private Counter numberOfRecordsFailedCounter; + + @Mock + private Counter numberOfRecordsDroppedCounter; + + @Mock + private Timer lambdaLatencyMetric; + + @Mock + private AtomicLong requestPayload; + + @Mock + private AtomicLong responsePayload; + + private LambdaProcessor createObjectUnderTest() { + return new LambdaProcessor(pluginMetrics, lambdaProcessorConfig, awsCredentialsSupplier, expressionEvaluator); + } + + @BeforeEach + public void setUp() throws IOException { + MockitoAnnotations.openMocks(this); + + BatchOptions batchOptions = mock(BatchOptions.class); + ThresholdOptions thresholdOptions = mock(ThresholdOptions.class); + AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + + lenient().when(lambdaProcessorConfig.getFunctionName()).thenReturn("test-function1"); + lenient().when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3); + lenient().when(lambdaProcessorConfig.getInvocationType()).thenReturn("request-response"); + lenient().when(lambdaProcessorConfig.getPayloadModel()).thenReturn("batch-event"); + + lenient().when(thresholdOptions.getEventCount()).thenReturn(10); + lenient().when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.ofBytes(6)); + lenient().when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.ofSeconds(5)); + + lenient().when(batchOptions.getKeyName()).thenReturn("events"); + lenient().when(batchOptions.getThresholdOptions()).thenReturn(thresholdOptions); + lenient().when(lambdaProcessorConfig.getBatchOptions()).thenReturn(batchOptions); + + lenient().when(lambdaProcessorConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + lenient().when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of("test-region")); + + lenient().when(pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS)).thenReturn(numberOfRecordsDroppedCounter); + lenient().when(pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)).thenReturn(numberOfRecordsFailedCounter); + lenient().when(pluginMetrics.timer(LAMBDA_LATENCY_METRIC)).thenReturn(lambdaLatencyMetric); + lenient().when(pluginMetrics.gauge(eq(REQUEST_PAYLOAD_SIZE), any(AtomicLong.class))).thenReturn(requestPayload); + lenient().when(pluginMetrics.gauge(eq(RESPONSE_PAYLOAD_SIZE), any(AtomicLong.class))).thenReturn(responsePayload); + + InvokeResponse resp = InvokeResponse.builder().statusCode(200).payload(SdkBytes.fromUtf8String(RESPONSE_PAYLOAD)).build(); + lambdaClientFactoryMockedStatic = Mockito.mockStatic(LambdaClientFactory.class); + when(LambdaClientFactory.createLambdaClient(any(AwsAuthenticationOptions.class), + eq(3), + any(AwsCredentialsSupplier.class))).thenReturn(lambdaClient); + lenient().when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(resp); + try { + lenient().when(bufferFactory.getBuffer(lambdaClient, lambdaProcessorConfig.getFunctionName(), "RequestResponse")).thenReturn(buffer); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @AfterEach + public void cleanup() { + lambdaClientFactoryMockedStatic.close(); + } + + @Test + public void testDoExecuteWithEmptyRecords() { + Collection> records = Collections.emptyList(); + LambdaProcessor lambdaProcessor = createObjectUnderTest(); + Collection> result = lambdaProcessor.doExecute(records); + + assertTrue(result.isEmpty()); + } + + @Test + public void testDoExecute() throws JsonProcessingException { + Event event = JacksonEvent.builder().withEventType("event").withData("{\"status\":true}").build(); + Record record = new Record<>(event); + Collection> records = List.of(record); + + InvokeResponse invokeResponse = InvokeResponse.builder().statusCode(200).payload(SdkBytes.fromUtf8String(RESPONSE_PAYLOAD)).build(); + + LambdaProcessor lambdaProcessor = createObjectUnderTest(); + Collection> resultRecords = lambdaProcessor.doExecute(records); + + assertEquals(1, resultRecords.size()); + Record resultRecord = resultRecords.iterator().next(); + + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode responseJsonNode = objectMapper.readTree(RESPONSE_PAYLOAD); + assertEquals(responseJsonNode, resultRecord.getData().getJsonNode()); + } + + @Test + public void testDoExecute_withException() { + List> records = new ArrayList<>(); + Event event = mock(Event.class); + records.add(new Record<>(event)); + + lenient().when(buffer.getOutputStream()).thenThrow(new RuntimeException("Test exception")); + + LambdaProcessor lambdaProcessor = createObjectUnderTest(); + Collection> result = lambdaProcessor.doExecute(records); + + assertEquals(1, result.size()); + verify(buffer, times(0)).flushToLambdaSync(); + } + + @Test + public void testFlushToLambdaIfNeeded_withThresholdNotExceeded() throws Exception { + lenient().when(buffer.getSize()).thenReturn(100L); + lenient().when(buffer.getEventCount()).thenReturn(1); + lenient().when(buffer.getDuration()).thenReturn(Duration.ofSeconds(1)); + + LambdaProcessor lambdaProcessor = createObjectUnderTest(); + List> records = mock(ArrayList.class); + lambdaProcessor.flushToLambdaIfNeeded(records); + verify(buffer, times(0)).flushToLambdaSync(); + verify(records, times(0)).add(any(Record.class)); + } + + @Test + public void testConvertLambdaResponseToEvent_withInvalidJsonPayload() { + // Arrange + // Using an invalid JSON string to trigger the JSON parsing exception + String invalidJsonPayload = "{ invalid json }"; + InvokeResponse response = InvokeResponse.builder() + .statusCode(200) + .payload(SdkBytes.fromUtf8String(invalidJsonPayload)) + .build(); + + lenient().when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(response); + + LambdaProcessor lambdaProcessor = createObjectUnderTest(); + + // Act and Assert + RuntimeException exception = assertThrows(RuntimeException.class, () -> { + lambdaProcessor.convertLambdaResponseToEvent(response); + }); + // Asserting the message thrown by the RuntimeException + assertEquals("Error converting Lambda response to Event", exception.getMessage()); + } + + + @Test + public void testDoExecute_withNonSuccessfulStatusCode() { + InvokeResponse response = InvokeResponse.builder().statusCode(500).payload(SdkBytes.fromUtf8String(RESPONSE_PAYLOAD)).build(); + lenient().when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(response); + + LambdaProcessor lambdaProcessor = createObjectUnderTest(); + + List> records = new ArrayList<>(); + Event event = mock(Event.class); + records.add(new Record<>(event)); + List> resultRecords = (List>) lambdaProcessor.doExecute(records); + + verify(lambdaClient, times(1)).invoke(any(InvokeRequest.class)); + + //event should be dropped on failure + assertEquals(resultRecords.size(), 0); + verify(numberOfRecordsFailedCounter, times(1)).increment(1); + //check if buffer is reset + assertEquals(buffer.getSize(), 0); + } + + @Test + public void testConvertLambdaResponseToEvent() throws JsonProcessingException { + InvokeResponse response = InvokeResponse.builder().statusCode(200).payload(SdkBytes.fromUtf8String(RESPONSE_PAYLOAD)).build(); + lenient().when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(response); + + LambdaProcessor lambdaProcessor = createObjectUnderTest(); + Event eventResponse = lambdaProcessor.convertLambdaResponseToEvent(response); + + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode jsonNode = objectMapper.readTree(RESPONSE_PAYLOAD); + Event event = JacksonEvent.builder().withEventType("event").withData(jsonNode).build(); + assertEquals(event.getJsonNode(), eventResponse.getJsonNode()); + } + + @Test + public void testDoExecute_WithConfig() throws JsonProcessingException { + final String config = " function_name: test_function\n" + " invocation_type: request-response\n"+ " payload_model: single-event\n" + " aws:\n" + " region: us-east-1\n" + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + " sts_header_overrides: {\"test\":\"test\"}\n" + " max_retries: 3\n"; + + this.lambdaProcessorConfig = objectMapper.readValue(config, LambdaProcessorConfig.class); + + Event event = JacksonEvent.builder().withEventType("event").withData("{\"status\":true}").build(); + Record record = new Record<>(event); + Collection> records = List.of(record); + + InvokeResponse invokeResponse = InvokeResponse.builder().statusCode(200).payload(SdkBytes.fromUtf8String(RESPONSE_PAYLOAD)).build(); + + LambdaProcessor lambdaProcessor = createObjectUnderTest(); + Collection> resultRecords = lambdaProcessor.doExecute(records); + verify(lambdaClient, times(1)).invoke(any(InvokeRequest.class)); + assertEquals(resultRecords.size(), 1); + } +} diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactoryTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactoryTest.java deleted file mode 100644 index 9ed5c71fb2..0000000000 --- a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactoryTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.lambda.sink; - -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.ArgumentCaptor; -import static org.mockito.ArgumentMatchers.any; -import org.mockito.Mock; -import org.mockito.MockedStatic; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; -import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.lambda.LambdaClient; -import software.amazon.awssdk.services.lambda.LambdaClientBuilder; - -import java.util.Map; -import java.util.UUID; - -@ExtendWith(MockitoExtension.class) -class LambdaClientFactoryTest { - @Mock - private LambdaSinkConfig lambdaSinkConfig; - @Mock - private AwsCredentialsSupplier awsCredentialsSupplier; - - @Mock - private AwsAuthenticationOptions awsAuthenticationOptions; - - @BeforeEach - void setUp() { - when(lambdaSinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); - } - - @Test - void createLambdaClient_with_real_LambdaClient() { - when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1); - final LambdaClient lambdaClient = LambdaClientFactory.createLambdaClient(lambdaSinkConfig, awsCredentialsSupplier); - - assertThat(lambdaClient, notNullValue()); - } - - @ParameterizedTest - @ValueSource(strings = {"us-east-1", "us-west-2", "eu-central-1"}) - void createlambdaClient_provides_correct_inputs(final String regionString) { - final Region region = Region.of(regionString); - final String stsRoleArn = UUID.randomUUID().toString(); - final Map stsHeaderOverrides = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); - when(awsAuthenticationOptions.getAwsRegion()).thenReturn(region); - when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn(stsRoleArn); - when(awsAuthenticationOptions.getAwsStsHeaderOverrides()).thenReturn(stsHeaderOverrides); - - final AwsCredentialsProvider expectedCredentialsProvider = mock(AwsCredentialsProvider.class); - when(awsCredentialsSupplier.getProvider(any())).thenReturn(expectedCredentialsProvider); - - final LambdaClientBuilder lambdaClientBuilder = mock(LambdaClientBuilder.class); - when(lambdaClientBuilder.region(region)).thenReturn(lambdaClientBuilder); - when(lambdaClientBuilder.credentialsProvider(any())).thenReturn(lambdaClientBuilder); - when(lambdaClientBuilder.overrideConfiguration(any(ClientOverrideConfiguration.class))).thenReturn(lambdaClientBuilder); - try(final MockedStatic lambdaClientMockedStatic = mockStatic(LambdaClient.class)) { - lambdaClientMockedStatic.when(LambdaClient::builder) - .thenReturn(lambdaClientBuilder); - LambdaClientFactory.createLambdaClient(lambdaSinkConfig, awsCredentialsSupplier); - } - - final ArgumentCaptor credentialsProviderArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsProvider.class); - verify(lambdaClientBuilder).credentialsProvider(credentialsProviderArgumentCaptor.capture()); - - final AwsCredentialsProvider actualCredentialsProvider = credentialsProviderArgumentCaptor.getValue(); - - assertThat(actualCredentialsProvider, equalTo(expectedCredentialsProvider)); - - final ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class); - verify(awsCredentialsSupplier).getProvider(optionsArgumentCaptor.capture()); - - final AwsCredentialsOptions actualCredentialsOptions = optionsArgumentCaptor.getValue(); - assertThat(actualCredentialsOptions.getRegion(), equalTo(region)); - assertThat(actualCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn)); - assertThat(actualCredentialsOptions.getStsHeaderOverrides(), equalTo(stsHeaderOverrides)); - } -} \ No newline at end of file diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceTest.java index f8ca0f11ec..d9c02d09b7 100644 --- a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceTest.java +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceTest.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -15,15 +16,18 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; @@ -39,7 +43,12 @@ import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory; import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig; +import static org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig.BATCH_EVENT; import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions; +import static org.opensearch.dataprepper.plugins.lambda.sink.LambdaSinkService.LAMBDA_LATENCY_METRIC; +import static org.opensearch.dataprepper.plugins.lambda.sink.LambdaSinkService.REQUEST_PAYLOAD_SIZE; +import static org.opensearch.dataprepper.plugins.lambda.sink.LambdaSinkService.RESPONSE_PAYLOAD_SIZE; import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler; import org.opensearch.dataprepper.plugins.lambda.sink.dlq.LambdaSinkFailedDlqData; import software.amazon.awssdk.awscore.exception.AwsServiceException; @@ -54,9 +63,11 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; public class LambdaSinkServiceTest { @@ -66,13 +77,14 @@ public class LambdaSinkServiceTest { public static final String maxSize = "1kb"; public static final String functionName = "testFunction"; public static final String invocationType = "event"; - public static final String batchKey ="lambda_batch_key"; + public static final String keyName ="lambda_batch_key"; public static final String config = " function_name: testFunction\n" + " aws:\n" + " region: us-east-1\n" + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + " sts_header_overrides: {\"test\":\"test\"}\n" + + " payload_model: single-event\n"+ " max_retries: 10\n"; private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); @@ -81,10 +93,14 @@ public class LambdaSinkServiceTest { private PluginMetrics pluginMetrics; private Counter numberOfRecordsSuccessCounter; private Counter numberOfRecordsFailedCounter; + private Timer lambdaLatencyMetric; + private AtomicLong requestPayload; + private AtomicLong responsePayload; private DlqPushHandler dlqPushHandler; private Buffer buffer; private BufferFactory bufferFactory; - + private OutputCodecContext outputCodecContext; + private ExpressionEvaluator expressionEvaluator; private InvokeResponse invokeResponse; @@ -97,46 +113,65 @@ public void setup() throws IOException { this.lambdaClient = mock(LambdaClient.class); this.pluginMetrics = mock(PluginMetrics.class); this.buffer = mock(InMemoryBuffer.class); + this.expressionEvaluator = mock(ExpressionEvaluator.class); this.lambdaSinkConfig = mock(LambdaSinkConfig.class); this.numberOfRecordsSuccessCounter = mock(Counter.class); this.numberOfRecordsFailedCounter = mock(Counter.class); + this.lambdaLatencyMetric = mock(Timer.class); + this.requestPayload = mock(AtomicLong.class); + this.responsePayload = mock(AtomicLong.class); this.dlqPushHandler = mock(DlqPushHandler.class); this.bufferFactory = mock(BufferFactory.class); + this.outputCodecContext = mock(OutputCodecContext.class); + when(lambdaSinkConfig.getInvocationType()).thenReturn(LambdaCommonConfig.EVENT); + when(lambdaSinkConfig.getPayloadModel()).thenReturn("single-event"); when(pluginMetrics.counter(LambdaSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS)).thenReturn(numberOfRecordsSuccessCounter); when(pluginMetrics.counter(LambdaSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)).thenReturn(numberOfRecordsFailedCounter); + lenient().when(pluginMetrics.timer(LAMBDA_LATENCY_METRIC)).thenReturn(lambdaLatencyMetric); + lenient().when(pluginMetrics.gauge(eq(REQUEST_PAYLOAD_SIZE), any(AtomicLong.class))).thenReturn(requestPayload); + lenient().when(pluginMetrics.gauge(eq(RESPONSE_PAYLOAD_SIZE), any(AtomicLong.class))).thenReturn(responsePayload); mockResponse = InvokeResponse.builder() .statusCode(200) // HTTP 200 for successful invocation .payload(SdkBytes.fromString("{\"key\": \"value\"}", java.nio.charset.StandardCharsets.UTF_8)) .build(); + } private LambdaSinkService createObjectUnderTest(LambdaSinkConfig lambdaSinkConfig) throws IOException { bufferFactory = new InMemoryBufferFactory(); buffer = bufferFactory.getBuffer(lambdaClient,functionName,invocationType); + when(outputCodecContext.getIncludeKeys()).thenReturn(Collections.emptyList()); + when(outputCodecContext.getExcludeKeys()).thenReturn(Collections.emptyList()); + when(outputCodecContext.getTagsTargetKey()).thenReturn(null); return new LambdaSinkService(lambdaClient, lambdaSinkConfig, pluginMetrics, mock(PluginFactory.class), mock(PluginSetting.class), - mock(OutputCodecContext.class), + outputCodecContext, mock(AwsCredentialsSupplier.class), dlqPushHandler, - bufferFactory); + bufferFactory, + expressionEvaluator); } private LambdaSinkService createObjectUnderTest(String config) throws IOException { this.lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class); bufferFactory = new InMemoryBufferFactory(); buffer = bufferFactory.getBuffer(lambdaClient,functionName,invocationType); + when(outputCodecContext.getIncludeKeys()).thenReturn(Collections.emptyList()); + when(outputCodecContext.getExcludeKeys()).thenReturn(Collections.emptyList()); + when(outputCodecContext.getTagsTargetKey()).thenReturn(null); return new LambdaSinkService(lambdaClient, lambdaSinkConfig, pluginMetrics, mock(PluginFactory.class), mock(PluginSetting.class), - mock(OutputCodecContext.class), + outputCodecContext, mock(AwsCredentialsSupplier.class), dlqPushHandler, - bufferFactory); + bufferFactory, + expressionEvaluator); } @Test @@ -172,44 +207,6 @@ public void lambda_sink_test_with_single_record_success_push_to_lambda() throws verify(numberOfRecordsSuccessCounter).increment(records.size()); } - @Test - public void lambda_sink_test_max_retires_works() throws IOException { - final String config = - " function_name: test_function\n" + - " aws:\n" + - " region: us-east-1\n" + - " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + - " sts_header_overrides: {\"test\":\"test\"}\n" + - " max_retries: 3\n"; - this.buffer = mock(InMemoryBuffer.class); - when(lambdaClient.invoke(any(InvokeRequest.class))).thenThrow(AwsServiceException.class); - doNothing().when(dlqPushHandler).perform(any(PluginSetting.class), any(LambdaSinkFailedDlqData.class)); - - this.lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class); - bufferFactory = mock(BufferFactory.class); - buffer = mock(Buffer.class); - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - when(buffer.getOutputStream()).thenReturn(byteArrayOutputStream); - when(bufferFactory.getBuffer(any(LambdaClient.class),any(),any())).thenReturn(buffer); - doThrow(AwsServiceException.class).when(buffer).flushToLambdaAsync(); - - LambdaSinkService lambdaSinkService = new LambdaSinkService(lambdaClient, - lambdaSinkConfig, - pluginMetrics, - mock(PluginFactory.class), - mock(PluginSetting.class), - mock(OutputCodecContext.class), - mock(AwsCredentialsSupplier.class), - dlqPushHandler, - bufferFactory); - - final Record eventRecord = new Record<>(JacksonEvent.fromMessage("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}")); - Collection> records = List.of(eventRecord); - lambdaSinkService.output(records); - - verify(buffer, times(3)).flushToLambdaAsync(); - } - @Test public void lambda_sink_test_dlq_works() throws IOException { final String config = @@ -218,6 +215,7 @@ public void lambda_sink_test_dlq_works() throws IOException { " region: us-east-1\n" + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + " sts_header_overrides: {\"test\":\"test\"}\n" + + " payload_model: single-event\n"+ " max_retries: 3\n"; when(lambdaClient.invoke(any(InvokeRequest.class))).thenThrow(AwsServiceException.class); @@ -237,17 +235,18 @@ public void lambda_sink_test_dlq_works() throws IOException { pluginMetrics, mock(PluginFactory.class), mock(PluginSetting.class), - mock(OutputCodecContext.class), + outputCodecContext, mock(AwsCredentialsSupplier.class), dlqPushHandler, - bufferFactory); + bufferFactory, + expressionEvaluator); final Record eventRecord = new Record<>(JacksonEvent.fromMessage("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}")); Collection> records = List.of(eventRecord); lambdaSinkService.output(records); - verify(buffer, times(3)).flushToLambdaAsync(); + verify(buffer, times(1)).flushToLambdaAsync(); verify(dlqPushHandler,times(1)).perform(any(PluginSetting.class),any(Object.class)); } @@ -274,6 +273,13 @@ void lambda_sink_service_test_output_with_single_record_ack_release() throws IOE final Event event = mock(Event.class); given(event.toJsonString()).willReturn("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}"); given(event.getEventHandle()).willReturn(mock(EventHandle.class)); + final Event.JsonStringBuilder jsonStringBuilder = mock(Event.JsonStringBuilder.class); // Mock the JsonStringBuilder + given(event.jsonBuilder()).willReturn(jsonStringBuilder); + given(jsonStringBuilder.includeKeys(outputCodecContext.getIncludeKeys())).willReturn(jsonStringBuilder); + given(jsonStringBuilder.excludeKeys(outputCodecContext.getExcludeKeys())).willReturn(jsonStringBuilder); + given(jsonStringBuilder.includeTags(outputCodecContext.getTagsTargetKey())).willReturn(jsonStringBuilder); + given(jsonStringBuilder.toJsonString()).willReturn("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}"); + final ArgumentCaptor invokeRequestCaptor = ArgumentCaptor.forClass(InvokeRequest.class); when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(mockResponse); @@ -291,8 +297,9 @@ void lambda_sink_service_test_output_with_single_record_ack_release() throws IOE public void lambda_sink_test_batch_enabled() throws IOException { when(lambdaSinkConfig.getFunctionName()).thenReturn(functionName); when(lambdaSinkConfig.getMaxConnectionRetries()).thenReturn(maxRetries); + when(lambdaSinkConfig.getPayloadModel()).thenReturn(BATCH_EVENT); when(lambdaSinkConfig.getBatchOptions()).thenReturn(mock(BatchOptions.class)); - when(lambdaSinkConfig.getBatchOptions().getBatchKey()).thenReturn(batchKey); + when(lambdaSinkConfig.getBatchOptions().getKeyName()).thenReturn("lambda_batch_key"); when(lambdaSinkConfig.getBatchOptions().getThresholdOptions()).thenReturn(mock(ThresholdOptions.class)); when(lambdaSinkConfig.getBatchOptions().getThresholdOptions().getEventCount()).thenReturn(1); when(lambdaSinkConfig.getBatchOptions().getThresholdOptions().getMaximumSize()).thenReturn(ByteCount.parse(maxSize)); diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java index 1842795e7c..010b7b3662 100644 --- a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java @@ -13,11 +13,13 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.lambda.LambdaClient; @@ -36,6 +38,7 @@ class LambdaSinkTest { private PluginFactory pluginFactory; private AwsCredentialsSupplier awsCredentialsSupplier; private SinkContext sinkContext; + private ExpressionEvaluator expressionEvaluator; @BeforeEach void setUp() { @@ -48,9 +51,11 @@ void setUp() { awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); Map dlqMap = mock(HashMap.class); LambdaClient lambdaClient = mock(LambdaClient.class); + ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class); - + when(lambdaSinkConfig.getPayloadModel()).thenReturn("single-event"); when(lambdaSinkConfig.getDlq()).thenReturn(pluginModel); + when(lambdaSinkConfig.getInvocationType()).thenReturn(LambdaCommonConfig.EVENT); when(pluginModel.getPluginSettings()).thenReturn(dlqMap); when(lambdaSinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(S3_REGION)); @@ -60,7 +65,7 @@ void setUp() { } private LambdaSink createObjectUnderTest() { - return new LambdaSink(pluginSetting, lambdaSinkConfig, pluginFactory, sinkContext, awsCredentialsSupplier); + return new LambdaSink(pluginSetting, lambdaSinkConfig, pluginFactory, sinkContext, awsCredentialsSupplier, expressionEvaluator); } @Test diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecConfig.java index ea99a23f4a..8d8ba865d6 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecConfig.java @@ -17,4 +17,8 @@ public class JsonOutputCodecConfig { public String getKeyName() { return keyName; } + + public void setKeyName(String keyName) { + this.keyName = keyName; + } }