Skip to content

Commit

Permalink
Serialize using codec version
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <[email protected]>
  • Loading branch information
soosinha committed Oct 21, 2024
1 parent ad7f9e7 commit 2349d48
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix inefficient Stream API call chains ending with count() ([#15386](https://github.com/opensearch-project/OpenSearch/pull/15386))
- Fix array hashCode calculation in ResyncReplicationRequest ([#16378](https://github.com/opensearch-project/OpenSearch/pull/16378))
- Fix typo super->sb in method toString() of RemoteStoreNodeAttribute ([#15362](https://github.com/opensearch-project/OpenSearch/pull/15362))
- Fix version upgrade for remote state enabled cluster ([#16403](https://github.com/opensearch-project/OpenSearch/pull/16403))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1216,12 +1216,18 @@ private static String componentPrefix(Object[] fields) {

private static final ConstructingObjectParser<UploadedIndexMetadata, Void> PARSER_V0 = new ConstructingObjectParser<>(
"uploaded_index_metadata",
fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields))
fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields), CODEC_V0)
);

private static final ConstructingObjectParser<UploadedIndexMetadata, Void> PARSER_V2 = new ConstructingObjectParser<>(
"uploaded_index_metadata",
fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields), componentPrefix(fields))
fields -> new UploadedIndexMetadata(
indexName(fields),
indexUUID(fields),
uploadedFilename(fields),
componentPrefix(fields),
CODEC_V2
)
);

private static final ConstructingObjectParser<UploadedIndexMetadata, Void> CURRENT_PARSER = PARSER_V2;
Expand Down Expand Up @@ -1306,6 +1312,10 @@ public String getComponentPrefix() {
return componentPrefix;
}

protected void setCodecVersion(long codecVersion) {
this.codecVersion = codecVersion;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(INDEX_NAME_FIELD.getPreferredName(), getIndexName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ RemoteClusterStateManifestInfo uploadManifest(
) {
synchronized (this) {
ClusterMetadataManifest.Builder manifestBuilder = ClusterMetadataManifest.builder();
uploadedMetadataResult.uploadedIndexMetadata.forEach(md -> md.setCodecVersion(codecVersion));
manifestBuilder.clusterTerm(clusterState.term())
.stateVersion(clusterState.getVersion())
.clusterUUID(clusterState.metadata().clusterUUID())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public UploadedMetadata getUploadedMetadata() {

@Override
public InputStream serialize() throws IOException {
return CLUSTER_METADATA_MANIFEST_FORMAT.serialize(
ChecksumBlobStoreFormat<ClusterMetadataManifest> blobStoreFormat = getClusterMetadataManifestBlobStoreFormatForUpload();
return blobStoreFormat.serialize(
clusterMetadataManifest,
generateBlobFileName(),
getCompressor(),
Expand All @@ -133,7 +134,7 @@ public InputStream serialize() throws IOException {

@Override
public ClusterMetadataManifest deserialize(final InputStream inputStream) throws IOException {
ChecksumBlobStoreFormat<ClusterMetadataManifest> blobStoreFormat = getClusterMetadataManifestBlobStoreFormat();
ChecksumBlobStoreFormat<ClusterMetadataManifest> blobStoreFormat = getClusterMetadataManifestBlobStoreFormatForDownload();
return blobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream));
}

Expand All @@ -151,8 +152,17 @@ int getManifestCodecVersion() {
}
}

private ChecksumBlobStoreFormat<ClusterMetadataManifest> getClusterMetadataManifestBlobStoreFormat() {
private ChecksumBlobStoreFormat<ClusterMetadataManifest> getClusterMetadataManifestBlobStoreFormatForDownload() {
long codecVersion = getManifestCodecVersion();
return getClusterMetadataManifestBlobStoreFormat(codecVersion);
}

private ChecksumBlobStoreFormat<ClusterMetadataManifest> getClusterMetadataManifestBlobStoreFormatForUpload() {
long codecVersion = clusterMetadataManifest.getCodecVersion();
return getClusterMetadataManifestBlobStoreFormat(codecVersion);
}

private ChecksumBlobStoreFormat<ClusterMetadataManifest> getClusterMetadataManifestBlobStoreFormat(long codecVersion) {
if (codecVersion == ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION) {
return CLUSTER_METADATA_MANIFEST_FORMAT;
} else if (codecVersion == ClusterMetadataManifest.CODEC_V3) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.core.compress.NoneCompressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
Expand All @@ -37,11 +38,13 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V0;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST;
Expand Down Expand Up @@ -214,18 +217,20 @@ public void testGetUploadedMetadata() throws IOException {
}

public void testSerDe() throws IOException {
ClusterMetadataManifest manifest = getClusterMetadataManifest();
RemoteClusterMetadataManifest remoteObjectForUpload = new RemoteClusterMetadataManifest(
manifest,
clusterUUID,
compressor,
namedXContentRegistry
);
try (InputStream inputStream = remoteObjectForUpload.serialize()) {
remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath());
assertThat(inputStream.available(), greaterThan(0));
ClusterMetadataManifest readManifest = remoteObjectForUpload.deserialize(inputStream);
assertThat(readManifest, is(manifest));
for (int codecVersion : ClusterMetadataManifest.CODEC_VERSIONS) {
ClusterMetadataManifest manifest = getClusterMetadataManifestForCodecVersion(codecVersion);
RemoteClusterMetadataManifest remoteObjectForUpload = new RemoteClusterMetadataManifest(
manifest,
clusterUUID,
compressor,
namedXContentRegistry
);
try (InputStream inputStream = remoteObjectForUpload.serialize()) {
remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath());
assertThat(inputStream.available(), greaterThan(0));
ClusterMetadataManifest readManifest = remoteObjectForUpload.deserialize(inputStream);
validateManifest(manifest, readManifest);
}
}

String blobName = "/usr/local/manifest__1__2__3__4__5__6";
Expand Down Expand Up @@ -261,9 +266,13 @@ public void testGetManifestCodecVersion() {
}

private ClusterMetadataManifest getClusterMetadataManifest() {
return ClusterMetadataManifest.builder()
.opensearchVersion(Version.CURRENT)
.codecVersion(MANIFEST_CURRENT_CODEC_VERSION)
return getClusterMetadataManifestForCodecVersion(MANIFEST_CURRENT_CODEC_VERSION);
}

private ClusterMetadataManifest getClusterMetadataManifestForCodecVersion(int codecVersion) {
ClusterMetadataManifest.Builder builder = ClusterMetadataManifest.builder();
builder.opensearchVersion(Version.CURRENT)
.codecVersion(codecVersion)
.nodeId("test-node")
.clusterUUID("test-uuid")
.previousClusterUUID("_NA_")
Expand All @@ -272,6 +281,64 @@ private ClusterMetadataManifest getClusterMetadataManifest() {
.stateVersion(VERSION)
.committed(true)
.coordinationMetadata(new UploadedMetadataAttribute("test-attr", "uploaded-file"))
.build();
.indices(List.of(new UploadedIndexMetadata("test-index", "tst-idx", "uploaded-index-file", codecVersion)))
.clusterUUIDCommitted(true);

if (codecVersion == CODEC_V1) {
builder.globalMetadataFileName("global-metadata-file-name");
}
if (codecVersion >= CODEC_V2) {
builder.coordinationMetadata(new UploadedMetadataAttribute("uploaded_coordination", "coordination-metadata-file"));
builder.settingMetadata(new UploadedMetadataAttribute("uploaded_settings", "settings-metadata-file"));
builder.templatesMetadata(new UploadedMetadataAttribute("uploaded_templates", "templates-metadata-file"));
builder.customMetadataMap(Map.of("uploaded_custom", new UploadedMetadataAttribute("uploaded_custom", "custom-metadata-file")));
builder.routingTableVersion(1L);
builder.indicesRouting(List.of(new UploadedIndexMetadata("test-index", "tst-idx", "uploaded_routing", "routing--")));
builder.discoveryNodesMetadata(new UploadedMetadataAttribute("uploaded_discovery", "discovery-metadata-file"));
builder.clusterBlocksMetadata(new UploadedMetadataAttribute("uploaded_blocks", "blocks-metadata-file"));
builder.metadataVersion(1L);
builder.transientSettingsMetadata(
new UploadedMetadataAttribute("uploaded_transient_settings", "transient-settings-metadata-file")
);
builder.hashesOfConsistentSettings(new UploadedMetadataAttribute("uploaded_hashes_settings", "hashes-settings-metadata-file"));
builder.clusterStateCustomMetadataMap(
Map.of("uploaded_custom", new UploadedMetadataAttribute("uploaded_custom", "custom-metadata-file"))
);
}

return builder.build();
}

private void validateManifest(ClusterMetadataManifest writeManifest, ClusterMetadataManifest readManifest) {
assertThat(readManifest.getOpensearchVersion(), is(writeManifest.getOpensearchVersion()));
assertThat(readManifest.getCodecVersion(), is(writeManifest.getCodecVersion()));
assertThat(readManifest.getNodeId(), is(writeManifest.getNodeId()));
assertThat(readManifest.getClusterUUID(), is(writeManifest.getClusterUUID()));
assertThat(readManifest.getPreviousClusterUUID(), is(writeManifest.getPreviousClusterUUID()));
assertThat(readManifest.getStateUUID(), is(writeManifest.getStateUUID()));
assertThat(readManifest.getClusterTerm(), is(writeManifest.getClusterTerm()));
assertThat(readManifest.getStateVersion(), is(writeManifest.getStateVersion()));
assertThat(readManifest.isCommitted(), is(writeManifest.isCommitted()));
assertThat(readManifest.getPreviousClusterUUID(), is(writeManifest.getPreviousClusterUUID()));
assertThat(readManifest.isClusterUUIDCommitted(), is(writeManifest.isClusterUUIDCommitted()));
assertThat(readManifest.getIndices(), is(writeManifest.getIndices()));
if (writeManifest.getCodecVersion() == CODEC_V1) {
assertThat(readManifest.getGlobalMetadataFileName(), is(writeManifest.getGlobalMetadataFileName()));
}
if (writeManifest.getCodecVersion() >= CODEC_V2) {
assertThat(readManifest.getCoordinationMetadata(), is(writeManifest.getCoordinationMetadata()));
assertThat(readManifest.getSettingsMetadata(), is(writeManifest.getSettingsMetadata()));
assertThat(readManifest.getTemplatesMetadata(), is(writeManifest.getTemplatesMetadata()));
assertThat(readManifest.getCustomMetadataMap(), is(writeManifest.getCustomMetadataMap()));
assertThat(readManifest.getRoutingTableVersion(), is(writeManifest.getRoutingTableVersion()));
assertThat(readManifest.getIndicesRouting(), is(writeManifest.getIndicesRouting()));
assertThat(readManifest.getDiscoveryNodesMetadata(), is(writeManifest.getDiscoveryNodesMetadata()));
assertThat(readManifest.getClusterBlocksMetadata(), is(writeManifest.getClusterBlocksMetadata()));
assertThat(readManifest.getMetadataVersion(), is(writeManifest.getMetadataVersion()));
assertThat(readManifest.getDiffManifest(), is(writeManifest.getDiffManifest()));
assertThat(readManifest.getTransientSettingsMetadata(), is(writeManifest.getTransientSettingsMetadata()));
assertThat(readManifest.getHashesOfConsistentSettings(), is(writeManifest.getHashesOfConsistentSettings()));
assertThat(readManifest.getClusterStateCustomMap(), is(writeManifest.getClusterStateCustomMap()));
}
}
}

0 comments on commit 2349d48

Please sign in to comment.