Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote routing input stream read #13603

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* GitHub history for details.
*/

package org.opensearch.index.translog;
package org.opensearch.common.io.stream;

import org.apache.lucene.store.BufferedChecksum;
import org.apache.lucene.util.BitUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* GitHub history for details.
*/

package org.opensearch.index.translog;
package org.opensearch.common.io.stream;

import org.apache.lucene.store.BufferedChecksum;
import org.opensearch.common.annotation.PublicApi;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {

public static final int CODEC_V0 = 0; // Older codec version, where we haven't introduced codec versions for manifest.
public static final int CODEC_V1 = 1; // In Codec V1 we have introduced global-metadata and codec version in Manifest file.
public static final int CODEC_V2 = 2; // In Codec V2 we introduce index routing-metadata in manifest file.

private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term");
private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version");
Expand All @@ -48,6 +49,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {
private static final ParseField INDICES_FIELD = new ParseField("indices");
private static final ParseField PREVIOUS_CLUSTER_UUID = new ParseField("previous_cluster_uuid");
private static final ParseField CLUSTER_UUID_COMMITTED = new ParseField("cluster_uuid_committed");
private static final ParseField INDICES_ROUTING_FIELD = new ParseField("indices_routing");

private static long term(Object[] fields) {
return (long) fields[0];
Expand Down Expand Up @@ -97,6 +99,10 @@ private static String globalMetadataFileName(Object[] fields) {
return (String) fields[11];
}

private static List<UploadedIndexMetadata> indicesRouting(Object[] fields) {
return (List<UploadedIndexMetadata>) fields[12];
}

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> PARSER_V0 = new ConstructingObjectParser<>(
"cluster_metadata_manifest",
fields -> new ClusterMetadataManifest(
Expand Down Expand Up @@ -133,11 +139,31 @@ private static String globalMetadataFileName(Object[] fields) {
)
);

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> CURRENT_PARSER = PARSER_V1;
private static final ConstructingObjectParser<ClusterMetadataManifest, Void> PARSER_V2 = new ConstructingObjectParser<>(
"cluster_metadata_manifest",
fields -> new ClusterMetadataManifest(
term(fields),
version(fields),
clusterUUID(fields),
stateUUID(fields),
opensearchVersion(fields),
nodeId(fields),
committed(fields),
codecVersion(fields),
globalMetadataFileName(fields),
indices(fields),
previousClusterUUID(fields),
clusterUUIDCommitted(fields),
indicesRouting(fields)
)
);

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> CURRENT_PARSER = PARSER_V2;

static {
declareParser(PARSER_V0, CODEC_V0);
declareParser(PARSER_V1, CODEC_V1);
declareParser(PARSER_V2, CODEC_V2);
}

private static void declareParser(ConstructingObjectParser<ClusterMetadataManifest, Void> parser, long codec_version) {
Expand All @@ -160,6 +186,13 @@ private static void declareParser(ConstructingObjectParser<ClusterMetadataManife
parser.declareInt(ConstructingObjectParser.constructorArg(), CODEC_VERSION_FIELD);
parser.declareString(ConstructingObjectParser.constructorArg(), GLOBAL_METADATA_FIELD);
}
if (codec_version >= CODEC_V2) {
parser.declareObjectArray(
ConstructingObjectParser.constructorArg(),
(p, c) -> UploadedIndexMetadata.fromXContent(p),
INDICES_ROUTING_FIELD
);
}
}

private final int codecVersion;
Expand All @@ -174,6 +207,7 @@ private static void declareParser(ConstructingObjectParser<ClusterMetadataManife
private final boolean committed;
private final String previousClusterUUID;
private final boolean clusterUUIDCommitted;
private final List<UploadedIndexMetadata> indicesRouting;

public List<UploadedIndexMetadata> getIndices() {
return indices;
Expand Down Expand Up @@ -223,6 +257,10 @@ public String getGlobalMetadataFileName() {
return globalMetadataFileName;
}

public List<UploadedIndexMetadata> getIndicesRouting() {
return indicesRouting;
}

public ClusterMetadataManifest(
long clusterTerm,
long version,
Expand All @@ -237,6 +275,25 @@ public ClusterMetadataManifest(
String previousClusterUUID,
boolean clusterUUIDCommitted
) {
this(clusterTerm, version, clusterUUID, stateUUID, opensearchVersion, nodeId, committed, codecVersion,
globalMetadataFileName, indices, previousClusterUUID, clusterUUIDCommitted, new ArrayList<>());
}

public ClusterMetadataManifest(
long clusterTerm,
long version,
String clusterUUID,
String stateUUID,
Version opensearchVersion,
String nodeId,
boolean committed,
int codecVersion,
String globalMetadataFileName,
List<UploadedIndexMetadata> indices,
String previousClusterUUID,
boolean clusterUUIDCommitted,
List<UploadedIndexMetadata> indicesRouting
) {
this.clusterTerm = clusterTerm;
this.stateVersion = version;
this.clusterUUID = clusterUUID;
Expand All @@ -249,6 +306,7 @@ public ClusterMetadataManifest(
this.indices = Collections.unmodifiableList(indices);
this.previousClusterUUID = previousClusterUUID;
this.clusterUUIDCommitted = clusterUUIDCommitted;
this.indicesRouting = Collections.unmodifiableList(indicesRouting);
}

public ClusterMetadataManifest(StreamInput in) throws IOException {
Expand All @@ -262,12 +320,18 @@ public ClusterMetadataManifest(StreamInput in) throws IOException {
this.indices = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new));
this.previousClusterUUID = in.readString();
this.clusterUUIDCommitted = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
if (in.getVersion().onOrAfter(Version.V_2_14_0)) {
this.codecVersion = in.readInt();
this.globalMetadataFileName = in.readString();
this.indicesRouting = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new));
} else if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
this.codecVersion = in.readInt();
this.globalMetadataFileName = in.readString();
this.indicesRouting = null;
} else {
this.codecVersion = CODEC_V0; // Default codec
this.globalMetadataFileName = null;
this.indicesRouting = null;
}
}

Expand All @@ -291,7 +355,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startArray(INDICES_FIELD.getPreferredName());
{
for (UploadedIndexMetadata uploadedIndexMetadata : indices) {
builder.startObject();
uploadedIndexMetadata.toXContent(builder, params);
builder.endObject();
}
}
builder.endArray();
Expand All @@ -301,6 +367,17 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(CODEC_VERSION_FIELD.getPreferredName(), getCodecVersion());
builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName());
}
if (onOrAfterCodecVersion(CODEC_V2)) {
builder.startArray(INDICES_ROUTING_FIELD.getPreferredName());
{
for (UploadedIndexMetadata uploadedIndexMetadata : indicesRouting) {
builder.startObject();
uploadedIndexMetadata.toXContent(builder, params);
builder.endObject();
}
}
builder.endArray();
}
return builder;
}

Expand All @@ -320,6 +397,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeInt(codecVersion);
out.writeString(globalMetadataFileName);
}
if (out.getVersion().onOrAfter(Version.V_2_14_0)) {
out.writeCollection(indicesRouting);
}
}

