diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java index 0c2e6507bd9..ef7e4f67894 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java @@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; @@ -70,8 +71,8 @@ public class ScheduleMessageService extends ConfigManager { private static final long WAIT_FOR_SHUTDOWN = 5000L; private static final long DELAY_FOR_A_SLEEP = 10L; - private final ConcurrentMap delayLevelTable = - new ConcurrentHashMap<>(32); + private final ConcurrentSkipListMap delayLevelTable = + new ConcurrentSkipListMap<>(); private final ConcurrentMap offsetTable = new ConcurrentHashMap<>(32); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index c0d00d86409..e907a1ccc3f 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -24,7 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -199,7 +199,7 @@ public class ProxyConfig implements ConfigFile { private boolean useDelayLevel = false; private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; - private transient Map delayLevelTable = new ConcurrentHashMap<>(); + private transient ConcurrentSkipListMap delayLevelTable = new ConcurrentSkipListMap<>(); private String metricCollectorMode = MetricCollectorMode.OFF.getModeString(); // Example address: 127.0.0.1:1234 @@ -291,7 +291,7 @@ public int computeDelayLevel(long timeMillis) { } public void parseDelayLevel() { - this.delayLevelTable = new ConcurrentHashMap<>(); + this.delayLevelTable = new ConcurrentSkipListMap<>(); Map timeUnitTable = new HashMap<>(); timeUnitTable.put("s", 1000L); timeUnitTable.put("m", 1000L * 60); @@ -1124,7 +1124,7 @@ public void setMessageDelayLevel(String messageDelayLevel) { this.messageDelayLevel = messageDelayLevel; } - public Map getDelayLevelTable() { + public ConcurrentSkipListMap getDelayLevelTable() { return delayLevelTable; } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index dc5f312e5a6..aa72b1617d1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -46,6 +46,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -190,8 +191,8 @@ public class DefaultMessageStore implements MessageStore { private SendMessageBackHook sendMessageBackHook; - private final ConcurrentMap delayLevelTable = - new ConcurrentHashMap<>(32); + private final ConcurrentSkipListMap delayLevelTable = + new ConcurrentSkipListMap<>(); private int maxDelayLevel;