Skip to content

Commit

Permalink
Address Comments
Browse files Browse the repository at this point in the history
Signed-off-by: Srikanth Govindarajan <[email protected]>
  • Loading branch information
srikanthjg committed Aug 15, 2024
1 parent d62ddd2 commit 426a023
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void verify_records_to_lambda_success(final int recordCount) throws Exception {

when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName);
when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3);
when(lambdaProcessorConfig.getMode()).thenReturn("synchronous");
when(lambdaProcessorConfig.getInvocationType()).thenReturn("RequestResponse");

LambdaProcessor objectUnderTest = createObjectUnderTest(lambdaProcessorConfig);

Expand All @@ -148,7 +148,7 @@ void verify_records_with_batching_to_lambda(final int recordCount) throws JsonPr

when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName);
when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3);
when(lambdaProcessorConfig.getMode()).thenReturn("synchronous");
when(lambdaProcessorConfig.getInvocationType()).thenReturn("RequestResponse");
when(thresholdOptions.getEventCount()).thenReturn(1);
when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse("2mb"));
when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.parse("PT10s"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ public class LambdaProcessor extends AbstractProcessor<Record<Event>, Record<Eve
public static final String LAMBDA_LATENCY_METRIC = "lambdaLatency";
public static final String REQUEST_PAYLOAD_SIZE = "requestPayloadSize";
public static final String RESPONSE_PAYLOAD_SIZE = "responsePayloadSize";
private static final String SYNC_INVOCATION_TYPE = "RequestResponse";
private static final String ASYNC_INVOCATION_TYPE = "Event";
private static final Logger LOG = LoggerFactory.getLogger(LambdaProcessor.class);

private final String functionName;
Expand All @@ -83,7 +81,6 @@ public class LambdaProcessor extends AbstractProcessor<Record<Event>, Record<Eve
private ByteCount maxBytes = null;
private Duration maxCollectionDuration = null;
private int maxRetries = 0;
private String mode = null;
private OutputCodec codec = null;

@DataPrepperPluginConstructor
Expand All @@ -110,12 +107,11 @@ public LambdaProcessor(final PluginMetrics pluginMetrics, final LambdaProcessorC
batchKey = null;
isBatchEnabled = false;
}
mode = lambdaProcessorConfig.getMode();
// TODO - Support for Async mode to be added.
if (mode != null && mode.equalsIgnoreCase(LambdaProcessorConfig.SYNCHRONOUS_MODE)) {
invocationType = SYNC_INVOCATION_TYPE;
} else {
throw new RuntimeException("Unsupported mode " + mode);
invocationType = lambdaProcessorConfig.getInvocationType();
// TODO - Support for Event invocation type to be added.
if(invocationType.equals(LambdaProcessorConfig.EVENT) ||
invocationType.equals(LambdaProcessorConfig.REQUEST_RESPONSE)){
throw new RuntimeException("Unsupported invocation type " + invocationType);
}

codec = new LambdaJsonCodec(batchKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

public class LambdaProcessorConfig {

public static final String SYNCHRONOUS_MODE = "RequestResponse";
public static final String ASYNCHRONOUS_MODE = "Event";
public static final String REQUEST_RESPONSE = "RequestResponse";
public static final String EVENT = "Event";
private static final int DEFAULT_CONNECTION_RETRIES = 3;

@JsonProperty("aws")
Expand All @@ -32,8 +32,8 @@ public class LambdaProcessorConfig {
@JsonProperty("max_retries")
private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES;

@JsonProperty("mode")
private String mode = SYNCHRONOUS_MODE;
@JsonProperty("invocation_type")
private String invocationType = REQUEST_RESPONSE;

@JsonProperty("batch")
private BatchOptions batchOptions;
Expand All @@ -55,7 +55,7 @@ public int getMaxConnectionRetries() {
return maxConnectionRetries;
}

public String getMode(){return mode;}
public String getInvocationType(){return invocationType;}

public String getWhenCondition() {
return whenCondition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void setUp() throws IOException {

lenient().when(lambdaProcessorConfig.getFunctionName()).thenReturn("test-function1");
lenient().when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3);
lenient().when(lambdaProcessorConfig.getMode()).thenReturn("requestresponse");
lenient().when(lambdaProcessorConfig.getInvocationType()).thenReturn("requestresponse");

lenient().when(thresholdOptions.getEventCount()).thenReturn(10);
lenient().when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.ofBytes(6));
Expand Down Expand Up @@ -264,7 +264,7 @@ public void testConvertLambdaResponseToEvent() throws JsonProcessingException {

@Test
public void testDoExecute_WithConfig() throws JsonProcessingException {
final String config = " function_name: test_function\n" + " mode: requestresponse\n" + " aws:\n" + " region: us-east-1\n" + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + " sts_header_overrides: {\"test\":\"test\"}\n" + " max_retries: 3\n";
final String config = " function_name: test_function\n" + " invocation_type: requestresponse\n" + " aws:\n" + " region: us-east-1\n" + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + " sts_header_overrides: {\"test\":\"test\"}\n" + " max_retries: 3\n";

this.lambdaProcessorConfig = objectMapper.readValue(config, LambdaProcessorConfig.class);

Expand Down

0 comments on commit 426a023

Please sign in to comment.