Skip to content

Commit

Permalink
Add metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Srikanth Govindarajan <[email protected]>
  • Loading branch information
srikanthjg committed Jul 21, 2024
1 parent 5dafca8 commit 879671f
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import io.micrometer.core.instrument.Counter;
import static junit.framework.TestCase.assertEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -15,6 +15,7 @@
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
Expand Down Expand Up @@ -66,6 +67,9 @@ public class LambdaProcessorServiceIT {
private Counter numberOfRecordsSuccessCounter;
@Mock
private Counter numberOfRecordsFailedCounter;
@Mock
private ExpressionEvaluator expressionEvaluator;

private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));


Expand All @@ -88,8 +92,6 @@ public void setUp() throws Exception {
thenReturn(numberOfRecordsSuccessCounter);
when(pluginMetrics.counter(LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)).
thenReturn(numberOfRecordsFailedCounter);
when(pluginSetting.getName()).thenReturn("test");
when(pluginSetting.getPipelineName()).thenReturn("test-pipeline");
}


Expand All @@ -101,11 +103,11 @@ private static Record<Event> createRecord() {
public LambdaProcessor createObjectUnderTest(final String config) throws JsonProcessingException {

final LambdaProcessorConfig lambdaProcessorConfig = objectMapper.readValue(config, LambdaProcessorConfig.class);
return new LambdaProcessor(pluginSetting,lambdaProcessorConfig,awsCredentialsSupplier);
return new LambdaProcessor(pluginMetrics,lambdaProcessorConfig,awsCredentialsSupplier,expressionEvaluator);
}

public LambdaProcessor createObjectUnderTest(LambdaProcessorConfig lambdaSinkConfig) throws JsonProcessingException {
return new LambdaProcessor(pluginSetting,lambdaSinkConfig,awsCredentialsSupplier);
return new LambdaProcessor(pluginMetrics,lambdaSinkConfig,awsCredentialsSupplier,expressionEvaluator);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
import software.amazon.awssdk.services.lambda.model.LambdaException;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -94,7 +95,7 @@ public void flushToLambdaAsync() {

@Override
public InvokeResponse flushToLambdaSync() {
InvokeResponse resp;
InvokeResponse resp = null;
SdkBytes payload = getPayload();
payloadRequestSyncSize = payload.asByteArray().length;

Expand All @@ -105,11 +106,18 @@ public InvokeResponse flushToLambdaSync() {
.invocationType(invocationType)
.build();

//TODO
lambdaSyncLatencyWatch.start();
resp = lambdaClient.invoke(request);
lambdaSyncLatencyWatch.stop();
payloadResponseSyncSize = resp.payload().asByteArray().length;
return resp;
try {
resp = lambdaClient.invoke(request);
payloadResponseSyncSize = resp.payload().asByteArray().length;
lambdaSyncLatencyWatch.stop();
return resp;
} catch (LambdaException e){
lambdaSyncLatencyWatch.stop();

}
return null;
}

private SdkBytes validatePayload(String payload_string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

public class ThresholdOptions {

private static final String DEFAULT_BYTE_CAPACITY = "6mb";
private static final String DEFAULT_BYTE_CAPACITY = "3mb";

@JsonProperty("event_count")
@Size(min = 0, max = 10000000, message = "event_count size should be between 0 and 10000000")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
Expand Down Expand Up @@ -55,53 +56,46 @@ public class LambdaProcessor extends AbstractProcessor<Record<Event>, Record<Eve
public static final String LAMBDA_LATENCY_METRIC = "lambdaLatency";
public static final String REQUEST_PAYLOAD_SIZE = "requestPayloadSize";
public static final String RESPONSE_PAYLOAD_SIZE = "responsePayloadSize";

private static final String SYNC_INVOCATION_TYPE = "RequestResponse";
private static final String ASYNC_INVOCATION_TYPE = "Event";
private static final Logger LOG = LoggerFactory.getLogger(LambdaProcessor.class);
private final PluginSetting pluginSetting;

private final String functionName;
private final String whenCondition;
private final ExpressionEvaluator expressionEvaluator;
private final Counter numberOfRecordsSuccessCounter;
private final Counter numberOfRecordsFailedCounter;
private final Timer lambdaLatencyMetric;
//TODO
// private final Counter recordsToLambda;
// private final Counter recordsFromLambda;
private AtomicLong requestPayload;
private AtomicLong responsePayload;
private final String invocationType;
private final Collection<EventHandle> bufferedEventHandles;
private final List<Event> events;
private final BatchOptions batchOptions;
private final ObjectMapper objectMapper = new ObjectMapper();
Buffer currentBuffer;
private final BufferFactory bufferFactory;
private final LambdaClient lambdaClient;
private final Boolean isBatchEnabled;
private final String batchKey;
Buffer currentBuffer;
private final AtomicLong requestPayload;
private final AtomicLong responsePayload;
private int maxEvents = 0;
private ByteCount maxBytes = null;
private Duration maxCollectionDuration = null;
private int maxRetries = 0;
private String mode = null;
private OutputCodec codec = null;
private final Boolean isBatchEnabled;
private final String batchKey;

@DataPrepperPluginConstructor
public LambdaProcessor(final PluginSetting pluginSetting, final LambdaProcessorConfig lambdaProcessorConfig, final AwsCredentialsSupplier awsCredentialsSupplier) {
super(pluginSetting);
this.pluginSetting = pluginSetting;
numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS);
numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED);
lambdaLatencyMetric = pluginMetrics.timer(LAMBDA_LATENCY_METRIC);
requestPayload = pluginMetrics.gauge(REQUEST_PAYLOAD_SIZE, new AtomicLong());
responsePayload = pluginMetrics.gauge(RESPONSE_PAYLOAD_SIZE, new AtomicLong());

//TODO
// recordsToLambda = pluginMetrics.counter();
// recordsFromLambda = pluginMetrics.counter();

public LambdaProcessor(final PluginMetrics pluginMetrics, final LambdaProcessorConfig lambdaProcessorConfig, final AwsCredentialsSupplier awsCredentialsSupplier, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.expressionEvaluator = expressionEvaluator;
this.numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS);
this.numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED);
this.lambdaLatencyMetric = pluginMetrics.timer(LAMBDA_LATENCY_METRIC);
this.requestPayload = pluginMetrics.gauge(REQUEST_PAYLOAD_SIZE, new AtomicLong());
this.responsePayload = pluginMetrics.gauge(RESPONSE_PAYLOAD_SIZE, new AtomicLong());
functionName = lambdaProcessorConfig.getFunctionName();

whenCondition = lambdaProcessorConfig.getWhenCondition();
maxRetries = lambdaProcessorConfig.getMaxConnectionRetries();
batchOptions = lambdaProcessorConfig.getBatchOptions();
if (batchOptions != null) {
Expand All @@ -110,6 +104,7 @@ public LambdaProcessor(final PluginSetting pluginSetting, final LambdaProcessorC
maxCollectionDuration = batchOptions.getThresholdOptions().getEventCollectTimeOut();
batchKey = batchOptions.getBatchKey();
isBatchEnabled = true;
LOG.info("maxEvents:" + maxEvents + " maxbytes:" + maxBytes + " maxDuration:" + maxCollectionDuration);
} else {
batchKey = null;
isBatchEnabled = false;
Expand Down Expand Up @@ -149,6 +144,10 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
for (Record<Event> record : records) {
final Event event = record.getData();

if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, event)) {
continue;
}

try {
if (currentBuffer.getEventCount() == 0) {
codec.start(currentBuffer.getOutputStream(), event, null);
Expand All @@ -160,9 +159,14 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
// flush to lambda and update result record
flushToLambdaIfNeeded(resultRecords);
} catch (Exception e) {
//on exception, same event is added back.
resultRecords.add(new Record<>(event));
LOG.error(EVENT, "There was an exception while processing Event [{}]", event, e);
numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount());
LOG.error(EVENT, "There was an exception while processing Event [{}]" + ", number of events dropped={}", event, e, numberOfRecordsFailedCounter);
//reset buffer
try {
currentBuffer = bufferFactory.getBuffer(lambdaClient, functionName, invocationType);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
return resultRecords;
Expand All @@ -185,12 +189,12 @@ public void shutdown() {

void flushToLambdaIfNeeded(List<Record<Event>> resultRecords) throws InterruptedException, IOException {

LOG.trace("Flush to Lambda check: currentBuffer.size={}, currentBuffer.events={}, currentBuffer.duration={}", currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration());
LOG.info("Flush to Lambda check: currentBuffer.size={}, currentBuffer.events={}, currentBuffer.duration={}", currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration());
final AtomicReference<String> errorMsgObj = new AtomicReference<>();

if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration, isBatchEnabled)) {
codec.complete(currentBuffer.getOutputStream());
// LOG.info("Writing {} to Lambda with {} events and size of {} bytes.", functionName, currentBuffer.getEventCount(), currentBuffer.getSize());
LOG.info("Writing {} to Lambda with {} events and size of {} bytes.", functionName, currentBuffer.getEventCount(), currentBuffer.getSize());
LambdaResult lambdaResult = retryFlushToLambda(currentBuffer, errorMsgObj);

if (lambdaResult.getIsUploadedToLambda()) {
Expand All @@ -201,20 +205,15 @@ void flushToLambdaIfNeeded(List<Record<Event>> resultRecords) throws Interrupted
requestPayload.set(currentBuffer.getPayloadRequestSyncSize());
responsePayload.set(currentBuffer.getPayloadResponseSyncSize());

LOG.info(String.valueOf(currentBuffer.getFlushLambdaSyncLatencyMetric()));
LOG.info(String.valueOf(currentBuffer.getPayloadRequestSyncSize()));
LOG.info(String.valueOf(currentBuffer.getPayloadResponseSyncSize()));
InvokeResponse lambdaResponse = lambdaResult.getLambdaResponse();
Event lambdaEvent = convertLambdaResponseToEvent(lambdaResponse);
resultRecords.add(new Record<>(lambdaEvent));
//reset buffer after flush
currentBuffer = bufferFactory.getBuffer(lambdaClient, functionName, invocationType);
} else {
LOG.error("Failed to save to Lambda {}", functionName);
numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount());
}
//reset buffer after flush
currentBuffer = bufferFactory.getBuffer(lambdaClient, functionName, invocationType);

InvokeResponse lambdaResponse = lambdaResult.getLambdaResponse();
Event lambdaEvent = convertLambdaResponseToEvent(lambdaResponse);

resultRecords.add(new Record<>(lambdaEvent));
}
}

Expand All @@ -224,7 +223,6 @@ LambdaResult retryFlushToLambda(Buffer currentBuffer, final AtomicReference<Stri
do {

try {
//TODO ADD Latency METRIC
InvokeResponse resp = currentBuffer.flushToLambdaSync();
isUploadedToLambda = Boolean.TRUE;
LambdaResult lambdaResult = LambdaResult.builder().withIsUploadedToLambda(isUploadedToLambda).withLambdaResponse(resp).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class LambdaProcessorConfig {
@JsonProperty("batch")
private BatchOptions batchOptions;

@JsonProperty("lambda_when")
private String whenCondition;

public AwsAuthenticationOptions getAwsAuthenticationOptions() {
return awsAuthenticationOptions;
}
Expand All @@ -54,4 +57,7 @@ public int getMaxConnectionRetries() {

public String getMode(){return mode;}

public String getWhenCondition() {
return whenCondition;
}
}
Loading

0 comments on commit 879671f

Please sign in to comment.