Skip to content

Commit

Permalink
Run system index handling code with stashed thread context (opensearc…
Browse files Browse the repository at this point in the history
…h-project#297)

Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Jul 13, 2023
1 parent a77dfa5 commit 8bcc523
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension;
import org.opensearch.geospatial.shared.StashedThreadContext;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
Expand Down Expand Up @@ -89,7 +90,7 @@ public void createIndexIfNotExists(final StepListener<Void> stepListener) {
indexSettings.put(INDEX_SETTING_HIDDEN.v1(), INDEX_SETTING_HIDDEN.v2());
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(DatasourceExtension.JOB_INDEX_NAME).mapping(getIndexMapping())
.settings(indexSettings);
client.admin().indices().create(createIndexRequest, new ActionListener<>() {
StashedThreadContext.run(client, () -> client.admin().indices().create(createIndexRequest, new ActionListener<>() {
@Override
public void onResponse(final CreateIndexResponse createIndexResponse) {
stepListener.onResponse(null);
Expand All @@ -104,7 +105,7 @@ public void onFailure(final Exception e) {
}
stepListener.onFailure(e);
}
});
}));
}

private String getIndexMapping() {
Expand All @@ -123,34 +124,44 @@ private String getIndexMapping() {
* Update datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME}
* @param datasource the datasource
* @return index response
* @throws IOException exception
*/
public IndexResponse updateDatasource(final Datasource datasource) throws IOException {
public IndexResponse updateDatasource(final Datasource datasource) {
datasource.setLastUpdateTime(Instant.now());
return client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
.setId(datasource.getName())
.setOpType(DocWriteRequest.OpType.INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
return StashedThreadContext.run(client, () -> {
try {
return client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
.setId(datasource.getName())
.setOpType(DocWriteRequest.OpType.INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

/**
* Put datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME}
*
* @param datasource the datasource
* @param listener the listener
* @throws IOException exception
*/
public void putDatasource(final Datasource datasource, final ActionListener listener) throws IOException {
public void putDatasource(final Datasource datasource, final ActionListener listener) {
datasource.setLastUpdateTime(Instant.now());
client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
.setId(datasource.getName())
.setOpType(DocWriteRequest.OpType.CREATE)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.execute(listener);
StashedThreadContext.run(client, () -> {
try {
client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
.setId(datasource.getName())
.setOpType(DocWriteRequest.OpType.CREATE)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.execute(listener);
} catch (IOException e) {
new RuntimeException(e);
}
});
}

/**
Expand All @@ -163,7 +174,7 @@ public Datasource getDatasource(final String name) throws IOException {
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name);
GetResponse response;
try {
response = client.get(request).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
response = StashedThreadContext.run(client, () -> client.get(request).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT)));
if (response.isExists() == false) {
log.error("Datasource[{}] does not exist in an index[{}]", name, DatasourceExtension.JOB_INDEX_NAME);
return null;
Expand All @@ -188,7 +199,7 @@ public Datasource getDatasource(final String name) throws IOException {
*/
public void getDatasource(final String name, final ActionListener<Datasource> actionListener) {
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name);
client.get(request, new ActionListener<GetResponse>() {
StashedThreadContext.run(client, () -> client.get(request, new ActionListener<>() {
@Override
public void onResponse(final GetResponse response) {
if (response.isExists() == false) {
Expand All @@ -212,7 +223,7 @@ public void onResponse(final GetResponse response) {
public void onFailure(final Exception e) {
actionListener.onFailure(e);
}
});
}));
}

/**
Expand All @@ -221,20 +232,26 @@ public void onFailure(final Exception e) {
* @param actionListener the action listener
*/
public void getDatasources(final String[] names, final ActionListener<List<Datasource>> actionListener) {
client.prepareMultiGet()
.add(DatasourceExtension.JOB_INDEX_NAME, names)
.execute(createGetDataSourceQueryActionLister(MultiGetResponse.class, actionListener));
StashedThreadContext.run(
client,
() -> client.prepareMultiGet()
.add(DatasourceExtension.JOB_INDEX_NAME, names)
.execute(createGetDataSourceQueryActionLister(MultiGetResponse.class, actionListener))
);
}

/**
* Get all datasources up to {@code MAX_SIZE} from an index {@code DatasourceExtension.JOB_INDEX_NAME}
* @param actionListener the action listener
*/
public void getAllDatasources(final ActionListener<List<Datasource>> actionListener) {
client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME)
.setQuery(QueryBuilders.matchAllQuery())
.setSize(MAX_SIZE)
.execute(createGetDataSourceQueryActionLister(SearchResponse.class, actionListener));
StashedThreadContext.run(
client,
() -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME)
.setQuery(QueryBuilders.matchAllQuery())
.setSize(MAX_SIZE)
.execute(createGetDataSourceQueryActionLister(SearchResponse.class, actionListener))
);
}

private <T> ActionListener<T> createGetDataSourceQueryActionLister(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.geospatial.shared.StashedThreadContext;
import org.opensearch.index.query.QueryBuilders;

/**
Expand Down Expand Up @@ -92,7 +93,10 @@ public void createIndexIfNotExists(final String indexName) {
indexSettings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2());
indexSettings.put(INDEX_SETTING_HIDDEN.v1(), INDEX_SETTING_HIDDEN.v2());
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(indexSettings).mapping(getIndexMapping());
client.admin().indices().create(createIndexRequest).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
StashedThreadContext.run(
client,
() -> client.admin().indices().create(createIndexRequest).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
);
}

/**
Expand Down Expand Up @@ -207,31 +211,34 @@ public String createDocument(final String[] fields, final String[] values) {
* @param actionListener action listener
*/
public void getGeoIpData(final String indexName, final String ip, final ActionListener<Map<String, Object>> actionListener) {
client.prepareSearch(indexName)
.setSize(1)
.setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip))
.setPreference("_local")
.setRequestCache(true)
.execute(new ActionListener<>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
if (searchResponse.getHits().getHits().length == 0) {
actionListener.onResponse(Collections.emptyMap());
} else {
Map<String, Object> geoIpData = (Map<String, Object>) XContentHelper.convertToMap(
searchResponse.getHits().getAt(0).getSourceRef(),
false,
XContentType.JSON
).v2().get(DATA_FIELD_NAME);
actionListener.onResponse(geoIpData);
StashedThreadContext.run(
client,
() -> client.prepareSearch(indexName)
.setSize(1)
.setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip))
.setPreference("_local")
.setRequestCache(true)
.execute(new ActionListener<>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
if (searchResponse.getHits().getHits().length == 0) {
actionListener.onResponse(Collections.emptyMap());
} else {
Map<String, Object> geoIpData = (Map<String, Object>) XContentHelper.convertToMap(
searchResponse.getHits().getAt(0).getSourceRef(),
false,
XContentType.JSON
).v2().get(DATA_FIELD_NAME);
actionListener.onResponse(geoIpData);
}
}
}

@Override
public void onFailure(final Exception e) {
actionListener.onFailure(e);
}
});
@Override
public void onFailure(final Exception e) {
actionListener.onFailure(e);
}
})
);
}

/**
Expand Down Expand Up @@ -281,7 +288,7 @@ public void getGeoIpData(
return;
}

mRequestBuilder.execute(new ActionListener<>() {
StashedThreadContext.run(client, () -> mRequestBuilder.execute(new ActionListener<>() {
@Override
public void onResponse(final MultiSearchResponse items) {
for (int i = 0; i < ipsToSearch.size(); i++) {
Expand Down Expand Up @@ -315,7 +322,7 @@ public void onResponse(final MultiSearchResponse items) {
public void onFailure(final Exception e) {
actionListener.onFailure(e);
}
});
}));
}

/**
Expand All @@ -328,32 +335,34 @@ public void onFailure(final Exception e) {
*/
public void putGeoIpData(final String indexName, final String[] fields, final Iterator<CSVRecord> iterator, final int bulkSize) {
TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT);
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
final BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
while (iterator.hasNext()) {
CSVRecord record = iterator.next();
String document = createDocument(fields, record.values());
IndexRequest request = Requests.indexRequest(indexName).source(document, XContentType.JSON);
bulkRequest.add(request);
if (iterator.hasNext() == false || bulkRequest.requests().size() == bulkSize) {
BulkResponse response = client.bulk(bulkRequest).actionGet(timeout);
BulkResponse response = StashedThreadContext.run(client, () -> client.bulk(bulkRequest).actionGet(timeout));
if (response.hasFailures()) {
throw new OpenSearchException(
"error occurred while ingesting GeoIP data in {} with an error {}",
indexName,
response.buildFailureMessage()
);
}
bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
bulkRequest.requests().clear();
}
}
client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);
client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout);
client.admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(Map.of(INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v1(), INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v2()))
.execute()
.actionGet(timeout);
StashedThreadContext.run(client, () -> {
client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);
client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout);
client.admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(Map.of(INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v1(), INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v2()))
.execute()
.actionGet(timeout);
});
}

