Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: #295 Create and Drop retention policies #351

Merged
merged 3 commits into from
Jul 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
- InfluxDBResultMapper now is able to process QueryResult created when a GROUP BY clause was used [PR #345](https://github.com/influxdata/influxdb-java/pull/345)
- InfluxDB will now handle the timestamp on its own if none is provided [PR#350](https://github.com/influxdata/influxdb-java/pull/350)

#### Features

- API: add InfluxDB#createRetentionPolicy and InfluxDB#dropRetentionPolicy to be able to create and drop Retention Policies [PR #351](https://github.com/influxdata/influxdb-java/pull/351)

## v2.7 [2017-06-26]

Expand Down
22 changes: 17 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ All low level REST Api calls are available.
InfluxDB influxDB = InfluxDBFactory.connect("http://172.17.0.2:8086", "root", "root");
String dbName = "aTimeSeries";
influxDB.createDatabase(dbName);
String rpName = "aRetentionPolicy";
influxDB.createRetentionPolicy(rpName, dbName, "30d", "30m", 2, true);

BatchPoints batchPoints = BatchPoints
.database(dbName)
.tag("async", "true")
.retentionPolicy("autogen")
.retentionPolicy(rpName)
.consistency(ConsistencyLevel.ALL)
.build();
Point point1 = Point.measurement("cpu")
Expand All @@ -43,16 +45,22 @@ batchPoints.point(point2);
influxDB.write(batchPoints);
Query query = new Query("SELECT idle FROM cpu", dbName);
influxDB.query(query);
influxDB.dropRetentionPolicy(rpName, dbName);
influxDB.deleteDatabase(dbName);
```
Note : If you are using influxdb < 1.0.0, you should use 'default' instead of 'autogen'
Note:
* APIs to create and drop retention policies are supported only in versions > 2.7
* If you are using influxdb < 2.8, you should use retention policy: 'autogen'
* If you are using influxdb < 1.0.0, you should use 'default' instead of 'autogen'

If your application produces only single Points, you can enable the batching functionality of influxdb-java:

```java
InfluxDB influxDB = InfluxDBFactory.connect("http://172.17.0.2:8086", "root", "root");
String dbName = "aTimeSeries";
influxDB.createDatabase(dbName);
String rpName = "aRetentionPolicy";
influxDB.createRetentionPolicy(rpName, dbName, "30d", "30m", 2, true);

// Flush every 2000 Points, at least every 100ms
influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS);
Expand All @@ -69,10 +77,11 @@ Point point2 = Point.measurement("disk")
.addField("free", 1L)
.build();

influxDB.write(dbName, "autogen", point1);
influxDB.write(dbName, "autogen", point2);
influxDB.write(dbName, rpName, point1);
influxDB.write(dbName, rpName, point2);
Query query = new Query("SELECT idle FROM cpu", dbName);
influxDB.query(query);
influxDB.dropRetentionPolicy(rpName, dbName);
influxDB.deleteDatabase(dbName);
```
Note that the batching functionality creates an internal thread pool that needs to be shutdown explicitly as part of a graceful application shut-down, or the application will not shut down properly. To do so simply call: ```influxDB.close()```
Expand All @@ -85,7 +94,9 @@ InfluxDB influxDB = InfluxDBFactory.connect("http://172.17.0.2:8086", "root", "r
String dbName = "aTimeSeries";
influxDB.createDatabase(dbName);
influxDB.setDatabase(dbName);
influxDB.setRetentionPolicy("autogen");
String rpName = "aRetentionPolicy";
influxDB.createRetentionPolicy(rpName, dbName, "30d", "30m", 2, true);
influxDB.setRetentionPolicy(rpName);

// Flush every 2000 Points, at least every 100ms
influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS);
Expand All @@ -105,6 +116,7 @@ influxDB.write(Point.measurement("disk")

Query query = new Query("SELECT idle FROM cpu", dbName);
influxDB.query(query);
influxDB.dropRetentionPolicy(rpName, dbName);
influxDB.deleteDatabase(dbName);
```

Expand Down
40 changes: 40 additions & 0 deletions src/main/java/org/influxdb/InfluxDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -395,4 +395,44 @@ public void write(final String database, final String retentionPolicy,
* @return the InfluxDB instance to be able to use it in a fluent manner.
*/
public InfluxDB setRetentionPolicy(final String retentionPolicy);

/**
* Creates a retentionPolicy.
* @param rpName the name of the retentionPolicy(rp)
* @param database the name of the database
* @param duration the duration of the rp
* @param shardDuration the shardDuration
* @param replicationFactor the replicationFactor of the rp
* @param isDefault if the rp is the default rp for the database or not
*/
public void createRetentionPolicy(final String rpName, final String database, final String duration,
final String shardDuration, final int replicationFactor, final boolean isDefault);

/**
* Creates a retentionPolicy. (optional shardDuration)
* @param rpName the name of the retentionPolicy(rp)
* @param database the name of the database
* @param duration the duration of the rp
* @param replicationFactor the replicationFactor of the rp
* @param isDefault if the rp is the default rp for the database or not
*/
public void createRetentionPolicy(final String rpName, final String database, final String duration,
final int replicationFactor, final boolean isDefault);

/**
* Creates a retentionPolicy. (optional shardDuration and isDefault)
* @param rpName the name of the retentionPolicy(rp)
* @param database the name of the database
* @param duration the duration of the rp
* @param replicationFactor the replicationFactor of the rp
*/
public void createRetentionPolicy(final String rpName, final String database, final String duration,
final String shardDuration, final int replicationFactor);

/**
* Drops a retentionPolicy in a database.
* @param rpName the name of the retentionPolicy
* @param database the name of the database
*/
public void dropRetentionPolicy(final String rpName, final String database);
}
69 changes: 69 additions & 0 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -557,4 +557,73 @@ public InfluxDB setRetentionPolicy(final String retentionPolicy) {
this.retentionPolicy = retentionPolicy;
return this;
}

/**
* {@inheritDoc}
*/
@Override
public void createRetentionPolicy(final String rpName, final String database, final String duration,
final String shardDuration, final int replicationFactor, final boolean isDefault) {
Preconditions.checkNonEmptyString(rpName, "retentionPolicyName");
Preconditions.checkNonEmptyString(database, "database");
Preconditions.checkNonEmptyString(duration, "retentionDuration");
Preconditions.checkDuration(duration, "retentionDuration");
if (shardDuration != null && !shardDuration.isEmpty()) {
Preconditions.checkDuration(shardDuration, "shardDuration");
}
Preconditions.checkPositiveNumber(replicationFactor, "replicationFactor");

StringBuilder queryBuilder = new StringBuilder("CREATE RETENTION POLICY \"");
queryBuilder.append(rpName)
.append("\" ON \"")
.append(database)
.append("\" DURATION ")
.append(duration)
.append(" REPLICATION ")
.append(replicationFactor);
if (shardDuration != null && !shardDuration.isEmpty()) {
queryBuilder.append(" SHARD DURATION ");
queryBuilder.append(shardDuration);
}
if (isDefault) {
queryBuilder.append(" DEFAULT");
}
execute(this.influxDBService.postQuery(this.username, this.password, Query.encode(queryBuilder.toString())));
}

/**
* {@inheritDoc}
*/
@Override
public void createRetentionPolicy(final String rpName, final String database, final String duration,
final int replicationFactor, final boolean isDefault) {
createRetentionPolicy(rpName, database, duration, null, replicationFactor, isDefault);
}

/**
* {@inheritDoc}
*/
@Override
public void createRetentionPolicy(final String rpName, final String database, final String duration,
final String shardDuration, final int replicationFactor) {
createRetentionPolicy(rpName, database, duration, null, replicationFactor, false);
}

/**
* {@inheritDoc}
* @param rpName the name of the retentionPolicy
* @param database the name of the database
*/
@Override
public void dropRetentionPolicy(final String rpName, final String database) {
Preconditions.checkNonEmptyString(rpName, "retentionPolicyName");
Preconditions.checkNonEmptyString(database, "database");
StringBuilder queryBuilder = new StringBuilder("DROP RETENTION POLICY \"");
queryBuilder.append(rpName)
.append("\" ON \"")
.append(database)
.append("\"");
execute(this.influxDBService.postQuery(this.username, this.password,
Query.encode(queryBuilder.toString())));
}
}
13 changes: 13 additions & 0 deletions src/main/java/org/influxdb/impl/Preconditions.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,17 @@ public static void checkPositiveNumber(final Number number, final String name) t
throw new IllegalArgumentException("Expecting a positive number for " + name);
}
}

