Skip to content

Commit

Permalink
remove bwc code related to 1.0 and earlier version
Browse files Browse the repository at this point in the history
This PR addresses AD compile failure because the core removed 1x Version constants. (check opensearch-project/OpenSearch#5021) OpenSearch does not support N-2 version compatibility. This is inherited from Elasticsearch and Lucene. So version 1x is not compatible with 3.0. Thus removal of deprecated 1x code. This PR follows suite and removes backward compatibility code on OpenSearch 1.0 and older versions. So we won't support direct upgrade from 1.x domains to 3.x.

Testing done:
1. gradle build. Note that CI workflow will fail due to missing job scheduler. We are using a job scheduler from distribution. But due to a circular dependency on -SNAPSHOT builds being published to maven that require the distribution build to be successful (check opensearch-project/opensearch-build#1463), AD compilation failure caused the job scheduler to be taken out.  Not including the latest job scheduler will cause AD build to fail. So we have a chicken and egg problem: this PR fixes AD build failure and the PR itself cannot build due to missing job scheduler. To run gradle build, I changed to use local job scheduler and verified gradle build succeeded. Once I merge this PR. job scheduler should be able to build and be published to maven. Future AD CI should be unblocked after that.
  • Loading branch information
kaituo committed Nov 4, 2022
1 parent 06e5edb commit 463bae5
Show file tree
Hide file tree
Showing 30 changed files with 364 additions and 1,673 deletions.
4 changes: 0 additions & 4 deletions src/main/java/org/opensearch/ad/cluster/ADVersionUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,4 @@ public static String normalizeVersion(String adVersion) {
}
return normalizedVersion.toString();
}

public static boolean compatibleWithVersionOnOrAfter1_1(Version adVersion) {
return adVersion != null && adVersion.onOrAfter(Version.V_1_1_0);
}
}
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/cluster/HashRing.java
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ private void buildCircles(Set<String> removedNodeIds, Set<String> addedNodeIds,
// rebuild AD version hash ring with cooldown after all new node added.
rebuildCirclesForRealtimeAD();

if (!dataMigrator.isMigrated() && circles.size() > 0 && circles.lastEntry().getKey().onOrAfter(Version.V_1_1_0)) {
if (!dataMigrator.isMigrated() && circles.size() > 0) {
// Find owning node with highest AD version to make sure the data migration logic be compatible to
// latest AD version when upgrade.
Optional<DiscoveryNode> owningNode = getOwningNodeWithHighestAdVersion(DEFAULT_HASH_RING_MODEL_ID);
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/org/opensearch/ad/model/ADTaskProfile.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.opensearch.Version;
import org.opensearch.ad.annotation.Generated;
import org.opensearch.ad.cluster.ADVersionUtil;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -181,7 +180,7 @@ public void writeTo(StreamOutput out, Version adVersion) throws IOException {
out.writeOptionalInt(thresholdModelTrainingDataSize);
out.writeOptionalLong(modelSizeInBytes);
out.writeOptionalString(nodeId);
if (ADVersionUtil.compatibleWithVersionOnOrAfter1_1(adVersion)) {
if (adVersion != null) {
out.writeOptionalString(taskId);
out.writeOptionalString(adTaskType);
out.writeOptionalInt(detectorTaskSlots);
Expand Down
33 changes: 8 additions & 25 deletions src/main/java/org/opensearch/ad/model/ModelProfile.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.util.Bwc;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
Expand All @@ -43,42 +42,26 @@ public ModelProfile(String modelId, Entity entity, long modelSizeInBytes) {

public ModelProfile(StreamInput in) throws IOException {
this.modelId = in.readString();
if (Bwc.supportMultiCategoryFields(in.getVersion())) {
if (in.readBoolean()) {
this.entity = new Entity(in);
} else {
this.entity = null;
}
if (in.readBoolean()) {
this.entity = new Entity(in);
} else {
this.entity = null;
}

this.modelSizeInBytes = in.readLong();
if (!Bwc.supportMultiCategoryFields(in.getVersion())) {
// removed nodeId since Opensearch 1.1
// read it and do no assignment
in.readString();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(modelId);
if (Bwc.supportMultiCategoryFields(out.getVersion())) {
if (entity != null) {
out.writeBoolean(true);
entity.writeTo(out);
} else {
out.writeBoolean(false);
}
if (entity != null) {
out.writeBoolean(true);
entity.writeTo(out);
} else {
out.writeBoolean(false);
}

out.writeLong(modelSizeInBytes);
// removed nodeId since Opensearch 1.1
if (!Bwc.supportMultiCategoryFields(out.getVersion())) {
// write empty string for node id as we don't have it
// otherwise, we will get EOFException
out.writeString(CommonName.EMPTY_FIELD);
}
}

public String getModelId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.ad.cluster.ADVersionUtil;
import org.opensearch.ad.model.ADTaskProfile;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -50,8 +49,7 @@ public ADTaskProfile getAdTaskProfile() {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (adTaskProfile != null
&& (ADVersionUtil.compatibleWithVersionOnOrAfter1_1(remoteAdVersion) || adTaskProfile.getNodeId() != null)) {
if (adTaskProfile != null && (remoteAdVersion != null || adTaskProfile.getNodeId() != null)) {
out.writeBoolean(true);
adTaskProfile.writeTo(out, remoteAdVersion);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.model.EntityProfileName;
import org.opensearch.ad.util.Bwc;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand All @@ -41,17 +40,8 @@ public class EntityProfileRequest extends ActionRequest implements ToXContentObj
public EntityProfileRequest(StreamInput in) throws IOException {
super(in);
adID = in.readString();
if (Bwc.supportMultiCategoryFields(in.getVersion())) {
entityValue = new Entity(in);
} else {
// entity profile involving an old node won't work. Read
// EntityProfileTransportAction.doExecute for details. Read
// a string to not cause EOF exception.
// Cannot assign null to entityValue as old node has no logic to
// deal with a null entity.
String oldFormatEntityString = in.readString();
entityValue = Entity.createSingleAttributeEntity(CommonName.EMPTY_FIELD, oldFormatEntityString);
}
entityValue = new Entity(in);

int size = in.readVInt();
profilesToCollect = new HashSet<EntityProfileName>();
if (size != 0) {
Expand Down Expand Up @@ -84,14 +74,8 @@ public Set<EntityProfileName> getProfilesToCollect() {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(adID);
if (Bwc.supportMultiCategoryFields(out.getVersion())) {
entityValue.writeTo(out);
} else {
// entity profile involving an old node won't work. Read
// EntityProfileTransportAction.doExecute for details. Write
// a string to not cause EOF exception.
out.writeString(entityValue.toString());
}
entityValue.writeTo(out);

out.writeVInt(profilesToCollect.size());
for (EntityProfileName profile : profilesToCollect) {
out.writeEnum(profile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
import org.apache.commons.lang.builder.ToStringBuilder;
import org.opensearch.action.ActionResponse;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.ModelProfile;
import org.opensearch.ad.model.ModelProfileOnNode;
import org.opensearch.ad.util.Bwc;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContentObject;
Expand Down Expand Up @@ -82,14 +80,7 @@ public EntityProfileResponse(StreamInput in) throws IOException {
lastActiveMs = in.readLong();
totalUpdates = in.readLong();
if (in.readBoolean()) {
if (Bwc.supportMultiCategoryFields(in.getVersion())) {
modelProfile = new ModelProfileOnNode(in);
} else {
// we don't have model information from old node
ModelProfile profile = new ModelProfile(in);
modelProfile = new ModelProfileOnNode("", profile);
}

modelProfile = new ModelProfileOnNode(in);
} else {
modelProfile = null;
}
Expand Down Expand Up @@ -118,12 +109,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(totalUpdates);
if (modelProfile != null) {
out.writeBoolean(true);
if (Bwc.supportMultiCategoryFields(out.getVersion())) {
modelProfile.writeTo(out);
} else {
ModelProfile oldFormatModelProfile = modelProfile.getModelProfile();
oldFormatModelProfile.writeTo(out);
}
modelProfile.writeTo(out);
} else {
out.writeBoolean(false);
}
Expand Down
38 changes: 2 additions & 36 deletions src/main/java/org/opensearch/ad/transport/EntityResultRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import static org.opensearch.action.ValidateActions.addValidationError;

import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

Expand All @@ -25,7 +24,6 @@
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.util.Bwc;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand All @@ -46,21 +44,7 @@ public EntityResultRequest(StreamInput in) throws IOException {

// guarded with version check. Just in case we receive requests from older node where we use String
// to represent an entity
if (Bwc.supportMultiCategoryFields(in.getVersion())) {
this.entities = in.readMap(Entity::new, StreamInput::readDoubleArray);
} else {
// receive a request from a version before OpenSearch 1.1
// the old request uses Map<String, double[]> instead of Map<Entity, double[]> to represent entities
// since it only supports one categorical field.
Map<String, double[]> oldFormatEntities = in.readMap(StreamInput::readString, StreamInput::readDoubleArray);
entities = new HashMap<>();
for (Map.Entry<String, double[]> entry : oldFormatEntities.entrySet()) {
// we don't know the category field name as we don't have access to detector config object
// so we put empty string as the category field name for now. Will handle the case
// in EntityResultTransportAciton.
entities.put(Entity.createSingleAttributeEntity(CommonName.EMPTY_FIELD, entry.getKey()), entry.getValue());
}
}
this.entities = in.readMap(Entity::new, StreamInput::readDoubleArray);

this.start = in.readLong();
this.end = in.readLong();
Expand Down Expand Up @@ -96,25 +80,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(this.detectorId);
// guarded with version check. Just in case we send requests to older node where we use String
// to represent an entity
if (Bwc.supportMultiCategoryFields(out.getVersion())) {
out.writeMap(entities, (s, e) -> e.writeTo(s), StreamOutput::writeDoubleArray);
} else {
Map<String, double[]> oldFormatEntities = new HashMap<>();
for (Map.Entry<Entity, double[]> entry : entities.entrySet()) {
Map<String, String> attributes = entry.getKey().getAttributes();
if (attributes.size() != 1) {
// cannot send a multi-category field entity to old node since it will
// cause EOF exception and stop the detector. The issue
// is temporary and will be gone after upgrade completes.
// Since one EntityResultRequest is sent to one node, we can safely
// ignore the rest of the requests.
LOG.info("Skip sending multi-category entities to an incompatible node. Attributes: ", attributes);
break;
}
oldFormatEntities.put(entry.getKey().getAttributes().entrySet().iterator().next().getValue(), entry.getValue());
}
out.writeMap(oldFormatEntities, StreamOutput::writeString, StreamOutput::writeDoubleArray);
}
out.writeMap(entities, (s, e) -> e.writeTo(s), StreamOutput::writeDoubleArray);

out.writeLong(this.start);
out.writeLong(this.end);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.Version;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.ad.cluster.ADVersionUtil;
import org.opensearch.ad.common.exception.ADVersionException;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.model.ADTask;
Expand Down Expand Up @@ -64,11 +63,8 @@ public ForwardADTaskRequest(
Integer availableTaskSlots,
Version remoteAdVersion
) {
if (!ADVersionUtil.compatibleWithVersionOnOrAfter1_1(remoteAdVersion)) {
throw new ADVersionException(
detector.getDetectorId(),
"Can't forward AD task request to node running AD version " + remoteAdVersion
);
if (remoteAdVersion == null) {
throw new ADVersionException(detector.getDetectorId(), "Can't forward AD task request to node running null AD version ");
}
this.detector = detector;
this.detectionDateRange = detectionDateRange;
Expand Down
19 changes: 8 additions & 11 deletions src/main/java/org/opensearch/ad/transport/ProfileNodeResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.ModelProfile;
import org.opensearch.ad.util.Bwc;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -51,7 +50,7 @@ public ProfileNodeResponse(StreamInput in) throws IOException {
shingleSize = in.readInt();
activeEntities = in.readVLong();
totalUpdates = in.readVLong();
if (Bwc.supportMultiCategoryFields(in.getVersion()) && in.readBoolean()) {
if (in.readBoolean()) {
// added after OpenSearch 1.0
modelProfiles = in.readList(ModelProfile::new);
modelCount = in.readVLong();
Expand Down Expand Up @@ -111,15 +110,13 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeInt(shingleSize);
out.writeVLong(activeEntities);
out.writeVLong(totalUpdates);
if (Bwc.supportMultiCategoryFields(out.getVersion())) {
// added after OpenSearch 1.0
if (modelProfiles != null) {
out.writeBoolean(true);
out.writeList(modelProfiles);
out.writeVLong(modelCount);
} else {
out.writeBoolean(false);
}
// added after OpenSearch 1.0
if (modelProfiles != null) {
out.writeBoolean(true);
out.writeList(modelProfiles);
out.writeVLong(modelCount);
} else {
out.writeBoolean(false);
}
}

Expand Down
28 changes: 5 additions & 23 deletions src/main/java/org/opensearch/ad/transport/ProfileResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.ModelProfile;
import org.opensearch.ad.model.ModelProfileOnNode;
import org.opensearch.ad.util.Bwc;
import org.opensearch.cluster.ClusterName;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -65,23 +64,15 @@ public ProfileResponse(StreamInput in) throws IOException {
int size = in.readVInt();
modelProfile = new ModelProfileOnNode[size];
for (int i = 0; i < size; i++) {
if (Bwc.supportMultiCategoryFields(in.getVersion())) {
modelProfile[i] = new ModelProfileOnNode(in);
} else {
// we don't have model information from old node
ModelProfile profile = new ModelProfile(in);
modelProfile[i] = new ModelProfileOnNode(CommonName.EMPTY_FIELD, profile);
}
modelProfile[i] = new ModelProfileOnNode(in);
}

shingleSize = in.readInt();
coordinatingNode = in.readString();
totalSizeInBytes = in.readVLong();
activeEntities = in.readVLong();
totalUpdates = in.readVLong();
if (Bwc.supportMultiCategoryFields(in.getVersion())) {
modelCount = in.readVLong();
}
modelCount = in.readVLong();
}

/**
Expand Down Expand Up @@ -140,25 +131,16 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(modelProfile.length);

if (Bwc.supportMultiCategoryFields(out.getVersion())) {
for (ModelProfileOnNode profile : modelProfile) {
profile.writeTo(out);
}
} else {
for (ModelProfileOnNode profile : modelProfile) {
ModelProfile oldFormatModelProfile = profile.getModelProfile();
oldFormatModelProfile.writeTo(out);
}
for (ModelProfileOnNode profile : modelProfile) {
profile.writeTo(out);
}

out.writeInt(shingleSize);
out.writeString(coordinatingNode);
out.writeVLong(totalSizeInBytes);
out.writeVLong(activeEntities);
out.writeVLong(totalUpdates);
if (Bwc.supportMultiCategoryFields(out.getVersion())) {
out.writeVLong(modelCount);
}
out.writeVLong(modelCount);
}

@Override
Expand Down
Loading

0 comments on commit 463bae5

Please sign in to comment.