Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InfluxDB Edge Timedata: handle persistence priority #2663

Merged
merged 7 commits into from
Jun 16, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;

import io.openems.common.channel.PersistencePriority;
import io.openems.shared.influxdb.QueryLanguageConfig;

@ObjectClassDefinition(//
Expand Down Expand Up @@ -46,5 +47,8 @@
@AttributeDefinition(name = "Read-Only mode", description = "Activates the read-only mode. Then no data is written to InfluxDB.")
boolean isReadOnly() default false;

@AttributeDefinition(name = "Persistence Priority", description = "Store only Channels with a Persistence Priority above this. Be aware that too many writes can wear-out your flash storage.")
PersistencePriority persistencePriority() default PersistencePriority.MEDIUM;

String webconsole_configurationFactory_nameHint() default "Timedata InfluxDB [{id}]";
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;

import io.openems.common.channel.AccessMode;
import io.openems.common.exceptions.OpenemsError.OpenemsNamedException;
import io.openems.common.oem.OpenemsEdgeOem;
import io.openems.common.timedata.Resolution;
Expand Down Expand Up @@ -127,56 +128,37 @@ protected synchronized void collectAndWriteChannelValues() {
final var point = Point.measurement(this.config.measurement()).time(timestamp, WritePrecision.MS);
final var addedAtLeastOneChannelValue = new AtomicBoolean(false);

this.componentManager.getEnabledComponents().stream().filter(OpenemsComponent::isEnabled)
.forEach(component -> {
component.channels().forEach(channel -> {
switch (channel.channelDoc().getAccessMode()) {
case WRITE_ONLY:
// ignore Write-Only-Channels
return;
case READ_ONLY:
case READ_WRITE:
break;
}

Optional<?> valueOpt = channel.value().asOptional();
if (!valueOpt.isPresent()) {
// ignore not available channels
return;
}
Object value = valueOpt.get();
var address = channel.address().toString();
try {
switch (channel.getType()) {
case BOOLEAN:
point.addField(address, (Boolean) value ? 1 : 0);
break;
case SHORT:
point.addField(address, (Short) value);
break;
case INTEGER:
point.addField(address, (Integer) value);
break;
case LONG:
point.addField(address, (Long) value);
break;
case FLOAT:
point.addField(address, (Float) value);
break;
case DOUBLE:
point.addField(address, (Double) value);
break;
case STRING:
point.addField(address, (String) value);
break;
}
} catch (IllegalArgumentException e) {
this.log.warn("Unable to add Channel [" + address + "] value [" + value + "]: "
+ e.getMessage());
return;
this.componentManager.getEnabledComponents().stream() //
.flatMap(component -> component.channels().stream()) //
.filter(channel -> {
final var doc = channel.channelDoc();
return doc.getPersistencePriority().isAtLeast(this.config.persistencePriority())
&& doc.getAccessMode() != AccessMode.WRITE_ONLY; //
}) //
.forEach(channel -> {
Optional<?> valueOpt = channel.value().asOptional();
if (!valueOpt.isPresent()) {
// ignore not available channels
return;
}
Object value = valueOpt.get();
var address = channel.address().toString();
try {
switch (channel.getType()) {
case BOOLEAN -> point.addField(address, (Boolean) value ? 1 : 0);
case SHORT -> point.addField(address, (Short) value);
case INTEGER -> point.addField(address, (Integer) value);
case LONG -> point.addField(address, (Long) value);
case FLOAT -> point.addField(address, (Float) value);
case DOUBLE -> point.addField(address, (Double) value);
case STRING -> point.addField(address, (String) value);
}
addedAtLeastOneChannelValue.set(true);
});
} catch (IllegalArgumentException e) {
this.log.warn(
"Unable to add Channel [" + address + "] value [" + value + "]: " + e.getMessage());
return;
}
addedAtLeastOneChannelValue.set(true);
});

if (addedAtLeastOneChannelValue.get()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.openems.edge.timedata.influxdb;

import io.openems.common.channel.PersistencePriority;
import io.openems.common.test.AbstractComponentConfig;
import io.openems.shared.influxdb.QueryLanguageConfig;

Expand All @@ -17,6 +18,7 @@ protected static class Builder {
private String url;
private QueryLanguageConfig queryLanguage;
private String measurement;
private PersistencePriority persistencePriority;

private Builder() {
}
Expand Down Expand Up @@ -71,6 +73,11 @@ public Builder setMeasurement(String measurement) {
return this;
}

public Builder setPersistencePriority(PersistencePriority persistencePriority) {
this.persistencePriority = persistencePriority;
return this;
}

public MyConfig build() {
return new MyConfig(this);
}
Expand Down Expand Up @@ -132,6 +139,11 @@ public boolean isReadOnly() {
return this.builder.isReadOnly;
}

@Override
public PersistencePriority persistencePriority() {
return this.builder.persistencePriority;
}

@Override
public String measurement() {
return this.builder.measurement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.junit.Test;

import io.openems.common.channel.PersistencePriority;
import io.openems.common.oem.DummyOpenemsEdgeOem;
import io.openems.edge.common.test.AbstractComponentTest.TestCase;
import io.openems.edge.common.test.ComponentTest;
Expand Down Expand Up @@ -30,6 +31,7 @@ public void test() throws Exception {
.setNoOfCycles(1) //
.setMaxQueueSize(5000) //
.setReadOnly(false) //
.setPersistencePriority(PersistencePriority.MEDIUM)
.build()) //
.next(new TestCase()) //
;
Expand Down