Skip to content

Commit

Permalink
Handle Describe,Refresh and Show Queries Properly (#2357) (#2362)
Browse files Browse the repository at this point in the history
(cherry picked from commit 886c2fc)

Signed-off-by: Vamsi Manohar <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 0f334f8 commit 16e2f30
Show file tree
Hide file tree
Showing 13 changed files with 519 additions and 162 deletions.
5 changes: 5 additions & 0 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ dropCoveringIndexStatement

materializedViewStatement
: createMaterializedViewStatement
| refreshMaterializedViewStatement
| showMaterializedViewStatement
| describeMaterializedViewStatement
| dropMaterializedViewStatement
Expand All @@ -90,6 +91,10 @@ createMaterializedViewStatement
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshMaterializedViewStatement
: REFRESH MATERIALIZED VIEW mvName=multipartIdentifier
;

showMaterializedViewStatement
: SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.dispatcher.model.*;
import org.opensearch.sql.spark.execution.session.CreateSessionRequest;
import org.opensearch.sql.spark.execution.session.Session;
import org.opensearch.sql.spark.execution.session.SessionId;
Expand All @@ -56,11 +53,10 @@
public class SparkQueryDispatcher {

private static final Logger LOG = LogManager.getLogger();

public static final String INDEX_TAG_KEY = "index";
public static final String DATASOURCE_TAG_KEY = "datasource";
public static final String CLUSTER_NAME_TAG_KEY = "cluster";
public static final String JOB_TYPE_TAG_KEY = "job_type";
public static final String JOB_TYPE_TAG_KEY = "type";

private EMRServerlessClient emrServerlessClient;

Expand Down Expand Up @@ -107,15 +103,18 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
}

private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryRequest) {
if (SQLQueryUtils.isIndexQuery(dispatchQueryRequest.getQuery())) {
IndexDetails indexDetails =
if (SQLQueryUtils.isFlintExtensionQuery(dispatchQueryRequest.getQuery())) {
IndexQueryDetails indexQueryDetails =
SQLQueryUtils.extractIndexDetails(dispatchQueryRequest.getQuery());
fillMissingDetails(dispatchQueryRequest, indexDetails);
fillMissingDetails(dispatchQueryRequest, indexQueryDetails);

if (indexDetails.isDropIndex()) {
return handleDropIndexQuery(dispatchQueryRequest, indexDetails);
// TODO: refactor this code properly.
if (IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType())) {
return handleDropIndexQuery(dispatchQueryRequest, indexQueryDetails);
} else if (IndexQueryActionType.CREATE.equals(indexQueryDetails.getIndexQueryActionType())) {
return handleStreamingQueries(dispatchQueryRequest, indexQueryDetails);
} else {
return handleIndexQuery(dispatchQueryRequest, indexDetails);
return handleFlintNonStreamingQueries(dispatchQueryRequest, indexQueryDetails);
}
} else {
return handleNonIndexQuery(dispatchQueryRequest);
Expand All @@ -127,24 +126,59 @@ private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryR
// Spark Assumes the datasource to be catalog.
// This is required to handle drop index case properly when datasource name is not provided.
private static void fillMissingDetails(
DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) {
if (indexDetails.getFullyQualifiedTableName() != null
&& indexDetails.getFullyQualifiedTableName().getDatasourceName() == null) {
indexDetails
DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) {
if (indexQueryDetails.getFullyQualifiedTableName() != null
&& indexQueryDetails.getFullyQualifiedTableName().getDatasourceName() == null) {
indexQueryDetails
.getFullyQualifiedTableName()
.setDatasourceName(dispatchQueryRequest.getDatasource());
}
}

private DispatchQueryResponse handleIndexQuery(
DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) {
private DispatchQueryResponse handleStreamingQueries(
DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) {
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query";
Map<String, String> tags = getDefaultTagsForJobSubmission(dispatchQueryRequest);
tags.put(INDEX_TAG_KEY, indexQueryDetails.openSearchIndexName());
if (indexQueryDetails.isAutoRefresh()) {
tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText());
}
StartJobRequest startJobRequest =
new StartJobRequest(
dispatchQueryRequest.getQuery(),
jobName,
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.Builder.builder()
.dataSource(
dataSourceService.getRawDataSourceMetadata(
dispatchQueryRequest.getDatasource()))
.structuredStreaming(indexQueryDetails.isAutoRefresh())
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams())
.build()
.toString(),
tags,
indexQueryDetails.isAutoRefresh(),
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
return new DispatchQueryResponse(
AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()),
jobId,
false,
dataSourceMetadata.getResultIndex(),
null);
}

private DispatchQueryResponse handleFlintNonStreamingQueries(
DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) {
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query";
Map<String, String> tags = getDefaultTagsForJobSubmission(dispatchQueryRequest);
tags.put(INDEX_TAG_KEY, indexDetails.openSearchIndexName());
tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText());
StartJobRequest startJobRequest =
new StartJobRequest(
dispatchQueryRequest.getQuery(),
Expand All @@ -155,12 +189,11 @@ private DispatchQueryResponse handleIndexQuery(
.dataSource(
dataSourceService.getRawDataSourceMetadata(
dispatchQueryRequest.getDatasource()))
.structuredStreaming(indexDetails.isAutoRefresh())
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams())
.build()
.toString(),
tags,
indexDetails.isAutoRefresh(),
indexQueryDetails.isAutoRefresh(),
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
return new DispatchQueryResponse(
Expand Down Expand Up @@ -242,11 +275,12 @@ private DispatchQueryResponse handleNonIndexQuery(DispatchQueryRequest dispatchQ
}

private DispatchQueryResponse handleDropIndexQuery(
DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) {
DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) {
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails);
FlintIndexMetadata indexMetadata =
flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails);
// if index is created without auto refresh. there is no job to cancel.
String status = JobRunState.FAILED.toString();
try {
Expand All @@ -255,7 +289,7 @@ private DispatchQueryResponse handleDropIndexQuery(
dispatchQueryRequest.getApplicationId(), indexMetadata.getJobId());
}
} finally {
String indexName = indexDetails.openSearchIndexName();
String indexName = indexQueryDetails.openSearchIndexName();
try {
AcknowledgedResponse response =
client.admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.dispatcher.model;

/** Enum for Index Action in the given query.* */
public enum IndexQueryActionType {
CREATE,
REFRESH,
DESCRIBE,
SHOW,
DROP
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.sql.spark.dispatcher.model;

import com.google.common.base.Preconditions;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -14,83 +13,67 @@
/** Index details in an async query. */
@Getter
@EqualsAndHashCode
public class IndexDetails {
public class IndexQueryDetails {

public static final String STRIP_CHARS = "`";

private String indexName;
private FullyQualifiedTableName fullyQualifiedTableName;
// by default, auto_refresh = false;
private boolean autoRefresh;
private boolean isDropIndex;
private IndexQueryActionType indexQueryActionType;
// materialized view special case where
// table name and mv name are combined.
private String mvName;
private FlintIndexType indexType;

private IndexDetails() {}
private IndexQueryDetails() {}

public static IndexDetailsBuilder builder() {
return new IndexDetailsBuilder();
public static IndexQueryDetailsBuilder builder() {
return new IndexQueryDetailsBuilder();
}

// Builder class
public static class IndexDetailsBuilder {
private final IndexDetails indexDetails;
public static class IndexQueryDetailsBuilder {
private final IndexQueryDetails indexQueryDetails;

public IndexDetailsBuilder() {
indexDetails = new IndexDetails();
public IndexQueryDetailsBuilder() {
indexQueryDetails = new IndexQueryDetails();
}

public IndexDetailsBuilder indexName(String indexName) {
indexDetails.indexName = indexName;
public IndexQueryDetailsBuilder indexName(String indexName) {
indexQueryDetails.indexName = indexName;
return this;
}

public IndexDetailsBuilder fullyQualifiedTableName(FullyQualifiedTableName tableName) {
indexDetails.fullyQualifiedTableName = tableName;
public IndexQueryDetailsBuilder fullyQualifiedTableName(FullyQualifiedTableName tableName) {
indexQueryDetails.fullyQualifiedTableName = tableName;
return this;
}

public IndexDetailsBuilder autoRefresh(Boolean autoRefresh) {
indexDetails.autoRefresh = autoRefresh;
public IndexQueryDetailsBuilder autoRefresh(Boolean autoRefresh) {
indexQueryDetails.autoRefresh = autoRefresh;
return this;
}

public IndexDetailsBuilder isDropIndex(boolean isDropIndex) {
indexDetails.isDropIndex = isDropIndex;
public IndexQueryDetailsBuilder indexQueryActionType(
IndexQueryActionType indexQueryActionType) {
indexQueryDetails.indexQueryActionType = indexQueryActionType;
return this;
}

public IndexDetailsBuilder mvName(String mvName) {
indexDetails.mvName = mvName;
public IndexQueryDetailsBuilder mvName(String mvName) {
indexQueryDetails.mvName = mvName;
return this;
}

public IndexDetailsBuilder indexType(FlintIndexType indexType) {
indexDetails.indexType = indexType;
public IndexQueryDetailsBuilder indexType(FlintIndexType indexType) {
indexQueryDetails.indexType = indexType;
return this;
}

public IndexDetails build() {
Preconditions.checkNotNull(indexDetails.indexType, "Index Type can't be null");
switch (indexDetails.indexType) {
case COVERING:
Preconditions.checkNotNull(
indexDetails.indexName, "IndexName can't be null for Covering Index.");
Preconditions.checkNotNull(
indexDetails.fullyQualifiedTableName, "TableName can't be null for Covering Index.");
break;
case SKIPPING:
Preconditions.checkNotNull(
indexDetails.fullyQualifiedTableName, "TableName can't be null for Skipping Index.");
break;
case MATERIALIZED_VIEW:
Preconditions.checkNotNull(indexDetails.mvName, "Materialized view name can't be null");
break;
}

return indexDetails;
public IndexQueryDetails build() {
return indexQueryDetails;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
public class InteractiveSession implements Session {
private static final Logger LOG = LogManager.getLogger();

public static final String SESSION_ID_TAG_KEY = "sid";

private final SessionId sessionId;
private final StateStore stateStore;
private final EMRServerlessClient serverlessClient;
Expand All @@ -46,6 +48,7 @@ public void open(CreateSessionRequest createSessionRequest) {
createSessionRequest
.getSparkSubmitParametersBuilder()
.sessionExecution(sessionId.getSessionId(), createSessionRequest.getDatasourceName());
createSessionRequest.getTags().put(SESSION_ID_TAG_KEY, sessionId.getSessionId());
String jobID = serverlessClient.startJobRun(createSessionRequest.getStartJobRequest());
String applicationId = createSessionRequest.getStartJobRequest().getApplicationId();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package org.opensearch.sql.spark.flint;

import org.opensearch.sql.spark.dispatcher.model.IndexDetails;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;

/** Interface for FlintIndexMetadataReader */
public interface FlintIndexMetadataReader {

/**
* Given Index details, get the streaming job Id.
*
* @param indexDetails indexDetails.
* @param indexQueryDetails indexDetails.
* @return FlintIndexMetadata.
*/
FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails);
FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexQueryDetails);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;

/** Implementation of {@link FlintIndexMetadataReader} */
@AllArgsConstructor
Expand All @@ -14,8 +14,8 @@ public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader {
private final Client client;

@Override
public FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails) {
String indexName = indexDetails.openSearchIndexName();
public FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexQueryDetails) {
String indexName = indexQueryDetails.openSearchIndexName();
GetMappingsResponse mappingsResponse =
client.admin().indices().prepareGetMappings(indexName).get();
try {
Expand Down
Loading

0 comments on commit 16e2f30

Please sign in to comment.