Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rs impl #14

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.common.remote;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
Expand All @@ -18,6 +19,7 @@
* @param <T> The object type which can be uploaded to or downloaded from remote storage.
* @param <U> The wrapper entity which provides methods for serializing/deserializing entity T.
*/
@ExperimentalApi
public interface RemoteWritableEntityStore<T, U extends RemoteWriteableEntity<T>> {

public void writeAsync(U entity, ActionListener<Void> listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.common.remote;

import org.opensearch.common.annotation.ExperimentalApi;

import java.io.IOException;
import java.io.InputStream;

Expand All @@ -17,6 +19,7 @@
*
* @param <T> The object type which can be uploaded to or downloaded from remote storage.
*/
@ExperimentalApi
public interface RemoteWriteableEntity<T> {
/**
* @return An InputStream created by serializing the entity T
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,30 @@

package org.opensearch.gateway.remote;

import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.core.xcontent.ToXContent;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;

/**
* Utility class for Remote Cluster State
*/
public class RemoteClusterStateUtils {

public static final String DELIMITER = "__";
public static final String PATH_DELIMITER = "/";
public static final String GLOBAL_METADATA_PATH_TOKEN = "global-metadata";
public static final String METADATA_NAME_FORMAT = "%s.dat";
public static final String METADATA_NAME_PLAIN_FORMAT = "%s";
public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1;

// ToXContent Params with gateway mode.
// We are using gateway context mode to persist all custom metadata.
public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(
Map.of(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY)
);

public static String encodeString(String content) {
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;

import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;

/**
* Wrapper class for uploading/downloading {@link ClusterMetadataManifest} to/from remote blob store
*/
public class RemoteClusterMetadataManifest extends AbstractRemoteWritableBlobEntity<ClusterMetadataManifest> {

public static final String MANIFEST = "manifest";
public static final int SPLITTED_MANIFEST_FILE_LENGTH = 6;

public static final String METADATA_MANIFEST_NAME_FORMAT = "%s";
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V3;
public static final String COMMITTED = "C";
public static final String PUBLISHED = "P";

/**
* Manifest format compatible with older codec v0, where codec version was missing.
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V0 =
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV0);
/**
* Manifest format compatible with older codec v1, where global metadata was missing.
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V1 =
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV1);

/**
* Manifest format compatible with codec v2, where we introduced codec versions/global metadata.
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>(
"cluster-metadata-manifest",
METADATA_MANIFEST_NAME_FORMAT,
ClusterMetadataManifest::fromXContent
);

private ClusterMetadataManifest clusterMetadataManifest;

public RemoteClusterMetadataManifest(
final ClusterMetadataManifest clusterMetadataManifest,
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry
) {
super(clusterUUID, compressor, namedXContentRegistry);
this.clusterMetadataManifest = clusterMetadataManifest;
}

public RemoteClusterMetadataManifest(
final String blobName,
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry
) {
super(clusterUUID, compressor, namedXContentRegistry);
this.blobName = blobName;
}

@Override
public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of(MANIFEST), MANIFEST);
}

@Override
public String generateBlobFileName() {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__
// <codec_version>
String blobFileName = String.join(
DELIMITER,
MANIFEST,
RemoteStoreUtils.invertLong(clusterMetadataManifest.getClusterTerm()),
RemoteStoreUtils.invertLong(clusterMetadataManifest.getStateVersion()),
(clusterMetadataManifest.isCommitted() ? COMMITTED : PUBLISHED),
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
String.valueOf(clusterMetadataManifest.getCodecVersion())
// Keep the codec version at last place only, during we read last place to determine codec version.
);
this.blobFileName = blobFileName;
return blobFileName;
}

@Override
public UploadedMetadata getUploadedMetadata() {
assert blobName != null;
return new UploadedMetadataAttribute(MANIFEST, blobName);
}

@Override
public InputStream serialize() throws IOException {
return CLUSTER_METADATA_MANIFEST_FORMAT.serialize(
clusterMetadataManifest,
generateBlobFileName(),
getCompressor(),
RemoteClusterStateUtils.FORMAT_PARAMS
).streamInput();
}

@Override
public ClusterMetadataManifest deserialize(final InputStream inputStream) throws IOException {
ChecksumBlobStoreFormat<ClusterMetadataManifest> blobStoreFormat = getClusterMetadataManifestBlobStoreFormat();
return blobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream));
}

private int getManifestCodecVersion() {
assert blobName != null;
String[] splitName = blobName.split(DELIMITER);
if (splitName.length == SPLITTED_MANIFEST_FILE_LENGTH) {
return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version.
} else if (splitName.length < SPLITTED_MANIFEST_FILE_LENGTH) { // Where codec is not part of file name, i.e. default codec version 0
// is used.
return ClusterMetadataManifest.CODEC_V0;
} else {
throw new IllegalArgumentException("Manifest file name is corrupted");
}
}

private ChecksumBlobStoreFormat<ClusterMetadataManifest> getClusterMetadataManifestBlobStoreFormat() {
long codecVersion = getManifestCodecVersion();
if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) {
return CLUSTER_METADATA_MANIFEST_FORMAT;
} else if (codecVersion == ClusterMetadataManifest.CODEC_V1) {
return CLUSTER_METADATA_MANIFEST_FORMAT_V1;
} else if (codecVersion == ClusterMetadataManifest.CODEC_V0) {
return CLUSTER_METADATA_MANIFEST_FORMAT_V0;
}
throw new IllegalArgumentException("Cluster metadata manifest file is corrupted, don't have valid codec version");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;

import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_PLAIN_FORMAT;

/**
* Wrapper class for uploading/downloading {@link CoordinationMetadata} to/from remote blob store
*/
public class RemoteCoordinationMetadata extends AbstractRemoteWritableBlobEntity<CoordinationMetadata> {

public static final String COORDINATION_METADATA = "coordination";
public static final ChecksumBlobStoreFormat<CoordinationMetadata> COORDINATION_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"coordination",
METADATA_NAME_PLAIN_FORMAT,
CoordinationMetadata::fromXContent
);

private CoordinationMetadata coordinationMetadata;
private long metadataVersion;

public RemoteCoordinationMetadata(
final CoordinationMetadata coordinationMetadata,
final long metadataVersion,
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry
) {
super(clusterUUID, compressor, namedXContentRegistry);
this.coordinationMetadata = coordinationMetadata;
this.metadataVersion = metadataVersion;
}

public RemoteCoordinationMetadata(
final String blobName,
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry
) {
super(clusterUUID, compressor, namedXContentRegistry);
this.blobName = blobName;
}

@Override
public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of("global-metadata"), COORDINATION_METADATA);
}

@Override
public String generateBlobFileName() {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/<componentPrefix>__<inverted_metadata_version>__<inverted__timestamp>__<codec_version>
String blobFileName = String.join(
DELIMITER,
getBlobPathParameters().getFilePrefix(),
RemoteStoreUtils.invertLong(metadataVersion),
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION)
);
this.blobFileName = blobFileName;
return blobFileName;
}

@Override
public InputStream serialize() throws IOException {
return COORDINATION_METADATA_FORMAT.serialize(
coordinationMetadata,
generateBlobFileName(),
getCompressor(),
RemoteClusterStateUtils.FORMAT_PARAMS
).streamInput();
}

@Override
public CoordinationMetadata deserialize(final InputStream inputStream) throws IOException {
return COORDINATION_METADATA_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream));
}

@Override
public UploadedMetadata getUploadedMetadata() {
assert blobName != null;
return new UploadedMetadataAttribute(COORDINATION_METADATA, blobName);
}
}
Loading