Skip to content

Commit

Permalink
InfluxDB Edge Timedata: handle persistence priority (#2663)
Browse files Browse the repository at this point in the history
* Ignore channels with a peristence priority less than a configured level
* Copy logic from  io.openems.edge.timedata.rrd4j.RecordWorker
* Modernize & cleanup code
  • Loading branch information
DerWahreKlinki authored Jun 16, 2024
1 parent ab73098 commit d7cf502
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;

import io.openems.common.channel.PersistencePriority;
import io.openems.shared.influxdb.QueryLanguageConfig;

@ObjectClassDefinition(//
Expand Down Expand Up @@ -46,5 +47,8 @@
@AttributeDefinition(name = "Read-Only mode", description = "Activates the read-only mode. Then no data is written to InfluxDB.")
boolean isReadOnly() default false;

@AttributeDefinition(name = "Persistence Priority", description = "Store only Channels with a Persistence Priority above this. Be aware that too many writes can wear-out your flash storage.")
PersistencePriority persistencePriority() default PersistencePriority.MEDIUM;

String webconsole_configurationFactory_nameHint() default "Timedata InfluxDB [{id}]";
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;

import io.openems.common.channel.AccessMode;
import io.openems.common.exceptions.OpenemsError.OpenemsNamedException;
import io.openems.common.oem.OpenemsEdgeOem;
import io.openems.common.timedata.Resolution;
Expand Down Expand Up @@ -127,56 +128,37 @@ protected synchronized void collectAndWriteChannelValues() {
final var point = Point.measurement(this.config.measurement()).time(timestamp, WritePrecision.MS);
final var addedAtLeastOneChannelValue = new AtomicBoolean(false);

this.componentManager.getEnabledComponents().stream().filter(OpenemsComponent::isEnabled)
.forEach(component -> {
component.channels().forEach(channel -> {
switch (channel.channelDoc().getAccessMode()) {
case WRITE_ONLY:
// ignore Write-Only-Channels
return;
case READ_ONLY:
case READ_WRITE:
break;
}

Optional<?> valueOpt = channel.value().asOptional();
if (!valueOpt.isPresent()) {
// ignore not available channels
return;
}
Object value = valueOpt.get();
var address = channel.address().toString();
try {
switch (channel.getType()) {
case BOOLEAN:
point.addField(address, (Boolean) value ? 1 : 0);
break;
case SHORT:
point.addField(address, (Short) value);
break;
case INTEGER:
point.addField(address, (Integer) value);
break;
case LONG:
point.addField(address, (Long) value);
break;
case FLOAT:
point.addField(address, (Float) value);
break;
case DOUBLE:
point.addField(address, (Double) value);
break;
case STRING:
point.addField(address, (String) value);
break;
}
} catch (IllegalArgumentException e) {
this.log.warn("Unable to add Channel [" + address + "] value [" + value + "]: "
+ e.getMessage());
return;
this.componentManager.getEnabledComponents().stream() //
.flatMap(component -> component.channels().stream()) //
.filter(channel -> {
final var doc = channel.channelDoc();
return doc.getPersistencePriority().isAtLeast(this.config.persistencePriority())
&& doc.getAccessMode() != AccessMode.WRITE_ONLY; //
}) //
.forEach(channel -> {
Optional<?> valueOpt = channel.value().asOptional();
if (!valueOpt.isPresent()) {
// ignore not available channels
return;
}
Object value = valueOpt.get();
var address = channel.address().toString();
try {
switch (channel.getType()) {
case BOOLEAN -> point.addField(address, (Boolean) value ? 1 : 0);
case SHORT -> point.addField(address, (Short) value);
case INTEGER -> point.addField(address, (Integer) value);
case LONG -> point.addField(address, (Long) value);
case FLOAT -> point.addField(address, (Float) value);
case DOUBLE -> point.addField(address, (Double) value);
case STRING -> point.addField(address, (String) value);
}
addedAtLeastOneChannelValue.set(true);
});
} catch (IllegalArgumentException e) {
this.log.warn(
"Unable to add Channel [" + address + "] value [" + value + "]: " + e.getMessage());
return;
}
addedAtLeastOneChannelValue.set(true);
});

if (addedAtLeastOneChannelValue.get()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.openems.edge.timedata.influxdb;

import io.openems.common.channel.PersistencePriority;
import io.openems.common.test.AbstractComponentConfig;
import io.openems.shared.influxdb.QueryLanguageConfig;

Expand All @@ -17,6 +18,7 @@ protected static class Builder {
private String url;
private QueryLanguageConfig queryLanguage;
private String measurement;
private PersistencePriority persistencePriority;

private Builder() {
}
Expand Down Expand Up @@ -71,6 +73,11 @@ public Builder setMeasurement(String measurement) {
return this;
}

public Builder setPersistencePriority(PersistencePriority persistencePriority) {
this.persistencePriority = persistencePriority;
return this;
}

public MyConfig build() {
return new MyConfig(this);
}
Expand Down Expand Up @@ -132,6 +139,11 @@ public boolean isReadOnly() {
return this.builder.isReadOnly;
}

@Override
public PersistencePriority persistencePriority() {
return this.builder.persistencePriority;
}

@Override
public String measurement() {
return this.builder.measurement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.junit.Test;

import io.openems.common.channel.PersistencePriority;
import io.openems.common.oem.DummyOpenemsEdgeOem;
import io.openems.edge.common.test.AbstractComponentTest.TestCase;
import io.openems.edge.common.test.ComponentTest;
Expand Down Expand Up @@ -30,6 +31,7 @@ public void test() throws Exception {
.setNoOfCycles(1) //
.setMaxQueueSize(5000) //
.setReadOnly(false) //
.setPersistencePriority(PersistencePriority.MEDIUM)
.build()) //
.next(new TestCase()) //
;
Expand Down

0 comments on commit d7cf502

Please sign in to comment.