From 75a6a0fdc0618a673aa982158744e25da2cd16c7 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Tue, 20 Jun 2023 19:52:14 +0800 Subject: [PATCH 1/7] HDDS-8888. Limit SCMBlockDeletingService to sending delete transactions to Datanode whose queue is full --- .../apache/hadoop/ozone/OzoneConfigKeys.java | 12 +- .../scm/block/SCMBlockDeletingService.java | 42 ++++- .../block/TestSCMBlockDeletingService.java | 159 ++++++++++++++++++ 3 files changed, 204 insertions(+), 9 deletions(-) create mode 100644 hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 85bc566902b..adbdcacd94b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -257,7 +257,17 @@ public final class OzoneConfigKeys { public static final String OZONE_BLOCK_DELETING_SERVICE_TIMEOUT = "ozone.block.deleting.service.timeout"; public static final String OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT - = "300s"; // 300s for default + = "300s"; // 300s for default deleteBlocksCommandLimit + + /** + * A limit to restrict the total number of delete block commands queued on a + * datanode. Note this is intended to be a temporary config until we have a + * more dynamic way of limiting load. + */ + public static final String OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT = + "ozone.block.deleting.pending.command.limit"; + public static final int OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT_DEFAULT + = 5; // same with hdds.datanode.block.delete.queue.limit public static final String OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT = "ozone.sst.filtering.service.timeout"; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index 0a1222e3800..8a221e078ad 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.utils.BackgroundService; import org.apache.hadoop.hdds.utils.BackgroundTask; @@ -57,6 +58,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; @@ -90,6 +93,7 @@ public class SCMBlockDeletingService extends BackgroundService private long safemodeExitMillis = 0; private final long safemodeExitRunDelayMillis; + private final long deleteBlocksPendingCommandLimit; private final Clock clock; @SuppressWarnings("parameternumber") @@ -110,6 +114,9 @@ public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog, HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT, TimeUnit.MILLISECONDS); + this.deleteBlocksPendingCommandLimit = conf.getInt( + OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT, + OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT_DEFAULT); this.clock = clock; this.deletedBlockLog = deletedBlockLog; this.nodeManager = nodeManager; @@ -156,13 +163,12 @@ public EmptyTaskResult call() throws Exception { List datanodes = nodeManager.getNodes(NodeStatus.inServiceHealthy()); if (datanodes != null) { - // When DN node is healthy and in-service, and previous commands - // are handled for deleteBlocks Type, then it will be considered - // in this iteration - final Set included = datanodes.stream().filter( - dn -> nodeManager.getCommandQueueCount(dn.getUuid(), - Type.deleteBlocksCommand) == 0).collect(Collectors.toSet()); try { + // When DN node is healthy and in-service, and their number of + // 'deleteBlocks' type commands is below the limit. + // These nodes will be considered for this iteration. + final Set included = + getDatanodesWithinCommandLimit(datanodes); DatanodeDeletedBlockTransactions transactions = deletedBlockLog.getTransactions(blockDeleteLimitSize, included); @@ -203,7 +209,8 @@ public EmptyTaskResult call() throws Exception { deletedBlockLog.incrementCount(new ArrayList<>(processedTxIDs)); } catch (NotLeaderException nle) { LOG.warn("Skip current run, since not leader any more.", nle); - return EmptyTaskResult.newResult(); + } catch (NodeNotFoundException e) { + LOG.error("Datanode not found in NodeManager. Should not happen", e); } catch (IOException e) { // We may tolerate a number of failures for sometime // but if it continues to fail, at some point we need to raise @@ -211,7 +218,6 @@ public EmptyTaskResult call() throws Exception { // continues to retry the scanning. LOG.error("Failed to get block deletion transactions from delTX log", e); - return EmptyTaskResult.newResult(); } } @@ -273,4 +279,24 @@ public void stop() { public ScmBlockDeletingServiceMetrics getMetrics() { return this.metrics; } + + /** + * Filters and returns a set of healthy datanodes that have not exceeded + * the deleteBlocksPendingCommandLimit. + * + * @param datanodes a list of DatanodeDetails + * @return a set of filtered DatanodeDetails + */ + @VisibleForTesting + protected Set getDatanodesWithinCommandLimit( + List datanodes) throws NodeNotFoundException { + final Set included = new HashSet<>(); + for (DatanodeDetails dn : datanodes) { + if (nodeManager.getTotalDatanodeCommandCount(dn, + Type.deleteBlocksCommand) < deleteBlocksPendingCommandLimit) { + included.add(dn); + } + } + return included; + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java new file mode 100644 index 00000000000..d5c712887ef --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java @@ -0,0 +1,159 @@ +package org.apache.hadoop.hdds.scm.block; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.time.Clock; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT_DEFAULT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test SCMBlockDeletingService. + */ +public class TestSCMBlockDeletingService { + private SCMBlockDeletingService service; + private EventPublisher eventPublisher; + private List datanodeDetails; + private OzoneConfiguration conf; + private NodeManager nodeManager; + private ScmBlockDeletingServiceMetrics metrics; + + @BeforeEach + public void setup() throws Exception { + nodeManager = mock(NodeManager.class); + eventPublisher = mock(EventPublisher.class); + conf = new OzoneConfiguration(); + metrics = ScmBlockDeletingServiceMetrics.create(); + when(nodeManager.getTotalDatanodeCommandCount(any(), + any())).thenReturn(0); + SCMServiceManager scmServiceManager = mock(SCMServiceManager.class); + SCMContext scmContext = mock(SCMContext.class); + + DatanodeDeletedBlockTransactions ddbt = + new DatanodeDeletedBlockTransactions(); + DatanodeDetails datanode1 = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeDetails datanode2 = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeDetails datanode3 = MockDatanodeDetails.randomDatanodeDetails(); + datanodeDetails = Arrays.asList(datanode1, datanode2, datanode3); + when(nodeManager.getNodes(NodeStatus.inServiceHealthy())).thenReturn( + datanodeDetails); + DeletedBlocksTransaction tx1 = createTestDeleteTxn(1, Arrays.asList(1L), 1); + ddbt.addTransactionToDN(datanode1.getUuid(), tx1); + ddbt.addTransactionToDN(datanode2.getUuid(), tx1); + ddbt.addTransactionToDN(datanode3.getUuid(), tx1); + DeletedBlockLog mockDeletedBlockLog = mock(DeletedBlockLog.class); + when(mockDeletedBlockLog.getTransactions( + anyInt(), anySet())).thenReturn(ddbt); + + service = spy(new SCMBlockDeletingService( + mockDeletedBlockLog, nodeManager, eventPublisher, scmContext, + scmServiceManager, conf, metrics, Clock.system( + ZoneOffset.UTC))); + when(service.shouldRun()).thenReturn(true); + } + + @AfterEach + public void stop() { + service.stop(); + ScmBlockDeletingServiceMetrics.unRegister(); + } + + @Test + public void testCall() throws Exception { + callDeletedBlockTransactionScanner(); + + ArgumentCaptor argumentCaptor = + ArgumentCaptor.forClass(CommandForDatanode.class); + + // Three Datanode is healthy and in-service, and the task queue is empty, + // so the transaction will send to all three Datanode + verify(eventPublisher, times(3)).fireEvent( + eq(SCMEvents.DATANODE_COMMAND), argumentCaptor.capture()); + List actualCommands = argumentCaptor.getAllValues(); + List actualDnIds = actualCommands.stream() + .map(CommandForDatanode::getDatanodeId) + .collect(Collectors.toList()); + Set expectedDnIdsSet = datanodeDetails.stream() + .map(DatanodeDetails::getUuid).collect(Collectors.toSet()); + + assertEquals(expectedDnIdsSet, new HashSet<>(actualDnIds)); + assertEquals(datanodeDetails.size(), + metrics.getNumBlockDeletionCommandSent()); + // Echo Command has one Transaction + assertEquals(datanodeDetails.size() * 1, + metrics.getNumBlockDeletionTransactionSent()); + } + + private void callDeletedBlockTransactionScanner() throws Exception { + service.getTasks().poll().call(); + } + + @Test + public void testLimitCommandSending() throws Exception { + int pendingCommandLimit = conf.getInt( + OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT, + OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT_DEFAULT); + + + // The number of commands pending on all Datanodes has reached the limit. + when(nodeManager.getTotalDatanodeCommandCount(any(), + any())).thenReturn(pendingCommandLimit); + assertEquals(0, + service.getDatanodesWithinCommandLimit(datanodeDetails).size()); + + // The number of commands pending on all Datanodes is 0 + when(nodeManager.getTotalDatanodeCommandCount(any(), + any())).thenReturn(0); + assertEquals(datanodeDetails.size(), + service.getDatanodesWithinCommandLimit(datanodeDetails).size()); + + // The number of commands pending on first Datanodes has reached the limit. + DatanodeDetails fullDatanode = datanodeDetails.get(0); + when(nodeManager.getTotalDatanodeCommandCount(fullDatanode, + Type.deleteBlocksCommand)).thenReturn(pendingCommandLimit); + Set includeNodes = + service.getDatanodesWithinCommandLimit(datanodeDetails); + assertEquals(datanodeDetails.size() - 1, + includeNodes.size()); + assertFalse(includeNodes.contains(fullDatanode)); + } + + private DeletedBlocksTransaction createTestDeleteTxn( + long txnID, List blocks, long containerID) { + return DeletedBlocksTransaction.newBuilder().setTxID(txnID) + .setContainerID(containerID).addAllLocalID(blocks).setCount(0).build(); + } +} From 4578b3cb09416260a8850a8ac61d3833a361331d Mon Sep 17 00:00:00 2001 From: xichen01 Date: Wed, 28 Jun 2023 17:09:54 +0800 Subject: [PATCH 2/7] added licensed for new file --- .../scm/block/TestSCMBlockDeletingService.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java index d5c712887ef..26ba9a18fa8 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdds.scm.block; import org.apache.hadoop.hdds.conf.OzoneConfiguration; From 84e601af3e5c3f623a01c1a9b6cf849066fc8849 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Wed, 28 Jun 2023 17:24:16 +0800 Subject: [PATCH 3/7] Add new cnew configuration to ozone-default.xml --- hadoop-hdds/common/src/main/resources/ozone-default.xml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 9849ae88bdd..77d54e976a2 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -427,6 +427,15 @@ threads spawned for block deletions. + + ozone.block.deleting.pending.command.limit + 5 + OZONE, PERFORMANCE, SCM + A limit to restrict the total number of delete block commands + queued on a datanode. Note this is intended to be a temporary config + until we have a more dynamic way of limiting load. + + ozone.block.deleting.limit.per.task 1000 From 75c2021c3874454da82857bc0ac473020c04a34c Mon Sep 17 00:00:00 2001 From: xichen01 Date: Wed, 28 Jun 2023 22:35:13 +0800 Subject: [PATCH 4/7] Using the existing hdds.datanode.block.delete.queue.limit configuration instead of new added configuration --- .../org/apache/hadoop/ozone/OzoneConfigKeys.java | 12 +----------- .../common/src/main/resources/ozone-default.xml | 9 --------- .../common/statemachine/DatanodeConfiguration.java | 6 +++++- .../hdds/scm/block/SCMBlockDeletingService.java | 9 ++++----- 4 files changed, 10 insertions(+), 26 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index adbdcacd94b..85bc566902b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -257,17 +257,7 @@ public final class OzoneConfigKeys { public static final String OZONE_BLOCK_DELETING_SERVICE_TIMEOUT = "ozone.block.deleting.service.timeout"; public static final String OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT - = "300s"; // 300s for default deleteBlocksCommandLimit - - /** - * A limit to restrict the total number of delete block commands queued on a - * datanode. Note this is intended to be a temporary config until we have a - * more dynamic way of limiting load. - */ - public static final String OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT = - "ozone.block.deleting.pending.command.limit"; - public static final int OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT_DEFAULT - = 5; // same with hdds.datanode.block.delete.queue.limit + = "300s"; // 300s for default public static final String OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT = "ozone.sst.filtering.service.timeout"; diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 77d54e976a2..9849ae88bdd 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -427,15 +427,6 @@ threads spawned for block deletions. - - ozone.block.deleting.pending.command.limit - 5 - OZONE, PERFORMANCE, SCM - A limit to restrict the total number of delete block commands - queued on a datanode. Note this is intended to be a temporary config - until we have a more dynamic way of limiting load. - - ozone.block.deleting.limit.per.task 1000 diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index 8bae562bbf2..036e6e48b6f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -149,7 +149,11 @@ public class DatanodeConfiguration { defaultValue = "5", tags = {DATANODE}, description = "The maximum number of block delete commands queued on " + - " a datanode" + " a datanode, This configuration is also used by the SCM to " + + "control whether to send delete commands to the DN. If the DN" + + " has more commands waiting in the queue than this value, " + + "the SCM will not send any new block delete commands. until the " + + "DN has processed some commands and the queue length is reduced." ) private int blockDeleteQueueLimit = 5; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index 8a221e078ad..6b3497e617e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; @@ -58,8 +59,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; @@ -114,9 +113,9 @@ public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog, HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT, TimeUnit.MILLISECONDS); - this.deleteBlocksPendingCommandLimit = conf.getInt( - OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT, - OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT_DEFAULT); + DatanodeConfiguration dnConf = + conf.getObject(DatanodeConfiguration.class); + this.deleteBlocksPendingCommandLimit = dnConf.getBlockDeleteQueueLimit(); this.clock = clock; this.deletedBlockLog = deletedBlockLog; this.nodeManager = nodeManager; From a3a41ddf21431f50a81db0248bdd53920c8775a5 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Thu, 29 Jun 2023 00:31:09 +0800 Subject: [PATCH 5/7] Fix unit test --- .../hdds/scm/block/TestSCMBlockDeletingService.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java index 26ba9a18fa8..a5c16501401 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -44,8 +45,6 @@ import java.util.UUID; import java.util.stream.Collectors; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT_DEFAULT; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.ArgumentMatchers.any; @@ -141,10 +140,9 @@ private void callDeletedBlockTransactionScanner() throws Exception { @Test public void testLimitCommandSending() throws Exception { - int pendingCommandLimit = conf.getInt( - OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT, - OZONE_BLOCK_DELETING_PENDING_COMMAND_LIMIT_DEFAULT); - + DatanodeConfiguration dnConf = + conf.getObject(DatanodeConfiguration.class); + int pendingCommandLimit = dnConf.getBlockDeleteQueueLimit(); // The number of commands pending on all Datanodes has reached the limit. when(nodeManager.getTotalDatanodeCommandCount(any(), From 092c8526d20c94530dc33dbc2c23a2291585d236 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Fri, 5 Jan 2024 05:09:32 +0800 Subject: [PATCH 6/7] Add more limit for the deleteBlocksCommand sending --- .../apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index 2df1d7e4b56..7271d9dcba6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -301,8 +301,8 @@ protected Set getDatanodesWithinCommandLimit( List datanodes) throws NodeNotFoundException { final Set included = new HashSet<>(); for (DatanodeDetails dn : datanodes) { - if (nodeManager.getTotalDatanodeCommandCount(dn, - Type.deleteBlocksCommand) < deleteBlocksPendingCommandLimit) { + if (nodeManager.getTotalDatanodeCommandCount(dn, Type.deleteBlocksCommand) < deleteBlocksPendingCommandLimit + && nodeManager.getCommandQueueCount(dn.getUuid(), Type.deleteBlocksCommand) < 2) { included.add(dn); } } From 7580f2f7f2336e425c70dae5ce25113f16172f00 Mon Sep 17 00:00:00 2001 From: "pony.chen" Date: Sat, 6 Jan 2024 04:49:51 +0800 Subject: [PATCH 7/7] Fix test --- .../hadoop/hdds/scm/block/TestSCMBlockDeletingService.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java index a5c16501401..3bd7ad00f6a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java @@ -19,10 +19,12 @@ package org.apache.hadoop.hdds.scm.block; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.ReconfigurationHandler; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; @@ -97,8 +99,8 @@ public void setup() throws Exception { service = spy(new SCMBlockDeletingService( mockDeletedBlockLog, nodeManager, eventPublisher, scmContext, - scmServiceManager, conf, metrics, Clock.system( - ZoneOffset.UTC))); + scmServiceManager, conf, conf.getObject(ScmConfig.class), metrics, Clock.system( + ZoneOffset.UTC), mock(ReconfigurationHandler.class))); when(service.shouldRun()).thenReturn(true); }