Skip to content

Commit

Permalink
Add more metrics and handle emr exception message (#2422)
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsi-amazon authored Nov 8, 2023
1 parent b8aa7ef commit 53fd85e
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient

private RestChannelConsumer executePostRequest(RestRequest restRequest, NodeClient nodeClient)
throws IOException {
MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_CREATION_REQ_COUNT);
DataSourceMetadata dataSourceMetadata =
XContentParserUtils.toDataSourceMetadata(restRequest.contentParser());
return restChannel ->
Expand Down Expand Up @@ -163,6 +164,7 @@ public void onFailure(Exception e) {
}

private RestChannelConsumer executeGetRequest(RestRequest restRequest, NodeClient nodeClient) {
MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_GET_REQ_COUNT);
String dataSourceName = restRequest.param("dataSourceName");
return restChannel ->
Scheduler.schedule(
Expand Down Expand Up @@ -191,6 +193,7 @@ public void onFailure(Exception e) {

private RestChannelConsumer executeUpdateRequest(RestRequest restRequest, NodeClient nodeClient)
throws IOException {
MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_PUT_REQ_COUNT);
DataSourceMetadata dataSourceMetadata =
XContentParserUtils.toDataSourceMetadata(restRequest.contentParser());
return restChannel ->
Expand Down Expand Up @@ -220,6 +223,7 @@ public void onFailure(Exception e) {

private RestChannelConsumer executePatchRequest(RestRequest restRequest, NodeClient nodeClient)
throws IOException {
MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_PATCH_REQ_COUNT);
Map<String, Object> dataSourceData = XContentParserUtils.toMap(restRequest.contentParser());
return restChannel ->
Scheduler.schedule(
Expand Down Expand Up @@ -247,7 +251,7 @@ public void onFailure(Exception e) {
}

private RestChannelConsumer executeDeleteRequest(RestRequest restRequest, NodeClient nodeClient) {

MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_DELETE_REQ_COUNT);
String dataSourceName = restRequest.param("dataSourceName");
return restChannel ->
Scheduler.schedule(
Expand Down Expand Up @@ -276,8 +280,10 @@ public void onFailure(Exception e) {

private void handleException(Exception e, RestChannel restChannel) {
if (e instanceof DataSourceNotFoundException) {
MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_CUS);
reportError(restChannel, e, NOT_FOUND);
} else if (e instanceof OpenSearchException) {
MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_SYS);
OpenSearchException exception = (OpenSearchException) e;
reportError(restChannel, exception, exception.status());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -62,7 +60,6 @@ protected void doExecute(
Task task,
CreateDataSourceActionRequest request,
ActionListener<CreateDataSourceActionResponse> actionListener) {
MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_CREATION_REQ_COUNT);
int dataSourceLimit = settings.getSettingValue(DATASOURCES_LIMIT);
if (dataSourceService.getDataSourceMetadata(false).size() >= dataSourceLimit) {
actionListener.onFailure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -55,7 +53,6 @@ protected void doExecute(
Task task,
DeleteDataSourceActionRequest request,
ActionListener<DeleteDataSourceActionResponse> actionListener) {
MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_DELETE_REQ_COUNT);
try {
dataSourceService.deleteDataSource(request.getDataSourceName());
actionListener.onResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -58,7 +56,6 @@ protected void doExecute(
Task task,
GetDataSourceActionRequest request,
ActionListener<GetDataSourceActionResponse> actionListener) {
MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_GET_REQ_COUNT);
try {
String responseContent;
if (request.getDataSourceName() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -59,7 +57,6 @@ protected void doExecute(
Task task,
PatchDataSourceActionRequest request,
ActionListener<PatchDataSourceActionResponse> actionListener) {
MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_PATCH_REQ_COUNT);
try {
dataSourceService.patchDataSource(request.getDataSourceData());
String responseContent =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -58,7 +56,6 @@ protected void doExecute(
Task task,
UpdateDataSourceActionRequest request,
ActionListener<UpdateDataSourceActionResponse> actionListener) {
MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_PUT_REQ_COUNT);
try {
dataSourceService.updateDataSource(request.getDataSourceMetadata());
String responseContent =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ public enum MetricName {
DATASOURCE_DELETE_REQ_COUNT("datasource_delete_request_count"),
DATASOURCE_FAILED_REQ_COUNT_SYS("datasource_failed_request_count_syserr"),
DATASOURCE_FAILED_REQ_COUNT_CUS("datasource_failed_request_count_cuserr"),
ASYNC_QUERY_CREATE_API_REQUEST_COUNT("async_query_create_api_request_count"),
ASYNC_QUERY_GET_API_REQUEST_COUNT("async_query_get_api_request_count"),
ASYNC_QUERY_CANCEL_API_REQUEST_COUNT("async_query_cancel_api_request_count"),
ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_SYS("async_query_get_api_failed_request_count_syserr"),
ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_CUS("async_query_get_api_failed_request_count_cuserr"),
ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_SYS("async_query_create_api_failed_request_count_syserr"),
ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_CUS("async_query_create_api_failed_request_count_cuserr"),
ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_SYS("async_query_cancel_api_failed_request_count_syserr"),
ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_CUS("async_query_cancel_api_failed_request_count_cuserr"),
EMR_START_JOB_REQUEST_FAILURE_COUNT("emr_start_job_request_failure_count"),
EMR_GET_JOB_RESULT_FAILURE_COUNT("emr_get_job_request_failure_count"),
EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT("emr_cancel_job_request_failure_count"),
Expand Down Expand Up @@ -73,6 +82,15 @@ public static List<String> getNames() {
.add(EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT)
.add(EMR_STREAMING_QUERY_JOBS_CREATION_COUNT)
.add(EMR_BATCH_QUERY_JOBS_CREATION_COUNT)
.add(ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_CUS)
.add(ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_SYS)
.add(ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_CUS)
.add(ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_SYS)
.add(ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_CUS)
.add(ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_SYS)
.add(ASYNC_QUERY_CREATE_API_REQUEST_COUNT)
.add(ASYNC_QUERY_GET_API_REQUEST_COUNT)
.add(ASYNC_QUERY_CANCEL_API_REQUEST_COUNT)
.build();

public boolean isNumerical() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.amazonaws.services.emrserverless.model.SparkSubmit;
import com.amazonaws.services.emrserverless.model.StartJobRunRequest;
import com.amazonaws.services.emrserverless.model.StartJobRunResult;
import com.amazonaws.services.emrserverless.model.ValidationException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import org.apache.logging.log4j.LogManager;
Expand All @@ -30,6 +29,8 @@ public class EmrServerlessClientImpl implements EMRServerlessClient {
private final AWSEMRServerless emrServerless;
private static final Logger logger = LogManager.getLogger(EmrServerlessClientImpl.class);

private static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error.";

public EmrServerlessClientImpl(AWSEMRServerless emrServerless) {
this.emrServerless = emrServerless;
}
Expand Down Expand Up @@ -62,9 +63,10 @@ public String startJobRun(StartJobRequest startJobRequest) {
try {
return emrServerless.startJobRun(request);
} catch (Throwable t) {
logger.error("Error while making start job request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_START_JOB_REQUEST_FAILURE_COUNT);
throw t;
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
});
logger.info("Job Run ID: " + startJobRunResult.getJobRunId());
Expand All @@ -82,9 +84,10 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
try {
return emrServerless.getJobRun(request);
} catch (Throwable t) {
logger.error("Error while making get job run request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_GET_JOB_RESULT_FAILURE_COUNT);
throw t;
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
});
logger.info("Job Run state: " + getJobRunResult.getJobRun().getState());
Expand All @@ -95,24 +98,20 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
CancelJobRunRequest cancelJobRunRequest =
new CancelJobRunRequest().withJobRunId(jobId).withApplicationId(applicationId);
try {
CancelJobRunResult cancelJobRunResult =
AccessController.doPrivileged(
(PrivilegedAction<CancelJobRunResult>)
() -> {
try {
return emrServerless.cancelJobRun(cancelJobRunRequest);
} catch (Throwable t) {
MetricUtils.incrementNumericalMetric(
MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
throw t;
}
});
logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId()));
return cancelJobRunResult;
} catch (ValidationException e) {
throw new IllegalArgumentException(
String.format("Couldn't cancel the queryId: %s due to %s", jobId, e.getMessage()));
}
CancelJobRunResult cancelJobRunResult =
AccessController.doPrivileged(
(PrivilegedAction<CancelJobRunResult>)
() -> {
try {
return emrServerless.cancelJobRun(cancelJobRunRequest);
} catch (Throwable t) {
logger.error("Error while making cancel job request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
});
logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId()));
return cancelJobRunResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.opensearch.rest.RestRequest;
import org.opensearch.sql.datasources.exceptions.ErrorMessage;
import org.opensearch.sql.datasources.utils.Scheduler;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction;
Expand Down Expand Up @@ -110,6 +112,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient

private RestChannelConsumer executePostRequest(RestRequest restRequest, NodeClient nodeClient)
throws IOException {
MetricUtils.incrementNumericalMetric(MetricName.ASYNC_QUERY_CREATE_API_REQUEST_COUNT);
CreateAsyncQueryRequest submitJobRequest =
CreateAsyncQueryRequest.fromXContentParser(restRequest.contentParser());
return restChannel ->
Expand All @@ -132,13 +135,14 @@ public void onResponse(

@Override
public void onFailure(Exception e) {
handleException(e, restChannel);
handleException(e, restChannel, restRequest.method());
}
}));
}

private RestChannelConsumer executeGetAsyncQueryResultRequest(
RestRequest restRequest, NodeClient nodeClient) {
MetricUtils.incrementNumericalMetric(MetricName.ASYNC_QUERY_GET_API_REQUEST_COUNT);
String queryId = restRequest.param("queryId");
return restChannel ->
Scheduler.schedule(
Expand All @@ -160,26 +164,31 @@ public void onResponse(

@Override
public void onFailure(Exception e) {
handleException(e, restChannel);
handleException(e, restChannel, restRequest.method());
}
}));
}

private void handleException(Exception e, RestChannel restChannel) {
private void handleException(
Exception e, RestChannel restChannel, RestRequest.Method requestMethod) {
if (e instanceof OpenSearchException) {
OpenSearchException exception = (OpenSearchException) e;
reportError(restChannel, exception, exception.status());
addCustomerErrorMetric(requestMethod);
} else {
LOG.error("Error happened during request handling", e);
if (isClientError(e)) {
reportError(restChannel, e, BAD_REQUEST);
addCustomerErrorMetric(requestMethod);
} else {
reportError(restChannel, e, SERVICE_UNAVAILABLE);
addSystemErrorMetric(requestMethod);
}
}
}

private RestChannelConsumer executeDeleteRequest(RestRequest restRequest, NodeClient nodeClient) {
MetricUtils.incrementNumericalMetric(MetricName.ASYNC_QUERY_CANCEL_API_REQUEST_COUNT);
String queryId = restRequest.param("queryId");
return restChannel ->
Scheduler.schedule(
Expand All @@ -201,7 +210,7 @@ public void onResponse(

@Override
public void onFailure(Exception e) {
handleException(e, restChannel);
handleException(e, restChannel, restRequest.method());
}
}));
}
Expand All @@ -214,4 +223,36 @@ private void reportError(final RestChannel channel, final Exception e, final Res
private static boolean isClientError(Exception e) {
return e instanceof IllegalArgumentException || e instanceof IllegalStateException;
}

private void addSystemErrorMetric(RestRequest.Method requestMethod) {
switch (requestMethod) {
case POST:
MetricUtils.incrementNumericalMetric(
MetricName.ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_SYS);
break;
case GET:
MetricUtils.incrementNumericalMetric(MetricName.ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_SYS);
break;
case DELETE:
MetricUtils.incrementNumericalMetric(
MetricName.ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_SYS);
break;
}
}

private void addCustomerErrorMetric(RestRequest.Method requestMethod) {
switch (requestMethod) {
case POST:
MetricUtils.incrementNumericalMetric(
MetricName.ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_CUS);
break;
case GET:
MetricUtils.incrementNumericalMetric(MetricName.ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_CUS);
break;
case DELETE:
MetricUtils.incrementNumericalMetric(
MetricName.ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_CUS);
break;
}
}
}
Loading

0 comments on commit 53fd85e

Please sign in to comment.