public AcknowledgedResponse deleteIp2GeoDataIndex(final String index) {
Expand All @@ -364,11 +373,14 @@ public AcknowledgedResponse deleteIp2GeoDataIndex(final String index) {
IP2GEO_DATA_INDEX_NAME_PREFIX
);
}
return client.admin()
.indices()
.prepareDelete(index)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
return StashedThreadContext.run(
client,
() -> client.admin()
.indices()
.prepareDelete(index)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.shared;

import java.util.function.Supplier;

import org.opensearch.client.Client;
import org.opensearch.common.util.concurrent.ThreadContext;

/**
* Helper class to run code with stashed thread context
*
* Code need to be run with stashed thread context if it interacts with system index
* when security plugin is enabled.
*/
public class StashedThreadContext {
/**
* Set the thread context to default, this is needed to allow actions on model system index
* when security plugin is enabled
* @param function runnable that needs to be executed after thread context has been stashed, accepts and returns nothing
*/
public static void run(final Client client, final Runnable function) {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
function.run();
}
}

/**
* Set the thread context to default, this is needed to allow actions on model system index
* when security plugin is enabled
* @param function supplier function that needs to be executed after thread context has been stashed, return object
*/
public static <T> T run(final Client client, final Supplier<T> function) {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
return function.get();
}
}
}

0 comments on commit 8bcc523

Please sign in to comment.