Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][sec] Add a check for the input time value #22023

Merged
merged 4 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public int getMinimumRolloverTimeMs() {
* the time unit
*/
public void setMinimumRolloverTime(int minimumRolloverTime, TimeUnit unit) {
checkArgument(minimumRolloverTime >= 0);
this.minimumRolloverTimeMs = (int) unit.toMillis(minimumRolloverTime);
checkArgument(maximumRolloverTimeMs >= minimumRolloverTimeMs,
"Minimum rollover time needs to be less than maximum rollover time");
Expand All @@ -195,6 +196,7 @@ public long getMaximumRolloverTimeMs() {
* the time unit
*/
public void setMaximumRolloverTime(int maximumRolloverTime, TimeUnit unit) {
checkArgument(maximumRolloverTime >= 0);
this.maximumRolloverTimeMs = unit.toMillis(maximumRolloverTime);
checkArgument(maximumRolloverTimeMs >= minimumRolloverTimeMs,
"Maximum rollover time needs to be greater than minimum rollover time");
Expand Down Expand Up @@ -411,7 +413,8 @@ public ManagedLedgerConfig setThrottleMarkDelete(double throttleMarkDelete) {
* time unit for retention time
*/
public ManagedLedgerConfig setRetentionTime(int retentionTime, TimeUnit unit) {
this.retentionTimeMs = unit.toMillis(retentionTime);
checkArgument(retentionTime >= -1, "The retention time should be -1, 0 or value > 0");
this.retentionTimeMs = retentionTime != -1 ? unit.toMillis(retentionTime) : -1;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
Expand All @@ -41,6 +42,7 @@ public ManagedLedgerFactoryMBeanImpl(ManagedLedgerFactoryImpl factory) throws Ex
}

public void refreshStats(long period, TimeUnit unit) {
checkArgument(period >= 0);
double seconds = unit.toMillis(period) / 1000.0;

if (seconds <= 0.0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand Down Expand Up @@ -64,6 +65,7 @@ public ManagedLedgerMBeanImpl(ManagedLedgerImpl managedLedger) {
}

public void refreshStats(long period, TimeUnit unit) {
checkArgument(period >= 0);
double seconds = unit.toMillis(period) / 1000.0;
if (seconds <= 0.0) {
// skip refreshing stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats;

import static com.google.common.base.Preconditions.checkArgument;
import static io.prometheus.client.CollectorRegistry.defaultRegistry;
import io.prometheus.client.Collector;
import io.prometheus.client.Summary;
Expand Down Expand Up @@ -70,6 +71,7 @@ public DimensionStats(String name, long updateDurationInSec) {
}

public void recordDimensionTimeValue(long latency, TimeUnit unit) {
checkArgument(latency >= 0);
summary.observe(unit.toMillis(latency));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats.prometheus.metrics;

import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.stats.Counter;
Expand Down Expand Up @@ -57,6 +58,7 @@ public void addCount(long delta) {

@Override
public void addLatency(long eventLatency, TimeUnit unit) {
checkArgument(eventLatency >= 0);
long valueMillis = unit.toMillis(eventLatency);
counter.add(valueMillis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ private void checkTopicRetentionPolicy(String topicName, RetentionPolicies reten
ManagedLedgerConfig config = pulsar.getBrokerService()
.getManagedLedgerConfig(TopicName.get(topicName)).get();
Assert.assertEquals(config.getRetentionSizeInMB(), retentionPolicies.getRetentionSizeInMB());
Assert.assertEquals(config.getRetentionTimeMillis(),retentionPolicies.getRetentionTimeInMinutes() * 60000L);
Assert.assertEquals(config.getRetentionTimeMillis(), retentionPolicies.getRetentionTimeInMinutes() < 0
? retentionPolicies.getRetentionTimeInMinutes()
: retentionPolicies.getRetentionTimeInMinutes() * 60000L);
}

private void testCompactionCursorRetention(String topic) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ public ClientConfiguration setServiceUrl(String serviceUrl) {
* @param unit the time unit in which the duration is defined
*/
public void setConnectionTimeout(int duration, TimeUnit unit) {
checkArgument(duration >= 0);
confData.setConnectionTimeoutMs((int) unit.toMillis(duration));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public long getAckTimeoutMillis() {
* @return {@link ConsumerConfiguration}
*/
public ConsumerConfiguration setAckTimeout(long ackTimeout, TimeUnit timeUnit) {
checkArgument(ackTimeout >= 0);
long ackTimeoutMillis = timeUnit.toMillis(ackTimeout);
checkArgument(ackTimeoutMillis >= minAckTimeoutMillis,
"Ack timeout should be should be greater than " + minAckTimeoutMillis + " ms");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.admin.internal;

import static com.google.common.base.Preconditions.checkArgument;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -200,18 +201,21 @@ public PulsarAdminBuilder tlsProtocols(Set<String> tlsProtocols) {

@Override
public PulsarAdminBuilder connectionTimeout(int connectionTimeout, TimeUnit connectionTimeoutUnit) {
checkArgument(connectionTimeout >= 0);
this.conf.setConnectionTimeoutMs((int) connectionTimeoutUnit.toMillis(connectionTimeout));
return this;
}

@Override
public PulsarAdminBuilder readTimeout(int readTimeout, TimeUnit readTimeoutUnit) {
checkArgument(readTimeout >= 0);
this.conf.setReadTimeoutMs((int) readTimeoutUnit.toMillis(readTimeout));
return this;
}

@Override
public PulsarAdminBuilder requestTimeout(int requestTimeout, TimeUnit requestTimeoutUnit) {
checkArgument(requestTimeout >= 0);
this.conf.setRequestTimeoutMs((int) requestTimeoutUnit.toMillis(requestTimeout));
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public TransactionPendingAckStats getPendingAckStats(String topic, String subNam
@Override
public CompletableFuture<Map<String, TransactionMetadata>> getSlowTransactionsByCoordinatorIdAsync(
Integer coordinatorId, long timeout, TimeUnit timeUnit) {
checkArgument(timeout >= 0);
WebTarget path = adminV3Transactions.path("slowTransactions");
path = path.path(timeUnit.toMillis(timeout) + "");
if (coordinatorId != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ public AutoClusterFailoverBuilder switchBackDelay(long switchBackDelay, TimeUnit

@Override
public AutoClusterFailoverBuilder checkInterval(long interval, TimeUnit timeUnit) {
checkArgument(interval >= 0L, "check interval time must not be negative.");
this.checkIntervalMs = timeUnit.toMillis(interval);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit) {

@Override
public ClientBuilder lookupTimeout(int lookupTimeout, TimeUnit unit) {
checkArgument(lookupTimeout >= 0, "lookupTimeout must not be negative");
conf.setLookupTimeoutMs(unit.toMillis(lookupTimeout));
return this;
}
Expand Down Expand Up @@ -333,6 +334,7 @@ public ClientBuilder keepAliveInterval(int keepAliveInterval, TimeUnit unit) {

@Override
public ClientBuilder connectionTimeout(int duration, TimeUnit unit) {
checkArgument(duration >= 0, "connectionTimeout needs to be >= 0");
conf.setConnectionTimeoutMs((int) unit.toMillis(duration));
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ public void acknowledge(Messages<?> messages) throws PulsarClientException {

@Override
public void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException {
checkArgument(delayTime >= 0, "The delay time must not be negative.");
reconsumeLater(message, null, delayTime, unit);
}

Expand Down Expand Up @@ -562,6 +563,7 @@ public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long dela
@Override
public CompletableFuture<Void> reconsumeLaterAsync(
Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) {
checkArgument(delayTime >= 0, "The delay time must not be negative.");
if (!conf.isRetryEnable()) {
return FutureUtil.failedFuture(new PulsarClientException(RECONSUME_LATER_ERROR_MSG));
}
Expand Down Expand Up @@ -598,12 +600,14 @@ public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message) {

@Override
public CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long delayTime, TimeUnit unit) {
checkArgument(delayTime >= 0, "The delay time must not be negative.");
return reconsumeLaterCumulativeAsync(message, null, delayTime, unit);
}

@Override
public CompletableFuture<Void> reconsumeLaterCumulativeAsync(
Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) {
checkArgument(delayTime >= 0, "The delay time must not be negative.");
if (!conf.isRetryEnable()) {
return FutureUtil.failedFuture(new PulsarClientException(RECONSUME_LATER_ERROR_MSG));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ public ConsumerBuilder<T> enableBatchIndexAcknowledgment(boolean batchIndexAckno

@Override
public ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit) {
checkArgument(duration >= 0, "expired time of incomplete chunk message must not be negative");
conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration));
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ public ControlledClusterFailoverBuilder urlProviderHeader(Map<String, String> he

@Override
public ControlledClusterFailoverBuilder checkInterval(long interval, @NonNull TimeUnit timeUnit) {
checkArgument(interval >= 0, "The check interval time must not be negative.");
this.interval = timeUnit.toMillis(interval);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ public ReaderBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean autoAckOl

@Override
public ReaderBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit) {
checkArgument(duration >= 0, "The expired time must not be negative.");
conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration));
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
Expand Down Expand Up @@ -205,6 +206,7 @@ private void failPendingRequest() {
}

public CompletableFuture<TxnID> newTransactionAsync(long timeout, TimeUnit unit) {
checkArgument(timeout >= 0, "The timeout must not be negative.");
if (LOG.isDebugEnabled()) {
LOG.debug("New transaction with timeout in ms {}", unit.toMillis(timeout));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ public TypedMessageBuilder<T> disableReplication() {

@Override
public TypedMessageBuilder<T> deliverAfter(long delay, TimeUnit unit) {
checkArgument(delay >= 0, "The delay time must not be negative.");
return deliverAt(System.currentTimeMillis() + unit.toMillis(delay));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl.transaction;

import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -45,6 +46,7 @@ public TransactionBuilderImpl(PulsarClientImpl client, TransactionCoordinatorCli

@Override
public TransactionBuilder withTransactionTimeout(long txnTimeout, TimeUnit timeoutUnit) {
checkArgument(txnTimeout >= 0, "The txn timeout must not be negative.");
this.txnTimeout = txnTimeout;
this.timeUnit = timeoutUnit;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.util;

import static com.google.common.base.Preconditions.checkArgument;
import java.time.Clock;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand All @@ -33,6 +34,7 @@ public class ObjectCache<T> implements Supplier<T> {

public ObjectCache(Supplier<T> supplier, long cacheDuration, TimeUnit unit) {
this(supplier, cacheDuration, unit, Clock.systemUTC());
checkArgument(cacheDuration >= 0, "The cache duration must not be negative.");
}

ObjectCache(Supplier<T> supplier, long cacheDuration, TimeUnit unit, Clock clock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.client;

import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -364,6 +365,7 @@ public synchronized CompletableFuture<Void> promiseAfter(int steps, List<Complet
}

public synchronized void addEntryDelay(long delay, TimeUnit unit) {
checkArgument(delay >= 0, "The delay time must not be negative.");
addEntryDelaysMillis.add(unit.toMillis(delay));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.client;

import static com.google.common.base.Preconditions.checkArgument;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -65,6 +66,7 @@ public void addCount(long delta) {

@Override
public void addLatency(long eventLatency, TimeUnit unit) {
checkArgument(eventLatency >= 0, "The event latency must not be negative.");
long valueMillis = unit.toMillis(eventLatency);
updateMax(val.addAndGet(valueMillis));
}
Expand Down
Loading