Skip to content

Commit

Permalink
Introduce HTTP-bridge & migrate Shelly Plug S (#2381)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Grill <[email protected]>
Co-authored-by: Sebastian Asen <[email protected]>
  • Loading branch information
sebastianasen and michaelgrill authored Jan 16, 2024
1 parent 96e452c commit 4aa1268
Show file tree
Hide file tree
Showing 20 changed files with 929 additions and 47 deletions.
2 changes: 2 additions & 0 deletions io.openems.edge.application/EdgeApp.bndrun
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
bnd.identity;id='io.openems.edge.batteryinverter.sunspec',\
bnd.identity;id='io.openems.edge.bosch.bpts5hybrid',\
bnd.identity;id='io.openems.edge.bridge.mbus',\
bnd.identity;id='io.openems.edge.bridge.http',\
bnd.identity;id='io.openems.edge.bridge.modbus',\
bnd.identity;id='io.openems.edge.bridge.onewire',\
bnd.identity;id='io.openems.edge.common',\
Expand Down Expand Up @@ -206,6 +207,7 @@
io.openems.edge.batteryinverter.sunspec;version=snapshot,\
io.openems.edge.bosch.bpts5hybrid;version=snapshot,\
io.openems.edge.bridge.mbus;version=snapshot,\
io.openems.edge.bridge.http;version=snapshot,\
io.openems.edge.bridge.modbus;version=snapshot,\
io.openems.edge.bridge.onewire;version=snapshot,\
io.openems.edge.common;version=snapshot,\
Expand Down
12 changes: 12 additions & 0 deletions io.openems.edge.bridge.http/.classpath
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="con" path="aQute.bnd.classpath.container"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-17"/>
<classpathentry kind="src" output="bin" path="src"/>
<classpathentry kind="src" output="bin_test" path="test">
<attributes>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="bin"/>
</classpath>
2 changes: 2 additions & 0 deletions io.openems.edge.bridge.http/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/bin_test/
/generated/
23 changes: 23 additions & 0 deletions io.openems.edge.bridge.http/.project
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>io.openems.edge.bridge.http</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>bndtools.core.bndbuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>bndtools.core.bndnature</nature>
</natures>
</projectDescription>
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
eclipse.preferences.version=1
encoding/<project>=UTF-8
12 changes: 12 additions & 0 deletions io.openems.edge.bridge.http/bnd.bnd
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Bundle-Name: OpenEMS Edge Bridge Http
Bundle-Vendor: FENECON GmbH
Bundle-License: https://opensource.org/licenses/EPL-2.0
Bundle-Version: 1.0.0.${tstamp}

-buildpath: \
${buildpath},\
io.openems.common,\
io.openems.edge.common

-testpath: \
${testpath}
6 changes: 6 additions & 0 deletions io.openems.edge.bridge.http/readme.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
= Http

Http is a widely used standard for web communication. It is used for some hardware devices like electric meters, relays and so on.
The standard architecture used by the devices are RESTful api's.

https://github.com/OpenEMS/openems/tree/develop/io.openems.edge.bridge.http[Source Code icon:github[]]
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package io.openems.edge.bridge.http;

import java.util.PriorityQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ServiceScope;
import org.osgi.service.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.openems.common.utils.ThreadPoolUtils;
import io.openems.edge.bridge.http.api.BridgeHttp;
import io.openems.edge.common.event.EdgeEventConstants;

@Component(//
scope = ServiceScope.PROTOTYPE //
)
public class BridgeHttpImpl implements BridgeHttp {

private static class EndpointCountdown {
private volatile int cycleCount;
public final Endpoint endpoint;
private volatile boolean running = false;

public EndpointCountdown(Endpoint endpoint) {
super();
this.cycleCount = endpoint.cycle();
this.endpoint = endpoint;
}

public EndpointCountdown reset() {
this.cycleCount = this.endpoint.cycle();
return this;
}

public int getCycleCount() {
return this.cycleCount;
}

public void decreaseCycleCount() {
this.cycleCount--;
}

public boolean isRunning() {
return this.running;
}

public void setRunning(boolean running) {
this.running = running;
}

}

private final Logger log = LoggerFactory.getLogger(BridgeHttpImpl.class);

@Reference
private CycleSubscriber cycleSubscriber;

@Reference
private UrlFetcher urlFetcher;

// TODO change to java 21 virtual threads
// TODO: Single pool for every http worker & avoid same endpoint in that pool
private final ExecutorService pool = Executors.newCachedThreadPool();

private final PriorityQueue<EndpointCountdown> endpoints = new PriorityQueue<>(
(e1, e2) -> e1.getCycleCount() - e2.getCycleCount());

/*
* Default timeout values in ms
*/
private int connectTimeout = 5000;
private int readTimeout = 5000;

@Activate
protected void activate() {
this.cycleSubscriber.subscribe(this::handleEvent);
}

@Deactivate
protected void deactivate() {
this.cycleSubscriber.unsubscribe(this::handleEvent);
this.endpoints.clear();
ThreadPoolUtils.shutdownAndAwaitTermination(this.pool, 0);
}

@Override
public void subscribe(Endpoint endpoint) {
if (!this.endpoints.offer(new EndpointCountdown(endpoint))) {
this.log.warn("Unable to add " + endpoint + "!");
}
}

@Override
public CompletableFuture<String> request(String url) {
final var future = new CompletableFuture<String>();
this.pool.execute(this.urlFetcher.createTask(url, this.connectTimeout, this.readTimeout, future));
return future;
}

@Override
public void setTimeout(int connectTimeout, int readTimeout) {
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
}

private void handleEvent(Event event) {
switch (event.getTopic()) {
// TODO: Execute before TOPIC_CYCLE_BEFORE_PROCESS_IMAGE, like modbus bridge
case EdgeEventConstants.TOPIC_CYCLE_BEFORE_PROCESS_IMAGE -> {

if (this.endpoints.isEmpty()) {
return;
}

this.endpoints.forEach(EndpointCountdown::decreaseCycleCount);

while (this.endpoints.peek().getCycleCount() == 0) {
final var item = this.endpoints.poll();
synchronized (item) {
if (item.isRunning()) {
this.log.info("Process for " + item.endpoint + " is still running. Task is not queued twice");
this.endpoints.add(item.reset());
continue;
}

item.setRunning(true);
}
this.pool.execute(this.createTask(item));

this.endpoints.add(item.reset());
}
}
}
}

private Runnable createTask(EndpointCountdown endpointItem) {
final var future = new CompletableFuture<String>();
future.whenComplete((t, e) -> {
try {
if (e != null) {
endpointItem.endpoint.onError().accept(e);
return;
}
endpointItem.endpoint.result().accept(t);
} finally {
synchronized (endpointItem) {
endpointItem.setRunning(false);
}
}
});
return this.urlFetcher.createTask(endpointItem.endpoint.url(), this.connectTimeout, this.readTimeout, future);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.openems.edge.bridge.http;

import java.util.LinkedList;
import java.util.List;
import java.util.function.Consumer;

import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ServiceScope;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
import org.osgi.service.event.propertytypes.EventTopics;

import io.openems.edge.common.event.EdgeEventConstants;

@Component(//
scope = ServiceScope.SINGLETON, //
service = { CycleSubscriber.class, EventHandler.class } //
)
@EventTopics({ //
EdgeEventConstants.TOPIC_CYCLE_BEFORE_PROCESS_IMAGE //
})
public class CycleSubscriber implements EventHandler {

private final List<Consumer<Event>> eventHandler = new LinkedList<>();

@Override
public void handleEvent(Event event) {
switch (event.getTopic()) {
case EdgeEventConstants.TOPIC_CYCLE_BEFORE_PROCESS_IMAGE -> {
this.eventHandler.forEach(t -> t.accept(event));
}
}
}

/**
* Subscribes to the events of the topics this component is subscribed to.
*
* @param eventHandler the handler to execute on every event
*/
public void subscribe(Consumer<Event> eventHandler) {
this.eventHandler.add(eventHandler);
}

/**
* Unsubscribes a event handler.
*
* @param eventHandler the handler to remove
* @return true if the handler was successfully removed; if the handler was not
* found returs false
*/
public boolean unsubscribe(Consumer<Event> eventHandler) {
return this.eventHandler.remove(eventHandler);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.openems.edge.bridge.http;

import java.util.concurrent.CompletableFuture;

public interface UrlFetcher {

/**
* Creates a {@link Runnable} to execute a request with the given parameters.
*
* @param urlString the url to fetch
* @param connectTimeout the connection timeout
* @param readTimeout the read timeout
* @param future the {@link CompletableFuture} to fulfill after the
* fetch
* @return the {@link Runnable} to run to execute the fetch
*/
public Runnable createTask(//
String urlString, //
int connectTimeout, //
int readTimeout, //
CompletableFuture<String> future //
);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.openems.edge.bridge.http;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.CompletableFuture;

import org.osgi.service.component.annotations.Component;

import io.openems.common.exceptions.OpenemsError.OpenemsNamedException;
import io.openems.common.exceptions.OpenemsException;

@Component
public class UrlFetcherImpl implements UrlFetcher {

@Override
public Runnable createTask(//
final String urlString, //
final int connectTimeout, //
final int readTimeout, //
final CompletableFuture<String> future //
) {
return () -> {
try {
var url = new URL(urlString);
var con = (HttpURLConnection) url.openConnection();
con.setRequestMethod("GET");

// config setting / method param ?
con.setConnectTimeout(connectTimeout);
con.setReadTimeout(readTimeout);

var status = con.getResponseCode();
String body;
try (var in = new BufferedReader(new InputStreamReader(con.getInputStream()))) {
// Read HTTP response
var content = new StringBuilder();
String line;
while ((line = in.readLine()) != null) {
content.append(line);
content.append(System.lineSeparator());
}
body = content.toString();
}

// Check valid for all?
if (status < 300) {
future.complete(body);
} else {
throw new OpenemsException(
"Error while reading Endpoint " + urlString + ". Response code: " + status + ". " + body);
}
} catch (OpenemsNamedException | IOException e) {
future.completeExceptionally(e);
}
};
}

}
Loading

0 comments on commit 4aa1268

Please sign in to comment.