From 154866a3683e94e887d798db98ed6b19320d80b0 Mon Sep 17 00:00:00 2001 From: Eric Date: Wed, 8 Nov 2023 21:24:30 +0000 Subject: [PATCH] Revert "Refactoring for tags usage in test files and also added explicit denly list setting. (#2383) (#2385)" This reverts commit 37e010f2bbf213fb0a5bf50fc7e1389d1e7933fe. Signed-off-by: Eric --- .../dispatcher/SparkQueryDispatcher.java | 2 +- ...AsyncQueryExecutorServiceImplSpecTest.java | 28 +++----- .../dispatcher/SparkQueryDispatcherTest.java | 67 +++++++++---------- 3 files changed, 43 insertions(+), 54 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index b603ee6909..5e80259e09 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -55,7 +55,7 @@ public class SparkQueryDispatcher { private static final Logger LOG = LogManager.getLogger(); public static final String INDEX_TAG_KEY = "index"; public static final String DATASOURCE_TAG_KEY = "datasource"; - public static final String CLUSTER_NAME_TAG_KEY = "domain_ident"; + public static final String CLUSTER_NAME_TAG_KEY = "cluster"; public static final String JOB_TYPE_TAG_KEY = "type"; private EMRServerlessClient emrServerlessClient; diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 39ec132442..4bc894c1b2 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -5,7 +5,8 @@ package org.opensearch.sql.spark.asyncquery; -import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.*; +import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_ENABLED_SETTING; +import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_LIMIT_SETTING; import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_REQUEST_INDEX; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_SESSION_ID; @@ -27,7 +28,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; import lombok.Getter; import org.junit.After; import org.junit.Before; @@ -99,18 +105,9 @@ public List> getSettings() { @Before public void setup() { clusterService = clusterService(); - client = (NodeClient) cluster().client(); - client - .admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings( - Settings.builder() - .putList(DATASOURCE_URI_HOSTS_DENY_LIST.getKey(), Collections.emptyList()) - .build()) - .get(); clusterSettings = clusterService.getClusterSettings(); pluginSettings = new OpenSearchSettings(clusterSettings); + client = (NodeClient) cluster().client(); dataSourceService = createDataSourceService(); dataSourceService.createDataSource( new DataSourceMetadata( @@ -147,13 +144,6 @@ public void clean() { .setTransientSettings( Settings.builder().putNull(SPARK_EXECUTION_SESSION_LIMIT_SETTING.getKey()).build()) .get(); - client - .admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings( - Settings.builder().putNull(DATASOURCE_URI_HOSTS_DENY_LIST.getKey()).build()) - .get(); } @Test diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index aaef4db6b8..95b6033d12 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -33,7 +33,6 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_USERNAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; -import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.*; import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; @@ -121,9 +120,9 @@ void setUp() { @Test void testDispatchSelectQuery() { HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, "my_glue"); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); + tags.put("datasource", "my_glue"); + tags.put("cluster", TEST_CLUSTER_NAME); + tags.put("type", JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -176,9 +175,9 @@ void testDispatchSelectQuery() { @Test void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, "my_glue"); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); + tags.put("datasource", "my_glue"); + tags.put("cluster", TEST_CLUSTER_NAME); + tags.put("type", JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -232,9 +231,9 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { @Test void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, "my_glue"); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); + tags.put("datasource", "my_glue"); + tags.put("cluster", TEST_CLUSTER_NAME); + tags.put("type", JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -347,10 +346,10 @@ void testDispatchSelectQueryFailedCreateSession() { @Test void testDispatchIndexQuery() { HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, "my_glue"); - tags.put(INDEX_TAG_KEY, "flint_my_glue_default_http_logs_elb_and_requesturi_index"); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText()); + tags.put("datasource", "my_glue"); + tags.put("index", "flint_my_glue_default_http_logs_elb_and_requesturi_index"); + tags.put("cluster", TEST_CLUSTER_NAME); + tags.put("type", JobType.STREAMING.getText()); String query = "CREATE INDEX elb_and_requestUri ON my_glue.default.http_logs(l_orderkey, l_quantity) WITH" + " (auto_refresh = true)"; @@ -406,9 +405,9 @@ void testDispatchIndexQuery() { @Test void testDispatchWithPPLQuery() { HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, "my_glue"); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); + tags.put("datasource", "my_glue"); + tags.put("cluster", TEST_CLUSTER_NAME); + tags.put("type", JobType.BATCH.getText()); String query = "source = my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -461,9 +460,9 @@ void testDispatchWithPPLQuery() { @Test void testDispatchQueryWithoutATableAndDataSourceName() { HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, "my_glue"); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); + tags.put("datasource", "my_glue"); + tags.put("cluster", TEST_CLUSTER_NAME); + tags.put("type", JobType.BATCH.getText()); String query = "show tables"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -516,10 +515,10 @@ void testDispatchQueryWithoutATableAndDataSourceName() { @Test void testDispatchIndexQueryWithoutADatasourceName() { HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, "my_glue"); - tags.put(INDEX_TAG_KEY, "flint_my_glue_default_http_logs_elb_and_requesturi_index"); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText()); + tags.put("datasource", "my_glue"); + tags.put("index", "flint_my_glue_default_http_logs_elb_and_requesturi_index"); + tags.put("cluster", TEST_CLUSTER_NAME); + tags.put("type", JobType.STREAMING.getText()); String query = "CREATE INDEX elb_and_requestUri ON default.http_logs(l_orderkey, l_quantity) WITH" + " (auto_refresh = true)"; @@ -575,10 +574,10 @@ void testDispatchIndexQueryWithoutADatasourceName() { @Test void testDispatchMaterializedViewQuery() { HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, "my_glue"); - tags.put(INDEX_TAG_KEY, "flint_mv_1"); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText()); + tags.put("datasource", "my_glue"); + tags.put("index", "flint_mv_1"); + tags.put("cluster", TEST_CLUSTER_NAME); + tags.put("type", JobType.STREAMING.getText()); String query = "CREATE MATERIALIZED VIEW mv_1 AS query=select * from my_glue.default.logs WITH" + " (auto_refresh = true)"; @@ -634,8 +633,8 @@ void testDispatchMaterializedViewQuery() { @Test void testDispatchShowMVQuery() { HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, "my_glue"); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put("datasource", "my_glue"); + tags.put("cluster", TEST_CLUSTER_NAME); String query = "SHOW MATERIALIZED VIEW IN mys3.default"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -688,8 +687,8 @@ void testDispatchShowMVQuery() { @Test void testRefreshIndexQuery() { HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, "my_glue"); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put("datasource", "my_glue"); + tags.put("cluster", TEST_CLUSTER_NAME); String query = "REFRESH SKIPPING INDEX ON my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -742,8 +741,8 @@ void testRefreshIndexQuery() { @Test void testDispatchDescribeIndexQuery() { HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, "my_glue"); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put("datasource", "my_glue"); + tags.put("cluster", TEST_CLUSTER_NAME); String query = "DESCRIBE SKIPPING INDEX ON mys3.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString(