Skip to content

Commit

Permalink
Improve Controller.Backend.Api (#1403)
Browse files Browse the repository at this point in the history
* simplify worker
* collect in sync with Cycle; send async
* add JUnit tests
  • Loading branch information
sfeilmeier authored Mar 9, 2021
1 parent f8c50a9 commit 6156ab0
Show file tree
Hide file tree
Showing 16 changed files with 862 additions and 241 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ public interface Timedata extends CommonTimedataService {
* Sends the data points to the Timedata service.
*
* @param edgeId The unique Edge-ID
* @param data Table of timestamp (epoch in seconds), Channel-Address and the
* Channel value as JsonElement. Sorted by timestamp.
* @param data Table of timestamp (epoch in milliseconds), Channel-Address and
* the Channel value as JsonElement. Sorted by timestamp.
* @throws OpenemsException
*/
public void write(String edgeId, TreeBasedTable<Long, ChannelAddress, JsonElement> data) throws OpenemsException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* "jsonrpc": "2.0",
* "method": "timestampedData",
* "params": {
* [timestamp: epoch in seconds]: {
* [timestamp: epoch in milliseconds]: {
* [channelAddress]: String | Number
* }
* }
Expand Down Expand Up @@ -77,6 +77,6 @@ public JsonObject getParams() {
}

public TreeBasedTable<Long, ChannelAddress, JsonElement> getData() {
return data;
return this.data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ public void onClose(int code, String reason, boolean remote) {

// Initialize reconnector
this.reconnectorWorker = new ClientReconnectorWorker(this);
this.reconnectorWorker.activate(this.getName());

if (proxy != null) {
this.ws.setProxy(proxy);
Expand All @@ -131,6 +130,7 @@ public void onClose(int code, String reason, boolean remote) {
public void start() {
this.log.info("Opening connection [" + this.getName() + "] to websocket server [" + this.serverUri + "]");
this.ws.connect();
this.reconnectorWorker.activate(this.getName());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,15 @@ public void broadcastMessage(JsonrpcMessage message) {
}
}

/**
* Gets the port number that this server listens on.
*
* @return The port number.
*/
public int getPort() {
return this.ws.getPort();
}

/**
* Starts the websocket server
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.openems.common.websocket;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.Instant;
import java.util.Random;

import org.java_websocket.client.WebSocketClient;
Expand All @@ -13,15 +13,15 @@

public class ClientReconnectorWorker extends AbstractWorker {

private final static int MAX_WAIT_TIME_SECONDS = 120;
private final static int MIN_WAIT_TIME_SECONDS = 10;
private final static int MAX_WAIT_SECONDS = 120;
private final static int MIN_WAIT_SECONDS = 10;

private final static Duration MIN_WAIT_TIME_BETWEEN_RETRIES = Duration
.ofSeconds(new Random().nextInt(MAX_WAIT_TIME_SECONDS) + MIN_WAIT_TIME_SECONDS);
private final static long MIN_WAIT_SEONDCS_BETWEEN_RETRIES = new Random().nextInt(MAX_WAIT_SECONDS)
+ MIN_WAIT_SECONDS;

private final Logger log = LoggerFactory.getLogger(ClientReconnectorWorker.class);
private final AbstractWebsocketClient<?> parent;
private LocalDateTime lastTry = LocalDateTime.MIN;
private Instant lastTry = null;;

public ClientReconnectorWorker(AbstractWebsocketClient<?> parent) {
this.parent = parent;
Expand All @@ -38,14 +38,20 @@ protected void forever() throws InterruptedException {
return;
}

Duration notWaitedEnough = Duration.between(LocalDateTime.now().minus(MIN_WAIT_TIME_BETWEEN_RETRIES),
this.lastTry);
if (!notWaitedEnough.isNegative()) {
this.parent.logInfo(this.log,
"Waiting till next WebSocket reconnect [" + notWaitedEnough.getSeconds() + "s]");
Instant now = Instant.now();

if (this.lastTry == null) {
this.lastTry = now;
return;
}

long waitedSeconds = Duration.between(this.lastTry, now).getSeconds();
if (waitedSeconds < MIN_WAIT_SEONDCS_BETWEEN_RETRIES) {
this.parent.logInfo(this.log, "Waiting till next WebSocket reconnect ["
+ (MIN_WAIT_SEONDCS_BETWEEN_RETRIES - waitedSeconds) + "s]");
return;
}
this.lastTry = LocalDateTime.now();
this.lastTry = now;

this.parent.logInfo(this.log, "Reconnecting WebSocket...");
ws.reconnectBlocking();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package io.openems.common.websocket;

import org.java_websocket.server.WebSocketServer;
import org.slf4j.Logger;

import io.openems.common.exceptions.NotImplementedException;

public class DummyWebsocketServer extends AbstractWebsocketServer<WsData> implements AutoCloseable {

private static class DummyWsData extends WsData {

@Override
public String toString() {
return "DummyWsData[]";
}

}

public static class Builder {
private OnOpen onOpen = (ws, handshake) -> {
};
private OnRequest onRequest = (ws, request) -> {
throw new NotImplementedException("On-Request handler is not implemented");
};
private OnNotification onNotification = (ws, notification) -> {
};
private OnError onError = (ws, ex) -> {
};
private OnClose onClose = (ws, code, reason, remote) -> {
};

private Builder() {
}

public DummyWebsocketServer.Builder onOpen(OnOpen onOpen) {
this.onOpen = onOpen;
return this;
}

public DummyWebsocketServer.Builder onRequest(OnRequest onRequest) {
this.onRequest = onRequest;
return this;
}

public DummyWebsocketServer.Builder onNotification(OnNotification onNotification) {
this.onNotification = onNotification;
return this;
}

public DummyWebsocketServer.Builder onError(OnError onError) {
this.onError = onError;
return this;
}

public DummyWebsocketServer.Builder onClose(OnClose onClose) {
this.onClose = onClose;
return this;
}

public DummyWebsocketServer build() {
return new DummyWebsocketServer(this);
}
}

/**
* Create a Config builder.
*
* @return a {@link Builder}
*/
public static DummyWebsocketServer.Builder create() {
return new Builder();
}

private final DummyWebsocketServer.Builder builder;

private DummyWebsocketServer(DummyWebsocketServer.Builder builder) {
super("DummyWebsocketServer", 0 /* auto-select port */);
this.builder = builder;
}

@Override
protected WsData createWsData() {
return new DummyWsData();
}

@Override
protected OnOpen getOnOpen() {
return this.builder.onOpen;
}

public void withOnOpen(OnOpen onOpen) {
this.builder.onOpen = onOpen;
}

@Override
protected OnRequest getOnRequest() {
return this.builder.onRequest;
}

public void withOnRequest(OnRequest onRequest) {
this.builder.onRequest = onRequest;
}

@Override
protected OnNotification getOnNotification() {
return this.builder.onNotification;
}

public void withOnNotification(OnNotification onNotification) {
this.builder.onNotification = onNotification;
}

@Override
protected OnError getOnError() {
return this.builder.onError;
}

public void withOnError(OnError onError) {
this.builder.onError = onError;
}

@Override
protected OnClose getOnClose() {
return this.builder.onClose;
}

public void withOnClose(OnClose onClose) {
this.builder.onClose = onClose;
}

@Override
protected void logInfo(Logger log, String message) {
log.info(message);
}

@Override
protected void logWarn(Logger log, String message) {
log.info(message);
}

/**
* Starts the {@link WebSocketServer} and waits.
*
* @return the dynamically assigned Port.
* @throws InterruptedException on error
*/
public int startBlocking() throws InterruptedException {
this.start();

// block until Port is not anymore zero
int port;
do {
Thread.sleep(500);
port = this.getPort();
} while (port == 0);
return port;
}

@Override
public void close() throws Exception {
this.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import java.time.Clock;

import io.openems.edge.common.test.TimeLeapClock;

/**
* {@link ClockProvider} provides a Clock - real or mocked like
* {@link TimeLeapClock}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
import org.ops4j.pax.logging.spi.PaxAppender;
import org.osgi.service.event.EventHandler;

import io.openems.common.channel.Level;
import io.openems.common.channel.PersistencePriority;
import io.openems.common.types.OpenemsType;
import io.openems.edge.common.channel.Doc;
import io.openems.edge.common.channel.StateChannel;
import io.openems.edge.common.channel.StringReadChannel;
import io.openems.edge.common.component.OpenemsComponent;
import io.openems.edge.controller.api.Controller;
Expand All @@ -13,7 +16,16 @@ public interface BackendApi extends Controller, OpenemsComponent, PaxAppender, E

public enum ChannelId implements io.openems.edge.common.channel.ChannelId {
API_WORKER_LOG(Doc.of(OpenemsType.STRING) //
.text("Logs Write-Commands via ApiWorker")); //
.text("Logs Write-Commands via ApiWorker")), //
UNABLE_TO_SEND(Doc.of(Level.WARNING)
// Make sure this is always persisted, as it is required for resending
.persistencePriority(PersistencePriority.VERY_HIGH)), //
LAST_SUCCESSFUL_RESEND(Doc.of(OpenemsType.LONG) //
// Make sure this is always persisted, as it is required for resending
.persistencePriority(PersistencePriority.VERY_HIGH) //
.text("Latest timestamp of successfully resent data")) //
// TODO: resend algorithm still needs to be implemented
;

private final Doc doc;

Expand All @@ -35,4 +47,13 @@ public Doc doc() {
public default StringReadChannel getApiWorkerLogChannel() {
return this.channel(ChannelId.API_WORKER_LOG);
}

/**
* Gets the Channel for {@link ChannelId#UNABLE_TO_SEND}.
*
* @return the Channel
*/
public default StateChannel getUnableToSendChannel() {
return this.channel(ChannelId.UNABLE_TO_SEND);
}
}
Loading

0 comments on commit 6156ab0

Please sign in to comment.