Skip to content

Commit

Permalink
Test7
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Jun 9, 2024
1 parent 2eb8a82 commit 2730df5
Showing 1 changed file with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParser.Token;
import org.opensearch.forecast.constant.ForecastCommonName;
import org.opensearch.forecast.indices.ForecastIndex;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.BoolQueryBuilder;
Expand Down Expand Up @@ -289,7 +290,7 @@ protected void choosePrimaryShards(CreateIndexRequest request, boolean hiddenInd
);
}

protected void deleteOldHistoryIndices(String indexPattern, TimeValue historyRetentionPeriod, Integer customResultIndexTtl) {
protected void deleteOldHistoryIndices(String indexPattern, TimeValue historyRetentionPeriod) {
Set<String> candidates = new HashSet<String>();

ClusterStateRequest clusterStateRequest = new ClusterStateRequest()
Expand All @@ -302,12 +303,12 @@ protected void deleteOldHistoryIndices(String indexPattern, TimeValue historyRet
adminClient.cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
String latestToDelete = null;
long latest = Long.MIN_VALUE;
long customTtlMillis = (customResultIndexTtl != null) ? customResultIndexTtl * 24 * 60 * 60 * 1000L : Long.MAX_VALUE;
for (IndexMetadata indexMetaData : clusterStateResponse.getState().metadata().indices().values()) {
long creationTime = indexMetaData.getCreationDate();
long indexAgeMillis = Instant.now().toEpochMilli() - creationTime;
if (indexAgeMillis > historyRetentionPeriod.millis() || indexAgeMillis > customTtlMillis) {
if (indexAgeMillis > historyRetentionPeriod.millis()) {
String indexName = indexMetaData.getIndex().getName();
System.out.println("indexName: " + indexName);
candidates.add(indexName);
if (latest < creationTime) {
latest = creationTime;
Expand Down Expand Up @@ -1085,7 +1086,7 @@ public void onClusterManager() {

// schedule the next rollover for approx MAX_AGE later
scheduledRollover = threadPool
.scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), TimeValue.timeValueMinutes(5), executorName());
.scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), TimeValue.timeValueMinutes(1), executorName());
} catch (Exception e) {
// This should be run on cluster startup
logger.error("Error rollover result indices. " + "Can't rollover result until clusterManager node is restarted.", e);
Expand All @@ -1108,7 +1109,6 @@ protected void rescheduleRollover() {
if (scheduledRollover != null) {
scheduledRollover.cancel();
}
System.out.println(5);

scheduledRollover = threadPool
.scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), historyRolloverPeriod, executorName());
Expand Down Expand Up @@ -1244,13 +1244,9 @@ protected void rolloverAndDeleteHistoryIndex(
String rolloverIndexPattern,
IndexType resultIndex
) {
System.out.println("resultIndexAlias: " + resultIndexAlias);
System.out.println("allResultIndicesPattern: " + allResultIndicesPattern);
System.out.println("rolloverIndexPattern: " + rolloverIndexPattern);
System.out.println("resultIndex: " + resultIndex.getIndexName());

// build rollover request for default result index
RolloverRequest defaultResultIndexRolloverRequest = buildRolloverRequest(resultIndexAlias, rolloverIndexPattern);
defaultResultIndexRolloverRequest.addMaxIndexDocsCondition(historyMaxDocs * getNumberOfPrimaryShards());

// get config files that have custom result index alias to perform rollover on
getConfigsWithCustomResultIndexAlias(ActionListener.wrap(candidateResultAliases -> {
Expand All @@ -1271,8 +1267,6 @@ protected void rolloverAndDeleteHistoryIndex(
return;
}

System.out.println("size: " + candidateResultAliases.size());

// perform rollover and delete on found custom result index alias
candidateResultAliases.forEach(config -> handleCustomResultIndex(config, resultIndex));

Expand All @@ -1284,16 +1278,14 @@ protected void rolloverAndDeleteHistoryIndex(
}

private void handleCustomResultIndex(Config config, IndexType resultIndex) {
System.out.println("detector name: " + config.getName());
System.out.println("custom index name: " + config.getCustomResultIndexOrAlias());
RolloverRequest rolloverRequest = buildRolloverRequest(
config.getCustomResultIndexOrAlias(),
getCustomResultIndexPattern(config.getCustomResultIndexOrAlias())
);

// add rollover conditions if found in config
if (config.getCustomResultIndexMinAge() != null) {
rolloverRequest.addMaxIndexAgeCondition(TimeValue.timeValueMinutes(10));
rolloverRequest.addMaxIndexAgeCondition(TimeValue.timeValueMinutes(1));

// rolloverRequest.addMaxIndexAgeCondition(TimeValue.timeValueDays(config.getCustomResultIndexMinAge()));
}
Expand Down Expand Up @@ -1326,7 +1318,6 @@ private RolloverRequest buildRolloverRequest(String resultIndexAlias, String rol

createRequest.index(rolloverIndexPattern).mapping(resultMapping, XContentType.JSON);
choosePrimaryShards(createRequest, true);
rollOverRequest.addMaxIndexDocsCondition(historyMaxDocs * getNumberOfPrimaryShards());

return rollOverRequest;
}
Expand All @@ -1345,7 +1336,17 @@ private void proceedWithRolloverAndDelete(
IndexState indexState = indexStates.computeIfAbsent(resultIndex, k -> new IndexState(k.getMapping()));
indexState.mappingUpToDate = true;
logger.info("{} rolled over. Conditions were: {}", resultIndexAlias, response.getConditionStatus());
deleteOldHistoryIndices(allResultIndicesPattern, historyRetentionPeriod, customResultIndexTtl);
if (resultIndexAlias.startsWith(ADCommonName.CUSTOM_RESULT_INDEX_PREFIX) || resultIndexAlias.startsWith(CUSTOM_RESULT_INDEX_PREFIX)) {
// handle custom result index deletion
if (customResultIndexTtl != null) {
// deleteOldHistoryIndices(allResultIndicesPattern, TimeValue.timeValueHours(customResultIndexTtl * 24));
deleteOldHistoryIndices(allResultIndicesPattern, TimeValue.timeValueMinutes(1));

}
} else {
// handle default result index deletion
deleteOldHistoryIndices(allResultIndicesPattern, historyRetentionPeriod);
}
}
}, exception -> { logger.error("Fail to roll over result index", exception); }));
}
Expand Down

0 comments on commit 2730df5

Please sign in to comment.