@Override
Expand All @@ -342,7 +422,8 @@ public boolean equals(Object o) {
&& Objects.equals(previousClusterUUID, that.previousClusterUUID)
&& Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted)
&& Objects.equals(globalMetadataFileName, that.globalMetadataFileName)
&& Objects.equals(codecVersion, that.codecVersion);
&& Objects.equals(codecVersion, that.codecVersion)
&& Objects.equals(indicesRouting, that.indicesRouting);
}

@Override
Expand All @@ -359,7 +440,8 @@ public int hashCode() {
nodeId,
committed,
previousClusterUUID,
clusterUUIDCommitted
clusterUUIDCommitted,
indicesRouting
);
}

Expand Down Expand Up @@ -399,12 +481,18 @@ public static class Builder {
private String previousClusterUUID;
private boolean committed;
private boolean clusterUUIDCommitted;
private List<UploadedIndexMetadata> indicesRouting;

public Builder indices(List<UploadedIndexMetadata> indices) {
this.indices = indices;
return this;
}

public Builder indicesRouting(List<UploadedIndexMetadata> indicesRouting) {
this.indicesRouting = indicesRouting;
return this;
}

public Builder codecVersion(int codecVersion) {
this.codecVersion = codecVersion;
return this;
Expand Down Expand Up @@ -454,6 +542,10 @@ public List<UploadedIndexMetadata> getIndices() {
return indices;
}

public List<UploadedIndexMetadata> getIndicesRouting() {
return indicesRouting;
}

public Builder previousClusterUUID(String previousClusterUUID) {
this.previousClusterUUID = previousClusterUUID;
return this;
Expand Down Expand Up @@ -481,6 +573,7 @@ public Builder(ClusterMetadataManifest manifest) {
this.indices = new ArrayList<>(manifest.indices);
this.previousClusterUUID = manifest.previousClusterUUID;
this.clusterUUIDCommitted = manifest.clusterUUIDCommitted;
this.indicesRouting = new ArrayList<>(manifest.indicesRouting);
}

public ClusterMetadataManifest build() {
Expand All @@ -496,7 +589,8 @@ public ClusterMetadataManifest build() {
globalMetadataFileName,
indices,
previousClusterUUID,
clusterUUIDCommitted
clusterUUIDCommitted,
indicesRouting
);
}

Expand Down Expand Up @@ -571,11 +665,10 @@ public String getIndexUUID() {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
return builder
.field(INDEX_NAME_FIELD.getPreferredName(), getIndexName())
.field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID())
.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath())
.endObject();
.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.routingtable;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.opensearch.Version;
import org.opensearch.common.io.stream.BufferedChecksumStreamInput;
import org.opensearch.common.io.stream.BufferedChecksumStreamOutput;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.EOFException;
import java.io.IOException;

/**
* The stored header information for the individual index routing table
*/
public class IndexRoutingTableHeader {

private final long routingTableVersion;

private final String indexName;

private final Version nodeVersion;

public static final String INDEX_ROUTING_HEADER_CODEC = "index_routing_header_codec";

public static final int INITIAL_VERSION = 1;

public static final int CURRENT_VERSION = INITIAL_VERSION;

public IndexRoutingTableHeader(long routingTableVersion, String indexName, Version nodeVersion) {
this.routingTableVersion = routingTableVersion;
this.indexName = indexName;
this.nodeVersion = nodeVersion;
}

/**
* Returns the bytes reference for the {@link IndexRoutingTableHeader}
* @throws IOException
*/
public void write(StreamOutput out) throws IOException {
CodecUtil.writeHeader(new OutputStreamDataOutput(out), INDEX_ROUTING_HEADER_CODEC, CURRENT_VERSION);
// Write version
out.writeLong(routingTableVersion);
out.writeInt(nodeVersion.id);
out.writeString(indexName);

out.flush();
}

/**
* Reads the contents on the byte array into the corresponding {@link IndexRoutingTableHeader}
* @param in
* @return IndexRoutingTableHeader
* @throws IOException
*/
public static IndexRoutingTableHeader read(BufferedChecksumStreamInput in) throws IOException {
try {
readHeaderVersion(in);
final long version = in.readLong();
final int nodeVersion = in.readInt();
final String name = in.readString();
assert version >= 0 : "Version must be non-negative [" + version + "]";
return new IndexRoutingTableHeader(version, name, Version.fromId(nodeVersion));
} catch (EOFException e) {
throw new IOException("index routing header truncated", e);
}
}

static int readHeaderVersion(final StreamInput in) throws IOException {
final int version;
try {
version = CodecUtil.checkHeader(new InputStreamDataInput(in), INDEX_ROUTING_HEADER_CODEC, INITIAL_VERSION, CURRENT_VERSION);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
throw new IOException("index routing table header corrupted", e);
}
return version;
}

public long getRoutingTableVersion() {
return routingTableVersion;
}

public String getIndexName() {
return indexName;
}

public Version getNodeVersion() {
return nodeVersion;
}
}
Loading
Loading