Skip to content

Commit

Permalink
Add remote routing table read
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed Jun 8, 2024
1 parent 4671704 commit 3467ca7
Show file tree
Hide file tree
Showing 5 changed files with 339 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
// index to IndexRoutingTable map
private final Map<String, IndexRoutingTable> indicesRouting;

private RoutingTable(long version, final Map<String, IndexRoutingTable> indicesRouting) {
public RoutingTable(long version, final Map<String, IndexRoutingTable> indicesRouting) {
this.version = version;
this.indicesRouting = Collections.unmodifiableMap(indicesRouting);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,37 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

Expand All @@ -41,6 +55,7 @@ public class RemoteRoutingTableService extends AbstractLifecycleComponent {
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private BlobStoreRepository blobStoreRepository;
private final ThreadPool threadPool;

private static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER =
new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() {
Expand All @@ -55,10 +70,55 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException {
}
};

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings) {
public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings, ThreadPool threadPool) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.threadPool = threadPool;
}

public CheckedRunnable<IOException> getAsyncIndexMetadataReadAction(
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener) {
int idx = uploadedFilename.lastIndexOf("/");
String blobFileName = uploadedFilename.substring(idx+1);
BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0,idx)));

return () -> readAsync(
blobContainer,
blobFileName,
index,
threadPool.executor(ThreadPool.Names.GENERIC),
ActionListener.wrap(response -> latchedActionListener.onResponse(response.getIndexRoutingTable()), latchedActionListener::onFailure)
);
}

public void readAsync(BlobContainer blobContainer, String name, Index index, ExecutorService executorService, ActionListener<RemoteIndexRoutingTable> listener) throws IOException {
executorService.execute(() -> {
try {
listener.onResponse(read(blobContainer, name, index));
} catch (Exception e) {
listener.onFailure(e);
}
});
}

public RemoteIndexRoutingTable read(BlobContainer blobContainer, String path, Index index) {
try {
return new RemoteIndexRoutingTable(blobContainer.readBlob(path), index);
} catch (IOException | AssertionError e) {
logger.info("RoutingTable read failed with error: {}", e.toString());
throw new RemoteClusterStateService.RemoteStateTransferException("Failed to read RemoteRoutingTable from Manifest with error ", e);
}
}

public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutingTableMetadata(List<String> updatedIndicesRouting, List<ClusterMetadataManifest.UploadedIndexMetadata> allIndicesRouting) {
return updatedIndicesRouting.stream().map(idx -> {
Optional<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadataOptional = allIndicesRouting.stream().filter(idx2 -> idx2.getIndexName().equals(idx)).findFirst();
assert uploadedIndexMetadataOptional.isPresent() == true;
return uploadedIndexMetadataOptional.get();
}).collect(Collectors.toList());
}

public static DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapDiff(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import static org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettings.HASHES_OF_CONSISTENT_SETTINGS;
import static org.opensearch.gateway.remote.model.RemoteIndexMetadata.INDEX_PATH_TOKEN;
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemotePublicationEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

import java.io.Closeable;
Expand Down Expand Up @@ -193,6 +193,9 @@ public RemoteClusterStateService(
this.namedWriteableRegistry = namedWriteableRegistry;
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService);
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
this.remoteRoutingTableService = isRemoteRoutingTableEnabled(settings)
? Optional.of(new RemoteRoutingTableService(repositoriesService, settings, threadPool))
: Optional.empty();
}

