Skip to content

Commit

Permalink
HDDS-10985. EC Reconstruction failed because the size of currentChunk…
Browse files Browse the repository at this point in the history
…s was not equal to checksumBlockDataChunks. (#7009)

(cherry picked from commit 0915f0b)
  • Loading branch information
slfan1989 authored and xichen01 committed Sep 18, 2024
1 parent 5fef53b commit 78883a5
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -141,8 +145,34 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
}

if (checksumBlockData != null) {
List<ChunkInfo> currentChunks = getContainerBlockData().getChunksList();

// For the same BlockGroupLength, we need to find the larger value of Block DataSize.
// This is because we do not send empty chunks to the DataNode, so the larger value is more accurate.
Map<Long, Optional<BlockData>> maxDataSizeByGroup = Arrays.stream(blockData)
.filter(Objects::nonNull)
.collect(Collectors.groupingBy(BlockData::getBlockGroupLength,
Collectors.maxBy(Comparator.comparingLong(BlockData::getSize))));
BlockData maxBlockData = maxDataSizeByGroup.get(blockGroupLength).get();

// When calculating the checksum size,
// We need to consider both blockGroupLength and the actual size of blockData.
//
// We use the smaller value to determine the size of the ChunkList.
//
// 1. In most cases, blockGroupLength is equal to the size of blockData.
// 2. Occasionally, blockData is not fully filled; if a chunk is empty,
// it is not sent to the DN, resulting in blockData size being smaller than blockGroupLength.
// 3. In cases with 'dirty data',
// if an error occurs when writing to the EC-Stripe (e.g., DN reports Container Closed),
// and the length confirmed with OM is smaller, blockGroupLength may be smaller than blockData size.
long blockDataSize = Math.min(maxBlockData.getSize(), blockGroupLength);
int chunkSize = (int) Math.ceil(((double) blockDataSize / repConfig.getEcChunkSize()));
List<ChunkInfo> checksumBlockDataChunks = checksumBlockData.getChunks();
if (chunkSize > 0) {
checksumBlockDataChunks = checksumBlockData.getChunks().subList(0, chunkSize);
}

List<ChunkInfo> currentChunks = getContainerBlockData().getChunksList();

Preconditions.checkArgument(
currentChunks.size() == checksumBlockDataChunks.size(),
Expand Down Expand Up @@ -268,7 +298,7 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
throw ce;
});
} catch (IOException | ExecutionException e) {
throw new IOException(EXCEPTION_MSG + e.toString(), e);
throw new IOException(EXCEPTION_MSG + e, e);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
import org.apache.hadoop.hdds.utils.db.Proto3Codec;
import org.apache.hadoop.ozone.OzoneConsts;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -296,4 +297,14 @@ public void appendTo(StringBuilder sb) {
sb.append(", size=").append(size);
sb.append("]");
}

public long getBlockGroupLength() {
String lenStr = getMetadata()
.get(OzoneConsts.BLOCK_GROUP_LEN_KEY_IN_PUT_BLOCK);
// If we don't have the length, then it indicates a problem with the stripe.
// All replica should carry the length, so if it is not there, we return 0,
// which will cause us to set the length of the block to zero and not
// attempt to reconstruct it.
return (lenStr == null) ? 0 : Long.parseLong(lenStr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.SecretKeyTestClient;
import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
import org.apache.hadoop.ozone.client.io.InsufficientLocationsException;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
Expand All @@ -85,6 +86,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
Expand All @@ -101,6 +103,7 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -114,6 +117,8 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.READ;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.WRITE;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.newWriteChunkRequestBuilder;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.params.provider.Arguments.arguments;

/**
* This class tests container commands on EC containers.
Expand Down Expand Up @@ -612,30 +617,33 @@ private static byte[] getBytesWith(int singleDigitNumber, int total) {

@ParameterizedTest
@MethodSource("recoverableMissingIndexes")
void testECReconstructionCoordinatorWith(List<Integer> missingIndexes)
void testECReconstructionCoordinatorWith(List<Integer> missingIndexes, boolean triggerRetry)
throws Exception {
testECReconstructionCoordinator(missingIndexes, 3);
testECReconstructionCoordinator(missingIndexes, 3, triggerRetry);
}

@ParameterizedTest
@MethodSource("recoverableMissingIndexes")
void testECReconstructionCoordinatorWithPartialStripe(List<Integer> missingIndexes)
throws Exception {
testECReconstructionCoordinator(missingIndexes, 1);
void testECReconstructionCoordinatorWithPartialStripe(List<Integer> missingIndexes,
boolean triggerRetry) throws Exception {
testECReconstructionCoordinator(missingIndexes, 1, triggerRetry);
}

@ParameterizedTest
@MethodSource("recoverableMissingIndexes")
void testECReconstructionCoordinatorWithFullAndPartialStripe(List<Integer> missingIndexes)
throws Exception {
testECReconstructionCoordinator(missingIndexes, 4);
void testECReconstructionCoordinatorWithFullAndPartialStripe(List<Integer> missingIndexes,
boolean triggerRetry) throws Exception {
testECReconstructionCoordinator(missingIndexes, 4, triggerRetry);
}

static Stream<List<Integer>> recoverableMissingIndexes() {
return Stream
.concat(IntStream.rangeClosed(1, 5).mapToObj(ImmutableList::of), Stream
.of(ImmutableList.of(2, 3), ImmutableList.of(2, 4),
ImmutableList.of(3, 5), ImmutableList.of(4, 5)));
static Stream<Arguments> recoverableMissingIndexes() {
Stream<Arguments> args = IntStream.rangeClosed(1, 5).mapToObj(i -> arguments(ImmutableList.of(i), true));
Stream<Arguments> args1 = IntStream.rangeClosed(1, 5).mapToObj(i -> arguments(ImmutableList.of(i), false));
Stream<Arguments> args2 = Stream.of(arguments(ImmutableList.of(2, 3), true),
arguments(ImmutableList.of(2, 4), true), arguments(ImmutableList.of(3, 5), true));
Stream<Arguments> args3 = Stream.of(arguments(ImmutableList.of(2, 3), false),
arguments(ImmutableList.of(2, 4), false), arguments(ImmutableList.of(3, 5), false));
return Stream.concat(Stream.concat(args, args1), Stream.concat(args2, args3));
}

/**
Expand All @@ -646,7 +654,7 @@ static Stream<List<Integer>> recoverableMissingIndexes() {
public void testECReconstructionCoordinatorWithMissingIndexes135() {
InsufficientLocationsException exception =
Assert.assertThrows(InsufficientLocationsException.class, () -> {
testECReconstructionCoordinator(ImmutableList.of(1, 3, 5), 3);
testECReconstructionCoordinator(ImmutableList.of(1, 3, 5), 3, false);
});

String expectedMessage =
Expand All @@ -657,7 +665,7 @@ public void testECReconstructionCoordinatorWithMissingIndexes135() {
}

private void testECReconstructionCoordinator(List<Integer> missingIndexes,
int numInputChunks) throws Exception {
int numInputChunks, boolean triggerRetry) throws Exception {
ObjectStore objectStore = rpcClient.getObjectStore();
String keyString = UUID.randomUUID().toString();
String volumeName = UUID.randomUUID().toString();
Expand All @@ -666,7 +674,7 @@ private void testECReconstructionCoordinator(List<Integer> missingIndexes,
objectStore.getVolume(volumeName).createBucket(bucketName);
OzoneVolume volume = objectStore.getVolume(volumeName);
OzoneBucket bucket = volume.getBucket(bucketName);
createKeyAndWriteData(keyString, bucket, numInputChunks);
createKeyAndWriteData(keyString, bucket, numInputChunks, triggerRetry);

try (
XceiverClientManager xceiverClientManager =
Expand Down Expand Up @@ -778,7 +786,7 @@ private void testECReconstructionCoordinator(List<Integer> missingIndexes,
.getReplicationConfig(), cToken);
Assert.assertEquals(blockDataArrList.get(i).length,
reconstructedBlockData.length);
checkBlockData(blockDataArrList.get(i), reconstructedBlockData);
checkBlockDataWithRetry(blockDataArrList.get(i), reconstructedBlockData, triggerRetry);
XceiverClientSpi client = xceiverClientManager.acquireClient(
newTargetPipeline);
try {
Expand All @@ -799,7 +807,7 @@ private void testECReconstructionCoordinator(List<Integer> missingIndexes,
}

private void createKeyAndWriteData(String keyString, OzoneBucket bucket,
int numChunks) throws IOException {
int numChunks, boolean triggerRetry) throws IOException {
for (int i = 0; i < numChunks; i++) {
inputChunks[i] = getBytesWith(i + 1, EC_CHUNK_SIZE);
}
Expand All @@ -808,11 +816,48 @@ private void createKeyAndWriteData(String keyString, OzoneBucket bucket,
new HashMap<>())) {
Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
for (int i = 0; i < numChunks; i++) {
// We generally wait until the data is written to the last chunk
// before attempting to trigger CloseContainer.
// We use an asynchronous approach for this trigger,
// aiming to ensure that closing the container does not interfere with the write operation.
// However, this process often needs to be executed multiple times before it takes effect.
if (i == numChunks - 1 && triggerRetry) {
triggerRetryByCloseContainer(out);
}
out.write(inputChunks[i]);
}
}
}

private void triggerRetryByCloseContainer(OzoneOutputStream out) {
CompletableFuture.runAsync(() -> {
BlockOutputStreamEntry blockOutputStreamEntry = out.getKeyOutputStream().getStreamEntries().get(0);
BlockID entryBlockID = blockOutputStreamEntry.getBlockID();
long entryContainerID = entryBlockID.getContainerID();
Pipeline entryPipeline = blockOutputStreamEntry.getPipeline();
Map<DatanodeDetails, Integer> replicaIndexes = entryPipeline.getReplicaIndexes();
try {
for (Map.Entry<DatanodeDetails, Integer> entry : replicaIndexes.entrySet()) {
DatanodeDetails key = entry.getKey();
Integer value = entry.getValue();
XceiverClientManager xceiverClientManager = new XceiverClientManager(config);
Token<ContainerTokenIdentifier> cToken = containerTokenGenerator
.generateToken(ANY_USER, ContainerID.valueOf(entryContainerID));
XceiverClientSpi client = xceiverClientManager.acquireClient(
createSingleNodePipeline(entryPipeline, key, value));
try {
ContainerProtocolCalls.closeContainer(client, entryContainerID, cToken.encodeToUrlString());
} finally {
xceiverClientManager.releaseClient(client, false);
}
break;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

@Test
public void testECReconstructionCoordinatorShouldCleanupContainersOnFailure()
throws Exception {
Expand All @@ -825,7 +870,7 @@ public void testECReconstructionCoordinatorShouldCleanupContainersOnFailure()
objectStore.getVolume(volumeName).createBucket(bucketName);
OzoneVolume volume = objectStore.getVolume(volumeName);
OzoneBucket bucket = volume.getBucket(bucketName);
createKeyAndWriteData(keyString, bucket, 3);
createKeyAndWriteData(keyString, bucket, 3, false);

OzoneKeyDetails key = bucket.getKey(keyString);
long conID = key.getOzoneKeyLocations().get(0).getContainerID();
Expand Down Expand Up @@ -899,6 +944,25 @@ private void closeContainer(long conID)
HddsProtos.LifeCycleEvent.CLOSE);
}

private void checkBlockDataWithRetry(
org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData,
org.apache.hadoop.ozone.container.common.helpers.BlockData[]
reconstructedBlockData, boolean triggerRetry) {
if (triggerRetry) {
for (int i = 0; i < reconstructedBlockData.length; i++) {
assertEquals(blockData[i].getBlockID(), reconstructedBlockData[i].getBlockID());
List<ContainerProtos.ChunkInfo> oldBlockDataChunks = blockData[i].getChunks();
List<ContainerProtos.ChunkInfo> newBlockDataChunks = reconstructedBlockData[i].getChunks();
for (int j = 0; j < newBlockDataChunks.size(); j++) {
ContainerProtos.ChunkInfo chunkInfo = oldBlockDataChunks.get(j);
assertEquals(chunkInfo, newBlockDataChunks.get(j));
}
}
return;
}
checkBlockData(blockData, reconstructedBlockData);
}

private void checkBlockData(
org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData,
org.apache.hadoop.ozone.container.common.helpers.BlockData[]
Expand Down Expand Up @@ -967,8 +1031,7 @@ public static void prepareData(int[][] ranges) throws Exception {
out.write(values[i]);
}
}
// List<ContainerID> containerIDs =
// new ArrayList<>(scm.getContainerManager().getContainerIDs());

List<ContainerID> containerIDs =
scm.getContainerManager().getContainers()
.stream()
Expand Down

0 comments on commit 78883a5

Please sign in to comment.