Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timestamp pinning service and scheduler to update in-memory state #15180

Merged
merged 12 commits into from
Aug 15, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.common.collect.Tuple;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Set;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase {
static final String INDEX_NAME = "remote-store-test-idx-1";

ActionListener<Void> noOpActionListener = new ActionListener<>() {
@Override
public void onResponse(Void unused) {}

@Override
public void onFailure(Exception e) {}
};

public void testTimestampPinUnpin() throws Exception {
prepareCluster(1, 1, INDEX_NAME, 0, 2);
ensureGreen(INDEX_NAME);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp = RemoteStorePinnedTimestampService.getPinnedTimestamps();
long lastFetchTimestamp = pinnedTimestampWithFetchTimestamp.v1();
assertEquals(-1L, lastFetchTimestamp);
assertEquals(Set.of(), pinnedTimestampWithFetchTimestamp.v2());

assertThrows(
IllegalArgumentException.class,
() -> remoteStorePinnedTimestampService.pinTimestamp(1234L, "ss1", noOpActionListener)
);

long timestamp1 = System.currentTimeMillis() + 30000L;
long timestamp2 = System.currentTimeMillis() + 60000L;
long timestamp3 = System.currentTimeMillis() + 900000L;
remoteStorePinnedTimestampService.pinTimestamp(timestamp1, "ss2", noOpActionListener);
remoteStorePinnedTimestampService.pinTimestamp(timestamp2, "ss3", noOpActionListener);
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss4", noOpActionListener);

remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueSeconds(1));

assertBusy(() -> {
Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp_2 = RemoteStorePinnedTimestampService.getPinnedTimestamps();
long lastFetchTimestamp_2 = pinnedTimestampWithFetchTimestamp_2.v1();
assertTrue(lastFetchTimestamp_2 != -1);
assertEquals(Set.of(timestamp1, timestamp2, timestamp3), pinnedTimestampWithFetchTimestamp_2.v2());
});

remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueMinutes(3));

// This should be a no-op as pinning entity is different
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "no-snapshot", noOpActionListener);
// Unpinning already pinned entity
remoteStorePinnedTimestampService.unpinTimestamp(timestamp2, "ss3", noOpActionListener);
// Adding different entity to already pinned timestamp
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss5", noOpActionListener);

remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueSeconds(1));

assertBusy(() -> {
Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp_3 = RemoteStorePinnedTimestampService.getPinnedTimestamps();
long lastFetchTimestamp_3 = pinnedTimestampWithFetchTimestamp_3.v1();
assertTrue(lastFetchTimestamp_3 != -1);
assertEquals(Set.of(timestamp1, timestamp3), pinnedTimestampWithFetchTimestamp_3.v2());
});

remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueMinutes(3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import org.opensearch.node.Node.DiscoverySettings;
import org.opensearch.node.NodeRoleSettings;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
import org.opensearch.persistent.PersistentTasksClusterService;
import org.opensearch.persistent.decider.EnableAssignmentDecider;
Expand Down Expand Up @@ -760,6 +761,8 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA,
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,

RemoteStorePinnedTimestampService.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL,

// Composite index settings
CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING,

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.BlobPathParameters;
import org.opensearch.common.remote.RemoteWriteableBlobEntity;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.compress.Compressor;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;

/**
* Wrapper class for uploading/downloading {@link RemotePinnedTimestamps} to/from remote blob store
*
* @opensearch.internal
*/
public class RemotePinnedTimestamps extends RemoteWriteableBlobEntity<RemotePinnedTimestamps.PinnedTimestamps> {
private static final Logger logger = LogManager.getLogger(RemotePinnedTimestamps.class);

/**
* Represents a collection of pinned timestamps and their associated pinning entities.
* This class is thread-safe and implements the Writeable interface for serialization.
*/
public static class PinnedTimestamps implements Writeable {
private final Map<Long, List<String>> pinnedTimestampPinningEntityMap;

public PinnedTimestamps(Map<Long, List<String>> pinnedTimestampPinningEntityMap) {
this.pinnedTimestampPinningEntityMap = new ConcurrentHashMap<>(pinnedTimestampPinningEntityMap);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(pinnedTimestampPinningEntityMap, StreamOutput::writeLong, StreamOutput::writeStringCollection);
}

public static PinnedTimestamps readFrom(StreamInput in) throws IOException {
return new PinnedTimestamps(in.readMap(StreamInput::readLong, StreamInput::readStringList));
}

/**
* Pins a timestamp against a pinning entity.
*
* @param timestamp The timestamp to pin.
* @param pinningEntity The entity pinning the timestamp.
*/
public void pin(Long timestamp, String pinningEntity) {
logger.debug("Pinning timestamp = {} against entity = {}", timestamp, pinningEntity);
pinnedTimestampPinningEntityMap.computeIfAbsent(timestamp, k -> new ArrayList<>()).add(pinningEntity);
}

/**
* Unpins a timestamp for a specific pinning entity.
*
* @param timestamp The timestamp to unpin.
* @param pinningEntity The entity unpinning the timestamp.
*/
public void unpin(Long timestamp, String pinningEntity) {
logger.debug("Unpinning timestamp = {} against entity = {}", timestamp, pinningEntity);
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
if (pinnedTimestampPinningEntityMap.containsKey(timestamp) == false
|| pinnedTimestampPinningEntityMap.get(timestamp).contains(pinningEntity) == false) {
logger.warn("Timestamp: {} is not pinned by entity: {}", timestamp, pinningEntity);

Check warning on line 82 in server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java#L82

Added line #L82 was not covered by tests
}
pinnedTimestampPinningEntityMap.compute(timestamp, (k, v) -> {
v.remove(pinningEntity);
return v.isEmpty() ? null : v;
});
}

public Map<Long, List<String>> getPinnedTimestampPinningEntityMap() {
return new HashMap<>(pinnedTimestampPinningEntityMap);
}
}

public static final String PINNED_TIMESTAMPS = "pinned_timestamps";
public static final ChecksumWritableBlobStoreFormat<PinnedTimestamps> PINNED_TIMESTAMPS_FORMAT = new ChecksumWritableBlobStoreFormat<>(
PINNED_TIMESTAMPS,
PinnedTimestamps::readFrom
);

private PinnedTimestamps pinnedTimestamps;

public RemotePinnedTimestamps(String clusterUUID, Compressor compressor) {
super(clusterUUID, compressor);
pinnedTimestamps = new PinnedTimestamps(new HashMap<>());
}

@Override
public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of(PINNED_TIMESTAMPS), PINNED_TIMESTAMPS);

Check warning on line 110 in server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java#L110

Added line #L110 was not covered by tests
}