/**
Expand All @@ -203,7 +206,6 @@ public RemoteClusterStateService(
*/
@Nullable
public RemoteUploadDetails writeFullMetadata(ClusterState clusterState, String previousClusterUUID) throws IOException {
logger.info("WRITING FULL STATE");
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
logger.error("Local node is not elected cluster manager. Exiting");
Expand Down Expand Up @@ -971,11 +973,12 @@ private ClusterState readClusterStateInParallel(
boolean readTemplatesMetadata,
boolean readDiscoveryNodes,
boolean readClusterBlocks,
List<UploadedIndexMetadata> indicesRoutingToRead,
boolean readHashesOfConsistentSettings,
Map<String, UploadedMetadataAttribute> clusterStateCustomToRead
) throws IOException {
int totalReadTasks =
indicesToRead.size() + customToRead.size() + (readCoordinationMetadata ? 1 : 0) + (readSettingsMetadata ? 1 : 0) + (
indicesToRead.size() + customToRead.size() + (remoteRoutingTableService.isPresent() ? indicesRoutingToRead.size() : 0) + (readCoordinationMetadata ? 1 : 0) + (readSettingsMetadata ? 1 : 0) + (
readTemplatesMetadata ? 1 : 0) + (readDiscoveryNodes ? 1 : 0) + (readClusterBlocks ? 1 : 0) + (readTransientSettingsMetadata ? 1 : 0) + (readHashesOfConsistentSettings ? 1 : 0)
+ clusterStateCustomToRead.size();
CountDownLatch latch = new CountDownLatch(totalReadTasks);
Expand Down Expand Up @@ -1022,6 +1025,18 @@ private ClusterState readClusterStateInParallel(
latch
);

if(remoteRoutingTableService.isPresent()) {
for (UploadedIndexMetadata indexRouting : indicesRoutingToRead) {
asyncMetadataReadActions.add(
remoteRoutingTableService.get().getAsyncIndexMetadataReadAction(
indexRouting.getUploadedFilename(),
new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()),
routingTableLatchedActionListener
)
);
}
}

for (Map.Entry<String, UploadedMetadataAttribute> entry : customToRead.entrySet()) {
asyncMetadataReadActions.add(
remoteGlobalMetadataManager.getAsyncMetadataReadAction(
Expand Down Expand Up @@ -1204,18 +1219,20 @@ private ClusterState readClusterStateInParallel(
}
});

readIndexRoutingTableResults.forEach(indexRoutingTable -> {
indicesRouting.put(indexRoutingTable.getIndex().getName(), indexRoutingTable);
});

metadataBuilder.indices(indexMetadataMap);
if (readDiscoveryNodes) {
clusterStateBuilder.nodes(discoveryNodesBuilder.get().localNodeId(localNodeId));
}
return clusterStateBuilder.metadata(metadataBuilder)

clusterStateBuilder = clusterStateBuilder.metadata(metadataBuilder)
.version(manifest.getStateVersion())
.stateUUID(manifest.getStateUUID())
.build();
.stateUUID(manifest.getStateUUID());

if(remoteRoutingTableService.isPresent()) {
readIndexRoutingTableResults.forEach(indexRoutingTable -> indicesRouting.put(indexRoutingTable.getIndex().getName(), indexRoutingTable));
clusterStateBuilder = clusterStateBuilder.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indicesRouting));
}
return clusterStateBuilder.build();
}

public ClusterState getClusterStateForManifest(String clusterName, ClusterMetadataManifest manifest, String localNodeId, boolean includeEphemeral)
Expand All @@ -1233,6 +1250,7 @@ public ClusterState getClusterStateForManifest(String clusterName, ClusterMetada
manifest.getTemplatesMetadata() != null,
includeEphemeral && manifest.getDiscoveryNodesMetadata() != null,
includeEphemeral && manifest.getClusterBlocksMetadata() != null,
includeEphemeral ? manifest.getIndicesRouting() : Collections.emptyList(),
includeEphemeral && manifest.getHashesOfConsistentSettings() != null,
includeEphemeral ? manifest.getClusterStateCustomMap() : Collections.emptyMap()
);
Expand Down Expand Up @@ -1261,6 +1279,13 @@ public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadata
updatedClusterStateCustom.put(customType, manifest.getClusterStateCustomMap().get(customType));
}
}

List<UploadedIndexMetadata> updatedIndexRouting = new ArrayList<>();
remoteRoutingTableService.ifPresent(routingTableService ->
updatedIndexRouting.addAll(routingTableService.getUpdatedIndexRoutingTableMetadata(diff.getIndicesRoutingUpdated(),
manifest.getIndicesRouting()))
);

ClusterState updatedClusterState = readClusterStateInParallel(
previousState,
manifest,
Expand All @@ -1274,6 +1299,7 @@ public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadata
diff.isTemplatesMetadataUpdated(),
diff.isDiscoveryNodesUpdated(),
diff.isClusterBlocksUpdated(),
updatedIndexRouting,
diff.isHashesOfConsistentSettingsUpdated(),
updatedClusterStateCustom
);
Expand Down Expand Up @@ -1307,6 +1333,7 @@ public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadata
stateUUID(manifest.getStateUUID()).
version(manifest.getStateVersion()).
metadata(metadataBuilder).
routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables)).
build();
}

Expand Down
Loading

0 comments on commit 3467ca7

Please sign in to comment.