Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP-Bridge add support for time based subscribes #2539

Merged
merged 14 commits into from
Feb 21, 2024
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package io.openems.edge.bridge.http;

import java.util.HashSet;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
Expand All @@ -23,26 +26,33 @@
)
public class BridgeHttpImpl implements BridgeHttp {

private static class EndpointCountdown {
public static class CycleEndpointCountdown {
private volatile int cycleCount;
public final Endpoint endpoint;
public final CycleEndpoint cycleEndpoint;
private volatile boolean running = false;

public EndpointCountdown(Endpoint endpoint) {
super();
public CycleEndpointCountdown(CycleEndpoint endpoint) {
this.cycleCount = endpoint.cycle();
this.endpoint = endpoint;
this.cycleEndpoint = endpoint;
}

public EndpointCountdown reset() {
this.cycleCount = this.endpoint.cycle();
/**
* Resets the current cycle count to the initial cycle count.
*
* @return this
*/
public CycleEndpointCountdown reset() {
this.cycleCount = this.cycleEndpoint.cycle();
return this;
}

public int getCycleCount() {
return this.cycleCount;
}

/**
* Decreases the current cycle count by one.
*/
public void decreaseCycleCount() {
this.cycleCount--;
}
Expand All @@ -57,6 +67,53 @@ public void setRunning(boolean running) {

}

public static class TimeEndpointCountdown {
private final TimeEndpoint timeEndpoint;
private volatile boolean running = false;
private volatile boolean shutdown = false;
private Runnable shutdownCurrentTask;

public TimeEndpointCountdown(TimeEndpoint timeEndpoint) {
this.timeEndpoint = timeEndpoint;
}

public TimeEndpoint getTimeEndpoint() {
return this.timeEndpoint;
}

public boolean isRunning() {
return this.running;
}

public void setRunning(boolean running) {
this.running = running;
}

public boolean isShutdown() {
return this.shutdown;
}

public void setShutdown(boolean shutdown) {
this.shutdown = shutdown;
}

public void setShutdownCurrentTask(Runnable shutdownCurrentTask) {
this.shutdownCurrentTask = shutdownCurrentTask;
}

/**
* Shuts down the current execution of the active task.
*/
public void shutdown() {
this.setShutdown(true);
final var shutdownTask = this.shutdownCurrentTask;
if (shutdownTask != null) {
shutdownTask.run();
}
}

}

private final Logger log = LoggerFactory.getLogger(BridgeHttpImpl.class);

@Reference
Expand All @@ -67,94 +124,157 @@ public void setRunning(boolean running) {

// TODO change to java 21 virtual threads
// TODO: Single pool for every http worker & avoid same endpoint in that pool
private final ExecutorService pool = Executors.newCachedThreadPool();
private final ScheduledExecutorService pool = Executors.newScheduledThreadPool(0);

private final PriorityQueue<EndpointCountdown> endpoints = new PriorityQueue<>(
private final PriorityQueue<CycleEndpointCountdown> cycleEndpoints = new PriorityQueue<>(
(e1, e2) -> e1.getCycleCount() - e2.getCycleCount());

/*
* Default timeout values in ms
*/
private int connectTimeout = 5000;
private int readTimeout = 5000;
private final Set<TimeEndpointCountdown> timeEndpoints = new HashSet<>();

/**
* Activate method.
*/
@Activate
protected void activate() {
public void activate() {
this.cycleSubscriber.subscribe(this::handleEvent);
}

/**
* Deactivate method.
*/
@Deactivate
protected void deactivate() {
public void deactivate() {
this.cycleSubscriber.unsubscribe(this::handleEvent);
this.endpoints.clear();
this.cycleEndpoints.clear();
this.timeEndpoints.forEach(TimeEndpointCountdown::shutdown);
this.timeEndpoints.clear();
ThreadPoolUtils.shutdownAndAwaitTermination(this.pool, 0);
}

@Override
public void subscribe(Endpoint endpoint) {
if (!this.endpoints.offer(new EndpointCountdown(endpoint))) {
public void subscribeCycle(CycleEndpoint endpoint) {
if (!this.cycleEndpoints.offer(new CycleEndpointCountdown(endpoint))) {
this.log.warn("Unable to add " + endpoint + "!");
}
}

@Override
public CompletableFuture<String> request(String url) {
final var future = new CompletableFuture<String>();
this.pool.execute(this.urlFetcher.createTask(url, this.connectTimeout, this.readTimeout, future));
return future;
public void subscribeTime(TimeEndpoint endpoint) {
final var endpointCountdown = new TimeEndpointCountdown(endpoint);
this.timeEndpoints.add(endpointCountdown);
final var delay = endpoint.delayTimeProvider().nextRun(true, true);
final var future = this.pool.schedule(this.createTask(endpointCountdown), delay.toMillis(),
TimeUnit.MILLISECONDS);
endpointCountdown.setShutdownCurrentTask(() -> future.cancel(false));
}

@Override
public void setTimeout(int connectTimeout, int readTimeout) {
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
public CompletableFuture<String> request(Endpoint endpoint) {
final var future = new CompletableFuture<String>();
this.pool.execute(() -> {
try {
final var result = this.urlFetcher.fetchEndpoint(endpoint);
future.complete(result);
} catch (Exception e) {
future.completeExceptionally(e);
}
});
return future;
}

private void handleEvent(Event event) {
switch (event.getTopic()) {
// TODO: Execute before TOPIC_CYCLE_BEFORE_PROCESS_IMAGE, like modbus bridge
case EdgeEventConstants.TOPIC_CYCLE_BEFORE_PROCESS_IMAGE -> {

if (this.endpoints.isEmpty()) {
if (this.cycleEndpoints.isEmpty()) {
return;
}

this.endpoints.forEach(EndpointCountdown::decreaseCycleCount);
this.cycleEndpoints.forEach(CycleEndpointCountdown::decreaseCycleCount);

while (this.endpoints.peek().getCycleCount() == 0) {
final var item = this.endpoints.poll();
while (this.cycleEndpoints.peek().getCycleCount() == 0) {
final var item = this.cycleEndpoints.poll();
synchronized (item) {
if (item.isRunning()) {
this.log.info("Process for " + item.endpoint + " is still running. Task is not queued twice");
this.endpoints.add(item.reset());
this.log.info(
"Process for " + item.cycleEndpoint + " is still running. Task is not queued twice");
this.cycleEndpoints.add(item.reset());
continue;
}

item.setRunning(true);
}
this.pool.execute(this.createTask(item));

this.endpoints.add(item.reset());
if (!this.cycleEndpoints.offer(item.reset())) {
this.log.warn("Unable to add " + item.cycleEndpoint + "!");
}
}
}
}
}

private Runnable createTask(EndpointCountdown endpointItem) {
final var future = new CompletableFuture<String>();
future.whenComplete((t, e) -> {
private Runnable createTask(CycleEndpointCountdown endpointItem) {
return () -> {
try {
if (e != null) {
endpointItem.endpoint.onError().accept(e);
return;
}
endpointItem.endpoint.result().accept(t);
final var result = this.urlFetcher.fetchEndpoint(endpointItem.cycleEndpoint.endpoint());
endpointItem.cycleEndpoint.result().accept(result);
} catch (Exception e) {
endpointItem.cycleEndpoint.onError().accept(e);
} finally {
synchronized (endpointItem) {
endpointItem.setRunning(false);
}
}
});
return this.urlFetcher.createTask(endpointItem.endpoint.url(), this.connectTimeout, this.readTimeout, future);
};
}

private Runnable createTask(TimeEndpointCountdown endpointCountdown) {
return () -> {
synchronized (endpointCountdown) {
if (endpointCountdown.isShutdown()) {
return;
}
endpointCountdown.setRunning(true);
}
boolean currentRunSuccessful;
try {
final var result = this.urlFetcher.fetchEndpoint(endpointCountdown.getTimeEndpoint().endpoint());
endpointCountdown.getTimeEndpoint().onResult().accept(result);
currentRunSuccessful = true;
} catch (Exception e) {
endpointCountdown.getTimeEndpoint().onError().accept(e);
currentRunSuccessful = false;
}
synchronized (endpointCountdown) {
if (endpointCountdown.isShutdown()) {
return;
}
}

try {
final var nextDelay = endpointCountdown.getTimeEndpoint().delayTimeProvider().nextRun(false,
currentRunSuccessful);

final var future = this.pool.schedule(this.createTask(endpointCountdown), nextDelay.toMillis(),
TimeUnit.MILLISECONDS);
endpointCountdown.setShutdownCurrentTask(() -> future.cancel(false));
} catch (Exception e) {
if (this.pool.isShutdown()) {
return;
}
this.log.error("Unexpected exception during Task", e);
}
};
}

public PriorityQueue<CycleEndpointCountdown> getCycleEndpoints() {
return this.cycleEndpoints;
}

public Set<TimeEndpointCountdown> getTimeEndpoints() {
return this.timeEndpoints;
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.openems.edge.bridge.http;

import java.util.LinkedList;
import java.util.List;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;

import org.osgi.service.component.annotations.Component;
Expand All @@ -21,13 +21,15 @@
})
public class CycleSubscriber implements EventHandler {

private final List<Consumer<Event>> eventHandler = new LinkedList<>();
private final Set<Consumer<Event>> eventHandler = new HashSet<>();

@Override
public void handleEvent(Event event) {
switch (event.getTopic()) {
case EdgeEventConstants.TOPIC_CYCLE_BEFORE_PROCESS_IMAGE -> {
this.eventHandler.forEach(t -> t.accept(event));
synchronized (this.eventHandler) {
this.eventHandler.forEach(t -> t.accept(event));
}
}
}
}
Expand All @@ -38,7 +40,9 @@ public void handleEvent(Event event) {
* @param eventHandler the handler to execute on every event
*/
public void subscribe(Consumer<Event> eventHandler) {
this.eventHandler.add(eventHandler);
synchronized (this.eventHandler) {
this.eventHandler.add(eventHandler);
}
}

/**
Expand All @@ -49,7 +53,9 @@ public void subscribe(Consumer<Event> eventHandler) {
* found returs false
*/
public boolean unsubscribe(Consumer<Event> eventHandler) {
return this.eventHandler.remove(eventHandler);
synchronized (this.eventHandler) {
return this.eventHandler.remove(eventHandler);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
package io.openems.edge.bridge.http;

import java.util.concurrent.CompletableFuture;
import io.openems.common.exceptions.OpenemsError.OpenemsNamedException;
import io.openems.edge.bridge.http.api.BridgeHttp.Endpoint;

public interface UrlFetcher {

/**
* Creates a {@link Runnable} to execute a request with the given parameters.
*
* @param urlString the url to fetch
* @param connectTimeout the connection timeout
* @param readTimeout the read timeout
* @param future the {@link CompletableFuture} to fulfill after the
* fetch
* @return the {@link Runnable} to run to execute the fetch
* @param endpoint the {@link Endpoint} to fetch
*
* @return the result of the {@link Endpoint}
* @throws OpenemsNamedException on error
*/
public Runnable createTask(//
String urlString, //
int connectTimeout, //
int readTimeout, //
CompletableFuture<String> future //
);
public String fetchEndpoint(Endpoint endpoint) throws OpenemsNamedException;

}
Loading
Loading