-
Notifications
You must be signed in to change notification settings - Fork 191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Lambda Synchronous processor support #4700
Add Lambda Synchronous processor support #4700
Conversation
testImplementation project(':data-prepper-plugins:parse-json-processor') | ||
testImplementation 'org.powermock:powermock-module-junit4:2.0.9' | ||
testImplementation 'org.powermock:powermock-api-mockito2:2.0.9' | ||
testImplementation 'junit:junit:4.13.2' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't need any of these four lines. They are provided by the root project.
879671f
to
96615fc
Compare
67f4a1d
to
98b27af
Compare
import software.amazon.awssdk.core.retry.RetryPolicy; | ||
import software.amazon.awssdk.services.lambda.LambdaClient; | ||
|
||
public final class LambdaClientFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you explore the possibility of using one LambdaClientFactory
class? I see one class with that name in lambda sink directory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure i can merge the two and move it to common.
if (mode != null && mode.equalsIgnoreCase(LambdaProcessorConfig.SYNCHRONOUS_MODE)) { | ||
invocationType = SYNC_INVOCATION_TYPE; | ||
} else { | ||
throw new RuntimeException("mode has to be synchronous or asynchronous"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like "unsupported mode {}", mode
is better message here.
98b27af
to
2fe00e8
Compare
@srikanthjg white source check is failing. I am ready to approve this. |
@JsonProperty("max_retries") | ||
private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES; | ||
|
||
@JsonProperty("mode") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The term "mode" is quite ambiguous. I think we can borrow the term "invocation_type" from AWS Lambda itself.
https://docs.aws.amazon.com/lambda/latest/api/API_Invoke.html#API_Invoke_RequestSyntax
throw new RuntimeException("Unsupported mode " + mode); | ||
} | ||
|
||
codec = new LambdaJsonCodec(batchKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach is very restrictive. It assumes that the body fits the format we ask. We should follow the same pattern used elsewhere in Data Prepper by allowing for a pluggable output codec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LambdaJsonCodec is an implementation of OutputCodec - link . I needed this specifically for batch processing when more than one event needs to be mapped to json. JsonOutputCodec is only event at a time. Maybe i can change the name to something more generic like BulkJsonOutputCodec or BatchJsonOutputCodec? It will be used for all dial out processors. In s3 sink we use BufferedCodec, but the only implementation is Parquet currently and the way we want to implement for lambda is different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All OutputCodec
s are made for batches.
Customers can use either JsonOutputCodec
or NdjsonOutputCodec
.
The json
codec already supports a configurable key name. This would replace the batch_key
which you don't need.
codec:
json:
key_name: myKey
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the customer can have two options:
- JSON
codec:
json:
key_name: myKey
Yields:
{
"myKey" : [
{ ...event1... },
{ ...event2... },
{ ...event3... }
]
}
- The customer can use
ndjson
codec:
ndjson:
Yields:
{ ...event1... }
{ ...event2... }
{ ...event3... }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bulk will always need a key as it will be considered one payload, so i guess ndJson cannot be used.
There is also a difference when it comes to handling single event without batch. In this case, i still want to convert dataprepper event to json but i dont want to have a key, i want to pass on the user's data as it is to lambda as payload; but current output codec forces me to have a key. To address that, i either need to add new behaviour to json writeEvent method, to convert event directly to json OR write a new codec(which is what i did).
The behaviour i want seem to be a combination of the 2 codecs - ndjson and json. i want json behaviour for bulk and ndjson behaviour when for single event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bulk will always need a key as it will be considered one payload, so i guess ndJson cannot be used.
Yes, this makes sense. The payload needs to be JSON and ND-JSON with multiple events becomes non-JSON.
There is also a difference when it comes to handling single event without batch.
Actually, an ndjson
which writes a single event gives you exactly what you want in this case. It is exactly the same output.
I also see that you are trying to support the concept of calling a Lambda for each event. Improving the configuration can help make this clearer. Right now there are multiple configurations which the user needs to carefully set to get the desired output.
This is a simpler way to configure it.
- To have a single invocation per event, add a boolean flag. The user need not make any more decisions.
aws_lambda:
function_name: MyFunction
invocation_per_event: true
- The default should be to batch, and this can have the existing defaults. You can probably keep this configuration the same. Though, rename
batch_key
tokey_name
for consistency with the other APIs. Also, disallow setting an event size of1
as this is not the goal of this approach.
aws_lambda:
function:name: MyFunction
Second, you can still use the existing codecs.
When invocation_per_event
is set to true
, you can use the NdJsonInputCodec
internally. Otherwise, use the JsonCodec
and provide the batch.key_name
as the keyName in the codec configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regarding the configuration, i already have "invocation_type" as a configuration, this allows to set per event invocation or batch invocation(RequestResponse or Event).
If we are implementing this internally, i cannot use parse-json-processor as a plugin but will have to take a dependency on it. Is it ok for one processor to take a dependency on the other?
I wanted to avoid this, hence went with implementing a custom codec. But i think this codec can also be used by other dial-out processors eventually, i can make it generic.
|
||
public class LambdaProcessorConfig { | ||
|
||
public static final String SYNCHRONOUS_MODE = "RequestResponse"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should stick with Data Prepper naming conventions: request_response
.
public class LambdaProcessorConfig { | ||
|
||
public static final String SYNCHRONOUS_MODE = "RequestResponse"; | ||
public static final String ASYNCHRONOUS_MODE = "Event"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to event
.
2fe00e8
to
426a023
Compare
426a023
to
e7ff721
Compare
|
||
public class LambdaProcessorConfig { | ||
|
||
public static final String REQUEST_RESPONSE = "RequestResponse"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use request-response
to match our existing naming conventions.
public class LambdaProcessorConfig { | ||
|
||
public static final String REQUEST_RESPONSE = "RequestResponse"; | ||
public static final String EVENT = "Event"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use event
to match our existing naming conventions.
@@ -18,10 +18,12 @@ | |||
|
|||
public class LambdaSinkConfig { | |||
|
|||
public static final String REQUEST_RESPONSE = "RequestResponse"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's consolidate these constant values with the LambdaProcessorConfig
so that they don't diverge.
public static final String EVENT = "Event"; | ||
public static final String BATCH_EVENT = "batch_event"; | ||
public static final String SINGLE_EVENT = "single_event"; | ||
public static final String REQUEST_RESPONSE = "request-response"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps make a CommonLambdaConfig
class that has these constants. We should avoid duplicating these or we may have future mismatches.
88a6549
to
8d5633d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @srikanthjg for this contribution!
maximum_size: 3mb | ||
``` | ||
|
||
`invocation_type` as RequestResponse will be used when the response from aws lambda comes back to dataprepper. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
invocation_type
as RequestResponse is used when DataPrepper needs to process the response from AWS Lambda.
invocation_type
as Event is used when the response from AWS Lambda is sent to an S3 bucket.
|
||
In batch options, an implicit batch threshold option is that if events size is 3mb, we flush it. | ||
`payload_model` this is used to define how the payload should be constructed from a dataprepper event. | ||
`payload_model` as batch_event is used when the output needs to be formed as a batch of multiple events, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there other values for paylod_model
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
||
@ParameterizedTest | ||
@ValueSource(ints = {1,3}) | ||
void verify_records_to_lambda_success(final int recordCount) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider adding test for InvocationType
Event
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
invocation type event will be disabled for now, will be releasing event type with asynchronous support that requires additional infra changes. I have disabled it in the verification for now, ll fix the readme.
final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions); | ||
|
||
return LambdaClient.builder() | ||
.region(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsRegion()) | ||
.region(awsAuthenticationOptions.getAwsRegion()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider enabling SDK metrics to track number of request, timeout, throttle etc.
...mbda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/BatchOptions.java
Show resolved
Hide resolved
codec = new NdjsonOutputCodec(ndjsonOutputCodecConfig); | ||
isBatchEnabled = false; | ||
} else{ | ||
throw new RuntimeException("invalid payload_model option"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this validation be part of lambdaProcessorConfig ?
|
||
if(!lambdaProcessorConfig.getInvocationType().equals(LambdaCommonConfig.EVENT) && | ||
!lambdaProcessorConfig.getInvocationType().equals(LambdaCommonConfig.REQUEST_RESPONSE)){ | ||
throw new RuntimeException("Unsupported invocation type " + lambdaProcessorConfig.getInvocationType()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
if (currentBuffer.getEventCount() == 0) { | ||
codec.start(currentBuffer.getOutputStream(), event, codecContext); | ||
} | ||
codec.writeEvent(event, currentBuffer.getOutputStream()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is currentBuffer
thread safe ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes this is running in the context of a processor.
|
||
} | ||
|
||
void flushToLambdaIfNeeded(List<Record<Event>> resultRecords) throws InterruptedException, IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be private function
|
||
void flushToLambdaIfNeeded(List<Record<Event>> resultRecords) throws InterruptedException, IOException { | ||
|
||
LOG.info("Flush to Lambda check: currentBuffer.size={}, currentBuffer.events={}, currentBuffer.duration={}", currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there excessive logging in this method ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure will reduce them.
} | ||
} | ||
|
||
LambdaResult retryFlushToLambda(Buffer currentBuffer, final AtomicReference<String> errorMsgObj) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be private function
return lambdaResult; | ||
} | ||
|
||
Event convertLambdaResponseToEvent(InvokeResponse lambdaResponse) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be private function
Map<String, String> invocationTypeMap = Map.of( | ||
LambdaCommonConfig.EVENT, EVENT_LAMBDA | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this to static constant
this.bufferFactory = new InMemoryBufferFactory(); | ||
try { | ||
currentBuffer = this.bufferFactory.getBuffer(lambdaClient, functionName, invocationType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buffer is overloaded in DataPrepper. Looks like this is not just buffer but tightly coupled with lambda. We should consider renaming this class and interface to be clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am handling the same way we would do it in the sink. i can address refactor in another pr.
} catch (AwsServiceException | SdkClientException e) { | ||
errorMsgObj.set(e.getMessage()); | ||
LOG.error("Exception occurred while uploading records to lambda. Retry countdown : {} | exception:", retryCount, e); | ||
--retryCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this retry on top of lambda client retry ? Any reason we need this ?
Signed-off-by: Srikanth Govindarajan <[email protected]>
Signed-off-by: Srikanth Govindarajan <[email protected]>
Signed-off-by: Srikanth Govindarajan <[email protected]>
Signed-off-by: Srikanth Govindarajan <[email protected]>
Signed-off-by: Srikanth Govindarajan <[email protected]>
Signed-off-by: Srikanth Govindarajan <[email protected]>
Signed-off-by: Srikanth Govindarajan <[email protected]>
57603ff
to
21ee56e
Compare
private String invocationType = REQUEST_RESPONSE; | ||
|
||
@JsonProperty("payload_model") | ||
private String payloadModel = BATCH_EVENT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make a Java enum
for this.
Here is an example:
I'm ok with doing this in a follow-on PR.
private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES; | ||
|
||
@JsonProperty("invocation_type") | ||
private String invocationType = REQUEST_RESPONSE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make this an enum as well.
@@ -38,12 +38,21 @@ public class LambdaSinkConfig { | |||
@JsonProperty("max_retries") | |||
private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES; | |||
|
|||
@JsonProperty("invocation_type") | |||
private String invocationType = EVENT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be an enum as well.
private String invocationType = EVENT; | ||
|
||
@JsonProperty("payload_model") | ||
private String payloadModel = BATCH_EVENT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be an enum as well.
Did you look into moving some of the null check validations from the plugin into Config ? Other than this the changes look good to me |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srikanthjg , Thank you for this contribution. I have a few other changes we should try to get in to improve it. But, let's follow-on in another PR.
|
||
public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS = "lambdaProcessorObjectsEventsSucceeded"; | ||
public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED = "lambdaProcessorObjectsEventsFailed"; | ||
public static final String LAMBDA_LATENCY_METRIC = "lambdaProcessorLatency"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can rename this to simply requestLatency
. Or you could call it lambdaRequestLatency
, but that seems unnecessary.
As I read the code, this is the time to make the request to Lambda regardless of it being request/response or event.
Add Lambda Processor Synchronous Mode support Make LambdaClientFactory common to sink and processor Signed-off-by: Srikanth Govindarajan <[email protected]>
Description
Adds AWS lambda as a remote processor for dataprepper.
Further details mentioned in #4699
Issues Resolved
Resolves #4699
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.