Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code Refactoring for CommonName #867

Merged
merged 1 commit into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,8 @@ List<String> jacocoExclusions = [

// Class containing just constants. Don't need to test
'org.opensearch.ad.constant.*',
'org.opensearch.forecast.constant.*',
'org.opensearch.timeseries.constant.*',

//'org.opensearch.ad.common.exception.AnomalyDetectionException',
'org.opensearch.ad.util.ClientUtil',
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.constant.CommonName;

import com.google.common.base.Throwables;

Expand Down Expand Up @@ -514,7 +515,7 @@ private void stopAdJobForEndRunException(
}

private void stopAdJob(String detectorId, AnomalyDetectorFunction function) {
GetRequest getRequest = new GetRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX).id(detectorId);
GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX).id(detectorId);
ActionListener<GetResponse> listener = ActionListener.wrap(response -> {
if (response.isExists()) {
try (
Expand All @@ -537,7 +538,7 @@ private void stopAdJob(String detectorId, AnomalyDetectorFunction function) {
job.getUser(),
job.getResultIndex()
);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX)
IndexRequest indexRequest = new IndexRequest(CommonName.JOB_INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(newJob.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), XCONTENT_WITH_TYPE))
.id(detectorId);
Expand Down
25 changes: 13 additions & 12 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.opensearch.ad.cluster.ADDataMigrator;
import org.opensearch.ad.cluster.ClusterManagerEventListener;
import org.opensearch.ad.cluster.HashRing;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.dataprocessor.IntegerSensitiveSingleFeatureLinearUniformInterpolator;
import org.opensearch.ad.dataprocessor.Interpolator;
import org.opensearch.ad.dataprocessor.LinearUniformInterpolator;
Expand Down Expand Up @@ -85,7 +85,6 @@
import org.opensearch.ad.settings.NumericSetting;
import org.opensearch.ad.stats.ADStat;
import org.opensearch.ad.stats.ADStats;
import org.opensearch.ad.stats.StatNames;
import org.opensearch.ad.stats.suppliers.CounterSupplier;
import org.opensearch.ad.stats.suppliers.IndexStatusSupplier;
import org.opensearch.ad.stats.suppliers.ModelsOnNodeCountSupplier;
Expand Down Expand Up @@ -195,6 +194,8 @@
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.stats.StatNames;
import org.opensearch.watcher.ResourceWatcherService;

import com.amazon.randomcutforest.parkservices.state.ThresholdedRandomCutForestMapper;
Expand Down Expand Up @@ -429,7 +430,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
CheckpointDao checkpoint = new CheckpointDao(
client,
clientUtil,
CommonName.CHECKPOINT_INDEX_NAME,
ADCommonName.CHECKPOINT_INDEX_NAME,
gson,
mapper,
converter,
Expand All @@ -454,7 +455,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
CheckPointMaintainRequestAdapter adapter = new CheckPointMaintainRequestAdapter(
cacheProvider,
checkpoint,
CommonName.CHECKPOINT_INDEX_NAME,
ADCommonName.CHECKPOINT_INDEX_NAME,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit: AnomalyDetectorCommonName ?? The line below is AnomalyDetectorSettings so ... but yes an acronym would make the lines shorter and readable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, trying to use acronym to make the lines shorter and readable.

AnomalyDetectorSettings.CHECKPOINT_SAVING_FREQ,
getClock(),
clusterService,
Expand All @@ -477,7 +478,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
AnomalyDetectorSettings.MAINTENANCE_FREQ_CONSTANT,
AnomalyDetectorSettings.QUEUE_MAINTENANCE,
checkpoint,
CommonName.CHECKPOINT_INDEX_NAME,
ADCommonName.CHECKPOINT_INDEX_NAME,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as previous.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, trying to use acronym to make the lines shorter and readable.

AnomalyDetectorSettings.HOURLY_MAINTENANCE,
stateManager,
AnomalyDetectorSettings.HOURLY_MAINTENANCE
Expand Down Expand Up @@ -625,23 +626,23 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
)
.put(
StatNames.ANOMALY_DETECTORS_INDEX_STATUS.getName(),
new ADStat<>(true, new IndexStatusSupplier(indexUtils, AnomalyDetector.ANOMALY_DETECTORS_INDEX))
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.CONFIG_INDEX))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "ADStat" should also get updated to "Stat/PluginStat"? But that can/should be a different PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, will do that in a different PR.

)
.put(
StatNames.ANOMALY_RESULTS_INDEX_STATUS.getName(),
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.ANOMALY_RESULT_INDEX_ALIAS))
new ADStat<>(true, new IndexStatusSupplier(indexUtils, ADCommonName.ANOMALY_RESULT_INDEX_ALIAS))
)
.put(
StatNames.MODELS_CHECKPOINT_INDEX_STATUS.getName(),
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.CHECKPOINT_INDEX_NAME))
new ADStat<>(true, new IndexStatusSupplier(indexUtils, ADCommonName.CHECKPOINT_INDEX_NAME))
)
.put(
StatNames.ANOMALY_DETECTION_JOB_INDEX_STATUS.getName(),
new ADStat<>(true, new IndexStatusSupplier(indexUtils, AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX))
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.JOB_INDEX))
)
.put(
StatNames.ANOMALY_DETECTION_STATE_STATUS.getName(),
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.DETECTION_STATE_INDEX))
new ADStat<>(true, new IndexStatusSupplier(indexUtils, ADCommonName.DETECTION_STATE_INDEX))
)
.put(StatNames.DETECTOR_COUNT.getName(), new ADStat<>(true, new SettableSupplier()))
.put(StatNames.SINGLE_ENTITY_DETECTOR_COUNT.getName(), new ADStat<>(true, new SettableSupplier()))
Expand Down Expand Up @@ -752,7 +753,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
client,
settings,
threadPool,
CommonName.ANOMALY_RESULT_INDEX_ALIAS,
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
anomalyDetectionIndices,
this.clientUtil,
this.indexUtils,
Expand Down Expand Up @@ -1010,7 +1011,7 @@ public String getJobType() {

@Override
public String getJobIndex() {
return AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
return CommonName.JOB_INDEX;
}

@Override
Expand Down
19 changes: 9 additions & 10 deletions src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@

import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG;
import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_PARSE_DETECTOR_MSG;
import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
Expand All @@ -35,8 +33,8 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.common.exception.NotSerializedADExceptionName;
import org.opensearch.ad.common.exception.ResourceNotFoundException;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
Expand Down Expand Up @@ -74,6 +72,7 @@
import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder;
import org.opensearch.search.aggregations.metrics.InternalCardinality;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.transport.TransportService;

public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
Expand Down Expand Up @@ -121,7 +120,7 @@ private void calculateTotalResponsesToWait(
Set<DetectorProfileName> profilesToCollect,
ActionListener<DetectorProfile> listener
) {
GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId);
GetRequest getDetectorRequest = new GetRequest(CommonName.CONFIG_INDEX, detectorId);
client.get(getDetectorRequest, ActionListener.wrap(getDetectorResponse -> {
if (getDetectorResponse != null && getDetectorResponse.isExists()) {
try (
Expand Down Expand Up @@ -151,7 +150,7 @@ private void prepareProfile(
Set<DetectorProfileName> profilesToCollect
) {
String detectorId = detector.getDetectorId();
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
try (
Expand Down Expand Up @@ -292,14 +291,14 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
if (categoryField.size() == 1) {
// Run a cardinality aggregation to count the cardinality of single category fields
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
CardinalityAggregationBuilder aggBuilder = new CardinalityAggregationBuilder(CommonName.TOTAL_ENTITIES);
CardinalityAggregationBuilder aggBuilder = new CardinalityAggregationBuilder(ADCommonName.TOTAL_ENTITIES);
aggBuilder.field(categoryField.get(0));
searchSourceBuilder.aggregation(aggBuilder);

SearchRequest request = new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder);
final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(searchResponse -> {
Map<String, Aggregation> aggMap = searchResponse.getAggregations().asMap();
InternalCardinality totalEntities = (InternalCardinality) aggMap.get(CommonName.TOTAL_ENTITIES);
InternalCardinality totalEntities = (InternalCardinality) aggMap.get(ADCommonName.TOTAL_ENTITIES);
long value = totalEntities.getValue();
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
DetectorProfile profile = profileBuilder.totalEntities(value).build();
Expand All @@ -322,7 +321,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
// Run a composite query and count the number of buckets to decide cardinality of multiple category fields
AggregationBuilder bucketAggs = AggregationBuilders
.composite(
CommonName.TOTAL_ENTITIES,
ADCommonName.TOTAL_ENTITIES,
detector.getCategoryField().stream().map(f -> new TermsValuesSourceBuilder(f).field(f)).collect(Collectors.toList())
)
.size(maxTotalEntitiesToTrack);
Expand All @@ -344,7 +343,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
return;
}

Aggregation aggrResult = aggs.get(CommonName.TOTAL_ENTITIES);
Aggregation aggrResult = aggs.get(ADCommonName.TOTAL_ENTITIES);
if (aggrResult == null) {
listener.onFailure(new IllegalArgumentException("Fail to find valid aggregation result"));
return;
Expand Down Expand Up @@ -558,7 +557,7 @@ private ActionListener<RCFPollingResponse> onPollRCFUpdates(
NotSerializedADExceptionName.RESOURCE_NOT_FOUND_EXCEPTION_NAME_UNDERSCORE.getName()
)
|| (ExceptionUtil.isIndexNotAvailable(causeException)
&& causeException.getMessage().contains(CommonName.CHECKPOINT_INDEX_NAME))) {
&& causeException.getMessage().contains(ADCommonName.CHECKPOINT_INDEX_NAME))) {
// cannot find checkpoint
// We don't want to show the estimated time remaining to initialize
// a detector before cold start finishes, where the actual
Expand Down
15 changes: 7 additions & 8 deletions src/main/java/org/opensearch/ad/EntityProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

package org.opensearch.ad;

import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.util.List;
Expand All @@ -27,8 +25,8 @@
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
Expand Down Expand Up @@ -58,6 +56,7 @@
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.constant.CommonName;

public class EntityProfileRunner extends AbstractProfileRunner {
private final Logger logger = LogManager.getLogger(EntityProfileRunner.class);
Expand Down Expand Up @@ -94,7 +93,7 @@ public void profile(
listener.onFailure(new IllegalArgumentException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
return;
}
GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId);
GetRequest getDetectorRequest = new GetRequest(CommonName.CONFIG_INDEX, detectorId);

client.get(getDetectorRequest, ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
Expand Down Expand Up @@ -220,7 +219,7 @@ private void getJob(
EntityProfileResponse entityProfileResponse,
ActionListener<EntityProfile> listener
) {
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
try (
Expand Down Expand Up @@ -457,15 +456,15 @@ private SearchRequest createLastSampleTimeRequest(String detectorId, long enable

boolQueryBuilder.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId));

boolQueryBuilder.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime));
boolQueryBuilder.filter(QueryBuilders.rangeQuery(CommonName.EXECUTION_END_TIME_FIELD).gte(enabledTime));

SearchSourceBuilder source = new SearchSourceBuilder()
.query(boolQueryBuilder)
.aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX_TIME).field(AnomalyResult.EXECUTION_END_TIME_FIELD))
.aggregation(AggregationBuilders.max(ADCommonName.AGG_NAME_MAX_TIME).field(CommonName.EXECUTION_END_TIME_FIELD))
.trackTotalHits(false)
.size(0);

SearchRequest request = new SearchRequest(CommonName.ANOMALY_RESULT_INDEX_ALIAS);
SearchRequest request = new SearchRequest(ADCommonName.ANOMALY_RESULT_INDEX_ALIAS);
request.source(source);
if (resultIndex != null) {
request.indices(resultIndex);
Expand Down
9 changes: 5 additions & 4 deletions src/main/java/org/opensearch/ad/NodeStateManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.ad.common.exception.EndRunException;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.ml.SingleStreamModelIdMapper;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
Expand All @@ -48,6 +48,7 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.timeseries.constant.CommonName;

/**
* NodeStateManager is used to manage states shared by transport and ml components
Expand Down Expand Up @@ -130,7 +131,7 @@ public void getAnomalyDetector(String adID, ActionListener<Optional<AnomalyDetec
if (state != null && state.getDetectorDef() != null) {
listener.onResponse(Optional.of(state.getDetectorDef()));
} else {
GetRequest request = new GetRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX, adID);
GetRequest request = new GetRequest(CommonName.CONFIG_INDEX, adID);
clientUtil.<GetRequest, GetResponse>asyncRequest(request, client::get, onGetDetectorResponse(adID, listener));
}
}
Expand Down Expand Up @@ -182,7 +183,7 @@ public void getDetectorCheckpoint(String adID, ActionListener<Boolean> listener)
return;
}

GetRequest request = new GetRequest(CommonName.CHECKPOINT_INDEX_NAME, SingleStreamModelIdMapper.getRcfModelId(adID, 0));
GetRequest request = new GetRequest(ADCommonName.CHECKPOINT_INDEX_NAME, SingleStreamModelIdMapper.getRcfModelId(adID, 0));

clientUtil.<GetRequest, GetResponse>asyncRequest(request, client::get, onGetCheckpointResponse(adID, listener));
}
Expand Down Expand Up @@ -375,7 +376,7 @@ public void getAnomalyDetectorJob(String adID, ActionListener<Optional<AnomalyDe
if (state != null && state.getDetectorJob() != null) {
listener.onResponse(Optional.of(state.getDetectorJob()));
} else {
GetRequest request = new GetRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, adID);
GetRequest request = new GetRequest(CommonName.JOB_INDEX, adID);
clientUtil.<GetRequest, GetResponse>asyncRequest(request, client::get, onGetDetectorJobResponse(adID, listener));
}
}
Expand Down
Loading