diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 0c93a5b642cf6..6ee9c2f949243 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -170,6 +170,7 @@ public int getMinimumRolloverTimeMs() { * the time unit */ public void setMinimumRolloverTime(int minimumRolloverTime, TimeUnit unit) { + checkArgument(minimumRolloverTime >= 0); this.minimumRolloverTimeMs = (int) unit.toMillis(minimumRolloverTime); checkArgument(maximumRolloverTimeMs >= minimumRolloverTimeMs, "Minimum rollover time needs to be less than maximum rollover time"); @@ -195,6 +196,7 @@ public long getMaximumRolloverTimeMs() { * the time unit */ public void setMaximumRolloverTime(int maximumRolloverTime, TimeUnit unit) { + checkArgument(maximumRolloverTime >= 0); this.maximumRolloverTimeMs = unit.toMillis(maximumRolloverTime); checkArgument(maximumRolloverTimeMs >= minimumRolloverTimeMs, "Maximum rollover time needs to be greater than minimum rollover time"); @@ -411,7 +413,8 @@ public ManagedLedgerConfig setThrottleMarkDelete(double throttleMarkDelete) { * time unit for retention time */ public ManagedLedgerConfig setRetentionTime(int retentionTime, TimeUnit unit) { - this.retentionTimeMs = unit.toMillis(retentionTime); + checkArgument(retentionTime >= -1, "The retention time should be -1, 0 or value > 0"); + this.retentionTimeMs = retentionTime != -1 ? unit.toMillis(retentionTime) : -1; return this; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java index cf3d7142d617e..5a6bc8017b7e0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl; +import static com.google.common.base.Preconditions.checkArgument; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean; @@ -41,6 +42,7 @@ public ManagedLedgerFactoryMBeanImpl(ManagedLedgerFactoryImpl factory) throws Ex } public void refreshStats(long period, TimeUnit unit) { + checkArgument(period >= 0); double seconds = unit.toMillis(period) / 1000.0; if (seconds <= 0.0) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java index 3935828ff3d80..bb4ad621211b1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl; +import static com.google.common.base.Preconditions.checkArgument; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -64,6 +65,7 @@ public ManagedLedgerMBeanImpl(ManagedLedgerImpl managedLedger) { } public void refreshStats(long period, TimeUnit unit) { + checkArgument(period >= 0); double seconds = unit.toMillis(period) / 1000.0; if (seconds <= 0.0) { // skip refreshing stats diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/DimensionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/DimensionStats.java index 1b6f981ca4e21..54965e4c783d8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/DimensionStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/DimensionStats.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.stats; +import static com.google.common.base.Preconditions.checkArgument; import static io.prometheus.client.CollectorRegistry.defaultRegistry; import io.prometheus.client.Collector; import io.prometheus.client.Summary; @@ -70,6 +71,7 @@ public DimensionStats(String name, long updateDurationInSec) { } public void recordDimensionTimeValue(long latency, TimeUnit unit) { + checkArgument(latency >= 0); summary.observe(unit.toMillis(latency)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java index 8ade2bc883f9a..c2816f5a2a013 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.stats.prometheus.metrics; +import static com.google.common.base.Preconditions.checkArgument; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.stats.Counter; @@ -57,6 +58,7 @@ public void addCount(long delta) { @Override public void addLatency(long eventLatency, TimeUnit unit) { + checkArgument(eventLatency >= 0); long valueMillis = unit.toMillis(eventLatency); counter.add(valueMillis); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java index 98bf2b819c2ba..88d923f74e196 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java @@ -257,7 +257,9 @@ private void checkTopicRetentionPolicy(String topicName, RetentionPolicies reten ManagedLedgerConfig config = pulsar.getBrokerService() .getManagedLedgerConfig(TopicName.get(topicName)).get(); Assert.assertEquals(config.getRetentionSizeInMB(), retentionPolicies.getRetentionSizeInMB()); - Assert.assertEquals(config.getRetentionTimeMillis(),retentionPolicies.getRetentionTimeInMinutes() * 60000L); + Assert.assertEquals(config.getRetentionTimeMillis(), retentionPolicies.getRetentionTimeInMinutes() < 0 + ? retentionPolicies.getRetentionTimeInMinutes() + : retentionPolicies.getRetentionTimeInMinutes() * 60000L); } private void testCompactionCursorRetention(String topic) throws Exception { diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java index 3b0efe64cf588..ea2bba166e6c5 100644 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java +++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java @@ -368,6 +368,7 @@ public ClientConfiguration setServiceUrl(String serviceUrl) { * @param unit the time unit in which the duration is defined */ public void setConnectionTimeout(int duration, TimeUnit unit) { + checkArgument(duration >= 0); confData.setConnectionTimeoutMs((int) unit.toMillis(duration)); } diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java index 81956db56f774..f2101b287043c 100644 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java +++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java @@ -69,6 +69,7 @@ public long getAckTimeoutMillis() { * @return {@link ConsumerConfiguration} */ public ConsumerConfiguration setAckTimeout(long ackTimeout, TimeUnit timeUnit) { + checkArgument(ackTimeout >= 0); long ackTimeoutMillis = timeUnit.toMillis(ackTimeout); checkArgument(ackTimeoutMillis >= minAckTimeoutMillis, "Ack timeout should be should be greater than " + minAckTimeoutMillis + " ms"); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java index 009fa67fbaa29..a9d913c016490 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.admin.internal; +import static com.google.common.base.Preconditions.checkArgument; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -200,18 +201,21 @@ public PulsarAdminBuilder tlsProtocols(Set tlsProtocols) { @Override public PulsarAdminBuilder connectionTimeout(int connectionTimeout, TimeUnit connectionTimeoutUnit) { + checkArgument(connectionTimeout >= 0); this.conf.setConnectionTimeoutMs((int) connectionTimeoutUnit.toMillis(connectionTimeout)); return this; } @Override public PulsarAdminBuilder readTimeout(int readTimeout, TimeUnit readTimeoutUnit) { + checkArgument(readTimeout >= 0); this.conf.setReadTimeoutMs((int) readTimeoutUnit.toMillis(readTimeout)); return this; } @Override public PulsarAdminBuilder requestTimeout(int requestTimeout, TimeUnit requestTimeoutUnit) { + checkArgument(requestTimeout >= 0); this.conf.setRequestTimeoutMs((int) requestTimeoutUnit.toMillis(requestTimeout)); return this; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java index 460478787eb10..dcc23163f11d1 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java @@ -168,6 +168,7 @@ public TransactionPendingAckStats getPendingAckStats(String topic, String subNam @Override public CompletableFuture> getSlowTransactionsByCoordinatorIdAsync( Integer coordinatorId, long timeout, TimeUnit timeUnit) { + checkArgument(timeout >= 0); WebTarget path = adminV3Transactions.path("slowTransactions"); path = path.path(timeUnit.toMillis(timeout) + ""); if (coordinatorId != null) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java index 68b781e67d29c..a1017e66760a5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java @@ -329,6 +329,7 @@ public AutoClusterFailoverBuilder switchBackDelay(long switchBackDelay, TimeUnit @Override public AutoClusterFailoverBuilder checkInterval(long interval, TimeUnit timeUnit) { + checkArgument(interval >= 0L, "check interval time must not be negative."); this.checkIntervalMs = timeUnit.toMillis(interval); return this; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index 9a86d81c93fab..48b3b2a6bda62 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -168,6 +168,7 @@ public ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit) { @Override public ClientBuilder lookupTimeout(int lookupTimeout, TimeUnit unit) { + checkArgument(lookupTimeout >= 0, "lookupTimeout must not be negative"); conf.setLookupTimeoutMs(unit.toMillis(lookupTimeout)); return this; } @@ -333,6 +334,7 @@ public ClientBuilder keepAliveInterval(int keepAliveInterval, TimeUnit unit) { @Override public ClientBuilder connectionTimeout(int duration, TimeUnit unit) { + checkArgument(duration >= 0, "connectionTimeout needs to be >= 0"); conf.setConnectionTimeoutMs((int) unit.toMillis(duration)); return this; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 4f29c0aa76c94..64097a14725d0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -448,6 +448,7 @@ public void acknowledge(Messages messages) throws PulsarClientException { @Override public void reconsumeLater(Message message, long delayTime, TimeUnit unit) throws PulsarClientException { + checkArgument(delayTime >= 0, "The delay time must not be negative."); reconsumeLater(message, null, delayTime, unit); } @@ -562,6 +563,7 @@ public CompletableFuture reconsumeLaterAsync(Message message, long dela @Override public CompletableFuture reconsumeLaterAsync( Message message, Map customProperties, long delayTime, TimeUnit unit) { + checkArgument(delayTime >= 0, "The delay time must not be negative."); if (!conf.isRetryEnable()) { return FutureUtil.failedFuture(new PulsarClientException(RECONSUME_LATER_ERROR_MSG)); } @@ -598,12 +600,14 @@ public CompletableFuture acknowledgeCumulativeAsync(Message message) { @Override public CompletableFuture reconsumeLaterCumulativeAsync(Message message, long delayTime, TimeUnit unit) { + checkArgument(delayTime >= 0, "The delay time must not be negative."); return reconsumeLaterCumulativeAsync(message, null, delayTime, unit); } @Override public CompletableFuture reconsumeLaterCumulativeAsync( Message message, Map customProperties, long delayTime, TimeUnit unit) { + checkArgument(delayTime >= 0, "The delay time must not be negative."); if (!conf.isRetryEnable()) { return FutureUtil.failedFuture(new PulsarClientException(RECONSUME_LATER_ERROR_MSG)); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index f644c6a18398f..895273a90c050 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -497,6 +497,7 @@ public ConsumerBuilder enableBatchIndexAcknowledgment(boolean batchIndexAckno @Override public ConsumerBuilder expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit) { + checkArgument(duration >= 0, "expired time of incomplete chunk message must not be negative"); conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration)); return this; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java index 080d328e3f02c..9d30108ec7a1d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java @@ -236,6 +236,7 @@ public ControlledClusterFailoverBuilder urlProviderHeader(Map he @Override public ControlledClusterFailoverBuilder checkInterval(long interval, @NonNull TimeUnit timeUnit) { + checkArgument(interval >= 0, "The check interval time must not be negative."); this.interval = timeUnit.toMillis(interval); return this; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index 2860cda0ceef1..fd01cef9a216f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -274,6 +274,7 @@ public ReaderBuilder autoAckOldestChunkedMessageOnQueueFull(boolean autoAckOl @Override public ReaderBuilder expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit) { + checkArgument(duration >= 0, "The expired time must not be negative."); conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration)); return this; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index 0b5174a015118..4ea6472b9d8b2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import static com.google.common.base.Preconditions.checkArgument; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; @@ -205,6 +206,7 @@ private void failPendingRequest() { } public CompletableFuture newTransactionAsync(long timeout, TimeUnit unit) { + checkArgument(timeout >= 0, "The timeout must not be negative."); if (LOG.isDebugEnabled()) { LOG.debug("New transaction with timeout in ms {}", unit.toMillis(timeout)); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index 026f8a1e69e0b..895949fdf32cc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -212,6 +212,7 @@ public TypedMessageBuilder disableReplication() { @Override public TypedMessageBuilder deliverAfter(long delay, TimeUnit unit) { + checkArgument(delay >= 0, "The delay time must not be negative."); return deliverAt(System.currentTimeMillis() + unit.toMillis(delay)); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java index 0ebfb91e62da7..bb0c4968fc7d4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl.transaction; +import static com.google.common.base.Preconditions.checkArgument; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -45,6 +46,7 @@ public TransactionBuilderImpl(PulsarClientImpl client, TransactionCoordinatorCli @Override public TransactionBuilder withTransactionTimeout(long txnTimeout, TimeUnit timeoutUnit) { + checkArgument(txnTimeout >= 0, "The txn timeout must not be negative."); this.txnTimeout = txnTimeout; this.timeUnit = timeoutUnit; return this; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ObjectCache.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ObjectCache.java index dc057ffe32daf..cf0620edf98df 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ObjectCache.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ObjectCache.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.util; +import static com.google.common.base.Preconditions.checkArgument; import java.time.Clock; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -33,6 +34,7 @@ public class ObjectCache implements Supplier { public ObjectCache(Supplier supplier, long cacheDuration, TimeUnit unit) { this(supplier, cacheDuration, unit, Clock.systemUTC()); + checkArgument(cacheDuration >= 0, "The cache duration must not be negative."); } ObjectCache(Supplier supplier, long cacheDuration, TimeUnit unit, Clock clock) { diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java index f0d279ef25050..998ef73fbd3e9 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.client; +import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Arrays; @@ -364,6 +365,7 @@ public synchronized CompletableFuture promiseAfter(int steps, List= 0, "The delay time must not be negative."); addEntryDelaysMillis.add(unit.toMillis(delay)); } diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/TestStatsProvider.java b/testmocks/src/main/java/org/apache/bookkeeper/client/TestStatsProvider.java index 4d08a7f80df5b..cf08cc635106e 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/TestStatsProvider.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/TestStatsProvider.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.client; +import static com.google.common.base.Preconditions.checkArgument; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -65,6 +66,7 @@ public void addCount(long delta) { @Override public void addLatency(long eventLatency, TimeUnit unit) { + checkArgument(eventLatency >= 0, "The event latency must not be negative."); long valueMillis = unit.toMillis(eventLatency); updateMax(val.addAndGet(valueMillis)); }