diff --git a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/BridgeHttpImpl.java b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/BridgeHttpImpl.java index da4583ed13e..92da80fdd4e 100644 --- a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/BridgeHttpImpl.java +++ b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/BridgeHttpImpl.java @@ -1,9 +1,12 @@ package io.openems.edge.bridge.http; +import java.util.HashSet; import java.util.PriorityQueue; +import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; @@ -23,19 +26,23 @@ ) public class BridgeHttpImpl implements BridgeHttp { - private static class EndpointCountdown { + public static class CycleEndpointCountdown { private volatile int cycleCount; - public final Endpoint endpoint; + public final CycleEndpoint cycleEndpoint; private volatile boolean running = false; - public EndpointCountdown(Endpoint endpoint) { - super(); + public CycleEndpointCountdown(CycleEndpoint endpoint) { this.cycleCount = endpoint.cycle(); - this.endpoint = endpoint; + this.cycleEndpoint = endpoint; } - public EndpointCountdown reset() { - this.cycleCount = this.endpoint.cycle(); + /** + * Resets the current cycle count to the initial cycle count. + * + * @return this + */ + public CycleEndpointCountdown reset() { + this.cycleCount = this.cycleEndpoint.cycle(); return this; } @@ -43,6 +50,9 @@ public int getCycleCount() { return this.cycleCount; } + /** + * Decreases the current cycle count by one. + */ public void decreaseCycleCount() { this.cycleCount--; } @@ -57,6 +67,53 @@ public void setRunning(boolean running) { } + public static class TimeEndpointCountdown { + private final TimeEndpoint timeEndpoint; + private volatile boolean running = false; + private volatile boolean shutdown = false; + private Runnable shutdownCurrentTask; + + public TimeEndpointCountdown(TimeEndpoint timeEndpoint) { + this.timeEndpoint = timeEndpoint; + } + + public TimeEndpoint getTimeEndpoint() { + return this.timeEndpoint; + } + + public boolean isRunning() { + return this.running; + } + + public void setRunning(boolean running) { + this.running = running; + } + + public boolean isShutdown() { + return this.shutdown; + } + + public void setShutdown(boolean shutdown) { + this.shutdown = shutdown; + } + + public void setShutdownCurrentTask(Runnable shutdownCurrentTask) { + this.shutdownCurrentTask = shutdownCurrentTask; + } + + /** + * Shuts down the current execution of the active task. + */ + public void shutdown() { + this.setShutdown(true); + final var shutdownTask = this.shutdownCurrentTask; + if (shutdownTask != null) { + shutdownTask.run(); + } + } + + } + private final Logger log = LoggerFactory.getLogger(BridgeHttpImpl.class); @Reference @@ -67,47 +124,62 @@ public void setRunning(boolean running) { // TODO change to java 21 virtual threads // TODO: Single pool for every http worker & avoid same endpoint in that pool - private final ExecutorService pool = Executors.newCachedThreadPool(); + private final ScheduledExecutorService pool = Executors.newScheduledThreadPool(0); - private final PriorityQueue endpoints = new PriorityQueue<>( + private final PriorityQueue cycleEndpoints = new PriorityQueue<>( (e1, e2) -> e1.getCycleCount() - e2.getCycleCount()); - /* - * Default timeout values in ms - */ - private int connectTimeout = 5000; - private int readTimeout = 5000; + private final Set timeEndpoints = new HashSet<>(); + /** + * Activate method. + */ @Activate - protected void activate() { + public void activate() { this.cycleSubscriber.subscribe(this::handleEvent); } + /** + * Deactivate method. + */ @Deactivate - protected void deactivate() { + public void deactivate() { this.cycleSubscriber.unsubscribe(this::handleEvent); - this.endpoints.clear(); + this.cycleEndpoints.clear(); + this.timeEndpoints.forEach(TimeEndpointCountdown::shutdown); + this.timeEndpoints.clear(); ThreadPoolUtils.shutdownAndAwaitTermination(this.pool, 0); } @Override - public void subscribe(Endpoint endpoint) { - if (!this.endpoints.offer(new EndpointCountdown(endpoint))) { + public void subscribeCycle(CycleEndpoint endpoint) { + if (!this.cycleEndpoints.offer(new CycleEndpointCountdown(endpoint))) { this.log.warn("Unable to add " + endpoint + "!"); } } @Override - public CompletableFuture request(String url) { - final var future = new CompletableFuture(); - this.pool.execute(this.urlFetcher.createTask(url, this.connectTimeout, this.readTimeout, future)); - return future; + public void subscribeTime(TimeEndpoint endpoint) { + final var endpointCountdown = new TimeEndpointCountdown(endpoint); + this.timeEndpoints.add(endpointCountdown); + final var delay = endpoint.delayTimeProvider().nextRun(true, true); + final var future = this.pool.schedule(this.createTask(endpointCountdown), delay.toMillis(), + TimeUnit.MILLISECONDS); + endpointCountdown.setShutdownCurrentTask(() -> future.cancel(false)); } @Override - public void setTimeout(int connectTimeout, int readTimeout) { - this.connectTimeout = connectTimeout; - this.readTimeout = readTimeout; + public CompletableFuture request(Endpoint endpoint) { + final var future = new CompletableFuture(); + this.pool.execute(() -> { + try { + final var result = this.urlFetcher.fetchEndpoint(endpoint); + future.complete(result); + } catch (Exception e) { + future.completeExceptionally(e); + } + }); + return future; } private void handleEvent(Event event) { @@ -115,18 +187,19 @@ private void handleEvent(Event event) { // TODO: Execute before TOPIC_CYCLE_BEFORE_PROCESS_IMAGE, like modbus bridge case EdgeEventConstants.TOPIC_CYCLE_BEFORE_PROCESS_IMAGE -> { - if (this.endpoints.isEmpty()) { + if (this.cycleEndpoints.isEmpty()) { return; } - this.endpoints.forEach(EndpointCountdown::decreaseCycleCount); + this.cycleEndpoints.forEach(CycleEndpointCountdown::decreaseCycleCount); - while (this.endpoints.peek().getCycleCount() == 0) { - final var item = this.endpoints.poll(); + while (this.cycleEndpoints.peek().getCycleCount() == 0) { + final var item = this.cycleEndpoints.poll(); synchronized (item) { if (item.isRunning()) { - this.log.info("Process for " + item.endpoint + " is still running. Task is not queued twice"); - this.endpoints.add(item.reset()); + this.log.info( + "Process for " + item.cycleEndpoint + " is still running. Task is not queued twice"); + this.cycleEndpoints.add(item.reset()); continue; } @@ -134,27 +207,74 @@ private void handleEvent(Event event) { } this.pool.execute(this.createTask(item)); - this.endpoints.add(item.reset()); + if (!this.cycleEndpoints.offer(item.reset())) { + this.log.warn("Unable to add " + item.cycleEndpoint + "!"); + } } } } } - private Runnable createTask(EndpointCountdown endpointItem) { - final var future = new CompletableFuture(); - future.whenComplete((t, e) -> { + private Runnable createTask(CycleEndpointCountdown endpointItem) { + return () -> { try { - if (e != null) { - endpointItem.endpoint.onError().accept(e); - return; - } - endpointItem.endpoint.result().accept(t); + final var result = this.urlFetcher.fetchEndpoint(endpointItem.cycleEndpoint.endpoint()); + endpointItem.cycleEndpoint.result().accept(result); + } catch (Exception e) { + endpointItem.cycleEndpoint.onError().accept(e); } finally { synchronized (endpointItem) { endpointItem.setRunning(false); } } - }); - return this.urlFetcher.createTask(endpointItem.endpoint.url(), this.connectTimeout, this.readTimeout, future); + }; + } + + private Runnable createTask(TimeEndpointCountdown endpointCountdown) { + return () -> { + synchronized (endpointCountdown) { + if (endpointCountdown.isShutdown()) { + return; + } + endpointCountdown.setRunning(true); + } + boolean currentRunSuccessful; + try { + final var result = this.urlFetcher.fetchEndpoint(endpointCountdown.getTimeEndpoint().endpoint()); + endpointCountdown.getTimeEndpoint().onResult().accept(result); + currentRunSuccessful = true; + } catch (Exception e) { + endpointCountdown.getTimeEndpoint().onError().accept(e); + currentRunSuccessful = false; + } + synchronized (endpointCountdown) { + if (endpointCountdown.isShutdown()) { + return; + } + } + + try { + final var nextDelay = endpointCountdown.getTimeEndpoint().delayTimeProvider().nextRun(false, + currentRunSuccessful); + + final var future = this.pool.schedule(this.createTask(endpointCountdown), nextDelay.toMillis(), + TimeUnit.MILLISECONDS); + endpointCountdown.setShutdownCurrentTask(() -> future.cancel(false)); + } catch (Exception e) { + if (this.pool.isShutdown()) { + return; + } + this.log.error("Unexpected exception during Task", e); + } + }; + } + + public PriorityQueue getCycleEndpoints() { + return this.cycleEndpoints; } + + public Set getTimeEndpoints() { + return this.timeEndpoints; + } + } diff --git a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/CycleSubscriber.java b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/CycleSubscriber.java index 99d62290c71..af8b6122f1d 100644 --- a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/CycleSubscriber.java +++ b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/CycleSubscriber.java @@ -1,7 +1,7 @@ package io.openems.edge.bridge.http; -import java.util.LinkedList; -import java.util.List; +import java.util.HashSet; +import java.util.Set; import java.util.function.Consumer; import org.osgi.service.component.annotations.Component; @@ -21,13 +21,15 @@ }) public class CycleSubscriber implements EventHandler { - private final List> eventHandler = new LinkedList<>(); + private final Set> eventHandler = new HashSet<>(); @Override public void handleEvent(Event event) { switch (event.getTopic()) { case EdgeEventConstants.TOPIC_CYCLE_BEFORE_PROCESS_IMAGE -> { - this.eventHandler.forEach(t -> t.accept(event)); + synchronized (this.eventHandler) { + this.eventHandler.forEach(t -> t.accept(event)); + } } } } @@ -38,7 +40,9 @@ public void handleEvent(Event event) { * @param eventHandler the handler to execute on every event */ public void subscribe(Consumer eventHandler) { - this.eventHandler.add(eventHandler); + synchronized (this.eventHandler) { + this.eventHandler.add(eventHandler); + } } /** @@ -49,7 +53,9 @@ public void subscribe(Consumer eventHandler) { * found returs false */ public boolean unsubscribe(Consumer eventHandler) { - return this.eventHandler.remove(eventHandler); + synchronized (this.eventHandler) { + return this.eventHandler.remove(eventHandler); + } } } diff --git a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/UrlFetcher.java b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/UrlFetcher.java index 0a78bb57f76..fdb4ebc01e6 100644 --- a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/UrlFetcher.java +++ b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/UrlFetcher.java @@ -1,24 +1,18 @@ package io.openems.edge.bridge.http; -import java.util.concurrent.CompletableFuture; +import io.openems.common.exceptions.OpenemsError.OpenemsNamedException; +import io.openems.edge.bridge.http.api.BridgeHttp.Endpoint; public interface UrlFetcher { /** * Creates a {@link Runnable} to execute a request with the given parameters. * - * @param urlString the url to fetch - * @param connectTimeout the connection timeout - * @param readTimeout the read timeout - * @param future the {@link CompletableFuture} to fulfill after the - * fetch - * @return the {@link Runnable} to run to execute the fetch + * @param endpoint the {@link Endpoint} to fetch + * + * @return the result of the {@link Endpoint} + * @throws OpenemsNamedException on error */ - public Runnable createTask(// - String urlString, // - int connectTimeout, // - int readTimeout, // - CompletableFuture future // - ); + public String fetchEndpoint(Endpoint endpoint) throws OpenemsNamedException; } diff --git a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/UrlFetcherImpl.java b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/UrlFetcherImpl.java index 1f8a1b17fc0..c4ac5a52672 100644 --- a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/UrlFetcherImpl.java +++ b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/UrlFetcherImpl.java @@ -1,61 +1,61 @@ package io.openems.edge.bridge.http; +import static java.util.stream.Collectors.joining; + import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.net.HttpURLConnection; import java.net.URL; -import java.util.concurrent.CompletableFuture; import org.osgi.service.component.annotations.Component; import io.openems.common.exceptions.OpenemsError.OpenemsNamedException; import io.openems.common.exceptions.OpenemsException; +import io.openems.edge.bridge.http.api.BridgeHttp.Endpoint; @Component public class UrlFetcherImpl implements UrlFetcher { @Override - public Runnable createTask(// - final String urlString, // - final int connectTimeout, // - final int readTimeout, // - final CompletableFuture future // - ) { - return () -> { - try { - var url = new URL(urlString); - var con = (HttpURLConnection) url.openConnection(); - con.setRequestMethod("GET"); - - // config setting / method param ? - con.setConnectTimeout(connectTimeout); - con.setReadTimeout(readTimeout); - - var status = con.getResponseCode(); - String body; - try (var in = new BufferedReader(new InputStreamReader(con.getInputStream()))) { - // Read HTTP response - var content = new StringBuilder(); - String line; - while ((line = in.readLine()) != null) { - content.append(line); - content.append(System.lineSeparator()); - } - body = content.toString(); + public String fetchEndpoint(final Endpoint endpoint) throws OpenemsNamedException { + try { + var url = new URL(endpoint.url()); + var con = (HttpURLConnection) url.openConnection(); + con.setRequestMethod(endpoint.method().name()); + con.setConnectTimeout(endpoint.connectTimeout()); + con.setReadTimeout(endpoint.readTimeout()); + + endpoint.properties().forEach(con::setRequestProperty); + + if (endpoint.method().isBodyAllowed() && endpoint.body() != null) { + con.setDoOutput(true); + try (var os = con.getOutputStream(); // + var osw = new OutputStreamWriter(os, "UTF-8")) { + osw.write(endpoint.body()); + osw.flush(); } + } - // Check valid for all? - if (status < 300) { - future.complete(body); - } else { - throw new OpenemsException( - "Error while reading Endpoint " + urlString + ". Response code: " + status + ". " + body); - } - } catch (OpenemsNamedException | IOException e) { - future.completeExceptionally(e); + final var status = con.getResponseCode(); + String body; + try (var in = new BufferedReader(new InputStreamReader(con.getInputStream()))) { + // Read HTTP response + body = in.lines().collect(joining(System.lineSeparator())); } - }; + + // Check valid for all? + if (status < 300) { + return body; + } else { + throw new OpenemsException( + "Error while reading Endpoint " + endpoint.url() + ". Response code: " + status + ". " + body); + } + + } catch (IOException e) { + throw new OpenemsException(e); + } } } diff --git a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/api/BridgeHttp.java b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/api/BridgeHttp.java index 2e8e6cfb2da..3a01db4f3f0 100644 --- a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/api/BridgeHttp.java +++ b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/api/BridgeHttp.java @@ -1,28 +1,37 @@ package io.openems.edge.bridge.http.api; +import static java.util.Collections.emptyMap; + +import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.function.BiConsumer; import java.util.function.Consumer; import com.google.gson.JsonElement; -import io.openems.common.function.ThrowingConsumer; +import io.openems.common.function.ThrowingFunction; import io.openems.common.utils.JsonUtils; /** - * HttpBridge to handle request to a endpoint. - * - *

- * If a request is scheduled every cycle and the request does take longer than - * one cycle it is not executed multiple times instead it waits until the last - * request is finished and will be executed with the next cycle. + * HttpBridge to handle requests to a {@link Endpoint}. * *

* To get a reference to a bridge object include this in your component: * *

-   @Reference(scope = ReferenceScope.PROTOTYPE_REQUIRED)
+   @Reference
+   private BridgeHttpFactory httpBridgeFactory;
    private BridgeHttp httpBridge;
+   
+   @Activate
+   private void activate() {
+       this.httpBridge = this.httpBridgeFactory.get();
+   }
+   
+   @Deactivate
+   private void deactivate() {
+       this.httpBridgeFactory.unget(this.httpBridge);
+       this.httpBridge = null;
+   }
  * 
* *

@@ -36,13 +45,15 @@ * }); * * - *

- * If an enpoint does not require to be called every cycle it can also be - * configured with e. g. {@link BridgeHttp#subscribe(int, String, Consumer)} - * where the first value could be 5 then the request gets triggered every 5th - * cycle. + * @see BridgeHttpCycle for more detailed explanation for requests based on + * cycle + * @see BridgeHttpTime for more detailed explanation for requests based on time + * e. g. every hour */ -public interface BridgeHttp { +public interface BridgeHttp extends BridgeHttpCycle, BridgeHttpTime { + + public static int DEFAULT_CONNECT_TIMEOUT = 5000; // 5s + public static int DEFAULT_READ_TIMEOUT = 5000; // 5s /** * Default empty error handler. @@ -51,241 +62,169 @@ public interface BridgeHttp { }; public record Endpoint(// - /** - * Configures how often the url should be fetched. - * - *

- * e. g. if the cycle is 3 the url gets fetched every 3th cycle and also only if - * the last request was finished either successfully or with a error. - */ - int cycle, // - /** - * The url which should be fetched. - */ String url, // - /** - * The callback to execute on every successful result. - */ - Consumer result, // - /** - * The callback to execute on every error. - */ - Consumer onError // + HttpMethod method, // + int connectTimeout, // + int readTimeout, // + String body, // nullable + Map properties // ) { - @Override - public String toString() { - return "Endpoint [cycle=" + this.cycle() + ", url=" + this.url() + "]"; - } - - } - - /** - * Subscribes to one http endpoint. - * - * @param endpoint the {@link Endpoint} configuration - */ - public void subscribe(Endpoint endpoint); - - /** - * Subscribes to one http endpoint. - * - *

- * Tries to fetch data every n-cycle. If receiving data takes more than n-cycle - * the next get request to the url gets send when the last was finished either - * successfully or with a timeout. - * - * @param cycle the number of cycles to wait between requests - * @param url the url of the enpoint - * @param result the consumer to call on every successful result - */ - public default void subscribe(int cycle, String url, Consumer result) { - this.subscribe(new Endpoint(cycle, url, result, EMPTY_ERROR_HANDLER)); } /** - * Subscribes to one http endpoint. - * - *

- * Tries to fetch data every n-cycle. If receiving data takes more than n-cycle - * the next get request to the url gets send when the last was finished either - * successfully or with an error. + * Fetches the url once with {@link HttpMethod#GET}. * - * @param cycle the number of cycles to wait between requests - * @param url the url of the enpoint - * @param result the consumer to call on every successful result - * @param onError the consumer to call on a error + * @param url the url to fetch + * @return the result response future */ - public default void subscribe(// - final int cycle, // - final String url, // - final ThrowingConsumer result, // - final Consumer onError // - ) { - this.subscribe(new Endpoint(cycle, url, t -> { - try { - result.accept(t); - } catch (Exception e) { - onError.accept(e); - } - }, onError)); + public default CompletableFuture get(String url) { + final var endpoint = new Endpoint(// + url, // + HttpMethod.GET, // + DEFAULT_CONNECT_TIMEOUT, // + DEFAULT_READ_TIMEOUT, // + null, // + emptyMap() // + ); + return this.request(endpoint); } /** - * Subscribes to one http endpoint. + * Fetches the url once with {@link HttpMethod#GET} and expects the result to be + * in json format. * - *

- * Tries to fetch data every n-cycle. If receiving data takes more than n-cycle - * the next get request to the url gets send when the last was finished either - * successfully or with an error. - * - * @param cycle the number of cycles to wait between requests - * @param url the url of the enpoint - * @param action the action to perform; the first is the result of the endpoint - * if existing and the second argument is passed if an error - * happend. One of the params is always null and one not + * @param url the url to fetch + * @return the result response future */ - public default void subscribe(// - final int cycle, // - final String url, // - final BiConsumer action // - ) { - this.subscribe(cycle, url, r -> action.accept(r, null), t -> action.accept(null, t)); + public default CompletableFuture getJson(String url) { + return mapFuture(this.get(url), JsonUtils::parse); } /** - * Subscribes to one http endpoint. + * Fetches the url once with {@link HttpMethod#PUT}. * - *

- * Tries to fetch data every cycle. If receiving data takes more than one cycle - * the next get request to the url gets send when the last was finished either - * successfully or with an error. - * - * @param url the url of the enpoint - * @param result the consumer to call on every successful result - * @param onError the consumer to call on a error + * @param url the url to fetch + * @return the result response future */ - public default void subscribeEveryCycle(// - final String url, // - final ThrowingConsumer result, // - final Consumer onError // - ) { - this.subscribe(1, url, result, onError); + public default CompletableFuture put(String url) { + final var endpoint = new Endpoint(// + url, // + HttpMethod.PUT, // + DEFAULT_CONNECT_TIMEOUT, // + DEFAULT_READ_TIMEOUT, // + null, // + emptyMap() // + ); + return this.request(endpoint); } /** - * Subscribes to one http endpoint. - * - *

- * Tries to fetch data every cycle. If receiving data takes more than one cycle - * the next get request to the url gets send when the last was finished either - * successfully or with an error. + * Fetches the url once with {@link HttpMethod#PUT} and expects the result to be + * in json format. * - * @param url the url of the enpoint - * @param action the action to perform; the first is the result of the endpoint - * if existing and the second argument is passed if an error - * happend. One of the params is always null and one not + * @param url the url to fetch + * @return the result response future */ - public default void subscribeEveryCycle(// - final String url, // - final BiConsumer action // - ) { - this.subscribe(1, url, r -> action.accept(r, null), t -> action.accept(null, t)); + public default CompletableFuture putJson(String url) { + return mapFuture(this.put(url), JsonUtils::parse); } /** - * Subscribes to one http endpoint. - * - *

- * Tries to fetch data every cycle. If receiving data takes more than one cycle - * the next get request to the url gets send when the last was finished either - * successfully or with an error. + * Fetches the url once with {@link HttpMethod#POST}. * - * @param url the url of the enpoint - * @param result the consumer to call on every successful result + * @param url the url to fetch + * @param body the request body to send + * @return the result response future */ - public default void subscribeEveryCycle(// - final String url, // - final Consumer result // - ) { - this.subscribe(1, url, result); + public default CompletableFuture post(String url, String body) { + final var endpoint = new Endpoint(// + url, // + HttpMethod.POST, // + DEFAULT_CONNECT_TIMEOUT, // + DEFAULT_READ_TIMEOUT, // + body, // + emptyMap() // + ); + return this.request(endpoint); } /** - * Subscribes to one http endpoint. + * Fetches the url once with {@link HttpMethod#POST} and expects the result to + * be in json format. * - *

- * Tries to fetch data every n-cycle. If receiving data takes more than n-cycle - * the next get request to the url gets send when the last was finished either - * successfully or with an error. - * - * @param cycle the number of cycles to wait between requests - * @param url the url of the enpoint - * @param result the consumer to call on every successful result - * @param onError the consumer to call on a error + * @param url the url to fetch + * @param body the request body to send + * @return the result response future */ - public default void subscribeJson(// - final int cycle, // - final String url, // - final ThrowingConsumer result, // - final Consumer onError // - ) { - this.subscribe(cycle, url, t -> result.accept(JsonUtils.parse(t)), onError); + public default CompletableFuture postJson(String url, JsonElement body) { + return mapFuture(this.post(url, body.toString()), JsonUtils::parse); } /** - * Subscribes to one http endpoint. - * - *

- * Tries to fetch data every cycle. If receiving data takes more than one cycle - * the next get request to the url gets send when the last was finished either - * successfully or with an error. + * Fetches the url once with {@link HttpMethod#DELETE}. * - * @param url the url of the enpoint - * @param result the consumer to call on every successful result - * @param onError the consumer to call on a error + * @param url the url to fetch + * @return the result response future */ - public default void subscribeJsonEveryCycle(// - final String url, // - final ThrowingConsumer result, // - final Consumer onError // - ) { - this.subscribeJson(1, url, result, onError); + public default CompletableFuture delete(String url) { + final var endpoint = new Endpoint(// + url, // + HttpMethod.DELETE, // + DEFAULT_CONNECT_TIMEOUT, // + DEFAULT_READ_TIMEOUT, // + null, // + emptyMap() // + ); + return this.request(endpoint); } /** - * Subscribes to one http endpoint. + * Fetches the url once with {@link HttpMethod#DELETE} and expects the result to + * be in json format. * - *

- * Tries to fetch data every cycle. If receiving data takes more than one cycle - * the next get request to the url gets send when the last was finished either - * successfully or with an error. - * - * @param url the url of the enpoint - * @param action the action to perform; the first is the result of the endpoint - * if existing and the second argument is passed if an error - * happend. One of the params is always null and one not + * @param url the url to fetch + * @return the result response future */ - public default void subscribeJsonEveryCycle(// - final String url, // - final BiConsumer action // - ) { - this.subscribeJson(1, url, r -> action.accept(r, null), t -> action.accept(null, t)); + public default CompletableFuture deleteJson(String url) { + return mapFuture(this.delete(url), JsonUtils::parse); } /** * Fetches the url once. * - * @param url the url to fetch + * @param endpoint the {@link Endpoint} to fetch * @return the result response future */ - public CompletableFuture request(String url); + public CompletableFuture request(Endpoint endpoint); /** - * Sets the connect and read timeout. + * Fetches the url once and expects the result to be in json format. * - * @param connectTimeout connect timeout - * @param readTimeout read timeout + * @param endpoint the {@link Endpoint} to fetch + * @return the result response future */ - public void setTimeout(int connectTimeout, int readTimeout); + public default CompletableFuture requestJson(Endpoint endpoint) { + return mapFuture(this.request(endpoint), JsonUtils::parse); + } + + private static CompletableFuture mapFuture(// + CompletableFuture origin, // + ThrowingFunction mapper // + ) { + final var future = new CompletableFuture(); + origin.whenComplete((t, u) -> { + if (u != null) { + future.completeExceptionally(u); + return; + } + try { + future.complete(mapper.apply(t)); + } catch (Exception e) { + future.completeExceptionally(e); + } + }); + return future; + } + } diff --git a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/api/BridgeHttpCycle.java b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/api/BridgeHttpCycle.java new file mode 100644 index 00000000000..2c1f7152e9f --- /dev/null +++ b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/api/BridgeHttpCycle.java @@ -0,0 +1,302 @@ +package io.openems.edge.bridge.http.api; + +import static java.util.Collections.emptyMap; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import com.google.gson.JsonElement; + +import io.openems.common.function.ThrowingConsumer; +import io.openems.common.utils.JsonUtils; +import io.openems.edge.bridge.http.api.BridgeHttp.Endpoint; + +/** + * BridgeHttpCycle to handle request to a endpoint based on the cycle. + * + *

+ * If a request is scheduled every cycle and the request does take longer than + * one cycle it is not executed multiple times instead it waits until the last + * request is finished and will be executed with the next cycle. + * + *

+ * A simple example to subscribe to an endpoint every cycle would be: + * + *

+ * this.httpBridge.subscribeEveryCycle("http://127.0.0.1/status", t -> {
+ * 	// process data
+ * }, t -> {
+ * 	// handle error
+ * });
+ * 
+ * + *

+ * If an endpoint does not require to be called every cycle it can also be + * configured with e. g. + * {@link BridgeHttpCycle#subscribeCycle(int, String, Consumer)} where the first + * value could be 5 then the request gets triggered every 5th cycle. + */ +public interface BridgeHttpCycle { + + public record CycleEndpoint(// + /** + * Configures how often the url should be fetched. + * + *

+ * e. g. if the cycle is 3 the url gets fetched every 3rd cycle and also only if + * the last request was finished either successfully or with a error. + */ + int cycle, // + /** + * The url which should be fetched. + */ + Endpoint endpoint, // + /** + * The callback to execute on every successful result. + */ + Consumer result, // + /** + * The callback to execute on every error. + */ + Consumer onError // + ) { + + @Override + public String toString() { + return "Endpoint [cycle=" + this.cycle() + ", url=" + this.endpoint.url() + "]"; + } + + } + + /** + * Subscribes to one http endpoint. + * + * @param endpoint the {@link CycleEndpoint} configuration + */ + public void subscribeCycle(CycleEndpoint endpoint); + + /** + * Subscribes to one http endpoint. + * + *

+ * Tries to fetch data every n-cycle. If receiving data takes more than n-cycle + * the next get request to the url gets send when the last was finished either + * successfully or with a timeout. + * + * @param cycle the number of cycles to wait between requests + * @param url the url of the enpoint + * @param result the consumer to call on every successful result + */ + public default void subscribeCycle(int cycle, String url, Consumer result) { + final var endpoint = new Endpoint(// + url, // + HttpMethod.GET, // + BridgeHttp.DEFAULT_CONNECT_TIMEOUT, // + BridgeHttp.DEFAULT_READ_TIMEOUT, // + null, // + emptyMap() // + ); + this.subscribeCycle(new CycleEndpoint(cycle, endpoint, result, BridgeHttp.EMPTY_ERROR_HANDLER)); + } + + /** + * Subscribes to one http endpoint. + * + *

+ * Tries to fetch data every n-cycle. If receiving data takes more than n-cycle + * the next get request to the url gets send when the last was finished either + * successfully or with an error. + * + * @param cycle the number of cycles to wait between requests + * @param url the url of the enpoint + * @param result the consumer to call on every successful result + * @param onError the consumer to call on a error + */ + public default void subscribeCycle(// + final int cycle, // + final String url, // + final ThrowingConsumer result, // + final Consumer onError // + ) { + final var endpoint = new Endpoint(// + url, // + HttpMethod.GET, // + BridgeHttp.DEFAULT_CONNECT_TIMEOUT, // + BridgeHttp.DEFAULT_READ_TIMEOUT, // + null, // + emptyMap() // + ); + this.subscribeCycle(new CycleEndpoint(cycle, endpoint, t -> { + try { + result.accept(t); + } catch (Exception e) { + onError.accept(e); + } + }, onError)); + } + + /** + * Subscribes to one http endpoint. + * + *

+ * Tries to fetch data every n-cycle. If receiving data takes more than n-cycle + * the next get request to the url gets send when the last was finished either + * successfully or with an error. + * + * @param cycle the number of cycles to wait between requests + * @param url the url of the enpoint + * @param action the action to perform; the first is the result of the endpoint + * if existing and the second argument is passed if an error + * happend. One of the params is always null and one not + */ + public default void subscribeCycle(// + final int cycle, // + final String url, // + final BiConsumer action // + ) { + this.subscribeCycle(cycle, url, r -> action.accept(r, null), t -> action.accept(null, t)); + } + + /** + * Subscribes to one http endpoint. + * + *

+ * Tries to fetch data every cycle. If receiving data takes more than one cycle + * the next get request to the url gets send when the last was finished either + * successfully or with an error. + * + * @param url the url of the enpoint + * @param result the consumer to call on every successful result + * @param onError the consumer to call on a error + */ + public default void subscribeEveryCycle(// + final String url, // + final ThrowingConsumer result, // + final Consumer onError // + ) { + this.subscribeCycle(1, url, result, onError); + } + + /** + * Subscribes to one http endpoint. + * + *

+ * Tries to fetch data every cycle. If receiving data takes more than one cycle + * the next get request to the url gets send when the last was finished either + * successfully or with an error. + * + * @param url the url of the enpoint + * @param action the action to perform; the first is the result of the endpoint + * if existing and the second argument is passed if an error + * happend. One of the params is always null and one not + */ + public default void subscribeEveryCycle(// + final String url, // + final BiConsumer action // + ) { + this.subscribeCycle(1, url, r -> action.accept(r, null), t -> action.accept(null, t)); + } + + /** + * Subscribes to one http endpoint. + * + *

+ * Tries to fetch data every cycle. If receiving data takes more than one cycle + * the next get request to the url gets send when the last was finished either + * successfully or with an error. + * + * @param url the url of the enpoint + * @param result the consumer to call on every successful result + */ + public default void subscribeEveryCycle(// + final String url, // + final Consumer result // + ) { + this.subscribeCycle(1, url, result); + } + + /** + * Subscribes to one http endpoint. + * + *

+ * Tries to fetch data every n-cycle. If receiving data takes more than n-cycle + * the next get request to the url gets send when the last was finished either + * successfully or with an error. + * + * @param cycle the number of cycles to wait between requests + * @param url the url of the enpoint + * @param result the consumer to call on every successful result + * @param onError the consumer to call on a error + */ + public default void subscribeJsonCycle(// + final int cycle, // + final String url, // + final ThrowingConsumer result, // + final Consumer onError // + ) { + this.subscribeCycle(cycle, url, t -> result.accept(JsonUtils.parse(t)), onError); + } + + /** + * Subscribes to one http endpoint. + * + *

+ * Tries to fetch data every n-cycle. If receiving data takes more than n-cycle + * the next get request to the url gets send when the last was finished either + * successfully or with an error. + * + * @param cycle the number of cycles to wait between requests + * @param url the url of the enpoint + * @param action the action to perform; the first is the result of the endpoint + * if existing and the second argument is passed if an error + * happend. One of the params is always null and one not + */ + public default void subscribeJsonCycle(// + final int cycle, // + final String url, // + final BiConsumer action // + ) { + this.subscribeCycle(cycle, url, t -> action.accept(JsonUtils.parse(t), null), t -> action.accept(null, t)); + } + + /** + * Subscribes to one http endpoint. + * + *

+ * Tries to fetch data every cycle. If receiving data takes more than one cycle + * the next get request to the url gets send when the last was finished either + * successfully or with an error. + * + * @param url the url of the enpoint + * @param result the consumer to call on every successful result + * @param onError the consumer to call on a error + */ + public default void subscribeJsonEveryCycle(// + final String url, // + final ThrowingConsumer result, // + final Consumer onError // + ) { + this.subscribeJsonCycle(1, url, result, onError); + } + + /** + * Subscribes to one http endpoint. + * + *

+ * Tries to fetch data every cycle. If receiving data takes more than one cycle + * the next get request to the url gets send when the last was finished either + * successfully or with an error. + * + * @param url the url of the enpoint + * @param action the action to perform; the first is the result of the endpoint + * if existing and the second argument is passed if an error + * happend. One of the params is always null and one not + */ + public default void subscribeJsonEveryCycle(// + final String url, // + final BiConsumer action // + ) { + this.subscribeJsonCycle(1, url, r -> action.accept(r, null), t -> action.accept(null, t)); + } + +} diff --git a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/api/BridgeHttpFactory.java b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/api/BridgeHttpFactory.java new file mode 100644 index 00000000000..39ed78988d7 --- /dev/null +++ b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/api/BridgeHttpFactory.java @@ -0,0 +1,65 @@ +package io.openems.edge.bridge.http.api; + +import org.osgi.service.component.ComponentServiceObjects; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Reference; + +/** + * Bridge factory to get an instance of an {@link BridgeHttp}. + * + *

+ * Usage: + * + *

+   @Reference
+   private BridgeHttpFactory httpBridgeFactory;
+   private BridgeHttp httpBridge;
+   
+   @Activate
+   private void activate() {
+       this.httpBridge = this.httpBridgeFactory.get();
+   }
+   
+   @Deactivate
+   private void deactivate() {
+       this.httpBridgeFactory.unget(this.httpBridge);
+       this.httpBridge = null;
+   }
+ * 
+ */ +@Component(service = BridgeHttpFactory.class) +public class BridgeHttpFactory { + + @Reference + private ComponentServiceObjects csoBridgeHttp; + + @Activate + public BridgeHttpFactory() { + + } + + /** + * Returns a new {@link BridgeHttp} service object. + * + * @return the created {@link BridgeHttp} object + * @see BridgeHttpFactory#unget(BridgeHttp) + */ + public BridgeHttp get() { + return this.csoBridgeHttp.getService(); + } + + /** + * Releases the {@link BridgeHttp} service object. + * + * @param bridge a {@link BridgeHttp} provided by this factory + * @see BridgeHttpFactory#unget(BridgeHttp) + */ + public void unget(BridgeHttp bridge) { + if (bridge == null) { + return; + } + this.csoBridgeHttp.ungetService(bridge); + } + +} diff --git a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/api/BridgeHttpTime.java b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/api/BridgeHttpTime.java new file mode 100644 index 00000000000..53196d2092a --- /dev/null +++ b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/api/BridgeHttpTime.java @@ -0,0 +1,251 @@ +package io.openems.edge.bridge.http.api; + +import static io.openems.edge.bridge.http.time.DelayTimeProviderChain.immediate; +import static java.util.Collections.emptyMap; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import com.google.gson.JsonElement; + +import io.openems.common.function.ThrowingConsumer; +import io.openems.common.utils.JsonUtils; +import io.openems.edge.bridge.http.api.BridgeHttp.Endpoint; +import io.openems.edge.bridge.http.time.DefaultDelayTimeProvider; +import io.openems.edge.bridge.http.time.DelayTimeProvider; +import io.openems.edge.bridge.http.time.DelayTimeProviderChain; + +/** + * BridgeHttpTime to handle request to a endpoint based on a time delay. + * + *

+ * The calculation when an endpoint gets called is provided in the + * {@link DelayTimeProvider}. The + * {@link DelayTimeProvider#nextRun(boolean, boolean)} gets called instantly + * when the initial method to add the endpoint gets called and then every time + * after the last endpoint handle was finished. + * + *

+ * So for e. g. if a fixed delay of 1 minute gets provided the time will shift + * into the back a little bit every time an endpoint gets called because + * fetching the endpoint and handling it also takes some time. + * + *

+ * A simple example to subscribe to an endpoint with 1 minute delay in between + * would be: + * + *

+ * final var delayProvider = DelayTimeProviderChain.fixedDelay(Duration.ofMinutes(1));
+ * this.httpBridge.subscribeTime(delayProvider, "http://127.0.0.1/status", t -> {
+ * 	// process data
+ * }, t -> {
+ * 	// handle error
+ * });
+ * 
+ */ +public interface BridgeHttpTime { + + public record TimeEndpoint(// + /** + * The delay time provider. Gives the time from the current time to the next + * time when the endpoint should be fetched. + */ + DelayTimeProvider delayTimeProvider, // + /** + * The url which should be fetched. + */ + Endpoint endpoint, // + /** + * The callback to execute on every successful result. + */ + Consumer onResult, // + /** + * The callback to execute on every error. + */ + Consumer onError // + ) { + + } + + /** + * Subscribes to an {@link TimeEndpoint}. The {@link TimeEndpoint#endpoint} gets + * fetched based on the delayed time provided by the + * {@link TimeEndpoint#delayTimeProvider}. After the endpoint gets fetched + * either the {@link TimeEndpoint#onResult} or the {@link TimeEndpoint#onError} + * gets executed depending on the result. + * + * @param endpoint the {@link TimeEndpoint} to add a subscription + */ + public void subscribeTime(TimeEndpoint endpoint); + + /** + * Subscribes to an {@link Endpoint} with the delay provided by the + * {@link DelayTimeProvider} and after every endpoint fetch either the + * onResult or the onError method gets called. + * + * @param delayTimeProvider the {@link DelayTimeProvider} to provided the delay + * between the fetches + * @param endpoint the {@link Endpoint} to fetch + * @param onResult the method to call on successful fetch + * @param onError the method to call if an error happens during + * fetching or handling the result + */ + public default void subscribeTime(// + DelayTimeProvider delayTimeProvider, // + Endpoint endpoint, // + ThrowingConsumer onResult, // + Consumer onError // + ) { + this.subscribeTime(new TimeEndpoint(delayTimeProvider, endpoint, t -> { + try { + onResult.accept(t); + } catch (Exception e) { + onError.accept(e); + } + }, onError)); + } + + /** + * Subscribes to an {@link Endpoint} with the delay provided by the + * {@link DelayTimeProvider} and after every endpoint fetch the + * action gets called either with the result or the error at least + * one is not null. + * + * @param delayTimeProvider the {@link DelayTimeProvider} to provided the delay + * between the fetches + * @param endpoint the {@link Endpoint} to fetch + * @param action the action to perform; the first is the result of + * the endpoint if existing and the second argument is + * passed if an error happend. One of the params is + * always null and one not + */ + public default void subscribeTime(// + DelayTimeProvider delayTimeProvider, // + Endpoint endpoint, // + BiConsumer action // + ) { + this.subscribeTime(new TimeEndpoint(delayTimeProvider, endpoint, r -> action.accept(r, null), + t -> action.accept(null, t))); + } + + /** + * Subscribes to an {@link Endpoint} with the delay provided by the + * {@link DelayTimeProviderChain} and after every endpoint fetch either the + * onResult or the onError method gets called. + * + *

+ * Note: the first fetch gets triggered immediately + * + * @param onErrorDelay the {@link DelayTimeProviderChain} when the last fetch + * was not successful + * @param onSuccessDelay the {@link DelayTimeProviderChain} when the last fetch + * was successful + * @param url the url to fetch + * @param onResult the method to call on successful fetch + * @param onError the method to call if an error happens during fetching + * or handling the result + */ + public default void subscribeTime(// + DelayTimeProviderChain onErrorDelay, // + DelayTimeProviderChain onSuccessDelay, // + String url, // + ThrowingConsumer onResult, // + Consumer onError // + ) { + this.subscribeTime(new DefaultDelayTimeProvider(immediate(), onErrorDelay, onSuccessDelay), new Endpoint(url, // + HttpMethod.GET, // + BridgeHttp.DEFAULT_CONNECT_TIMEOUT, // + BridgeHttp.DEFAULT_READ_TIMEOUT, // + null, // + emptyMap() // + ), onResult, onError); + } + + /** + * Subscribes to an {@link Endpoint} with the delay provided by the + * {@link DelayTimeProviderChain} and after every endpoint fetch either the + * onResult or the onError method gets called. + * + *

+ * Note: the first fetch gets triggered immediately + * + * @param delay the {@link DelayTimeProviderChain} between each fetch + * @param url the url to fetch + * @param onResult the method to call on successful fetch + * @param onError the method to call if an error happens during fetching or + * handling the result + */ + public default void subscribeTime(// + DelayTimeProviderChain delay, // + String url, // + ThrowingConsumer onResult, // + Consumer onError // + ) { + this.subscribeTime(delay, delay, url, onResult, onError); + } + + /** + * Subscribes to an {@link Endpoint} with the delay provided by the + * {@link DelayTimeProviderChain} and after every endpoint fetch either the + * onResult or the onError method gets called. + * + *

+ * Note: the first fetch gets triggered immediately + * + * @param delay the {@link DelayTimeProviderChain} between each fetch + * @param url the url to fetch + * @param onResult the method to call on successful fetch + */ + public default void subscribeTime(// + DelayTimeProviderChain delay, // + String url, // + ThrowingConsumer onResult // + ) { + this.subscribeTime(delay, delay, url, onResult, BridgeHttp.EMPTY_ERROR_HANDLER); + } + + /** + * Subscribes to an {@link Endpoint} with the delay provided by the + * {@link DelayTimeProvider} and after every endpoint fetch either the + * onResult or the onError method gets called. + * + * @param delayTimeProvider the {@link DelayTimeProvider} to provided the delay + * between the fetches + * @param endpoint the {@link Endpoint} to fetch + * @param onResult the method to call on successful fetch + * @param onError the method to call if an error happens during + * fetching or handling the result + */ + public default void subscribeJsonTime(// + DelayTimeProvider delayTimeProvider, // + Endpoint endpoint, // + ThrowingConsumer onResult, // + Consumer onError // + ) { + this.subscribeTime(delayTimeProvider, endpoint, t -> onResult.accept(JsonUtils.parse(t)), onError); + } + + /** + * Subscribes to an {@link Endpoint} with the delay provided by the + * {@link DelayTimeProvider} and after every endpoint fetch the + * action gets called either with the result or the error at least + * one is not null. + * + * @param delayTimeProvider the {@link DelayTimeProvider} to provided the delay + * between the fetches + * @param endpoint the {@link Endpoint} to fetch + * @param action the action to perform; the first is the result of + * the endpoint if existing and the second argument is + * passed if an error happend. One of the params is + * always null and one not + */ + public default void subscribeJsonTime(// + DelayTimeProvider delayTimeProvider, // + Endpoint endpoint, // + BiConsumer action // + ) { + this.subscribeTime(delayTimeProvider, endpoint, t -> action.accept(JsonUtils.parse(t), null), + e -> action.accept(null, e)); + } + +} diff --git a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/api/HttpMethod.java b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/api/HttpMethod.java new file mode 100644 index 00000000000..0ab917dbe11 --- /dev/null +++ b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/api/HttpMethod.java @@ -0,0 +1,24 @@ +package io.openems.edge.bridge.http.api; + +public enum HttpMethod { + GET(false), // + POST(true), // + PUT(true), // + DELETE(false), // + PATCH(true), // + HEAD(false), // + OPTIONS(false), // + TRACE(false) // + ; + + private final boolean bodyAllowed; + + private HttpMethod(boolean bodyAllowed) { + this.bodyAllowed = bodyAllowed; + } + + public boolean isBodyAllowed() { + return this.bodyAllowed; + } + +} diff --git a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/dummy/DummyBridgeHttp.java b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/dummy/DummyBridgeHttp.java index 57ccce181dc..7a5e3af2b5c 100644 --- a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/dummy/DummyBridgeHttp.java +++ b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/dummy/DummyBridgeHttp.java @@ -7,21 +7,21 @@ public class DummyBridgeHttp implements BridgeHttp { @Override - public void subscribe(Endpoint endpoint) { + public void subscribeCycle(CycleEndpoint endpoint) { // TODO Auto-generated method stub } @Override - public CompletableFuture request(String url) { + public void subscribeTime(TimeEndpoint endpoint) { // TODO Auto-generated method stub - return null; + } @Override - public void setTimeout(int connectTimeout, int readTimeout) { + public CompletableFuture request(Endpoint endpoint) { // TODO Auto-generated method stub - + return null; } } diff --git a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/dummy/DummyBridgeHttpFactory.java b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/dummy/DummyBridgeHttpFactory.java new file mode 100644 index 00000000000..d8a4d19c59a --- /dev/null +++ b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/dummy/DummyBridgeHttpFactory.java @@ -0,0 +1,38 @@ +package io.openems.edge.bridge.http.dummy; + +import java.lang.reflect.InvocationTargetException; + +import org.osgi.framework.ServiceReference; +import org.osgi.service.component.ComponentServiceObjects; + +import io.openems.common.utils.ReflectionUtils; +import io.openems.edge.bridge.http.api.BridgeHttp; +import io.openems.edge.bridge.http.api.BridgeHttpFactory; + +public class DummyBridgeHttpFactory extends BridgeHttpFactory { + + public DummyBridgeHttpFactory() throws IllegalArgumentException, IllegalAccessException, InvocationTargetException { + super(); + ReflectionUtils.setAttribute(BridgeHttpFactory.class, this, "csoBridgeHttp", new DummyBridgeHttpCso()); + } + + private static class DummyBridgeHttpCso implements ComponentServiceObjects { + + @Override + public BridgeHttp getService() { + return new DummyBridgeHttp(); + } + + @Override + public void ungetService(BridgeHttp service) { + // empty for tests + } + + @Override + public ServiceReference getServiceReference() { + // empty for tests + return null; + } + } + +} diff --git a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/time/DefaultDelayTimeProvider.java b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/time/DefaultDelayTimeProvider.java new file mode 100644 index 00000000000..95c883301c7 --- /dev/null +++ b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/time/DefaultDelayTimeProvider.java @@ -0,0 +1,34 @@ +package io.openems.edge.bridge.http.time; + +import java.time.Duration; + +public class DefaultDelayTimeProvider implements DelayTimeProvider { + + private final DelayTimeProviderChain firstRunDelay; + private final DelayTimeProviderChain onErrorDelay; + private final DelayTimeProviderChain onSuccessDelay; + + public DefaultDelayTimeProvider(// + DelayTimeProviderChain firstRunDelay, // + DelayTimeProviderChain onErrorDelay, // + DelayTimeProviderChain onSuccessDelay // + ) { + this.firstRunDelay = firstRunDelay == null ? onSuccessDelay : firstRunDelay; + this.onErrorDelay = onErrorDelay == null ? onSuccessDelay : onErrorDelay; + this.onSuccessDelay = onSuccessDelay; + } + + @Override + public Duration nextRun(boolean firstRun, boolean lastRunSuccessful) { + if (firstRun) { + return this.firstRunDelay.getDelay(); + } + + if (!lastRunSuccessful) { + return this.onErrorDelay.getDelay(); + } + + return this.onSuccessDelay.getDelay(); + } + +} \ No newline at end of file diff --git a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/time/DelayTimeProvider.java b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/time/DelayTimeProvider.java new file mode 100644 index 00000000000..27561837d2f --- /dev/null +++ b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/time/DelayTimeProvider.java @@ -0,0 +1,19 @@ +package io.openems.edge.bridge.http.time; + +import java.time.Duration; + +import io.openems.edge.bridge.http.api.BridgeHttp.Endpoint; + +public interface DelayTimeProvider { + + /** + * Gives the {@link Duration} till the next run should be triggered. + * + * @param firstRun true if this method gets executed for the first time + * @param lastRunSuccessful true if the last fetch of an {@link Endpoint} was + * successful + * @return the {@link Duration} till the next run + */ + public Duration nextRun(boolean firstRun, boolean lastRunSuccessful); + +} diff --git a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/time/DelayTimeProviderChain.java b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/time/DelayTimeProviderChain.java new file mode 100644 index 00000000000..e3b69e16e6e --- /dev/null +++ b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/time/DelayTimeProviderChain.java @@ -0,0 +1,137 @@ +package io.openems.edge.bridge.http.time; + +import java.time.Clock; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.temporal.TemporalUnit; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Supplier; + +import io.openems.common.timedata.DurationUnit; + +public class DelayTimeProviderChain { + + /** + * Creates a {@link DelayTimeProviderChain} which returns a the {@link Duration} + * of zero on request. + * + * @return a {@link DelayTimeProviderChain} of zero delay + */ + public static DelayTimeProviderChain immediate() { + return new DelayTimeProviderChain(() -> Duration.ZERO); + } + + /** + * Creates a {@link DelayTimeProviderChain} which returns a fixed + * {@link Duration} on request. + * + * @param delay the {@link Duration} to return when requested + * @return a {@link DelayTimeProviderChain} of the given {@link Duration} + */ + public static DelayTimeProviderChain fixedDelay(Duration delay) { + return new DelayTimeProviderChain(() -> delay); + } + + /** + * Creates a {@link DelayTimeProviderChain} which returns a {@link Duration} + * till the next truncated time of the given {@link DurationUnit}. + * + *

+ * e. g. if a DurationUnit.ofMinutes(1) gets provided the + * {@link Duration} at the time 12h 43min 23sec would be (60sec - 23sec) = 37sec + * to 12h 44min 0sec. Same would work for every hour 12h, 13h, 14h, ... with + * DurationUnit.ofHours(1) + * + * @param clock the {@link Clock} to get the current time + * @param durationUnit the {@link DurationUnit} to truncate with + * @return a {@link DelayTimeProviderChain} which returns the {@link Duration} + */ + public static DelayTimeProviderChain fixedAtEveryFull(Clock clock, DurationUnit durationUnit) { + return new DelayTimeProviderChain(() -> { + final var now = LocalDateTime.now(clock); + + return Duration.between(now, now.truncatedTo(durationUnit) // + .plus(durationUnit.getDuration())); + }); + } + + private final Supplier supplier; + + public DelayTimeProviderChain(Supplier supplier) { + this.supplier = supplier; + } + + public Duration getDelay() { + return this.supplier.get(); + } + + /** + * Creates a {@link DelayTimeProviderChain} which adds to the original provider + * the given amount. The new provided {@link Duration} gets rounded to seconds + * based on the implementation of {@link Delay#plus(Delay)}. + * + * @param origin the original {@link DelayTimeProviderChain} to get the + * initial {@link Duration} + * @param duration the {@link Duration} to add + * @return a {@link DelayTimeProviderChain} which returns the {@link Duration} + * of the original {@link DelayTimeProviderChain} with the added amount + */ + public static DelayTimeProviderChain plusFixedAmount(DelayTimeProviderChain origin, Duration duration) { + return new DelayTimeProviderChain(() -> { + return origin.getDelay().plus(duration); + }); + } + + /** + * Helper method to create a new {@link DelayTimeProviderChain} with a fixed + * amount added. + * + * @implNote delegates to + * {@link #plusFixedAmount(DelayTimeProviderChain, int, TimeUnit)} + * @param duration the {@link Duration} to add + * @return a {@link DelayTimeProviderChain} which returns the {@link Duration} + * of the original {@link DelayTimeProviderChain} with the added amount + * @see #plusFixedAmount(DelayTimeProviderChain, int, TimeUnit) + */ + public DelayTimeProviderChain plusFixedAmount(Duration duration) { + return plusFixedAmount(this, duration); + } + + /** + * Creates a {@link DelayTimeProviderChain} which adds to the original provider + * a random amount between 0 (inclusive) and bound (exclusive). The new provided + * {@link Duration} gets rounded to seconds based on the implementation of + * {@link Delay#plus(Delay)}. + * + * @param origin the original {@link DelayTimeProviderChain} to get the initial + * {@link Duration} + * @param bound the upper bound (exclusive). Must be positive. + * @param unit the {@link TemporalUnit} of the amount to add + * @return a {@link DelayTimeProviderChain} which returns the {@link Duration} + * of the original {@link DelayTimeProviderChain} with the added amount + */ + public static DelayTimeProviderChain plusRandomDelay(DelayTimeProviderChain origin, int bound, TemporalUnit unit) { + return new DelayTimeProviderChain(() -> { + return origin.getDelay().plus(Duration.of(new Random().nextInt(bound), unit)); + }); + } + + /** + * Helper method to create a new {@link DelayTimeProviderChain} with a random + * amount added between 0 (inclusive) and the bound (exclusive). + * + * @implNote delegates to + * {@link #plusRandomDelay(DelayTimeProviderChain, int, TimeUnit)} + * @param bound the upper bound (exclusive). Must be positive. + * @param unit the {@link TemporalUnit} of the amount to add + * @return a {@link DelayTimeProviderChain} which returns the {@link Duration} + * of the original {@link DelayTimeProviderChain} with the added amount + * @see #plusRandomDelay(DelayTimeProviderChain, int, TimeUnit) + */ + public DelayTimeProviderChain plusRandomDelay(int bound, TemporalUnit unit) { + return plusRandomDelay(this, bound, unit); + } + +} \ No newline at end of file diff --git a/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/time/package-info.java b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/time/package-info.java new file mode 100644 index 00000000000..316a97228c7 --- /dev/null +++ b/io.openems.edge.bridge.http/src/io/openems/edge/bridge/http/time/package-info.java @@ -0,0 +1,3 @@ +@org.osgi.annotation.versioning.Version("1.0.0") +@org.osgi.annotation.bundle.Export +package io.openems.edge.bridge.http.time; diff --git a/io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/DummyUrlFetcher.java b/io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/DummyUrlFetcher.java index e18400ea66d..f1da87db903 100644 --- a/io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/DummyUrlFetcher.java +++ b/io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/DummyUrlFetcher.java @@ -2,10 +2,11 @@ import java.util.LinkedList; import java.util.List; -import java.util.concurrent.CompletableFuture; import io.openems.common.exceptions.OpenemsError.OpenemsNamedException; +import io.openems.common.exceptions.OpenemsException; import io.openems.common.function.ThrowingFunction; +import io.openems.edge.bridge.http.api.BridgeHttp.Endpoint; public class DummyUrlFetcher implements UrlFetcher { @@ -13,31 +14,24 @@ public class DummyUrlFetcher implements UrlFetcher { // empty }; - private final List> urlHandler = new LinkedList<>(); + private final List> urlHandler = new LinkedList<>(); private Runnable onTaskFinished = EMPTY_RUNNABLE; @Override - public Runnable createTask(// - final String urlString, // - final int connectTimeout, // - final int readTimeout, // - final CompletableFuture future // - ) { - return () -> { - try { - for (var handler : this.urlHandler) { - final var result = handler.apply(urlString); - if (result != null) { - future.complete(result); - return; - } + public String fetchEndpoint(// + final Endpoint endpoint // + ) throws OpenemsNamedException { + try { + for (var handler : this.urlHandler) { + final var result = handler.apply(endpoint); + if (result != null) { + return result; } - } catch (Throwable e) { - future.completeExceptionally(e); - } finally { - this.onTaskFinished.run(); } - }; + throw new OpenemsException(""); + } finally { + this.onTaskFinished.run(); + } } /** @@ -45,7 +39,7 @@ public Runnable createTask(// * * @param handler the handler */ - public void addUrlHandler(ThrowingFunction handler) { + public void addEndpointHandler(ThrowingFunction handler) { this.urlHandler.add(handler); } diff --git a/io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/BridgeHttpImplTest.java b/io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/api/BridgeHttpCycleTest.java similarity index 81% rename from io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/BridgeHttpImplTest.java rename to io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/api/BridgeHttpCycleTest.java index a9f37cae7b7..77abb0aa35a 100644 --- a/io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/BridgeHttpImplTest.java +++ b/io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/api/BridgeHttpCycleTest.java @@ -1,4 +1,4 @@ -package io.openems.edge.bridge.http; +package io.openems.edge.bridge.http.api; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -15,10 +15,12 @@ import org.osgi.service.event.Event; import io.openems.common.utils.ReflectionUtils; -import io.openems.edge.bridge.http.api.BridgeHttp; +import io.openems.edge.bridge.http.BridgeHttpImpl; +import io.openems.edge.bridge.http.CycleSubscriber; +import io.openems.edge.bridge.http.DummyUrlFetcher; import io.openems.edge.common.event.EdgeEventConstants; -public class BridgeHttpImplTest { +public class BridgeHttpCycleTest { private DummyUrlFetcher fetcher; private CycleSubscriber cycleSubscriber; @@ -31,8 +33,8 @@ public void before() throws Exception { ReflectionUtils.setAttribute(BridgeHttpImpl.class, this.bridgeHttp, "cycleSubscriber", this.cycleSubscriber); this.fetcher = new DummyUrlFetcher(); - this.fetcher.addUrlHandler(url -> { - return switch (url) { + this.fetcher.addEndpointHandler(endpoint -> { + return switch (endpoint.url()) { case "dummy" -> "success"; case "error" -> throw new RuntimeException(); default -> null; @@ -52,7 +54,7 @@ public void after() throws Exception { public void test() throws Exception { final var callCount = new AtomicInteger(0); final var future = new CompletableFuture(); - this.bridgeHttp.subscribe(3, "dummy", t -> { + this.bridgeHttp.subscribeCycle(3, "dummy", t -> { assertEquals("success", t); callCount.incrementAndGet(); future.complete(null); @@ -90,8 +92,6 @@ public void testNotRunningMultipleTimes() throws Exception { } }); - final var waitForTaskFinish = new CompletableFuture(); - this.fetcher.setOnTaskFinished(() -> waitForTaskFinish.complete(null)); synchronized (lock) { assertEquals(0, callCount.get()); this.nextCycle(); @@ -105,14 +105,15 @@ public void testNotRunningMultipleTimes() throws Exception { this.nextCycle(); assertEquals(1, callCount.get()); } + waitUntilContinueHandler.complete(null); - // wait until last request is finished - waitForTaskFinish.get(); + this.waitUntilCycleRequestAreFinished(); synchronized (lock) { this.nextCycle(); lock.wait(); } + assertEquals(2, callCount.get()); } @@ -132,4 +133,19 @@ private void nextCycle() { .handleEvent(new Event(EdgeEventConstants.TOPIC_CYCLE_BEFORE_PROCESS_IMAGE, new HashMap<>())); } + private void waitUntilCycleRequestAreFinished() { + while (this.isCycleRequestRunning()) { + // wait + } + } + + private boolean isCycleRequestRunning() { + for (var endpoint : ((BridgeHttpImpl) this.bridgeHttp).getCycleEndpoints()) { + if (endpoint.isRunning()) { + return true; + } + } + return false; + } + } diff --git a/io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/api/BridgeHttpTest.java b/io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/api/BridgeHttpTest.java new file mode 100644 index 00000000000..a0572b212a9 --- /dev/null +++ b/io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/api/BridgeHttpTest.java @@ -0,0 +1,178 @@ +package io.openems.edge.bridge.http.api; + +import static java.util.Collections.emptyMap; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.gson.JsonElement; + +import io.openems.common.exceptions.OpenemsError.OpenemsNamedException; +import io.openems.common.function.ThrowingFunction; +import io.openems.common.utils.JsonUtils; +import io.openems.common.utils.ReflectionUtils; +import io.openems.edge.bridge.http.BridgeHttpImpl; +import io.openems.edge.bridge.http.CycleSubscriber; +import io.openems.edge.bridge.http.DummyUrlFetcher; +import io.openems.edge.bridge.http.api.BridgeHttp.Endpoint; + +public class BridgeHttpTest { + + private DummyUrlFetcher fetcher; + private CycleSubscriber cycleSubscriber; + private BridgeHttpImpl bridgeHttp; + + @Before + public void before() throws Exception { + this.cycleSubscriber = new CycleSubscriber(); + this.bridgeHttp = new BridgeHttpImpl(); + ReflectionUtils.setAttribute(BridgeHttpImpl.class, this.bridgeHttp, "cycleSubscriber", this.cycleSubscriber); + + this.fetcher = new DummyUrlFetcher(); + ReflectionUtils.setAttribute(BridgeHttpImpl.class, this.bridgeHttp, "urlFetcher", this.fetcher); + + this.bridgeHttp.activate(); + } + + @After + public void after() throws Exception { + this.bridgeHttp.deactivate(); + } + + @Test + public void testGet() throws Exception { + this.fetcher.addEndpointHandler(assertExact("dummy", HttpMethod.GET)); + assertEquals("success", this.bridgeHttp.get("dummy").get()); + } + + @Test + public void testGetJson() throws Exception { + this.fetcher.addEndpointHandler(assertExactJson("dummy", HttpMethod.GET)); + assertEquals(successJson(), this.bridgeHttp.getJson("dummy").get()); + } + + @Test + public void testPut() throws Exception { + this.fetcher.addEndpointHandler(assertExact("dummy", HttpMethod.PUT)); + assertEquals("success", this.bridgeHttp.put("dummy").get()); + } + + @Test + public void testPutJson() throws Exception { + this.fetcher.addEndpointHandler(assertExactJson("dummy", HttpMethod.PUT)); + assertEquals(successJson(), this.bridgeHttp.putJson("dummy").get()); + } + + @Test + public void testPost() throws Exception { + final var body = "body"; + this.fetcher.addEndpointHandler(assertExact("dummy", HttpMethod.POST, body)); + assertEquals("success", this.bridgeHttp.post("dummy", body).get()); + } + + @Test + public void testPostJson() throws Exception { + final var body = JsonUtils.buildJsonObject() // + .addProperty("body", true) // + .build(); + this.fetcher.addEndpointHandler(assertExactJson("dummy", HttpMethod.POST, body)); + assertEquals(successJson(), this.bridgeHttp.postJson("dummy", body).get()); + } + + @Test + public void testDelete() throws Exception { + this.fetcher.addEndpointHandler(assertExact("dummy", HttpMethod.DELETE)); + assertEquals("success", this.bridgeHttp.delete("dummy").get()); + } + + @Test + public void testDeleteJson() throws Exception { + this.fetcher.addEndpointHandler(assertExactJson("dummy", HttpMethod.DELETE)); + assertEquals(successJson(), this.bridgeHttp.deleteJson("dummy").get()); + } + + @Test + public void testRequest() throws Exception { + this.fetcher.addEndpointHandler(assertExact("dummy", HttpMethod.DELETE)); + + final var response = this.bridgeHttp + .request(new Endpoint("dummy", HttpMethod.DELETE, 12345, 1245, null, emptyMap())); + + assertEquals("success", response.get()); + } + + @Test + public void testRequestJson() throws Exception { + this.fetcher.addEndpointHandler(assertExactJson("dummy", HttpMethod.DELETE)); + + final var response = this.bridgeHttp + .requestJson(new Endpoint("dummy", HttpMethod.DELETE, 12345, 1245, null, emptyMap())); + + assertEquals(successJson(), response.get()); + } + + private static ThrowingFunction assertExact(// + String url, // + HttpMethod method // + ) { + return assertExact(url, method, null); + } + + private static ThrowingFunction assertExact(// + String url, // + HttpMethod method, // + String body // + ) { + return endpoint -> { + if (!endpoint.url().equals(url)) { + return null; + } + + assertEquals(method, endpoint.method()); + assertEquals(body, endpoint.body()); + + return "success"; + }; + } + + private static ThrowingFunction assertExactJson(// + String url, // + HttpMethod method // + ) { + return assertExactJson(url, method, null); + } + + private static ThrowingFunction assertExactJson(// + String url, // + HttpMethod method, // + JsonElement body // + ) { + return endpoint -> { + if (!endpoint.url().equals(url)) { + return null; + } + + assertEquals(method, endpoint.method()); + + if (body != null) { + assertNotNull(endpoint.body()); + assertEquals(body, JsonUtils.parse(endpoint.body())); + } else { + assertNull(endpoint.body()); + } + + return successJson().toString(); + }; + } + + private static JsonElement successJson() { + return JsonUtils.buildJsonObject() // + .addProperty("success", true) // + .build(); + } + +} diff --git a/io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/api/BridgeHttpTimeTest.java b/io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/api/BridgeHttpTimeTest.java new file mode 100644 index 00000000000..0edf0840df0 --- /dev/null +++ b/io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/api/BridgeHttpTimeTest.java @@ -0,0 +1,64 @@ +package io.openems.edge.bridge.http.api; + +import static io.openems.edge.bridge.http.time.DelayTimeProviderChain.fixedDelay; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.openems.common.utils.ReflectionUtils; +import io.openems.edge.bridge.http.BridgeHttpImpl; +import io.openems.edge.bridge.http.CycleSubscriber; +import io.openems.edge.bridge.http.DummyUrlFetcher; + +public class BridgeHttpTimeTest { + + private DummyUrlFetcher fetcher; + private BridgeHttp bridgeHttp; + + @Before + public void before() throws Exception { + final var cycleSubscriber = new CycleSubscriber(); + this.bridgeHttp = new BridgeHttpImpl(); + ReflectionUtils.setAttribute(BridgeHttpImpl.class, this.bridgeHttp, "cycleSubscriber", cycleSubscriber); + + this.fetcher = new DummyUrlFetcher(); + this.fetcher.addEndpointHandler(endpoint -> { + return switch (endpoint.url()) { + case "dummy" -> "success"; + case "error" -> throw new RuntimeException(); + default -> null; + }; + }); + ReflectionUtils.setAttribute(BridgeHttpImpl.class, this.bridgeHttp, "urlFetcher", this.fetcher); + + ((BridgeHttpImpl) this.bridgeHttp).activate(); + } + + @After + public void after() throws Exception { + ((BridgeHttpImpl) this.bridgeHttp).deactivate(); + } + + @Test(timeout = 1000L) + public void testSubscribeTime() throws Exception { + this.fetcher.addEndpointHandler(endpoint -> { + if (!endpoint.url().equals("dummy")) { + return null; + } + + return "success"; + }); + + final var executedOnce = new CompletableFuture(); + this.bridgeHttp.subscribeTime(fixedDelay(Duration.ofHours(99)), "dummy", result -> { + executedOnce.complete(null); + }); + + executedOnce.get(); + } + +} diff --git a/io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/time/DelayTimeProviderChainTest.java b/io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/time/DelayTimeProviderChainTest.java new file mode 100644 index 00000000000..b6d8cf94157 --- /dev/null +++ b/io.openems.edge.bridge.http/test/io/openems/edge/bridge/http/time/DelayTimeProviderChainTest.java @@ -0,0 +1,57 @@ +package io.openems.edge.bridge.http.time; + +import static io.openems.edge.bridge.http.time.DelayTimeProviderChain.fixedAtEveryFull; +import static io.openems.edge.bridge.http.time.DelayTimeProviderChain.fixedDelay; +import static io.openems.edge.bridge.http.time.DelayTimeProviderChain.immediate; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; + +import org.junit.Test; + +import io.openems.common.test.TimeLeapClock; +import io.openems.common.timedata.DurationUnit; + +public class DelayTimeProviderChainTest { + + @Test + public void testImmediate() { + final var delayProvider = immediate(); + assertEquals(Duration.ofSeconds(0), delayProvider.getDelay()); + } + + @Test + public void testFixedDelay() { + final var delay = Duration.ofSeconds(9); + final var delayProvider = fixedDelay(delay); + assertEquals(delay, delayProvider.getDelay()); + } + + @Test + public void testFixedAtEveryFull() { + final var clock = new TimeLeapClock(LocalDateTime.of(2000, 1, 1, 12, 30, 23).toInstant(ZoneOffset.UTC)); + final var delayProvider = fixedAtEveryFull(clock, DurationUnit.ofMinutes(1)); + assertEquals(Duration.ofSeconds(60 - 23), delayProvider.getDelay()); + } + + @Test + public void testPlusFixedAmountDelayTimeProviderChainIntTimeUnit() { + final var delayProvider = fixedDelay(Duration.ofSeconds(5)) // + .plusFixedAmount(Duration.ofSeconds(3)); + assertEquals(Duration.ofSeconds(8), delayProvider.getDelay()); + } + + @Test + public void testPlusRandomDelayDelayTimeProviderChainIntTimeUnit() { + final var delayProvider = fixedDelay(Duration.ofSeconds(5)) // + .plusRandomDelay(10, ChronoUnit.SECONDS); + final var createdDelay = delayProvider.getDelay(); + assertTrue(createdDelay.toSeconds() >= 5); + assertTrue(createdDelay.toSeconds() < 15); + } + +} diff --git a/io.openems.edge.io.shelly/src/io/openems/edge/io/shelly/shellyplug/IoShellyPlugImpl.java b/io.openems.edge.io.shelly/src/io/openems/edge/io/shelly/shellyplug/IoShellyPlugImpl.java index 890ce6be1c6..03bbb6a9982 100644 --- a/io.openems.edge.io.shelly/src/io/openems/edge/io/shelly/shellyplug/IoShellyPlugImpl.java +++ b/io.openems.edge.io.shelly/src/io/openems/edge/io/shelly/shellyplug/IoShellyPlugImpl.java @@ -8,7 +8,6 @@ import org.osgi.service.component.annotations.ConfigurationPolicy; import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Reference; -import org.osgi.service.component.annotations.ReferenceScope; import org.osgi.service.event.Event; import org.osgi.service.event.EventHandler; import org.osgi.service.event.propertytypes.EventTopics; @@ -21,6 +20,7 @@ import io.openems.common.exceptions.OpenemsError.OpenemsNamedException; import io.openems.common.utils.JsonUtils; import io.openems.edge.bridge.http.api.BridgeHttp; +import io.openems.edge.bridge.http.api.BridgeHttpFactory; import io.openems.edge.common.channel.BooleanWriteChannel; import io.openems.edge.common.component.AbstractOpenemsComponent; import io.openems.edge.common.component.OpenemsComponent; @@ -50,9 +50,11 @@ public class IoShellyPlugImpl extends AbstractOpenemsComponent private SinglePhase phase = null; private String baseUrl; - @Reference(scope = ReferenceScope.PROTOTYPE_REQUIRED) private BridgeHttp httpBridge; + @Reference + private BridgeHttpFactory httpBridgeFactory; + public IoShellyPlugImpl() { super(// OpenemsComponent.ChannelId.values(), // @@ -73,6 +75,7 @@ private void activate(ComponentContext context, Config config) { this.meterType = config.type(); this.phase = config.phase(); this.baseUrl = "http://" + config.ip(); + this.httpBridge = this.httpBridgeFactory.get(); if (!this.isEnabled()) { return; @@ -85,6 +88,8 @@ private void activate(ComponentContext context, Config config) { @Deactivate protected void deactivate() { super.deactivate(); + this.httpBridgeFactory.unget(this.httpBridge); + this.httpBridge = null; } @Override @@ -166,7 +171,7 @@ private void executeWrite(BooleanWriteChannel channel, int index) { return; } final var url = this.baseUrl + "/relay/" + index + "?turn=" + (writeValue.get() ? "on" : "off"); - this.httpBridge.request(url).whenComplete((t, e) -> { + this.httpBridge.get(url).whenComplete((t, e) -> { this._setSlaveCommunicationFailed(e != null); }); } diff --git a/io.openems.edge.io.shelly/test/io/openems/edge/io/shelly/shellyplug/IoShellyPlugImplTest.java b/io.openems.edge.io.shelly/test/io/openems/edge/io/shelly/shellyplug/IoShellyPlugImplTest.java index f95da125c34..fd99bec6db1 100644 --- a/io.openems.edge.io.shelly/test/io/openems/edge/io/shelly/shellyplug/IoShellyPlugImplTest.java +++ b/io.openems.edge.io.shelly/test/io/openems/edge/io/shelly/shellyplug/IoShellyPlugImplTest.java @@ -2,7 +2,7 @@ import org.junit.Test; -import io.openems.edge.bridge.http.dummy.DummyBridgeHttp; +import io.openems.edge.bridge.http.dummy.DummyBridgeHttpFactory; import io.openems.edge.common.test.ComponentTest; import io.openems.edge.meter.api.MeterType; import io.openems.edge.meter.api.SinglePhase; @@ -14,7 +14,7 @@ public class IoShellyPlugImplTest { @Test public void test() throws Exception { new ComponentTest(new IoShellyPlugImpl()) // - .addReference("httpBridge", new DummyBridgeHttp()) // + .addReference("httpBridgeFactory", new DummyBridgeHttpFactory()) // .activate(MyConfig.create() // .setId(COMPONENT_ID) // .setPhase(SinglePhase.L1) //