/**
* Enforces that the duration is a valid influxDB duration.
* @param duration the duration to test
* @param name variable name for reporting
* @throws IllegalArgumentException
*/
public static void checkDuration(final String duration, final String name) throws IllegalArgumentException {
if (!duration.matches("(\\d+[wdmhs])+")) {
throw new IllegalArgumentException("Invalid InfluxDB duration: " + duration
+ "for " + name);
}
}
}
30 changes: 30 additions & 0 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -715,4 +715,34 @@ public void testFlushThrowsIfBatchingIsNotEnabled() {
this.influxDB.flush();
}

/**
* Test creation and deletion of retention policies
*/
@Test
public void testCreateDropRetentionPolicies() {
String dbName = "rpTest_" + System.currentTimeMillis();
this.influxDB.createDatabase(dbName);

this.influxDB.createRetentionPolicy("testRP1", dbName, "30h", 2, false);
this.influxDB.createRetentionPolicy("testRP2", dbName, "10d", "20m", 2, false);
this.influxDB.createRetentionPolicy("testRP3", dbName, "2d4w", "20m", 2);

Query query = new Query("SHOW RETENTION POLICIES", dbName);
QueryResult result = this.influxDB.query(query);
Assert.assertNull(result.getError());
List<List<Object>> retentionPolicies = result.getResults().get(0).getSeries().get(0).getValues();
Assert.assertTrue(retentionPolicies.get(1).contains("testRP1"));
Assert.assertTrue(retentionPolicies.get(2).contains("testRP2"));
Assert.assertTrue(retentionPolicies.get(3).contains("testRP3"));

this.influxDB.dropRetentionPolicy("testRP1", dbName);
this.influxDB.dropRetentionPolicy("testRP2", dbName);
this.influxDB.dropRetentionPolicy("testRP3", dbName);

result = this.influxDB.query(query);
Assert.assertNull(result.getError());
retentionPolicies = result.getResults().get(0).getSeries().get(0).getValues();
Assert.assertTrue(retentionPolicies.size() == 1);
}

}