forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implementation of RemoteWritableEntity for objects to uploaded to rem…
…ote store (opensearch-project#13834) * Implementation of RemoteWritableEntity for objects to uploaded to remote store Signed-off-by: Sooraj Sinha <[email protected]>
- Loading branch information
Showing
17 changed files
with
2,408 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
153 changes: 153 additions & 0 deletions
153
server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.gateway.remote.model; | ||
|
||
import org.opensearch.common.io.Streams; | ||
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; | ||
import org.opensearch.common.remote.BlobPathParameters; | ||
import org.opensearch.core.compress.Compressor; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.gateway.remote.ClusterMetadataManifest; | ||
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; | ||
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; | ||
import org.opensearch.gateway.remote.RemoteClusterStateUtils; | ||
import org.opensearch.index.remote.RemoteStoreUtils; | ||
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.List; | ||
|
||
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; | ||
|
||
/** | ||
* Wrapper class for uploading/downloading {@link ClusterMetadataManifest} to/from remote blob store | ||
*/ | ||
public class RemoteClusterMetadataManifest extends AbstractRemoteWritableBlobEntity<ClusterMetadataManifest> { | ||
|
||
public static final String MANIFEST = "manifest"; | ||
public static final int SPLITTED_MANIFEST_FILE_LENGTH = 6; | ||
|
||
public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; | ||
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V3; | ||
public static final String COMMITTED = "C"; | ||
public static final String PUBLISHED = "P"; | ||
|
||
/** | ||
* Manifest format compatible with older codec v0, where codec version was missing. | ||
*/ | ||
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V0 = | ||
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV0); | ||
/** | ||
* Manifest format compatible with older codec v1, where global metadata was missing. | ||
*/ | ||
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V1 = | ||
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV1); | ||
|
||
/** | ||
* Manifest format compatible with codec v2, where we introduced codec versions/global metadata. | ||
*/ | ||
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>( | ||
"cluster-metadata-manifest", | ||
METADATA_MANIFEST_NAME_FORMAT, | ||
ClusterMetadataManifest::fromXContent | ||
); | ||
|
||
private ClusterMetadataManifest clusterMetadataManifest; | ||
|
||
public RemoteClusterMetadataManifest( | ||
final ClusterMetadataManifest clusterMetadataManifest, | ||
final String clusterUUID, | ||
final Compressor compressor, | ||
final NamedXContentRegistry namedXContentRegistry | ||
) { | ||
super(clusterUUID, compressor, namedXContentRegistry); | ||
this.clusterMetadataManifest = clusterMetadataManifest; | ||
} | ||
|
||
public RemoteClusterMetadataManifest( | ||
final String blobName, | ||
final String clusterUUID, | ||
final Compressor compressor, | ||
final NamedXContentRegistry namedXContentRegistry | ||
) { | ||
super(clusterUUID, compressor, namedXContentRegistry); | ||
this.blobName = blobName; | ||
} | ||
|
||
@Override | ||
public BlobPathParameters getBlobPathParameters() { | ||
return new BlobPathParameters(List.of(MANIFEST), MANIFEST); | ||
} | ||
|
||
@Override | ||
public String generateBlobFileName() { | ||
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__ | ||
// <codec_version> | ||
String blobFileName = String.join( | ||
DELIMITER, | ||
MANIFEST, | ||
RemoteStoreUtils.invertLong(clusterMetadataManifest.getClusterTerm()), | ||
RemoteStoreUtils.invertLong(clusterMetadataManifest.getStateVersion()), | ||
(clusterMetadataManifest.isCommitted() ? COMMITTED : PUBLISHED), | ||
RemoteStoreUtils.invertLong(System.currentTimeMillis()), | ||
String.valueOf(clusterMetadataManifest.getCodecVersion()) | ||
// Keep the codec version at last place only, during we read last place to determine codec version. | ||
); | ||
this.blobFileName = blobFileName; | ||
return blobFileName; | ||
} | ||
|
||
@Override | ||
public UploadedMetadata getUploadedMetadata() { | ||
assert blobName != null; | ||
return new UploadedMetadataAttribute(MANIFEST, blobName); | ||
} | ||
|
||
@Override | ||
public InputStream serialize() throws IOException { | ||
return CLUSTER_METADATA_MANIFEST_FORMAT.serialize( | ||
clusterMetadataManifest, | ||
generateBlobFileName(), | ||
getCompressor(), | ||
RemoteClusterStateUtils.FORMAT_PARAMS | ||
).streamInput(); | ||
} | ||
|
||
@Override | ||
public ClusterMetadataManifest deserialize(final InputStream inputStream) throws IOException { | ||
ChecksumBlobStoreFormat<ClusterMetadataManifest> blobStoreFormat = getClusterMetadataManifestBlobStoreFormat(); | ||
return blobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); | ||
} | ||
|
||
private int getManifestCodecVersion() { | ||
assert blobName != null; | ||
String[] splitName = blobName.split(DELIMITER); | ||
if (splitName.length == SPLITTED_MANIFEST_FILE_LENGTH) { | ||
return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version. | ||
} else if (splitName.length < SPLITTED_MANIFEST_FILE_LENGTH) { // Where codec is not part of file name, i.e. default codec version 0 | ||
// is used. | ||
return ClusterMetadataManifest.CODEC_V0; | ||
} else { | ||
throw new IllegalArgumentException("Manifest file name is corrupted"); | ||
} | ||
} | ||
|
||
private ChecksumBlobStoreFormat<ClusterMetadataManifest> getClusterMetadataManifestBlobStoreFormat() { | ||
long codecVersion = getManifestCodecVersion(); | ||
if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) { | ||
return CLUSTER_METADATA_MANIFEST_FORMAT; | ||
} else if (codecVersion == ClusterMetadataManifest.CODEC_V1) { | ||
return CLUSTER_METADATA_MANIFEST_FORMAT_V1; | ||
} else if (codecVersion == ClusterMetadataManifest.CODEC_V0) { | ||
return CLUSTER_METADATA_MANIFEST_FORMAT_V0; | ||
} | ||
throw new IllegalArgumentException("Cluster metadata manifest file is corrupted, don't have valid codec version"); | ||
} | ||
} |
107 changes: 107 additions & 0 deletions
107
server/src/main/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadata.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.gateway.remote.model; | ||
|
||
import org.opensearch.cluster.coordination.CoordinationMetadata; | ||
import org.opensearch.common.io.Streams; | ||
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; | ||
import org.opensearch.common.remote.BlobPathParameters; | ||
import org.opensearch.core.compress.Compressor; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; | ||
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; | ||
import org.opensearch.gateway.remote.RemoteClusterStateUtils; | ||
import org.opensearch.index.remote.RemoteStoreUtils; | ||
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.List; | ||
|
||
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; | ||
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION; | ||
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_PLAIN_FORMAT; | ||
|
||
/** | ||
* Wrapper class for uploading/downloading {@link CoordinationMetadata} to/from remote blob store | ||
*/ | ||
public class RemoteCoordinationMetadata extends AbstractRemoteWritableBlobEntity<CoordinationMetadata> { | ||
|
||
public static final String COORDINATION_METADATA = "coordination"; | ||
public static final ChecksumBlobStoreFormat<CoordinationMetadata> COORDINATION_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( | ||
"coordination", | ||
METADATA_NAME_PLAIN_FORMAT, | ||
CoordinationMetadata::fromXContent | ||
); | ||
|
||
private CoordinationMetadata coordinationMetadata; | ||
private long metadataVersion; | ||
|
||
public RemoteCoordinationMetadata( | ||
final CoordinationMetadata coordinationMetadata, | ||
final long metadataVersion, | ||
final String clusterUUID, | ||
final Compressor compressor, | ||
final NamedXContentRegistry namedXContentRegistry | ||
) { | ||
super(clusterUUID, compressor, namedXContentRegistry); | ||
this.coordinationMetadata = coordinationMetadata; | ||
this.metadataVersion = metadataVersion; | ||
} | ||
|
||
public RemoteCoordinationMetadata( | ||
final String blobName, | ||
final String clusterUUID, | ||
final Compressor compressor, | ||
final NamedXContentRegistry namedXContentRegistry | ||
) { | ||
super(clusterUUID, compressor, namedXContentRegistry); | ||
this.blobName = blobName; | ||
} | ||
|
||
@Override | ||
public BlobPathParameters getBlobPathParameters() { | ||
return new BlobPathParameters(List.of("global-metadata"), COORDINATION_METADATA); | ||
} | ||
|
||
@Override | ||
public String generateBlobFileName() { | ||
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/<componentPrefix>__<inverted_metadata_version>__<inverted__timestamp>__<codec_version> | ||
String blobFileName = String.join( | ||
DELIMITER, | ||
getBlobPathParameters().getFilePrefix(), | ||
RemoteStoreUtils.invertLong(metadataVersion), | ||
RemoteStoreUtils.invertLong(System.currentTimeMillis()), | ||
String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION) | ||
); | ||
this.blobFileName = blobFileName; | ||
return blobFileName; | ||
} | ||
|
||
@Override | ||
public InputStream serialize() throws IOException { | ||
return COORDINATION_METADATA_FORMAT.serialize( | ||
coordinationMetadata, | ||
generateBlobFileName(), | ||
getCompressor(), | ||
RemoteClusterStateUtils.FORMAT_PARAMS | ||
).streamInput(); | ||
} | ||
|
||
@Override | ||
public CoordinationMetadata deserialize(final InputStream inputStream) throws IOException { | ||
return COORDINATION_METADATA_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); | ||
} | ||
|
||
@Override | ||
public UploadedMetadata getUploadedMetadata() { | ||
assert blobName != null; | ||
return new UploadedMetadataAttribute(COORDINATION_METADATA, blobName); | ||
} | ||
} |
Oops, something went wrong.