diff --git a/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorServiceIT.java b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorServiceIT.java index 859cbb81d9..686312cfe7 100644 --- a/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorServiceIT.java +++ b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorServiceIT.java @@ -5,7 +5,7 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; import io.micrometer.core.instrument.Counter; -import static junit.framework.TestCase.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -15,6 +15,7 @@ import org.mockito.MockitoAnnotations; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; @@ -66,6 +67,9 @@ public class LambdaProcessorServiceIT { private Counter numberOfRecordsSuccessCounter; @Mock private Counter numberOfRecordsFailedCounter; + @Mock + private ExpressionEvaluator expressionEvaluator; + private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); @@ -88,8 +92,6 @@ public void setUp() throws Exception { thenReturn(numberOfRecordsSuccessCounter); when(pluginMetrics.counter(LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)). thenReturn(numberOfRecordsFailedCounter); - when(pluginSetting.getName()).thenReturn("test"); - when(pluginSetting.getPipelineName()).thenReturn("test-pipeline"); } @@ -101,11 +103,11 @@ private static Record createRecord() { public LambdaProcessor createObjectUnderTest(final String config) throws JsonProcessingException { final LambdaProcessorConfig lambdaProcessorConfig = objectMapper.readValue(config, LambdaProcessorConfig.class); - return new LambdaProcessor(pluginSetting,lambdaProcessorConfig,awsCredentialsSupplier); + return new LambdaProcessor(pluginMetrics,lambdaProcessorConfig,awsCredentialsSupplier,expressionEvaluator); } public LambdaProcessor createObjectUnderTest(LambdaProcessorConfig lambdaSinkConfig) throws JsonProcessingException { - return new LambdaProcessor(pluginSetting,lambdaSinkConfig,awsCredentialsSupplier); + return new LambdaProcessor(pluginMetrics,lambdaSinkConfig,awsCredentialsSupplier,expressionEvaluator); } diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java index f46df217ca..f944ef110f 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java @@ -12,6 +12,7 @@ import software.amazon.awssdk.services.lambda.LambdaClient; import software.amazon.awssdk.services.lambda.model.InvokeRequest; import software.amazon.awssdk.services.lambda.model.InvokeResponse; +import software.amazon.awssdk.services.lambda.model.LambdaException; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -94,7 +95,7 @@ public void flushToLambdaAsync() { @Override public InvokeResponse flushToLambdaSync() { - InvokeResponse resp; + InvokeResponse resp = null; SdkBytes payload = getPayload(); payloadRequestSyncSize = payload.asByteArray().length; @@ -105,11 +106,18 @@ public InvokeResponse flushToLambdaSync() { .invocationType(invocationType) .build(); + //TODO lambdaSyncLatencyWatch.start(); - resp = lambdaClient.invoke(request); - lambdaSyncLatencyWatch.stop(); - payloadResponseSyncSize = resp.payload().asByteArray().length; - return resp; + try { + resp = lambdaClient.invoke(request); + payloadResponseSyncSize = resp.payload().asByteArray().length; + lambdaSyncLatencyWatch.stop(); + return resp; + } catch (LambdaException e){ + lambdaSyncLatencyWatch.stop(); + + } + return null; } private SdkBytes validatePayload(String payload_string) { diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java index 1f92b90b48..ca8ed6e574 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java @@ -17,7 +17,7 @@ public class ThresholdOptions { - private static final String DEFAULT_BYTE_CAPACITY = "6mb"; + private static final String DEFAULT_BYTE_CAPACITY = "3mb"; @JsonProperty("event_count") @Size(min = 0, max = 10000000, message = "event_count size should be between 0 and 10000000") diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java index 73915dd3b1..01be1a40bc 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java @@ -11,11 +11,12 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Timer; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; -import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; @@ -55,53 +56,46 @@ public class LambdaProcessor extends AbstractProcessor, Record bufferedEventHandles; private final List events; private final BatchOptions batchOptions; private final ObjectMapper objectMapper = new ObjectMapper(); - Buffer currentBuffer; private final BufferFactory bufferFactory; private final LambdaClient lambdaClient; + private final Boolean isBatchEnabled; + private final String batchKey; + Buffer currentBuffer; + private final AtomicLong requestPayload; + private final AtomicLong responsePayload; private int maxEvents = 0; private ByteCount maxBytes = null; private Duration maxCollectionDuration = null; private int maxRetries = 0; private String mode = null; private OutputCodec codec = null; - private final Boolean isBatchEnabled; - private final String batchKey; @DataPrepperPluginConstructor - public LambdaProcessor(final PluginSetting pluginSetting, final LambdaProcessorConfig lambdaProcessorConfig, final AwsCredentialsSupplier awsCredentialsSupplier) { - super(pluginSetting); - this.pluginSetting = pluginSetting; - numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS); - numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED); - lambdaLatencyMetric = pluginMetrics.timer(LAMBDA_LATENCY_METRIC); - requestPayload = pluginMetrics.gauge(REQUEST_PAYLOAD_SIZE, new AtomicLong()); - responsePayload = pluginMetrics.gauge(RESPONSE_PAYLOAD_SIZE, new AtomicLong()); - - //TODO -// recordsToLambda = pluginMetrics.counter(); -// recordsFromLambda = pluginMetrics.counter(); - + public LambdaProcessor(final PluginMetrics pluginMetrics, final LambdaProcessorConfig lambdaProcessorConfig, final AwsCredentialsSupplier awsCredentialsSupplier, final ExpressionEvaluator expressionEvaluator) { + super(pluginMetrics); + this.expressionEvaluator = expressionEvaluator; + this.numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS); + this.numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED); + this.lambdaLatencyMetric = pluginMetrics.timer(LAMBDA_LATENCY_METRIC); + this.requestPayload = pluginMetrics.gauge(REQUEST_PAYLOAD_SIZE, new AtomicLong()); + this.responsePayload = pluginMetrics.gauge(RESPONSE_PAYLOAD_SIZE, new AtomicLong()); functionName = lambdaProcessorConfig.getFunctionName(); - + whenCondition = lambdaProcessorConfig.getWhenCondition(); maxRetries = lambdaProcessorConfig.getMaxConnectionRetries(); batchOptions = lambdaProcessorConfig.getBatchOptions(); if (batchOptions != null) { @@ -110,6 +104,7 @@ public LambdaProcessor(final PluginSetting pluginSetting, final LambdaProcessorC maxCollectionDuration = batchOptions.getThresholdOptions().getEventCollectTimeOut(); batchKey = batchOptions.getBatchKey(); isBatchEnabled = true; + LOG.info("maxEvents:" + maxEvents + " maxbytes:" + maxBytes + " maxDuration:" + maxCollectionDuration); } else { batchKey = null; isBatchEnabled = false; @@ -149,6 +144,10 @@ public Collection> doExecute(Collection> records) { for (Record record : records) { final Event event = record.getData(); + if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, event)) { + continue; + } + try { if (currentBuffer.getEventCount() == 0) { codec.start(currentBuffer.getOutputStream(), event, null); @@ -160,9 +159,14 @@ public Collection> doExecute(Collection> records) { // flush to lambda and update result record flushToLambdaIfNeeded(resultRecords); } catch (Exception e) { - //on exception, same event is added back. - resultRecords.add(new Record<>(event)); - LOG.error(EVENT, "There was an exception while processing Event [{}]", event, e); + numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount()); + LOG.error(EVENT, "There was an exception while processing Event [{}]" + ", number of events dropped={}", event, e, numberOfRecordsFailedCounter); + //reset buffer + try { + currentBuffer = bufferFactory.getBuffer(lambdaClient, functionName, invocationType); + } catch (IOException ex) { + throw new RuntimeException(ex); + } } } return resultRecords; @@ -185,12 +189,12 @@ public void shutdown() { void flushToLambdaIfNeeded(List> resultRecords) throws InterruptedException, IOException { - LOG.trace("Flush to Lambda check: currentBuffer.size={}, currentBuffer.events={}, currentBuffer.duration={}", currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration()); + LOG.info("Flush to Lambda check: currentBuffer.size={}, currentBuffer.events={}, currentBuffer.duration={}", currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration()); final AtomicReference errorMsgObj = new AtomicReference<>(); if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration, isBatchEnabled)) { codec.complete(currentBuffer.getOutputStream()); -// LOG.info("Writing {} to Lambda with {} events and size of {} bytes.", functionName, currentBuffer.getEventCount(), currentBuffer.getSize()); + LOG.info("Writing {} to Lambda with {} events and size of {} bytes.", functionName, currentBuffer.getEventCount(), currentBuffer.getSize()); LambdaResult lambdaResult = retryFlushToLambda(currentBuffer, errorMsgObj); if (lambdaResult.getIsUploadedToLambda()) { @@ -201,20 +205,15 @@ void flushToLambdaIfNeeded(List> resultRecords) throws Interrupted requestPayload.set(currentBuffer.getPayloadRequestSyncSize()); responsePayload.set(currentBuffer.getPayloadResponseSyncSize()); - LOG.info(String.valueOf(currentBuffer.getFlushLambdaSyncLatencyMetric())); - LOG.info(String.valueOf(currentBuffer.getPayloadRequestSyncSize())); - LOG.info(String.valueOf(currentBuffer.getPayloadResponseSyncSize())); + InvokeResponse lambdaResponse = lambdaResult.getLambdaResponse(); + Event lambdaEvent = convertLambdaResponseToEvent(lambdaResponse); + resultRecords.add(new Record<>(lambdaEvent)); + //reset buffer after flush + currentBuffer = bufferFactory.getBuffer(lambdaClient, functionName, invocationType); } else { LOG.error("Failed to save to Lambda {}", functionName); numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount()); } - //reset buffer after flush - currentBuffer = bufferFactory.getBuffer(lambdaClient, functionName, invocationType); - - InvokeResponse lambdaResponse = lambdaResult.getLambdaResponse(); - Event lambdaEvent = convertLambdaResponseToEvent(lambdaResponse); - - resultRecords.add(new Record<>(lambdaEvent)); } } @@ -224,7 +223,6 @@ LambdaResult retryFlushToLambda(Buffer currentBuffer, final AtomicReference lambdaClientFactoryMockedStatic; private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); +// @Mock +// private PluginSetting pluginSetting; + + @Mock + private PluginMetrics pluginMetrics; + @Mock - private PluginSetting pluginSetting; + private ExpressionEvaluator expressionEvaluator; @Mock private LambdaProcessorConfig lambdaProcessorConfig; @@ -74,8 +90,26 @@ public class LambdaProcessorTest { @Mock private Buffer buffer; + @Mock + private Counter numberOfRecordsSuccessCounter; + + @Mock + private Counter numberOfRecordsFailedCounter; + + @Mock + private Counter numberOfRecordsDroppedCounter; + + @Mock + private Timer lambdaLatencyMetric; + + @Mock + private AtomicLong requestPayload; + + @Mock + private AtomicLong responsePayload; + private LambdaProcessor createObjectUnderTest() { - return new LambdaProcessor(pluginSetting, lambdaProcessorConfig, awsCredentialsSupplier); + return new LambdaProcessor(pluginMetrics, lambdaProcessorConfig, awsCredentialsSupplier, expressionEvaluator); } @BeforeEach @@ -88,7 +122,7 @@ public void setUp() throws IOException { lenient().when(lambdaProcessorConfig.getFunctionName()).thenReturn("test-function1"); lenient().when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3); - lenient().when(lambdaProcessorConfig.getMode()).thenReturn("synchronous"); + lenient().when(lambdaProcessorConfig.getMode()).thenReturn("requestresponse"); lenient().when(thresholdOptions.getEventCount()).thenReturn(10); lenient().when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.ofBytes(6)); @@ -98,12 +132,15 @@ public void setUp() throws IOException { lenient().when(batchOptions.getBatchKey()).thenReturn("key"); lenient().when(lambdaProcessorConfig.getBatchOptions()).thenReturn(batchOptions); - when(pluginSetting.getName()).thenReturn(PROCESSOR_PLUGIN_NAME); - when(pluginSetting.getPipelineName()).thenReturn(PROCESSOR_PIPELINE_NAME); - lenient().when(lambdaProcessorConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); lenient().when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of("test-region")); + lenient().when(pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS)).thenReturn(numberOfRecordsDroppedCounter); + lenient().when(pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)).thenReturn(numberOfRecordsFailedCounter); + lenient().when(pluginMetrics.timer(LAMBDA_LATENCY_METRIC)).thenReturn(lambdaLatencyMetric); + lenient().when(pluginMetrics.gauge(eq(REQUEST_PAYLOAD_SIZE), any(AtomicLong.class))).thenReturn(requestPayload); + lenient().when(pluginMetrics.gauge(eq(RESPONSE_PAYLOAD_SIZE), any(AtomicLong.class))).thenReturn(responsePayload); + InvokeResponse resp = InvokeResponse.builder().statusCode(200).payload(SdkBytes.fromUtf8String(RESPONSE_PAYLOAD)).build(); lambdaClientFactoryMockedStatic = Mockito.mockStatic(LambdaClientFactory.class); when(LambdaClientFactory.createLambdaClient(any(LambdaProcessorConfig.class), any(AwsCredentialsSupplier.class))).thenReturn(lambdaClient); @@ -190,7 +227,7 @@ public void testConvertLambdaResponseToEvent_withNon200StatusCode() { } @Test - public void testDoExecute_withNon200StatusCode() { + public void testDoExecute_withNonSuccessfulStatusCode() { InvokeResponse response = InvokeResponse.builder().statusCode(500).payload(SdkBytes.fromUtf8String(RESPONSE_PAYLOAD)).build(); lenient().when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(response); @@ -203,8 +240,11 @@ public void testDoExecute_withNon200StatusCode() { verify(lambdaClient, times(1)).invoke(any(InvokeRequest.class)); - //event should remain unmodified - assertEquals(records.get(0).getData(), resultRecords.get(0).getData()); + //event should be dropped on failure + assertEquals(resultRecords.size(), 0); + verify(numberOfRecordsFailedCounter, times(1)).increment(1); + //check if buffer is reset + assertEquals(buffer.getSize(), 0); } @Test @@ -223,7 +263,7 @@ public void testConvertLambdaResponseToEvent() throws JsonProcessingException { @Test public void testDoExecute_WithConfig() throws JsonProcessingException { - final String config = " function_name: test_function\n" + " mode: synchronous\n" + " aws:\n" + " region: us-east-1\n" + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + " sts_header_overrides: {\"test\":\"test\"}\n" + " max_retries: 3\n"; + final String config = " function_name: test_function\n" + " mode: requestresponse\n" + " aws:\n" + " region: us-east-1\n" + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + " sts_header_overrides: {\"test\":\"test\"}\n" + " max_retries: 3\n"; this.lambdaProcessorConfig = objectMapper.readValue(config, LambdaProcessorConfig.class);