Skip to content

Commit

Permalink
HDDS-10767. Reducing DatanodeDetails in the ContainerLocationCache
Browse files Browse the repository at this point in the history
  • Loading branch information
guohao-rosicky committed Apr 28, 2024
1 parent 0984893 commit 827be5f
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@
import com.google.common.cache.CacheLoader.InvalidCacheLoadException;
import com.google.common.cache.LoadingCache;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.util.CacheMetrics;
import jakarta.annotation.Nonnull;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -72,6 +75,9 @@ static LoadingCache<Long, Pipeline> createContainerLocationCache(
long ttl = configuration.getTimeDuration(
OZONE_OM_CONTAINER_LOCATION_CACHE_TTL,
OZONE_OM_CONTAINER_LOCATION_CACHE_TTL_DEFAULT.getDuration(), unit);

final Map<UUID, DatanodeDetails>
datanodeDetailsCache = new ConcurrentHashMap<>();
return CacheBuilder.newBuilder()
.maximumSize(maxSize)
.expireAfterWrite(ttl, unit)
Expand All @@ -80,7 +86,9 @@ static LoadingCache<Long, Pipeline> createContainerLocationCache(
@Nonnull
@Override
public Pipeline load(@Nonnull Long key) throws Exception {
return containerClient.getContainerWithPipeline(key).getPipeline();
Pipeline pipeline =
containerClient.getContainerWithPipeline(key).getPipeline();
return newPipelineWithDNCache(pipeline, datanodeDetailsCache);
}

@Nonnull
Expand All @@ -91,12 +99,31 @@ public Map<Long, Pipeline> loadAll(
.stream()
.collect(Collectors.toMap(
x -> x.getContainerInfo().getContainerID(),
ContainerWithPipeline::getPipeline
x -> newPipelineWithDNCache(x.getPipeline(),
datanodeDetailsCache)
));
}
});
}

static Pipeline newPipelineWithDNCache(Pipeline pipeline,
Map<UUID, DatanodeDetails> datanodeDetailsCache) {
Pipeline.Builder builder = Pipeline.newBuilder(pipeline);
List<DatanodeDetails> nodes = new ArrayList<>();
for (DatanodeDetails node : pipeline.getNodes()) {
DatanodeDetails datanodeDetails =
datanodeDetailsCache.get(node.getUuid());
if (node.equals(datanodeDetails)) {
nodes.add(datanodeDetails);
} else {
datanodeDetailsCache.put(node.getUuid(), node);
nodes.add(node);
}
}
builder.setNodes(nodes);
return builder.build();
}

public ScmBlockLocationProtocol getBlockClient() {
return this.blockClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.stream.Stream;

import static com.google.common.collect.Sets.newHashSet;
import static java.util.Arrays.asList;
import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
import static org.apache.hadoop.hdds.client.ReplicationConfig.fromTypeAndFactor;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -107,9 +106,12 @@ public void testGetContainerLocations(String testCaseName,
throws IOException {

Map<Long, ContainerWithPipeline> actualLocations = new HashMap<>();

List<DatanodeDetails> dnList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
dnList.add(randomDatanode());
}
for (long containerId : prepopulatedIds) {
ContainerWithPipeline pipeline = createPipeline(containerId);
ContainerWithPipeline pipeline = createPipeline(containerId, dnList);
actualLocations.put(containerId, pipeline);
}

Expand All @@ -129,7 +131,7 @@ public void testGetContainerLocations(String testCaseName,
if (!expectedScmCallIds.isEmpty()) {
List<ContainerWithPipeline> scmLocations = new ArrayList<>();
for (long containerId : expectedScmCallIds) {
ContainerWithPipeline pipeline = createPipeline(containerId);
ContainerWithPipeline pipeline = createPipeline(containerId, dnList);
scmLocations.add(pipeline);
actualLocations.put(containerId, pipeline);
}
Expand Down Expand Up @@ -167,13 +169,14 @@ public void testGetContainerLocationsWithScmFailures() throws IOException {
assertEquals(runtimeException, actualRt.getCause());
}

ContainerWithPipeline createPipeline(long containerId) {
ContainerWithPipeline createPipeline(long containerId,
List<DatanodeDetails> dnList) {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerID(containerId)
.build();
Pipeline pipeline = Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setNodes(asList(randomDatanode(), randomDatanode()))
.setNodes(dnList)
.setReplicationConfig(fromTypeAndFactor(
ReplicationType.RATIS, ReplicationFactor.THREE))
.setState(Pipeline.PipelineState.OPEN)
Expand Down

0 comments on commit 827be5f

Please sign in to comment.