From 80623409ab5c1493393b67181dd0046dc3fee563 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Fri, 9 Aug 2024 14:55:03 +0530 Subject: [PATCH 01/12] Add timestamp pinning service and scheduler to update in-memory state Signed-off-by: Sachin Kale --- .../common/settings/ClusterSettings.java | 3 + .../remote/model/RemotePinnedTimestamps.java | 132 +++++++++ .../RemoteStorePinnedTimestampsBlobStore.java | 41 +++ .../main/java/org/opensearch/node/Node.java | 20 +- .../RemoteStorePinnedTimestampService.java | 279 ++++++++++++++++++ .../snapshots/SnapshotsService.java | 15 + .../model/RemotePinnedTimestampsTests.java | 105 +++++++ 7 files changed, 594 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java create mode 100644 server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/model/RemotePinnedTimestampsTests.java diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index a73e5d44b7e02..7baae17dd77cd 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -142,6 +142,7 @@ import org.opensearch.node.Node.DiscoverySettings; import org.opensearch.node.NodeRoleSettings; import org.opensearch.node.remotestore.RemoteStoreNodeService; +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.node.resource.tracker.ResourceTrackerSettings; import org.opensearch.persistent.PersistentTasksClusterService; import org.opensearch.persistent.decider.EnableAssignmentDecider; @@ -760,6 +761,8 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA, SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING, + RemoteStorePinnedTimestampService.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL, + // Composite index settings CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java new file mode 100644 index 0000000000000..d751aad995c16 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java @@ -0,0 +1,132 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.AbstractDiffable; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.common.io.Streams; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; + +/** + * Wrapper class for uploading/downloading {@link RemotePinnedTimestamps} to/from remote blob store + * + * @opensearch.internal + */ +public class RemotePinnedTimestamps extends AbstractRemoteWritableBlobEntity { + private static final Logger logger = LogManager.getLogger(RemotePinnedTimestamps.class); + + public static class PinnedTimestamps extends AbstractDiffable { + private final Map> pinnedTimestampPinningEntityMap; + + public PinnedTimestamps(Map> pinnedTimestampPinningEntityMap) { + this.pinnedTimestampPinningEntityMap = new ConcurrentHashMap<>(pinnedTimestampPinningEntityMap); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(pinnedTimestampPinningEntityMap, StreamOutput::writeLong, StreamOutput::writeStringCollection); + } + + public static PinnedTimestamps readFrom(StreamInput in) throws IOException { + return new PinnedTimestamps(in.readMap(StreamInput::readLong, StreamInput::readStringList)); + } + + public void pin(Long timestamp, String pinningEntity) { + logger.debug("Pinning timestamp = {} against entity = {}", timestamp, pinningEntity); + pinnedTimestampPinningEntityMap.computeIfAbsent(timestamp, k -> new ArrayList<>()).add(pinningEntity); + } + + public void unpin(Long timestamp, String pinningEntity) { + logger.debug("Unpinning timestamp = {} against entity = {}", timestamp, pinningEntity); + pinnedTimestampPinningEntityMap.computeIfPresent(timestamp, (k, v) -> { + v.remove(pinningEntity); + return v.isEmpty() ? null : v; + }); + } + + public Map> getPinnedTimestampPinningEntityMap() { + return new HashMap<>(pinnedTimestampPinningEntityMap); + } + } + + public static final String PINNED_TIMESTAMPS = "pinned_timestamps"; + public static final ChecksumWritableBlobStoreFormat PINNED_TIMESTAMPS_FORMAT = new ChecksumWritableBlobStoreFormat<>( + PINNED_TIMESTAMPS, + PinnedTimestamps::readFrom + ); + + private PinnedTimestamps pinnedTimestamps; + + public RemotePinnedTimestamps(String clusterUUID, Compressor compressor, NamedXContentRegistry namedXContentRegistry) { + super(clusterUUID, compressor, namedXContentRegistry); + pinnedTimestamps = new PinnedTimestamps(new HashMap<>()); + } + + @Override + public BlobPathParameters getBlobPathParameters() { + return new BlobPathParameters(List.of(PINNED_TIMESTAMPS), PINNED_TIMESTAMPS); + } + + @Override + public String getType() { + return PINNED_TIMESTAMPS; + } + + @Override + public String generateBlobFileName() { + return this.blobFileName = String.join(DELIMITER, PINNED_TIMESTAMPS, RemoteStoreUtils.invertLong(System.currentTimeMillis())); + } + + @Override + public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { + return null; + } + + @Override + public InputStream serialize() throws IOException { + return PINNED_TIMESTAMPS_FORMAT.serialize(pinnedTimestamps, generateBlobFileName(), getCompressor()).streamInput(); + } + + @Override + public PinnedTimestamps deserialize(InputStream inputStream) throws IOException { + return PINNED_TIMESTAMPS_FORMAT.deserialize(blobName, Streams.readFully(inputStream)); + } + + public void setBlobFileName(String blobFileName) { + this.blobFileName = blobFileName; + } + + public void setPinnedTimestamps(PinnedTimestamps pinnedTimestamps) { + this.pinnedTimestamps = pinnedTimestamps; + } + + public PinnedTimestamps getPinnedTimestamps() { + return pinnedTimestamps; + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java new file mode 100644 index 0000000000000..76d2eaca3e795 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +/** + * Extends the RemoteClusterStateBlobStore to support {@link RemotePinnedTimestamps} + */ +public class RemoteStorePinnedTimestampsBlobStore extends RemoteClusterStateBlobStore< + RemotePinnedTimestamps.PinnedTimestamps, + RemotePinnedTimestamps> { + + private final BlobStoreRepository blobStoreRepository; + + public RemoteStorePinnedTimestampsBlobStore( + BlobStoreTransferService blobStoreTransferService, + BlobStoreRepository blobStoreRepository, + String clusterName, + ThreadPool threadPool, + String executor + ) { + super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool, executor); + this.blobStoreRepository = blobStoreRepository; + } + + @Override + public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity obj) { + return blobStoreRepository.basePath(); + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index cbed8dfea8cc4..9c4627d5bef38 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -185,6 +185,7 @@ import org.opensearch.monitor.fs.FsProbe; import org.opensearch.monitor.jvm.JvmInfo; import org.opensearch.node.remotestore.RemoteStoreNodeService; +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.node.resource.tracker.NodeResourceUsageTracker; import org.opensearch.persistent.PersistentTasksClusterService; import org.opensearch.persistent.PersistentTasksExecutor; @@ -784,6 +785,7 @@ protected Node( final RemoteClusterStateService remoteClusterStateService; final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; final RemoteIndexPathUploader remoteIndexPathUploader; + final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService; if (isRemoteStoreClusterStateEnabled(settings)) { remoteIndexPathUploader = new RemoteIndexPathUploader( threadPool, @@ -801,11 +803,19 @@ protected Node( List.of(remoteIndexPathUploader), namedWriteableRegistry ); + remoteStorePinnedTimestampService = new RemoteStorePinnedTimestampService( + repositoriesServiceReference::get, + settings, + threadPool, + clusterService + ); + resourcesToClose.add(remoteStorePinnedTimestampService); remoteClusterStateCleanupManager = remoteClusterStateService.getCleanupManager(); } else { remoteClusterStateService = null; remoteIndexPathUploader = null; remoteClusterStateCleanupManager = null; + remoteStorePinnedTimestampService = null; } // collect engine factory providers from plugins @@ -1170,7 +1180,8 @@ protected Node( clusterModule.getIndexNameExpressionResolver(), repositoryService, transportService, - actionModule.getActionFilters() + actionModule.getActionFilters(), + remoteStorePinnedTimestampService ); SnapshotShardsService snapshotShardsService = new SnapshotShardsService( settings, @@ -1416,6 +1427,7 @@ protected Node( b.bind(MetricsRegistry.class).toInstance(metricsRegistry); b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); b.bind(RemoteIndexPathUploader.class).toProvider(() -> remoteIndexPathUploader); + b.bind(RemoteStorePinnedTimestampService.class).toProvider(() -> remoteStorePinnedTimestampService); b.bind(RemoteClusterStateCleanupManager.class).toProvider(() -> remoteClusterStateCleanupManager); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker); @@ -1569,6 +1581,12 @@ public Node start() throws NodeValidationException { if (remoteIndexPathUploader != null) { remoteIndexPathUploader.start(); } + final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = injector.getInstance( + RemoteStorePinnedTimestampService.class + ); + if (remoteStorePinnedTimestampService != null) { + remoteStorePinnedTimestampService.start(); + } // Load (and maybe upgrade) the metadata stored on disk final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class); gatewayMetaState.start( diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java new file mode 100644 index 0000000000000..03a7597aeb581 --- /dev/null +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -0,0 +1,279 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.node.remotestore; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.AbstractAsyncTask; +import org.opensearch.core.action.ActionListener; +import org.opensearch.gateway.remote.model.RemotePinnedTimestamps; +import org.opensearch.gateway.remote.model.RemoteStorePinnedTimestampsBlobStore; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.node.Node; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; + +/** + * Service for managing pinned timestamps in a remote store. + * This service handles pinning and unpinning of timestamps, as well as periodic updates of the pinned timestamps set. + * + * @opensearch.internal + */ +public class RemoteStorePinnedTimestampService implements Closeable { + private static final Logger logger = LogManager.getLogger(RemoteStorePinnedTimestampService.class); + private Tuple> pinnedTimestampsSet = new Tuple<>(-1L, Set.of()); + public static final int PINNED_TIMESTAMP_FILES_TO_KEEP = 5; + + private final Supplier repositoriesService; + private final Settings settings; + private final ThreadPool threadPool; + private final ClusterService clusterService; + private BlobStoreRepository blobStoreRepository; + private BlobStoreTransferService blobStoreTransferService; + private RemoteStorePinnedTimestampsBlobStore pinnedTimestampsBlobStore; + private AsyncUpdatePinnedTimestampTask asyncUpdatePinnedTimestampTask; + private volatile TimeValue pinnedTimestampsSchedulerInterval; + + /** + * Controls pinned timestamp scheduler interval + */ + public static final Setting CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL = Setting.timeSetting( + "cluster.remote_store.pinned_timestamps.scheduler_interval", + TimeValue.timeValueMinutes(3), + TimeValue.timeValueMinutes(1), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + public RemoteStorePinnedTimestampService( + Supplier repositoriesService, + Settings settings, + ThreadPool threadPool, + ClusterService clusterService + ) { + this.repositoriesService = repositoriesService; + this.settings = settings; + this.threadPool = threadPool; + this.clusterService = clusterService; + + pinnedTimestampsSchedulerInterval = CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL.get(settings); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL, + this::setPinnedTimestampsSchedulerInterval + ); + } + + /** + * Starts the RemoteStorePinnedTimestampService. + * This method validates the remote store configuration, initializes components, + * and starts the asynchronous update task. + */ + public void start() { + validateRemoteStoreConfiguration(); + initializeComponents(); + startAsyncUpdateTask(); + } + + private void validateRemoteStoreConfiguration() { + assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled"; + final String remoteStoreRepo = settings.get( + Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY + ); + assert remoteStoreRepo != null : "Remote Cluster State repository is not configured"; + final Repository repository = repositoriesService.get().repository(remoteStoreRepo); + assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; + blobStoreRepository = (BlobStoreRepository) repository; + } + + private void initializeComponents() { + String clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value(); + blobStoreTransferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), this.threadPool); + pinnedTimestampsBlobStore = new RemoteStorePinnedTimestampsBlobStore( + blobStoreTransferService, + blobStoreRepository, + clusterName, + this.threadPool, + ThreadPool.Names.REMOTE_STATE_READ + ); + } + + private void startAsyncUpdateTask() { + asyncUpdatePinnedTimestampTask = new AsyncUpdatePinnedTimestampTask(logger, threadPool, pinnedTimestampsSchedulerInterval, true); + } + + /** + * Pins a timestamp in the remote store. + * + * @param timestamp The timestamp to be pinned + * @param pinningEntity The entity responsible for pinning the timestamp + * @param listener A listener to be notified when the pinning operation completes + * @throws IOException If an I/O error occurs during the pinning process + * @throws IllegalArgumentException If the timestamp is less than the current time minus one second + */ + public void pinTimestamp(long timestamp, String pinningEntity, ActionListener listener) throws IOException { + if (timestamp < System.currentTimeMillis() - 1000) { + throw new IllegalArgumentException("Timestamp to be pinned is less than current timestamp"); + } + updatePinning(pinnedTimestamps -> pinnedTimestamps.pin(timestamp, pinningEntity), listener); + } + + /** + * Unpins a timestamp from the remote store. + * + * @param timestamp The timestamp to be unpinned + * @param pinningEntity The entity responsible for unpinning the timestamp + * @param listener A listener to be notified when the unpinning operation completes + */ + public void unpinTimestamp(long timestamp, String pinningEntity, ActionListener listener) { + updatePinning(pinnedTimestamps -> pinnedTimestamps.unpin(timestamp, pinningEntity), listener); + } + + private void updatePinning(Consumer updateConsumer, ActionListener listener) { + RemotePinnedTimestamps remotePinnedTimestamps = new RemotePinnedTimestamps( + clusterService.state().metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ); + BlobPath path = pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps); + blobStoreTransferService.listAllInSortedOrder(path, remotePinnedTimestamps.getType(), Integer.MAX_VALUE, new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + RemotePinnedTimestamps.PinnedTimestamps pinnedTimestamps = remotePinnedTimestamps.getPinnedTimestamps(); + if (blobMetadata.isEmpty() == false) { + pinnedTimestamps = readExistingPinnedTimestamps(blobMetadata.get(0).name(), remotePinnedTimestamps); + } + updateConsumer.accept(pinnedTimestamps); + remotePinnedTimestamps.setPinnedTimestamps(pinnedTimestamps); + pinnedTimestampsBlobStore.writeAsync(remotePinnedTimestamps, listener); + + // Delete older pinnedTimestamp files + if (blobMetadata.size() > PINNED_TIMESTAMP_FILES_TO_KEEP) { + List oldFilesToBeDeleted = blobMetadata.subList(PINNED_TIMESTAMP_FILES_TO_KEEP, blobMetadata.size()) + .stream() + .map(BlobMetadata::name) + .collect(Collectors.toList()); + try { + blobStoreTransferService.deleteBlobs( + pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps), + oldFilesToBeDeleted + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + + private RemotePinnedTimestamps.PinnedTimestamps readExistingPinnedTimestamps( + String blobFilename, + RemotePinnedTimestamps remotePinnedTimestamps + ) { + remotePinnedTimestamps.setBlobFileName(blobFilename); + remotePinnedTimestamps.setFullBlobName(pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps)); + try { + return pinnedTimestampsBlobStore.read(remotePinnedTimestamps); + } catch (IOException e) { + throw new RuntimeException("Failed to read existing pinned timestamps", e); + } + } + + @Override + public void close() throws IOException { + asyncUpdatePinnedTimestampTask.close(); + } + + public void setPinnedTimestampsSchedulerInterval(TimeValue pinnedTimestampsSchedulerInterval) { + this.pinnedTimestampsSchedulerInterval = pinnedTimestampsSchedulerInterval; + rescheduleAsyncUpdatePinnedTimestampTask(); + } + + private void rescheduleAsyncUpdatePinnedTimestampTask() { + if (pinnedTimestampsSchedulerInterval != null) { + pinnedTimestampsSet = new Tuple<>(-1L, Set.of()); + asyncUpdatePinnedTimestampTask.close(); + startAsyncUpdateTask(); + } + } + + /** + * Inner class for asynchronously updating the pinned timestamp set. + */ + private final class AsyncUpdatePinnedTimestampTask extends AbstractAsyncTask { + private AsyncUpdatePinnedTimestampTask(Logger logger, ThreadPool threadPool, TimeValue interval, boolean autoReschedule) { + super(logger, threadPool, interval, autoReschedule); + rescheduleIfNecessary(); + } + + @Override + protected boolean mustReschedule() { + return true; + } + + @Override + protected void runInternal() { + long triggerTimestamp = System.currentTimeMillis(); + RemotePinnedTimestamps remotePinnedTimestamps = new RemotePinnedTimestamps( + clusterService.state().metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ); + BlobPath path = pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps); + blobStoreTransferService.listAllInSortedOrder(path, remotePinnedTimestamps.getType(), 1, new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + if (blobMetadata.isEmpty()) { + return; + } + RemotePinnedTimestamps.PinnedTimestamps pinnedTimestamps = readExistingPinnedTimestamps( + blobMetadata.get(0).name(), + remotePinnedTimestamps + ); + logger.debug( + "Fetched pinned timestamps from remote store: {} - {}", + triggerTimestamp, + pinnedTimestamps.getPinnedTimestampPinningEntityMap().keySet() + ); + pinnedTimestampsSet = new Tuple<>(triggerTimestamp, pinnedTimestamps.getPinnedTimestampPinningEntityMap().keySet()); + } + + @Override + public void onFailure(Exception e) { + logger.error("Exception while listing pinned timestamp files", e); + } + }); + } + } +} diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index acc2dc83749cd..55ae2f9336ed6 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -92,6 +92,7 @@ import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -180,6 +181,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final UpdateSnapshotStatusAction updateSnapshotStatusHandler; private final TransportService transportService; + private final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService; private final OngoingRepositoryOperations repositoryOperations = new OngoingRepositoryOperations(); @@ -209,6 +211,18 @@ public SnapshotsService( RepositoriesService repositoriesService, TransportService transportService, ActionFilters actionFilters + ) { + this(settings, clusterService, indexNameExpressionResolver, repositoriesService, transportService, actionFilters, null); + } + + public SnapshotsService( + Settings settings, + ClusterService clusterService, + IndexNameExpressionResolver indexNameExpressionResolver, + RepositoriesService repositoriesService, + TransportService transportService, + ActionFilters actionFilters, + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService ) { this.clusterService = clusterService; this.indexNameExpressionResolver = indexNameExpressionResolver; @@ -216,6 +230,7 @@ public SnapshotsService( this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService); this.threadPool = transportService.getThreadPool(); this.transportService = transportService; + this.remoteStorePinnedTimestampService = remoteStorePinnedTimestampService; // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction( diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemotePinnedTimestampsTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemotePinnedTimestampsTests.java new file mode 100644 index 0000000000000..b63b07be7fe81 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemotePinnedTimestampsTests.java @@ -0,0 +1,105 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.mock; + +public class RemotePinnedTimestampsTests extends OpenSearchTestCase { + + private RemotePinnedTimestamps remotePinnedTimestamps; + + @Before + public void setup() { + Compressor compressor = new DeflateCompressor(); + NamedXContentRegistry mockNamedXContentRegistry = mock(NamedXContentRegistry.class); + remotePinnedTimestamps = new RemotePinnedTimestamps("testClusterUUID", compressor, mockNamedXContentRegistry); + } + + public void testGenerateBlobFileName() { + String fileName = remotePinnedTimestamps.generateBlobFileName(); + assertTrue(fileName.startsWith(RemotePinnedTimestamps.PINNED_TIMESTAMPS)); + assertEquals(fileName, remotePinnedTimestamps.getBlobFileName()); + } + + public void testSerializeAndDeserialize() throws IOException { + RemotePinnedTimestamps.PinnedTimestamps pinnedTimestamps = new RemotePinnedTimestamps.PinnedTimestamps(new HashMap<>()); + pinnedTimestamps.pin(1000L, "entity1"); + pinnedTimestamps.pin(2000L, "entity2"); + remotePinnedTimestamps.setPinnedTimestamps(pinnedTimestamps); + + InputStream serialized = remotePinnedTimestamps.serialize(); + RemotePinnedTimestamps.PinnedTimestamps deserialized = remotePinnedTimestamps.deserialize(serialized); + + assertEquals(pinnedTimestamps.getPinnedTimestampPinningEntityMap(), deserialized.getPinnedTimestampPinningEntityMap()); + } + + public void testSetAndGetPinnedTimestamps() { + RemotePinnedTimestamps.PinnedTimestamps pinnedTimestamps = new RemotePinnedTimestamps.PinnedTimestamps(new HashMap<>()); + remotePinnedTimestamps.setPinnedTimestamps(pinnedTimestamps); + assertEquals(pinnedTimestamps, remotePinnedTimestamps.getPinnedTimestamps()); + } + + public void testPinnedTimestampsPin() { + RemotePinnedTimestamps.PinnedTimestamps pinnedTimestamps = new RemotePinnedTimestamps.PinnedTimestamps(new HashMap<>()); + pinnedTimestamps.pin(1000L, "entity1"); + pinnedTimestamps.pin(1000L, "entity2"); + pinnedTimestamps.pin(2000L, "entity3"); + + Map> expected = new HashMap<>(); + expected.put(1000L, Arrays.asList("entity1", "entity2")); + expected.put(2000L, List.of("entity3")); + + assertEquals(expected, pinnedTimestamps.getPinnedTimestampPinningEntityMap()); + } + + public void testPinnedTimestampsUnpin() { + RemotePinnedTimestamps.PinnedTimestamps pinnedTimestamps = new RemotePinnedTimestamps.PinnedTimestamps(new HashMap<>()); + pinnedTimestamps.pin(1000L, "entity1"); + pinnedTimestamps.pin(1000L, "entity2"); + pinnedTimestamps.pin(2000L, "entity3"); + + pinnedTimestamps.unpin(1000L, "entity1"); + pinnedTimestamps.unpin(2000L, "entity3"); + + Map> expected = new HashMap<>(); + expected.put(1000L, List.of("entity2")); + + assertEquals(expected, pinnedTimestamps.getPinnedTimestampPinningEntityMap()); + } + + public void testPinnedTimestampsReadFromAndWriteTo() throws IOException { + RemotePinnedTimestamps.PinnedTimestamps original = new RemotePinnedTimestamps.PinnedTimestamps(new HashMap<>()); + original.pin(1000L, "entity1"); + original.pin(2000L, "entity2"); + + BytesStreamOutput out = new BytesStreamOutput(); + original.writeTo(out); + + StreamInput in = new BytesStreamInput(out.bytes().toBytesRef().bytes); + RemotePinnedTimestamps.PinnedTimestamps deserialized = RemotePinnedTimestamps.PinnedTimestamps.readFrom(in); + + assertEquals(original.getPinnedTimestampPinningEntityMap(), deserialized.getPinnedTimestampPinningEntityMap()); + } +} From 5815d58905c559602993a5e5bc6c2f772428ecf6 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Fri, 9 Aug 2024 19:13:54 +0530 Subject: [PATCH 02/12] Add IT Signed-off-by: Sachin Kale --- .../RemoteStorePinnedTimestampsIT.java | 88 +++++++++++++++++++ .../RemoteStorePinnedTimestampsBlobStore.java | 2 +- .../RemoteStorePinnedTimestampService.java | 7 +- 3 files changed, 95 insertions(+), 2 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java new file mode 100644 index 0000000000000..003c49f25c723 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java @@ -0,0 +1,88 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.Set; +import org.opensearch.common.collect.Tuple; + +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase { + static final String INDEX_NAME = "remote-store-test-idx-1"; + + ActionListener noOpActionListener = new ActionListener<>() { + @Override + public void onResponse(Void unused) {} + + @Override + public void onFailure(Exception e) {} + }; + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build(); + } + + public void testTimestampPinUnpin() throws Exception { + prepareCluster(1, 1, INDEX_NAME, 0, 2); + ensureGreen(INDEX_NAME); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(RemoteStorePinnedTimestampService.class, primaryNodeName(INDEX_NAME)); + + Tuple> pinnedTimestampWithFetchTimestamp = RemoteStorePinnedTimestampService.getPinnedTimestamps(); + long lastFetchTimestamp = pinnedTimestampWithFetchTimestamp.v1(); + assertEquals(-1L, lastFetchTimestamp); + assertEquals(Set.of(), pinnedTimestampWithFetchTimestamp.v2()); + + assertThrows(IllegalArgumentException.class, () -> remoteStorePinnedTimestampService.pinTimestamp(1234L, "ss1", noOpActionListener)); + + long timestamp1 = System.currentTimeMillis() + 30000L; + long timestamp2 = System.currentTimeMillis() + 60000L; + long timestamp3 = System.currentTimeMillis() + 900000L; + remoteStorePinnedTimestampService.pinTimestamp(timestamp1, "ss2", noOpActionListener); + remoteStorePinnedTimestampService.pinTimestamp(timestamp2, "ss3", noOpActionListener); + remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss4", noOpActionListener); + + remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueSeconds(1)); + + assertBusy(() -> { + Tuple> pinnedTimestampWithFetchTimestamp_2 = RemoteStorePinnedTimestampService.getPinnedTimestamps(); + long lastFetchTimestamp_2 = pinnedTimestampWithFetchTimestamp_2.v1(); + assertTrue(lastFetchTimestamp_2 != -1); + assertEquals(Set.of(timestamp1, timestamp2, timestamp3), pinnedTimestampWithFetchTimestamp_2.v2()); + }); + + remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueMinutes(3)); + + // This should be a no-op as pinning entity is different + remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "no-snapshot", noOpActionListener); + // Unpinning already pinned entity + remoteStorePinnedTimestampService.unpinTimestamp(timestamp2, "ss3", noOpActionListener); + // Adding different entity to already pinned timestamp + remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss5", noOpActionListener); + + remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueSeconds(1)); + + assertBusy(() -> { + Tuple> pinnedTimestampWithFetchTimestamp_3 = RemoteStorePinnedTimestampService.getPinnedTimestamps(); + long lastFetchTimestamp_3 = pinnedTimestampWithFetchTimestamp_3.v1(); + assertTrue(lastFetchTimestamp_3 != -1); + assertEquals(Set.of(timestamp1, timestamp3), pinnedTimestampWithFetchTimestamp_3.v2()); + }); + + remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueMinutes(3)); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java index 76d2eaca3e795..8bff4c958a9f8 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java @@ -36,6 +36,6 @@ public RemoteStorePinnedTimestampsBlobStore( @Override public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity obj) { - return blobStoreRepository.basePath(); + return blobStoreRepository.basePath().add("pinned_timestamps"); } } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index 03a7597aeb581..ffa12c3ffc682 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -47,7 +47,7 @@ */ public class RemoteStorePinnedTimestampService implements Closeable { private static final Logger logger = LogManager.getLogger(RemoteStorePinnedTimestampService.class); - private Tuple> pinnedTimestampsSet = new Tuple<>(-1L, Set.of()); + private static Tuple> pinnedTimestampsSet = new Tuple<>(-1L, Set.of()); public static final int PINNED_TIMESTAMP_FILES_TO_KEEP = 5; private final Supplier repositoriesService; @@ -215,6 +215,7 @@ public void close() throws IOException { asyncUpdatePinnedTimestampTask.close(); } + // Visible for testing public void setPinnedTimestampsSchedulerInterval(TimeValue pinnedTimestampsSchedulerInterval) { this.pinnedTimestampsSchedulerInterval = pinnedTimestampsSchedulerInterval; rescheduleAsyncUpdatePinnedTimestampTask(); @@ -228,6 +229,10 @@ private void rescheduleAsyncUpdatePinnedTimestampTask() { } } + public static Tuple> getPinnedTimestamps() { + return pinnedTimestampsSet; + } + /** * Inner class for asynchronously updating the pinned timestamp set. */ From 6af93757ba94a5bf855244e94ed8f8371c69390d Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 14 Aug 2024 10:30:46 +0530 Subject: [PATCH 03/12] Changes to use RemoteWriteableEntityBlobStore Signed-off-by: Sachin Kale --- .../RemoteStorePinnedTimestampsIT.java | 12 +++++++++--- .../remote/model/RemotePinnedTimestamps.java | 15 ++++----------- .../RemoteStorePinnedTimestampsBlobStore.java | 12 +++++++----- .../RemoteStorePinnedTimestampService.java | 6 ++---- .../remote/model/RemotePinnedTimestampsTests.java | 6 +----- 5 files changed, 23 insertions(+), 28 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java index 003c49f25c723..8dceceb82cdfb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java @@ -8,6 +8,7 @@ package org.opensearch.remotestore; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; @@ -15,7 +16,6 @@ import org.opensearch.test.OpenSearchIntegTestCase; import java.util.Set; -import org.opensearch.common.collect.Tuple; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; @@ -40,14 +40,20 @@ public void testTimestampPinUnpin() throws Exception { prepareCluster(1, 1, INDEX_NAME, 0, 2); ensureGreen(INDEX_NAME); - RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(RemoteStorePinnedTimestampService.class, primaryNodeName(INDEX_NAME)); + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); Tuple> pinnedTimestampWithFetchTimestamp = RemoteStorePinnedTimestampService.getPinnedTimestamps(); long lastFetchTimestamp = pinnedTimestampWithFetchTimestamp.v1(); assertEquals(-1L, lastFetchTimestamp); assertEquals(Set.of(), pinnedTimestampWithFetchTimestamp.v2()); - assertThrows(IllegalArgumentException.class, () -> remoteStorePinnedTimestampService.pinTimestamp(1234L, "ss1", noOpActionListener)); + assertThrows( + IllegalArgumentException.class, + () -> remoteStorePinnedTimestampService.pinTimestamp(1234L, "ss1", noOpActionListener) + ); long timestamp1 = System.currentTimeMillis() + 30000L; long timestamp2 = System.currentTimeMillis() + 60000L; diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java index d751aad995c16..6f870e8d1ef53 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java @@ -13,13 +13,11 @@ import org.opensearch.cluster.AbstractDiffable; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.common.io.Streams; -import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.common.remote.RemoteWriteableBlobEntity; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.compress.Compressor; -import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat; @@ -38,7 +36,7 @@ * * @opensearch.internal */ -public class RemotePinnedTimestamps extends AbstractRemoteWritableBlobEntity { +public class RemotePinnedTimestamps extends RemoteWriteableBlobEntity { private static final Logger logger = LogManager.getLogger(RemotePinnedTimestamps.class); public static class PinnedTimestamps extends AbstractDiffable { @@ -83,8 +81,8 @@ public Map> getPinnedTimestampPinningEntityMap() { private PinnedTimestamps pinnedTimestamps; - public RemotePinnedTimestamps(String clusterUUID, Compressor compressor, NamedXContentRegistry namedXContentRegistry) { - super(clusterUUID, compressor, namedXContentRegistry); + public RemotePinnedTimestamps(String clusterUUID, Compressor compressor) { + super(clusterUUID, compressor); pinnedTimestamps = new PinnedTimestamps(new HashMap<>()); } @@ -103,11 +101,6 @@ public String generateBlobFileName() { return this.blobFileName = String.join(DELIMITER, PINNED_TIMESTAMPS, RemoteStoreUtils.invertLong(System.currentTimeMillis())); } - @Override - public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { - return null; - } - @Override public InputStream serialize() throws IOException { return PINNED_TIMESTAMPS_FORMAT.serialize(pinnedTimestamps, generateBlobFileName(), getCompressor()).streamInput(); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java index 8bff4c958a9f8..2a65dd993d0af 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java @@ -9,7 +9,8 @@ package org.opensearch.gateway.remote.model; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.RemoteWriteableBlobEntity; +import org.opensearch.common.remote.RemoteWriteableEntityBlobStore; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; @@ -17,10 +18,11 @@ /** * Extends the RemoteClusterStateBlobStore to support {@link RemotePinnedTimestamps} */ -public class RemoteStorePinnedTimestampsBlobStore extends RemoteClusterStateBlobStore< +public class RemoteStorePinnedTimestampsBlobStore extends RemoteWriteableEntityBlobStore< RemotePinnedTimestamps.PinnedTimestamps, RemotePinnedTimestamps> { + public static final String PINNED_TIMESTAMPS_PATH_TOKEN = "pinned_timestamps"; private final BlobStoreRepository blobStoreRepository; public RemoteStorePinnedTimestampsBlobStore( @@ -30,12 +32,12 @@ public RemoteStorePinnedTimestampsBlobStore( ThreadPool threadPool, String executor ) { - super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool, executor); + super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool, executor, PINNED_TIMESTAMPS_PATH_TOKEN); this.blobStoreRepository = blobStoreRepository; } @Override - public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity obj) { - return blobStoreRepository.basePath().add("pinned_timestamps"); + public BlobPath getBlobPathForUpload(final RemoteWriteableBlobEntity obj) { + return blobStoreRepository.basePath().add(PINNED_TIMESTAMPS_PATH_TOKEN); } } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index ffa12c3ffc682..8d065838f36b0 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -158,8 +158,7 @@ public void unpinTimestamp(long timestamp, String pinningEntity, ActionListener< private void updatePinning(Consumer updateConsumer, ActionListener listener) { RemotePinnedTimestamps remotePinnedTimestamps = new RemotePinnedTimestamps( clusterService.state().metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() + blobStoreRepository.getCompressor() ); BlobPath path = pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps); blobStoreTransferService.listAllInSortedOrder(path, remotePinnedTimestamps.getType(), Integer.MAX_VALUE, new ActionListener<>() { @@ -252,8 +251,7 @@ protected void runInternal() { long triggerTimestamp = System.currentTimeMillis(); RemotePinnedTimestamps remotePinnedTimestamps = new RemotePinnedTimestamps( clusterService.state().metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() + blobStoreRepository.getCompressor() ); BlobPath path = pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps); blobStoreTransferService.listAllInSortedOrder(path, remotePinnedTimestamps.getType(), 1, new ActionListener<>() { diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemotePinnedTimestampsTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemotePinnedTimestampsTests.java index b63b07be7fe81..309263a634265 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemotePinnedTimestampsTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemotePinnedTimestampsTests.java @@ -13,7 +13,6 @@ import org.opensearch.core.common.io.stream.BytesStreamInput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.compress.Compressor; -import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; @@ -24,8 +23,6 @@ import java.util.List; import java.util.Map; -import static org.mockito.Mockito.mock; - public class RemotePinnedTimestampsTests extends OpenSearchTestCase { private RemotePinnedTimestamps remotePinnedTimestamps; @@ -33,8 +30,7 @@ public class RemotePinnedTimestampsTests extends OpenSearchTestCase { @Before public void setup() { Compressor compressor = new DeflateCompressor(); - NamedXContentRegistry mockNamedXContentRegistry = mock(NamedXContentRegistry.class); - remotePinnedTimestamps = new RemotePinnedTimestamps("testClusterUUID", compressor, mockNamedXContentRegistry); + remotePinnedTimestamps = new RemotePinnedTimestamps("testClusterUUID", compressor); } public void testGenerateBlobFileName() { From 74c45ba62dacd4cfc7159318a20ed816a6ba0d85 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 14 Aug 2024 10:37:00 +0530 Subject: [PATCH 04/12] Fix super class for PinnedTimestamps Signed-off-by: Sachin Kale --- .../gateway/remote/model/RemotePinnedTimestamps.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java index 6f870e8d1ef53..ede434bfa325b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java @@ -10,13 +10,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.cluster.AbstractDiffable; -import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.common.io.Streams; import org.opensearch.common.remote.BlobPathParameters; import org.opensearch.common.remote.RemoteWriteableBlobEntity; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.compress.Compressor; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat; @@ -39,7 +38,7 @@ public class RemotePinnedTimestamps extends RemoteWriteableBlobEntity { private static final Logger logger = LogManager.getLogger(RemotePinnedTimestamps.class); - public static class PinnedTimestamps extends AbstractDiffable { + public static class PinnedTimestamps implements Writeable { private final Map> pinnedTimestampPinningEntityMap; public PinnedTimestamps(Map> pinnedTimestampPinningEntityMap) { From 868782c55c147006183bb9f155fc0b666ec6e6e1 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 14 Aug 2024 10:44:44 +0530 Subject: [PATCH 05/12] Address PR comments Signed-off-by: Sachin Kale --- .../org/opensearch/snapshots/SnapshotsService.java | 13 +------------ .../snapshots/SnapshotResiliencyTests.java | 3 ++- 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 55ae2f9336ed6..5e49208465dbb 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -204,17 +204,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private volatile int maxConcurrentOperations; - public SnapshotsService( - Settings settings, - ClusterService clusterService, - IndexNameExpressionResolver indexNameExpressionResolver, - RepositoriesService repositoriesService, - TransportService transportService, - ActionFilters actionFilters - ) { - this(settings, clusterService, indexNameExpressionResolver, repositoriesService, transportService, actionFilters, null); - } - public SnapshotsService( Settings settings, ClusterService clusterService, @@ -222,7 +211,7 @@ public SnapshotsService( RepositoriesService repositoriesService, TransportService transportService, ActionFilters actionFilters, - RemoteStorePinnedTimestampService remoteStorePinnedTimestampService + @Nullable RemoteStorePinnedTimestampService remoteStorePinnedTimestampService ) { this.clusterService = clusterService; this.indexNameExpressionResolver = indexNameExpressionResolver; diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 9c58fc8fde084..769dfeb37ff8d 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2015,7 +2015,8 @@ public void onFailure(final Exception e) { indexNameExpressionResolver, repositoriesService, transportService, - actionFilters + actionFilters, + null ); nodeEnv = new NodeEnvironment(settings, environment); final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(Collections.emptyList()); From 8ae64b7846b1a3046df4aa847981c99b4f50cd86 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 14 Aug 2024 10:56:34 +0530 Subject: [PATCH 06/12] Address PR comments Signed-off-by: Sachin Kale --- .../node/remotestore/RemoteStorePinnedTimestampService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index 8d065838f36b0..fb1fb26ce6da7 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -184,7 +184,7 @@ public void onResponse(List blobMetadata) { oldFilesToBeDeleted ); } catch (IOException e) { - throw new RuntimeException(e); + logger.error("Exception while deleting stale pinned timestamps", e); } } } From 4104939355ee28fe7c2003047e494dbb2f3162ef Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 14 Aug 2024 14:34:58 +0530 Subject: [PATCH 07/12] Address PR comments Signed-off-by: Sachin Kale --- .../remotestore/RemoteStorePinnedTimestampService.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index fb1fb26ce6da7..a078ac53fbc56 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -67,8 +67,7 @@ public class RemoteStorePinnedTimestampService implements Closeable { "cluster.remote_store.pinned_timestamps.scheduler_interval", TimeValue.timeValueMinutes(3), TimeValue.timeValueMinutes(1), - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope ); public RemoteStorePinnedTimestampService( @@ -83,11 +82,6 @@ public RemoteStorePinnedTimestampService( this.clusterService = clusterService; pinnedTimestampsSchedulerInterval = CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL.get(settings); - clusterService.getClusterSettings() - .addSettingsUpdateConsumer( - CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL, - this::setPinnedTimestampsSchedulerInterval - ); } /** From eeeae206c7e52e57b1b655570015e09ee1eb3cf3 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 14 Aug 2024 14:42:16 +0530 Subject: [PATCH 08/12] Address PR comments Signed-off-by: Sachin Kale --- .../remotestore/RemoteStorePinnedTimestampService.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index a078ac53fbc56..582a0bf928718 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -49,6 +50,7 @@ public class RemoteStorePinnedTimestampService implements Closeable { private static final Logger logger = LogManager.getLogger(RemoteStorePinnedTimestampService.class); private static Tuple> pinnedTimestampsSet = new Tuple<>(-1L, Set.of()); public static final int PINNED_TIMESTAMP_FILES_TO_KEEP = 5; + public static final int TIMESTAMP_PINNING_PAST_BUFFER_IN_MILLIS = 10000; private final Supplier repositoriesService; private final Settings settings; @@ -131,8 +133,10 @@ private void startAsyncUpdateTask() { * @throws IOException If an I/O error occurs during the pinning process * @throws IllegalArgumentException If the timestamp is less than the current time minus one second */ - public void pinTimestamp(long timestamp, String pinningEntity, ActionListener listener) throws IOException { - if (timestamp < System.currentTimeMillis() - 1000) { + public void pinTimestamp(long timestamp, String pinningEntity, ActionListener listener) { + // If a caller uses current system time to pin the timestamp, following check will almost always fail. + // So, we allow pinning timestamp in the past upto some buffer + if (timestamp < TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - TIMESTAMP_PINNING_PAST_BUFFER_IN_MILLIS) { throw new IllegalArgumentException("Timestamp to be pinned is less than current timestamp"); } updatePinning(pinnedTimestamps -> pinnedTimestamps.pin(timestamp, pinningEntity), listener); From 296c86d8215566caebe5ab91dd0d0bfe174fc484 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 14 Aug 2024 14:51:04 +0530 Subject: [PATCH 09/12] Address PR comments Signed-off-by: Sachin Kale --- .../RemoteStorePinnedTimestampService.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index 582a0bf928718..57cf8e5495529 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -21,6 +21,7 @@ import org.opensearch.common.util.concurrent.AbstractAsyncTask; import org.opensearch.core.action.ActionListener; import org.opensearch.gateway.remote.model.RemotePinnedTimestamps; +import org.opensearch.gateway.remote.model.RemotePinnedTimestamps.PinnedTimestamps; import org.opensearch.gateway.remote.model.RemoteStorePinnedTimestampsBlobStore; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.node.Node; @@ -31,6 +32,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -153,7 +155,7 @@ public void unpinTimestamp(long timestamp, String pinningEntity, ActionListener< updatePinning(pinnedTimestamps -> pinnedTimestamps.unpin(timestamp, pinningEntity), listener); } - private void updatePinning(Consumer updateConsumer, ActionListener listener) { + private void updatePinning(Consumer updateConsumer, ActionListener listener) { RemotePinnedTimestamps remotePinnedTimestamps = new RemotePinnedTimestamps( clusterService.state().metadata().clusterUUID(), blobStoreRepository.getCompressor() @@ -162,7 +164,7 @@ private void updatePinning(Consumer upd blobStoreTransferService.listAllInSortedOrder(path, remotePinnedTimestamps.getType(), Integer.MAX_VALUE, new ActionListener<>() { @Override public void onResponse(List blobMetadata) { - RemotePinnedTimestamps.PinnedTimestamps pinnedTimestamps = remotePinnedTimestamps.getPinnedTimestamps(); + PinnedTimestamps pinnedTimestamps = new PinnedTimestamps(new HashMap<>()); if (blobMetadata.isEmpty() == false) { pinnedTimestamps = readExistingPinnedTimestamps(blobMetadata.get(0).name(), remotePinnedTimestamps); } @@ -194,10 +196,7 @@ public void onFailure(Exception e) { }); } - private RemotePinnedTimestamps.PinnedTimestamps readExistingPinnedTimestamps( - String blobFilename, - RemotePinnedTimestamps remotePinnedTimestamps - ) { + private PinnedTimestamps readExistingPinnedTimestamps(String blobFilename, RemotePinnedTimestamps remotePinnedTimestamps) { remotePinnedTimestamps.setBlobFileName(blobFilename); remotePinnedTimestamps.setFullBlobName(pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps)); try { @@ -258,10 +257,7 @@ public void onResponse(List blobMetadata) { if (blobMetadata.isEmpty()) { return; } - RemotePinnedTimestamps.PinnedTimestamps pinnedTimestamps = readExistingPinnedTimestamps( - blobMetadata.get(0).name(), - remotePinnedTimestamps - ); + PinnedTimestamps pinnedTimestamps = readExistingPinnedTimestamps(blobMetadata.get(0).name(), remotePinnedTimestamps); logger.debug( "Fetched pinned timestamps from remote store: {} - {}", triggerTimestamp, From bd569bebad373a7cfeba5a6a9aa16c8f89159064 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 14 Aug 2024 15:08:06 +0530 Subject: [PATCH 10/12] Address PR comments Signed-off-by: Sachin Kale --- .../remotestore/RemoteStorePinnedTimestampsIT.java | 8 -------- server/src/main/java/org/opensearch/node/Node.java | 13 ++++++++----- .../RemoteStorePinnedTimestampService.java | 6 +----- 3 files changed, 9 insertions(+), 18 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java index 8dceceb82cdfb..0bb53309f7a78 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java @@ -9,7 +9,6 @@ package org.opensearch.remotestore; import org.opensearch.common.collect.Tuple; -import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; @@ -17,8 +16,6 @@ import java.util.Set; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; - @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase { static final String INDEX_NAME = "remote-store-test-idx-1"; @@ -31,11 +28,6 @@ public void onResponse(Void unused) {} public void onFailure(Exception e) {} }; - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build(); - } - public void testTimestampPinUnpin() throws Exception { prepareCluster(1, 1, INDEX_NAME, 0, 2); ensureGreen(INDEX_NAME); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 9c4627d5bef38..8324d3b219d3f 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -785,7 +785,6 @@ protected Node( final RemoteClusterStateService remoteClusterStateService; final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; final RemoteIndexPathUploader remoteIndexPathUploader; - final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService; if (isRemoteStoreClusterStateEnabled(settings)) { remoteIndexPathUploader = new RemoteIndexPathUploader( threadPool, @@ -803,6 +802,14 @@ protected Node( List.of(remoteIndexPathUploader), namedWriteableRegistry ); + remoteClusterStateCleanupManager = remoteClusterStateService.getCleanupManager(); + } else { + remoteClusterStateService = null; + remoteIndexPathUploader = null; + remoteClusterStateCleanupManager = null; + } + final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService; + if (isRemoteStoreAttributePresent(settings)) { remoteStorePinnedTimestampService = new RemoteStorePinnedTimestampService( repositoriesServiceReference::get, settings, @@ -810,11 +817,7 @@ protected Node( clusterService ); resourcesToClose.add(remoteStorePinnedTimestampService); - remoteClusterStateCleanupManager = remoteClusterStateService.getCleanupManager(); } else { - remoteClusterStateService = null; - remoteIndexPathUploader = null; - remoteClusterStateCleanupManager = null; remoteStorePinnedTimestampService = null; } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index 57cf8e5495529..8723e55a6e52f 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -40,8 +40,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; - /** * Service for managing pinned timestamps in a remote store. * This service handles pinning and unpinning of timestamps, as well as periodic updates of the pinned timestamps set. @@ -100,9 +98,8 @@ public void start() { } private void validateRemoteStoreConfiguration() { - assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled"; final String remoteStoreRepo = settings.get( - Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY + Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY ); assert remoteStoreRepo != null : "Remote Cluster State repository is not configured"; final Repository repository = repositoriesService.get().repository(remoteStoreRepo); @@ -132,7 +129,6 @@ private void startAsyncUpdateTask() { * @param timestamp The timestamp to be pinned * @param pinningEntity The entity responsible for pinning the timestamp * @param listener A listener to be notified when the pinning operation completes - * @throws IOException If an I/O error occurs during the pinning process * @throws IllegalArgumentException If the timestamp is less than the current time minus one second */ public void pinTimestamp(long timestamp, String pinningEntity, ActionListener listener) { From 3bad7905157e4b8b36322109a243c0b251182945 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 14 Aug 2024 15:18:48 +0530 Subject: [PATCH 11/12] Fix javadocs Signed-off-by: Sachin Kale --- .../remote/model/RemotePinnedTimestamps.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java index ede434bfa325b..b3187c1bd2ec2 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java @@ -38,6 +38,10 @@ public class RemotePinnedTimestamps extends RemoteWriteableBlobEntity { private static final Logger logger = LogManager.getLogger(RemotePinnedTimestamps.class); + /** + * Represents a collection of pinned timestamps and their associated pinning entities. + * This class is thread-safe and implements the Writeable interface for serialization. + */ public static class PinnedTimestamps implements Writeable { private final Map> pinnedTimestampPinningEntityMap; @@ -54,11 +58,23 @@ public static PinnedTimestamps readFrom(StreamInput in) throws IOException { return new PinnedTimestamps(in.readMap(StreamInput::readLong, StreamInput::readStringList)); } + /** + * Pins a timestamp against a pinning entity. + * + * @param timestamp The timestamp to pin. + * @param pinningEntity The entity pinning the timestamp. + */ public void pin(Long timestamp, String pinningEntity) { logger.debug("Pinning timestamp = {} against entity = {}", timestamp, pinningEntity); pinnedTimestampPinningEntityMap.computeIfAbsent(timestamp, k -> new ArrayList<>()).add(pinningEntity); } + /** + * Unpins a timestamp for a specific pinning entity. + * + * @param timestamp The timestamp to unpin. + * @param pinningEntity The entity unpinning the timestamp. + */ public void unpin(Long timestamp, String pinningEntity) { logger.debug("Unpinning timestamp = {} against entity = {}", timestamp, pinningEntity); pinnedTimestampPinningEntityMap.computeIfPresent(timestamp, (k, v) -> { From 4212363ed57fbd77c71c6bbaa1a020b16cdbdcf3 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 14 Aug 2024 20:02:33 +0530 Subject: [PATCH 12/12] Address PR comments Signed-off-by: Sachin Kale --- .../remote/model/RemotePinnedTimestamps.java | 6 +- .../RemoteStorePinnedTimestampService.java | 115 +++++++++++++----- 2 files changed, 87 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java index b3187c1bd2ec2..030491cf8b7b9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java @@ -77,7 +77,11 @@ public void pin(Long timestamp, String pinningEntity) { */ public void unpin(Long timestamp, String pinningEntity) { logger.debug("Unpinning timestamp = {} against entity = {}", timestamp, pinningEntity); - pinnedTimestampPinningEntityMap.computeIfPresent(timestamp, (k, v) -> { + if (pinnedTimestampPinningEntityMap.containsKey(timestamp) == false + || pinnedTimestampPinningEntityMap.get(timestamp).contains(pinningEntity) == false) { + logger.warn("Timestamp: {} is not pinned by entity: {}", timestamp, pinningEntity); + } + pinnedTimestampPinningEntityMap.compute(timestamp, (k, v) -> { v.remove(pinningEntity); return v.isEmpty() ? null : v; }); diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index 8723e55a6e52f..35730a75a8142 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -35,7 +35,9 @@ import java.util.HashMap; import java.util.List; import java.util.Set; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -50,7 +52,6 @@ public class RemoteStorePinnedTimestampService implements Closeable { private static final Logger logger = LogManager.getLogger(RemoteStorePinnedTimestampService.class); private static Tuple> pinnedTimestampsSet = new Tuple<>(-1L, Set.of()); public static final int PINNED_TIMESTAMP_FILES_TO_KEEP = 5; - public static final int TIMESTAMP_PINNING_PAST_BUFFER_IN_MILLIS = 10000; private final Supplier repositoriesService; private final Settings settings; @@ -61,6 +62,7 @@ public class RemoteStorePinnedTimestampService implements Closeable { private RemoteStorePinnedTimestampsBlobStore pinnedTimestampsBlobStore; private AsyncUpdatePinnedTimestampTask asyncUpdatePinnedTimestampTask; private volatile TimeValue pinnedTimestampsSchedulerInterval; + private final Semaphore updateTimetampPinningSemaphore = new Semaphore(1); /** * Controls pinned timestamp scheduler interval @@ -72,6 +74,17 @@ public class RemoteStorePinnedTimestampService implements Closeable { Setting.Property.NodeScope ); + /** + * Controls allowed timestamp values to be pinned from past + */ + public static final Setting CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL = Setting.timeSetting( + "cluster.remote_store.pinned_timestamps.lookback_interval", + TimeValue.timeValueMinutes(1), + TimeValue.timeValueMinutes(1), + TimeValue.timeValueMinutes(5), + Setting.Property.NodeScope + ); + public RemoteStorePinnedTimestampService( Supplier repositoriesService, Settings settings, @@ -101,7 +114,7 @@ private void validateRemoteStoreConfiguration() { final String remoteStoreRepo = settings.get( Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY ); - assert remoteStoreRepo != null : "Remote Cluster State repository is not configured"; + assert remoteStoreRepo != null : "Remote Segment Store repository is not configured"; final Repository repository = repositoriesService.get().repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; @@ -134,8 +147,11 @@ private void startAsyncUpdateTask() { public void pinTimestamp(long timestamp, String pinningEntity, ActionListener listener) { // If a caller uses current system time to pin the timestamp, following check will almost always fail. // So, we allow pinning timestamp in the past upto some buffer - if (timestamp < TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - TIMESTAMP_PINNING_PAST_BUFFER_IN_MILLIS) { - throw new IllegalArgumentException("Timestamp to be pinned is less than current timestamp"); + long lookbackIntervalInMills = CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL.get(settings).millis(); + if (timestamp < TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - lookbackIntervalInMills) { + throw new IllegalArgumentException( + "Timestamp to be pinned is less than current timestamp - value of cluster.remote_store.pinned_timestamps.lookback_interval" + ); } updatePinning(pinnedTimestamps -> pinnedTimestamps.pin(timestamp, pinningEntity), listener); } @@ -157,39 +173,72 @@ private void updatePinning(Consumer updateConsumer, ActionList blobStoreRepository.getCompressor() ); BlobPath path = pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps); - blobStoreTransferService.listAllInSortedOrder(path, remotePinnedTimestamps.getType(), Integer.MAX_VALUE, new ActionListener<>() { - @Override - public void onResponse(List blobMetadata) { - PinnedTimestamps pinnedTimestamps = new PinnedTimestamps(new HashMap<>()); - if (blobMetadata.isEmpty() == false) { - pinnedTimestamps = readExistingPinnedTimestamps(blobMetadata.get(0).name(), remotePinnedTimestamps); - } - updateConsumer.accept(pinnedTimestamps); - remotePinnedTimestamps.setPinnedTimestamps(pinnedTimestamps); - pinnedTimestampsBlobStore.writeAsync(remotePinnedTimestamps, listener); + try { + if (updateTimetampPinningSemaphore.tryAcquire(10, TimeUnit.MINUTES)) { + ActionListener semaphoreAwareListener = ActionListener.runBefore(listener, updateTimetampPinningSemaphore::release); + ActionListener> listCallResponseListener = getListenerForListCallResponse( + remotePinnedTimestamps, + updateConsumer, + semaphoreAwareListener + ); + blobStoreTransferService.listAllInSortedOrder( + path, + remotePinnedTimestamps.getType(), + Integer.MAX_VALUE, + listCallResponseListener + ); + } else { + throw new TimeoutException("Timed out while waiting to acquire lock in updatePinning"); + } + } catch (InterruptedException | TimeoutException e) { + listener.onFailure(e); + } + } - // Delete older pinnedTimestamp files - if (blobMetadata.size() > PINNED_TIMESTAMP_FILES_TO_KEEP) { - List oldFilesToBeDeleted = blobMetadata.subList(PINNED_TIMESTAMP_FILES_TO_KEEP, blobMetadata.size()) - .stream() - .map(BlobMetadata::name) - .collect(Collectors.toList()); - try { - blobStoreTransferService.deleteBlobs( - pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps), - oldFilesToBeDeleted - ); - } catch (IOException e) { - logger.error("Exception while deleting stale pinned timestamps", e); - } - } + private ActionListener> getListenerForListCallResponse( + RemotePinnedTimestamps remotePinnedTimestamps, + Consumer updateConsumer, + ActionListener listener + ) { + return ActionListener.wrap(blobMetadata -> { + PinnedTimestamps pinnedTimestamps = new PinnedTimestamps(new HashMap<>()); + if (blobMetadata.isEmpty() == false) { + pinnedTimestamps = readExistingPinnedTimestamps(blobMetadata.get(0).name(), remotePinnedTimestamps); } + updateConsumer.accept(pinnedTimestamps); + remotePinnedTimestamps.setPinnedTimestamps(pinnedTimestamps); + ActionListener writeCallResponseListener = getListenerForWriteCallResponse( + remotePinnedTimestamps, + blobMetadata, + listener + ); + pinnedTimestampsBlobStore.writeAsync(remotePinnedTimestamps, writeCallResponseListener); + }, listener::onFailure); + } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); + private ActionListener getListenerForWriteCallResponse( + RemotePinnedTimestamps remotePinnedTimestamps, + List blobMetadata, + ActionListener listener + ) { + return ActionListener.wrap(unused -> { + // Delete older pinnedTimestamp files + if (blobMetadata.size() > PINNED_TIMESTAMP_FILES_TO_KEEP) { + List oldFilesToBeDeleted = blobMetadata.subList(PINNED_TIMESTAMP_FILES_TO_KEEP, blobMetadata.size()) + .stream() + .map(BlobMetadata::name) + .collect(Collectors.toList()); + try { + blobStoreTransferService.deleteBlobs( + pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps), + oldFilesToBeDeleted + ); + } catch (IOException e) { + logger.error("Exception while deleting stale pinned timestamps", e); + } } - }); + listener.onResponse(null); + }, listener::onFailure); } private PinnedTimestamps readExistingPinnedTimestamps(String blobFilename, RemotePinnedTimestamps remotePinnedTimestamps) {