@Override
public String getType() {
return PINNED_TIMESTAMPS;

Check warning on line 115 in server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java#L115

Added line #L115 was not covered by tests
}

@Override
public String generateBlobFileName() {
return this.blobFileName = String.join(DELIMITER, PINNED_TIMESTAMPS, RemoteStoreUtils.invertLong(System.currentTimeMillis()));
}

@Override
public InputStream serialize() throws IOException {
return PINNED_TIMESTAMPS_FORMAT.serialize(pinnedTimestamps, generateBlobFileName(), getCompressor()).streamInput();
}

@Override
public PinnedTimestamps deserialize(InputStream inputStream) throws IOException {
return PINNED_TIMESTAMPS_FORMAT.deserialize(blobName, Streams.readFully(inputStream));
}

public void setBlobFileName(String blobFileName) {
this.blobFileName = blobFileName;
}

Check warning on line 135 in server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java#L134-L135

Added lines #L134 - L135 were not covered by tests

public void setPinnedTimestamps(PinnedTimestamps pinnedTimestamps) {
this.pinnedTimestamps = pinnedTimestamps;
}

public PinnedTimestamps getPinnedTimestamps() {
return pinnedTimestamps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;

import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.remote.RemoteWriteableBlobEntity;
import org.opensearch.common.remote.RemoteWriteableEntityBlobStore;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

/**
* Extends the RemoteClusterStateBlobStore to support {@link RemotePinnedTimestamps}
*/
public class RemoteStorePinnedTimestampsBlobStore extends RemoteWriteableEntityBlobStore<
RemotePinnedTimestamps.PinnedTimestamps,
RemotePinnedTimestamps> {

public static final String PINNED_TIMESTAMPS_PATH_TOKEN = "pinned_timestamps";
private final BlobStoreRepository blobStoreRepository;

public RemoteStorePinnedTimestampsBlobStore(
BlobStoreTransferService blobStoreTransferService,
BlobStoreRepository blobStoreRepository,
String clusterName,
ThreadPool threadPool,
String executor
) {
super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool, executor, PINNED_TIMESTAMPS_PATH_TOKEN);
this.blobStoreRepository = blobStoreRepository;
}

@Override
public BlobPath getBlobPathForUpload(final RemoteWriteableBlobEntity<RemotePinnedTimestamps.PinnedTimestamps> obj) {
return blobStoreRepository.basePath().add(PINNED_TIMESTAMPS_PATH_TOKEN);

Check warning on line 41 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java#L41

Added line #L41 was not covered by tests
}
}
23 changes: 22 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.node.resource.tracker.NodeResourceUsageTracker;
import org.opensearch.persistent.PersistentTasksClusterService;
import org.opensearch.persistent.PersistentTasksExecutor;
Expand Down Expand Up @@ -807,6 +808,18 @@ protected Node(
remoteIndexPathUploader = null;
remoteClusterStateCleanupManager = null;
}
final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService;
if (isRemoteStoreAttributePresent(settings)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use isRemoteDataAttributePresent here instead since this is required for remote data store.

remoteStorePinnedTimestampService = new RemoteStorePinnedTimestampService(
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
repositoriesServiceReference::get,
settings,
threadPool,
clusterService
);
resourcesToClose.add(remoteStorePinnedTimestampService);
} else {
remoteStorePinnedTimestampService = null;
}

// collect engine factory providers from plugins
final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);
Expand Down Expand Up @@ -1170,7 +1183,8 @@ protected Node(
clusterModule.getIndexNameExpressionResolver(),
repositoryService,
transportService,
actionModule.getActionFilters()
actionModule.getActionFilters(),
remoteStorePinnedTimestampService
);
SnapshotShardsService snapshotShardsService = new SnapshotShardsService(
settings,
Expand Down Expand Up @@ -1416,6 +1430,7 @@ protected Node(
b.bind(MetricsRegistry.class).toInstance(metricsRegistry);
b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService);
b.bind(RemoteIndexPathUploader.class).toProvider(() -> remoteIndexPathUploader);
b.bind(RemoteStorePinnedTimestampService.class).toProvider(() -> remoteStorePinnedTimestampService);
b.bind(RemoteClusterStateCleanupManager.class).toProvider(() -> remoteClusterStateCleanupManager);
b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry);
b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker);
Expand Down Expand Up @@ -1569,6 +1584,12 @@ public Node start() throws NodeValidationException {
if (remoteIndexPathUploader != null) {
remoteIndexPathUploader.start();
}
final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = injector.getInstance(
RemoteStorePinnedTimestampService.class
);
if (remoteStorePinnedTimestampService != null) {
remoteStorePinnedTimestampService.start();
}
// Load (and maybe upgrade) the metadata stored on disk
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
gatewayMetaState.start(
Expand Down
Loading
Loading