Skip to content

Commit

Permalink
FEMS-Backports for 2022.04 (#1773)
Browse files Browse the repository at this point in the history
- Bugfix for InfluxDB v2 implementation:
  - add missing runtime dependencies
  - use ThreadPool for InfluxDB batch writes
  - improve exception handling
- Add continuous debug output to Ui.Websocket and Edge.Websocket
- Add fallback for Resolution and Timezone for older UI versions
- Improvement on Generic.Ess ErrorHandler
  • Loading branch information
sfeilmeier authored Mar 28, 2022
1 parent 9df274e commit a9581e2
Show file tree
Hide file tree
Showing 14 changed files with 257 additions and 121 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package io.openems.backend.edgewebsocket;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.java_websocket.WebSocket;
import org.osgi.service.component.annotations.Activate;
Expand Down Expand Up @@ -29,6 +32,7 @@
import io.openems.common.jsonrpc.request.AuthenticatedRpcRequest;
import io.openems.common.jsonrpc.request.SubscribeSystemLogRequest;
import io.openems.common.jsonrpc.response.AuthenticatedRpcResponse;
import io.openems.common.utils.ThreadPoolUtils;

@Designate(ocd = Config.class, factory = false)
@Component(//
Expand All @@ -43,6 +47,7 @@ public class EdgeWebsocketImpl extends AbstractOpenemsBackendComponent implement
private WebsocketServer server = null;

private final SystemLogHandler systemLogHandler;
private final ScheduledExecutorService debugLogExecutor = Executors.newSingleThreadScheduledExecutor();

@Reference
protected volatile Metadata metadata;
Expand All @@ -68,10 +73,17 @@ public EdgeWebsocketImpl() {
private void activate(Config config) {
this.config = config;
this.metadata.addOnIsInitializedListener(this.startServerWhenMetadataIsInitialized);
this.debugLogExecutor.scheduleWithFixedDelay(() -> {
this.log.info(new StringBuilder("[monitor] ") //
.append("Edge-Connections: ")
.append(this.server != null ? this.server.getConnections().size() : "initializing") //
.toString());
}, 10, 10, TimeUnit.SECONDS);
}

@Deactivate
private void deactivate() {
ThreadPoolUtils.shutdownAndAwaitTermination(this.debugLogExecutor, 0);
this.metadata.removeOnIsInitializedListener(this.startServerWhenMetadataIsInitialized);
this.stopServer();
}
Expand Down Expand Up @@ -104,6 +116,9 @@ private synchronized void stopServer() {
* @return true if it is online
*/
protected boolean isOnline(String edgeId) {
if (this.server == null) {
return false;
}
return this.server.isOnline(edgeId);
}

Expand Down Expand Up @@ -157,6 +172,9 @@ public void send(String edgeId, JsonrpcNotification notification) throws Openems
* @return the WebSocket connection
*/
private final WebSocket getWebSocketForEdgeId(String edgeId) {
if (this.server == null) {
return null;
}
for (WebSocket ws : this.server.getConnections()) {
WsData wsData = ws.getAttachment();
var wsEdgeIdOpt = wsData.getEdgeId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,35 +42,62 @@ public synchronized void handleException(InfluxException e) {
}
var field = matcher.group("channel");
var thisType = matcher.group("thisType");
var requiredType = matcher.group("requiredType");
var requiredType = RequiredType.valueOf(matcher.group("requiredType").toUpperCase());

if (this.specialCaseFieldHandlers.containsKey(field)) {
// Special handling had already been added.
return;
}

BiConsumer<Point, JsonElement> handler = null;
var handler = this.createAndAddHandler(field, requiredType);

if (handler == null) {
this.parent.logWarn(this.log, "Unable to add special field handler for [" + field + "] from [" + thisType
+ "] to [" + requiredType + "]");
}
this.parent.logInfo(this.log,
"Add special field handler for [" + field + "] from [" + thisType + "] to [" + requiredType + "]");
}

private static enum RequiredType {
STRING, INTEGER, FLOAT;
}

private BiConsumer<Point, JsonElement> createAndAddHandler(String field, RequiredType requiredType) {
var handler = this.createHandler(field, requiredType);
this.specialCaseFieldHandlers.put(field, handler);
return handler;
}

/**
* Creates a Handler for the given field, to convert a Point to a
* 'requiredType'.
*
* @param field the field name, i.e. the Channel-Address
* @param requiredType the {@link RequiredType
* @return
*/
private BiConsumer<Point, JsonElement> createHandler(String field, RequiredType requiredType) {
switch (requiredType) {
case "string":
handler = (builder, jValue) -> {
var value = this.getAsFieldTypeString(jValue);
case STRING:
return (builder, jValue) -> {
var value = getAsFieldTypeString(jValue);
if (value != null) {
builder.addField(field, value);
}
};
break;

case "integer":
handler = (builder, jValue) -> {
case INTEGER:
return (builder, jValue) -> {
try {
var value = this.getAsFieldTypeNumber(jValue);
var value = getAsFieldTypeNumber(jValue);
if (value != null) {
builder.addField(field, value);
}
} catch (NumberFormatException e1) {
try {
// Failed -> try conversion to float and then to int
var value = this.getAsFieldTypeFloat(jValue);
var value = getAsFieldTypeFloat(jValue);
if (value != null) {
builder.addField(field, Math.round(value));
}
Expand All @@ -80,12 +107,11 @@ public synchronized void handleException(InfluxException e) {
}
}
};
break;

case "float":
handler = (builder, jValue) -> {
case FLOAT:
return (builder, jValue) -> {
try {
var value = this.getAsFieldTypeFloat(jValue);
var value = getAsFieldTypeFloat(jValue);
if (value != null) {
builder.addField(field, value);
}
Expand All @@ -94,16 +120,8 @@ public synchronized void handleException(InfluxException e) {
+ "] to float: " + e1.getMessage());
}
};
break;
}

if (handler == null) {
this.parent.logWarn(this.log, "Unable to add special field handler for [" + field + "] from [" + thisType
+ "] to [" + requiredType + "]");
}
this.parent.logInfo(this.log,
"Add special field handler for [" + field + "] from [" + thisType + "] to [" + requiredType + "]");
this.specialCaseFieldHandlers.put(field, handler);
return null; // can never happen
}

/**
Expand All @@ -112,7 +130,7 @@ public synchronized void handleException(InfluxException e) {
* @param jValue the value
* @return the value as String; null if value represents null
*/
private String getAsFieldTypeString(JsonElement jValue) {
private static String getAsFieldTypeString(JsonElement jValue) {
if (jValue.isJsonNull()) {
return null;
}
Expand All @@ -126,7 +144,7 @@ private String getAsFieldTypeString(JsonElement jValue) {
* @return the value as Number; null if value represents null
* @throws NumberFormatException on error
*/
private Number getAsFieldTypeNumber(JsonElement jValue) throws NumberFormatException {
private static Number getAsFieldTypeNumber(JsonElement jValue) throws NumberFormatException {
if (jValue.isJsonNull()) {
return null;
}
Expand Down Expand Up @@ -154,7 +172,7 @@ private Number getAsFieldTypeNumber(JsonElement jValue) throws NumberFormatExcep
* @return the value as Float; null if value represents null
* @throws NumberFormatException on error
*/
private Float getAsFieldTypeFloat(JsonElement jValue) throws NumberFormatException {
private static Float getAsFieldTypeFloat(JsonElement jValue) throws NumberFormatException {
if (jValue.isJsonNull()) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ public void write(String edgeId, TreeBasedTable<Long, ChannelAddress, JsonElemen
* @param data the data
* @throws OpenemsException on error
*/
private void writeData(int influxEdgeId, TreeBasedTable<Long, ChannelAddress, JsonElement> data)
throws OpenemsException {
private void writeData(int influxEdgeId, TreeBasedTable<Long, ChannelAddress, JsonElement> data) {
var dataEntries = data.rowMap().entrySet();
if (dataEntries.isEmpty()) {
// no data to write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.java_websocket.WebSocket;
import org.osgi.service.component.annotations.Activate;
Expand All @@ -13,6 +16,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.openems.backend.common.component.AbstractOpenemsBackendComponent;
import io.openems.backend.common.edgewebsocket.EdgeWebsocket;
Expand All @@ -25,6 +29,7 @@
import io.openems.common.jsonrpc.base.JsonrpcNotification;
import io.openems.common.jsonrpc.base.JsonrpcRequest;
import io.openems.common.jsonrpc.base.JsonrpcResponseSuccess;
import io.openems.common.utils.ThreadPoolUtils;

@Designate(ocd = Config.class, factory = false)
@Component(//
Expand All @@ -34,7 +39,8 @@
)
public class UiWebsocketImpl extends AbstractOpenemsBackendComponent implements UiWebsocket {

// private final Logger log = LoggerFactory.getLogger(UiWebsocket.class);
private final Logger log = LoggerFactory.getLogger(UiWebsocket.class);
private final ScheduledExecutorService debugLogExecutor = Executors.newSingleThreadScheduledExecutor();

protected WebsocketServer server = null;

Expand Down Expand Up @@ -64,10 +70,17 @@ public UiWebsocketImpl() {
private void activate(Config config) {
this.config = config;
this.metadata.addOnIsInitializedListener(this.startServerWhenMetadataIsInitialized);
this.debugLogExecutor.scheduleWithFixedDelay(() -> {
this.log.info(new StringBuilder("[monitor] ") //
.append("UI-Connections: ") //
.append(this.server != null ? this.server.getConnections().size() : "initializing") //
.toString());
}, 10, 10, TimeUnit.SECONDS);
}

@Deactivate
private void deactivate() {
ThreadPoolUtils.shutdownAndAwaitTermination(this.debugLogExecutor, 0);
this.metadata.removeOnIsInitializedListener(this.startServerWhenMetadataIsInitialized);
this.stopServer();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.openems.common.jsonrpc.request;

import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
Expand Down Expand Up @@ -52,16 +54,31 @@ public class QueryHistoricTimeseriesDataRequest extends JsonrpcRequest {
*/
public static QueryHistoricTimeseriesDataRequest from(JsonrpcRequest r) throws OpenemsNamedException {
var p = r.getParams();
var timezone = TimeZone.getTimeZone(JsonUtils.getAsString(p, "timezone")).toZoneId();

var jTimezone = JsonUtils.getAsPrimitive(p, "timezone");
final ZoneId timezone;
if (jTimezone.isNumber()) {
// For UI version before 2022.4.0
timezone = ZoneId.ofOffset("", ZoneOffset.ofTotalSeconds(JsonUtils.getAsInt(jTimezone) * -1));
} else {
timezone = TimeZone.getTimeZone(JsonUtils.getAsString(p, "timezone")).toZoneId();
}

var fromDate = JsonUtils.getAsZonedDateTime(p, "fromDate", timezone);
var toDate = JsonUtils.getAsZonedDateTime(p, "toDate", timezone).plusDays(1);

var jResolutionOpt = JsonUtils.getOptionalSubElement(p, "resolution");
final Optional<Resolution> resolution;
var resolutionObj = JsonUtils.getAsOptionalJsonObject(p, "resolution");
if (resolutionObj.isPresent()) {
var value = JsonUtils.getAsInt(resolutionObj.get(), "value");
var unit = JsonUtils.getAsString(resolutionObj.get(), "unit");
resolution = Optional.of(new Resolution(value, unit));
if (jResolutionOpt.isPresent()) {
var jResolution = jResolutionOpt.get();
if (jResolution.isJsonPrimitive()) {
// For UI version before 2022.4.0
resolution = Optional.of(new Resolution(JsonUtils.getAsInt(jResolution), ChronoUnit.SECONDS));
} else {
var value = JsonUtils.getAsInt(jResolution, "value");
var unit = JsonUtils.getAsString(jResolution, "unit");
resolution = Optional.of(new Resolution(value, unit));
}
} else {
resolution = Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.openems.common.jsonrpc.request;

import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
Expand Down Expand Up @@ -57,14 +59,29 @@ public class QueryHistoricTimeseriesEnergyPerPeriodRequest extends JsonrpcReques
*/
public static QueryHistoricTimeseriesEnergyPerPeriodRequest from(JsonrpcRequest r) throws OpenemsNamedException {
var p = r.getParams();
var timezone = TimeZone.getTimeZone(JsonUtils.getAsString(p, "timezone")).toZoneId();

var jTimezone = JsonUtils.getAsPrimitive(p, "timezone");
final ZoneId timezone;
if (jTimezone.isNumber()) {
// For UI version before 2022.4.0
timezone = ZoneId.ofOffset("", ZoneOffset.ofTotalSeconds(JsonUtils.getAsInt(jTimezone) * -1));
} else {
timezone = TimeZone.getTimeZone(JsonUtils.getAsString(p, "timezone")).toZoneId();
}

var fromDate = JsonUtils.getAsZonedDateTime(p, "fromDate", timezone);
var toDate = JsonUtils.getAsZonedDateTime(p, "toDate", timezone).plusDays(1);

var resolutionObj = JsonUtils.getAsJsonObject(p, "resolution");
var resolution = new Resolution(//
JsonUtils.getAsInt(resolutionObj, "value"), //
JsonUtils.getAsString(resolutionObj, "unit"));
var jResolution = JsonUtils.getSubElement(p, "resolution");
final Resolution resolution;
if (jResolution.isJsonPrimitive()) {
// For UI version before 2022.4.0
resolution = new Resolution(JsonUtils.getAsInt(jResolution), ChronoUnit.SECONDS);
} else {
var value = JsonUtils.getAsInt(jResolution, "value");
var unit = JsonUtils.getAsString(jResolution, "unit");
resolution = new Resolution(value, unit);
}

var result = new QueryHistoricTimeseriesEnergyPerPeriodRequest(r, fromDate, toDate, resolution);
var channels = JsonUtils.getAsJsonArray(p, "channels");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.openems.common.jsonrpc.request;

import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
Expand Down Expand Up @@ -57,7 +59,16 @@ public class QueryHistoricTimeseriesEnergyRequest extends JsonrpcRequest {
*/
public static QueryHistoricTimeseriesEnergyRequest from(JsonrpcRequest r) throws OpenemsNamedException {
var p = r.getParams();
var timezone = TimeZone.getTimeZone(JsonUtils.getAsString(p, "timezone")).toZoneId();

var jTimezone = JsonUtils.getAsPrimitive(p, "timezone");
final ZoneId timezone;
if (jTimezone.isNumber()) {
// For UI version before 2022.4.0
timezone = ZoneId.ofOffset("", ZoneOffset.ofTotalSeconds(JsonUtils.getAsInt(jTimezone) * -1));
} else {
timezone = TimeZone.getTimeZone(JsonUtils.getAsString(p, "timezone")).toZoneId();
}

var fromDate = JsonUtils.getAsZonedDateTime(p, "fromDate", timezone);
var toDate = JsonUtils.getAsZonedDateTime(p, "toDate", timezone).plusDays(1);
var result = new QueryHistoricTimeseriesEnergyRequest(r, fromDate, toDate);
Expand Down
Loading

0 comments on commit a9581e2

Please sign in to comment.