Skip to content

Commit

Permalink
feat: add exponential backoff strategy for retry (#156)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar authored Oct 1, 2020
1 parent 2b82245 commit e967762
Show file tree
Hide file tree
Showing 9 changed files with 520 additions and 51 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features
1. [#150](https://github.com/influxdata/influxdb-client-java/pull/150): flux-dsl: added support for an offset parameter to limit operator, aggregates accept only a 'column' parameter
1. [#156](https://github.com/influxdata/influxdb-client-java/pull/156): Added exponential backoff strategy for batching writes. Default value for `retryInterval` is 5_000 milliseconds.

### API
1. [#139](https://github.com/influxdata/influxdb-client-java/pull/148): Changed default port from 9999 to 8086
Expand Down
5 changes: 4 additions & 1 deletion client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,10 @@ The writes are processed in batches which are configurable by `WriteOptions`:
| **batchSize** | the number of data point to collect in batch | 1000 |
| **flushInterval** | the number of milliseconds before the batch is written | 1000 |
| **jitterInterval** | the number of milliseconds to increase the batch flush interval by a random amount | 0 |
| **retryInterval** | the number of milliseconds to retry unsuccessful write. The retry interval is used when the InfluxDB server does not specify "Retry-After" header.| 1000 |
| **retryInterval** | the number of milliseconds to retry unsuccessful write. The retry interval is used when the InfluxDB server does not specify "Retry-After" header.| 5000 |
| **maxRetries** | the number of max retries when write fails | 3 |
| **maxRetryDelay** | the maximum delay between each retry attempt in milliseconds | 180_000 |
| **exponentialBase** | the base for the exponential retry delay, the next delay is computed as `retryInterval * exponentialBase^(attempts-1) + random(jitterInterval)` | 5 |
| **bufferLimit** | the maximum number of unwritten stored points | 10000 |
| **backpressureStrategy** | the strategy to deal with buffer overflow | DROP_OLDEST |

Expand Down
88 changes: 85 additions & 3 deletions client/src/main/java/com/influxdb/client/WriteOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* <ul>
* <li>batchSize = 1000</li>
* <li>flushInterval = 1000 ms</li>
* <li>retryInterval = 1000 ms</li>
* <li>retryInterval = 5000 ms</li>
* <li>jitterInterval = 0</li>
* <li>bufferLimit = 10_000</li>
* </ul>
Expand All @@ -55,7 +55,10 @@ public final class WriteOptions {
private static final int DEFAULT_BATCH_SIZE = 1000;
private static final int DEFAULT_FLUSH_INTERVAL = 1000;
private static final int DEFAULT_JITTER_INTERVAL = 0;
private static final int DEFAULT_RETRY_INTERVAL = 1000;
private static final int DEFAULT_RETRY_INTERVAL = 5000;
private static final int DEFAULT_MAX_RETRIES = 3;
private static final int DEFAULT_MAX_RETRY_DELAY = 180_000;
private static final int DEFAULT_EXPONENTIAL_BASE = 5;
private static final int DEFAULT_BUFFER_LIMIT = 10000;

/**
Expand All @@ -67,6 +70,9 @@ public final class WriteOptions {
private final int flushInterval;
private final int jitterInterval;
private final int retryInterval;
private final int maxRetries;
private final int maxRetryDelay;
private final int exponentialBase;
private final int bufferLimit;
private final Scheduler writeScheduler;
private final BackpressureOverflowStrategy backpressureStrategy;
Expand Down Expand Up @@ -95,7 +101,6 @@ public int getJitterInterval() {
return jitterInterval;
}


/**
* The retry interval is used when the InfluxDB server does not specify "Retry-After" header.
* <br>
Expand All @@ -108,6 +113,38 @@ public int getRetryInterval() {
return retryInterval;
}

/**
* The number of max retries when write fails.
*
* @return number of max retries
* @see WriteOptions.Builder#maxRetries(int)
*/
public int getMaxRetries() {
return maxRetries;
}

/**
* The maximum delay between each retry attempt in milliseconds.
*
* @return maximum delay
* @see WriteOptions.Builder#maxRetryDelay(int)
*/
public int getMaxRetryDelay() {
return maxRetryDelay;
}

/**
* The base for the exponential retry delay.
*
* The next delay is computed as: retryInterval * exponentialBase^(attempts-1) + random(jitterInterval)
*
* @return exponential base
* @see WriteOptions.Builder#exponentialBase(int)
*/
public int getExponentialBase() {
return exponentialBase;
}

/**
* @return Maximum number of points stored in the retry buffer.
* @see WriteOptions.Builder#bufferLimit(int)
Expand Down Expand Up @@ -142,6 +179,9 @@ private WriteOptions(@Nonnull final Builder builder) {
flushInterval = builder.flushInterval;
jitterInterval = builder.jitterInterval;
retryInterval = builder.retryInterval;
maxRetries = builder.maxRetries;
maxRetryDelay = builder.maxRetryDelay;
exponentialBase = builder.exponentialBase;
bufferLimit = builder.bufferLimit;
writeScheduler = builder.writeScheduler;
backpressureStrategy = builder.backpressureStrategy;
Expand All @@ -167,6 +207,9 @@ public static class Builder {
private int flushInterval = DEFAULT_FLUSH_INTERVAL;
private int jitterInterval = DEFAULT_JITTER_INTERVAL;
private int retryInterval = DEFAULT_RETRY_INTERVAL;
private int maxRetries = DEFAULT_MAX_RETRIES;
private int maxRetryDelay = DEFAULT_MAX_RETRY_DELAY;
private int exponentialBase = DEFAULT_EXPONENTIAL_BASE;
private int bufferLimit = DEFAULT_BUFFER_LIMIT;
private Scheduler writeScheduler = Schedulers.newThread();
private BackpressureOverflowStrategy backpressureStrategy = BackpressureOverflowStrategy.DROP_OLDEST;
Expand Down Expand Up @@ -229,6 +272,45 @@ public Builder retryInterval(final int retryInterval) {
return this;
}

/**
* The number of max retries when write fails.
*
* @param maxRetries number of max retries
* @return {@code this}
*/
@Nonnull
public Builder maxRetries(final int maxRetries) {
Arguments.checkPositiveNumber(maxRetries, "maxRetries");
this.maxRetries = maxRetries;
return this;
}

/**
* The maximum delay between each retry attempt in milliseconds.
*
* @param maxRetryDelay maximum delay
* @return {@code this}
*/
@Nonnull
public Builder maxRetryDelay(final int maxRetryDelay) {
Arguments.checkPositiveNumber(maxRetryDelay, "maxRetryDelay");
this.maxRetryDelay = maxRetryDelay;
return this;
}

/**
* The base for the exponential retry delay.
*
* @param exponentialBase exponential base
* @return {@code this}
*/
@Nonnull
public Builder exponentialBase(final int exponentialBase) {
Arguments.checkPositiveNumber(exponentialBase, "exponentialBase");
this.exponentialBase = exponentialBase;
return this;
}

/**
* The client maintains a buffer for failed writes so that the writes will be retried later on. This may
* help to overcome temporary network problems or InfluxDB load spikes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public AbstractInfluxDBClient(@Nonnull final InfluxDBClientOptions options, @Non
this.gzipInterceptor = new GzipInterceptor();

this.okHttpClient = options.getOkHttpClient()
// Connection errors are handled by RetryAttempt in AbstractWriteClient.
.retryOnConnectionFailure(false)
.addInterceptor(new UserAgentInterceptor(clientType))
.addInterceptor(this.loggingInterceptor)
.addInterceptor(this.authenticateInterceptor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
*/
package com.influxdb.client.internal;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -67,7 +65,6 @@
public abstract class AbstractWriteClient extends AbstractRestClient implements AutoCloseable {

private static final Logger LOG = Logger.getLogger(AbstractWriteClient.class.getName());
private static final List<Integer> ABLE_TO_RETRY_ERRORS = Arrays.asList(429, 503);
private static final String CLOSED_EXCEPTION = "WriteApi is closed. "
+ "Data should be written before calling InfluxDBClient.close or WriteApi.close.";
private static final int DEFAULT_WAIT = 30_000;
Expand Down Expand Up @@ -264,7 +261,7 @@ private FlowableTransformer<BatchWriteItem, BatchWriteItem> jitter(@Nonnull fina
//
return source.delay((Function<BatchWriteItem, Flowable<Long>>) pointFlowable -> {

int delay = jitterDelay();
int delay = RetryAttempt.jitterDelay(writeOptions.getJitterInterval());

LOG.log(Level.FINEST, "Generated Jitter dynamic delay: {0}", delay);

Expand All @@ -273,11 +270,6 @@ private FlowableTransformer<BatchWriteItem, BatchWriteItem> jitter(@Nonnull fina
};
}

private int jitterDelay() {

return (int) (Math.random() * writeOptions.getJitterInterval());
}

private <T extends AbstractWriteEvent> void publish(@Nonnull final T event) {

Arguments.checkNotNull(event, "event");
Expand Down Expand Up @@ -518,49 +510,26 @@ private Function<Flowable<Throwable>, Publisher<?>> retryHandler(@Nonnull final
Objects.requireNonNull(writeOptions, "WriteOptions are required");
Objects.requireNonNull(retryScheduler, "RetryScheduler is required");

return errors -> errors.flatMap(throwable -> {

if (throwable instanceof HttpException) {

HttpException ie = (HttpException) throwable;

//
// The type of error is not able to retry
//
if (!ABLE_TO_RETRY_ERRORS.contains(ie.code())) {

return Flowable.error(throwable);
}
return errors -> errors
.zipWith(Flowable.range(1, writeOptions.getMaxRetries() + 1),
(throwable, count) -> new RetryAttempt(throwable, count, writeOptions))
.flatMap(attempt -> {

//
// Retry request
//
long retryInterval;
Throwable throwable = attempt.getThrowable();
if (attempt.isRetry()) {

String retryAfter = ((HttpException) throwable).response().headers().get("Retry-After");
if (retryAfter != null) {
long retryInterval = attempt.getRetryInterval();

retryInterval = TimeUnit.MILLISECONDS.convert(Integer.parseInt(retryAfter), TimeUnit.SECONDS);
} else {
publish(new WriteRetriableErrorEvent(throwable, retryInterval));

retryInterval = writeOptions.getRetryInterval();

String msg = "The InfluxDB does not specify \"Retry-After\". Use the default retryInterval: {0}";
LOG.log(Level.FINEST, msg, retryInterval);
}

retryInterval = retryInterval + jitterDelay();

publish(new WriteRetriableErrorEvent(throwable, retryInterval));

return Flowable.just("notify").delay(retryInterval, TimeUnit.MILLISECONDS, retryScheduler);
}
return Flowable.just("notify").delay(retryInterval, TimeUnit.MILLISECONDS, retryScheduler);
}

//
// This type of throwable is not able to retry
//
return Flowable.error(throwable);
});
//
// This type of throwable is not able to retry
//
return Flowable.error(throwable);
});
}

@Nonnull
Expand Down
Loading

0 comments on commit e967762

Please sign in to comment.