Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

imlement issue #439 : Retry capability for writing of BatchPoints #503

Merged
merged 1 commit into from
Sep 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/main/java/org/influxdb/InfluxDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,23 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, final Ti
*/
public void write(final BatchPoints batchPoints);

/**
* Write a set of Points to the influxdb database with the new (>= 0.9.0rc32) lineprotocol.
*
* If batching is enabled with appropriate {@code BatchOptions} settings
* ({@code BatchOptions.bufferLimit} greater than {@code BatchOptions.actions})
* This method will try to retry in case of some recoverable errors.
* Otherwise it just works as {@link #write(BatchPoints)}
*
* @see <a href="https://github.com/influxdb/influxdb/pull/2696">2696</a>
* @see <a href="https://github.com/influxdata/influxdb-java/wiki/Handling-errors-of-InfluxDB-under-high-load">
* Retry worth errors</a>
*
* @param batchPoints
* the points to write in BatchPoints.
*/
public void writeWithRetry(final BatchPoints batchPoints);

/**
* Write a set of Points to the influxdb database with the string records.
*
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/influxdb/impl/BatchProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -376,4 +376,8 @@ public ConsistencyLevel getConsistencyLevel() {
return consistencyLevel;
}

BatchWriter getBatchWriter() {
return batchWriter;
}

}
9 changes: 9 additions & 0 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -425,6 +426,14 @@ public void write(final BatchPoints batchPoints) {
lineProtocol));
}

@Override
public void writeWithRetry(final BatchPoints batchPoints) {
if (isBatchEnabled()) {
batchProcessor.getBatchWriter().write(Collections.singleton(batchPoints));
} else {
write(batchPoints);
}
}

@Override
public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency,
Expand Down
111 changes: 109 additions & 2 deletions src/test/java/org/influxdb/BatchOptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,100 @@ protected void check(InvocationOnMock invocation) {
spy.deleteDatabase(dbName);
}
}



@Test
public void testWriteWithRetryOnRecoverableError() throws InterruptedException {
String dbName = "write_unittest_" + System.currentTimeMillis();
InfluxDB spy = spy(influxDB);
doAnswer(new Answer() {
boolean firstCall = true;

@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
if (firstCall) {
firstCall = false;
throw new InfluxDBException("error");
} else {
return invocation.callRealMethod();
}
}
}).when(spy).write(any(BatchPoints.class));
try {
BiConsumer<Iterable<Point>, Throwable> mockHandler = mock(BiConsumer.class);
BatchOptions options = BatchOptions.DEFAULTS.exceptionHandler(mockHandler).flushDuration(100);

spy.createDatabase(dbName);
spy.setDatabase(dbName);
spy.enableBatch(options);

BatchPoints batchPoints = createBatchPoints(dbName, "m0", 200);
spy.writeWithRetry(batchPoints);
Thread.sleep(500);
verify(mockHandler, never()).accept(any(), any());

verify(spy, times(2)).write(any(BatchPoints.class));

QueryResult result = influxDB.query(new Query("select * from m0", dbName));
Assertions.assertNotNull(result.getResults().get(0).getSeries());
Assertions.assertEquals(200, result.getResults().get(0).getSeries().get(0).getValues().size());

} finally {
spy.disableBatch();
spy.deleteDatabase(dbName);
}
}

@Test
public void testWriteWithRetryOnUnrecoverableError() throws InterruptedException {

String dbName = "write_unittest_" + System.currentTimeMillis();
InfluxDB spy = spy((InfluxDB) influxDB);
doThrow(DatabaseNotFoundException.class).when(spy).write(any(BatchPoints.class));

try {
BiConsumer<Iterable<Point>, Throwable> mockHandler = mock(BiConsumer.class);
BatchOptions options = BatchOptions.DEFAULTS.exceptionHandler(mockHandler).flushDuration(100);

spy.createDatabase(dbName);
spy.setDatabase(dbName);
spy.enableBatch(options);

BatchPoints batchPoints = createBatchPoints(dbName, "m0", 200);
spy.writeWithRetry(batchPoints);
Thread.sleep(500);

verify(mockHandler, times(1)).accept(any(), any());

QueryResult result = influxDB.query(new Query("select * from m0", dbName));
Assertions.assertNull(result.getResults().get(0).getSeries());
Assertions.assertNull(result.getResults().get(0).getError());
} finally {
spy.disableBatch();
spy.deleteDatabase(dbName);
}

}

@Test
public void testWriteWithRetryOnBatchingNotEnabled() {
String dbName = "write_unittest_" + System.currentTimeMillis();
try {

influxDB.createDatabase(dbName);
influxDB.setDatabase(dbName);

BatchPoints batchPoints = createBatchPoints(dbName, "m0", 200);
influxDB.writeWithRetry(batchPoints);

QueryResult result = influxDB.query(new Query("select * from m0", dbName));
Assertions.assertNotNull(result.getResults().get(0).getSeries());
Assertions.assertEquals(200, result.getResults().get(0).getSeries().get(0).getValues().size());
} finally {
influxDB.deleteDatabase(dbName);
}

}
void writeSomePoints(InfluxDB influxDB, String measurement, int firstIndex, int lastIndex) {
for (int i = firstIndex; i <= lastIndex; i++) {
Point point = Point.measurement(measurement)
Expand Down Expand Up @@ -514,7 +607,21 @@ void write20Points(InfluxDB influxDB) {
void writeSomePoints(InfluxDB influxDB, int n) {
writeSomePoints(influxDB, 0, n - 1);
}


private BatchPoints createBatchPoints(String dbName, String measurement, int n) {
BatchPoints batchPoints = BatchPoints.database(dbName).build();
for (int i = 1; i <= n; i++) {
Point point = Point.measurement(measurement)
.time(i,TimeUnit.MILLISECONDS)
.addField("f1", (double) i)
.addField("f2", (double) (i) * 1.1)
.addField("f3", "f_v3").build();
batchPoints.point(point);
}

return batchPoints;
}

static String createErrorBody(String errorMessage) {
return MessageFormat.format("'{' \"error\": \"{0}\" '}'", errorMessage);
}
Expand Down