Skip to content

Commit

Permalink
Merge pull request #215 from dimaa6/172-async-execution
Browse files Browse the repository at this point in the history
Asynchronous execution
  • Loading branch information
TVolden authored Sep 30, 2022
2 parents cfff07b + b6c0b59 commit 9fe3b79
Show file tree
Hide file tree
Showing 71 changed files with 293 additions and 93 deletions.
14 changes: 14 additions & 0 deletions OCPP-J/src/main/java/eu/chargetime/ocpp/JSONCommunicator.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import eu.chargetime.ocpp.model.CallMessage;
import eu.chargetime.ocpp.model.CallResultMessage;
import eu.chargetime.ocpp.model.Message;
import eu.chargetime.ocpp.model.Exclude;

import java.lang.reflect.Type;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -99,6 +101,18 @@ public ZonedDateTime deserialize(
static {
GsonBuilder builder = new GsonBuilder();
builder.registerTypeAdapter(ZonedDateTime.class, new ZonedDateTimeSerializer());
builder.addSerializationExclusionStrategy(new ExclusionStrategy() {
@Override
public boolean shouldSkipClass(Class<?> clazz) {
return false;
}

@Override
public boolean shouldSkipField(FieldAttributes field) {
return field.getAnnotation(Exclude.class) != null;
}
});

gson = builder.disableHtmlEscaping().create();
}

Expand Down
9 changes: 9 additions & 0 deletions ocpp-common/src/main/java/eu/chargetime/ocpp/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public Confirmation handleRequest(Request request) throws UnsupportedFeatureExce
}
}

@Override
public boolean asyncCompleteRequest(String uniqueId, Confirmation confirmation) throws UnsupportedFeatureException, OccurenceConstraintException {
return session.completePendingPromise(uniqueId, confirmation);
}

@Override
public void handleError(
String uniqueId, String errorCode, String errorDescription, Object payload) {
Expand Down Expand Up @@ -165,4 +170,8 @@ public CompletableFuture<Confirmation> send(Request request)
public UUID getSessionId() {
return this.session.getSessionId();
}

public boolean asyncCompleteRequest(String uniqueId, Confirmation confirmation) throws UnsupportedFeatureException, OccurenceConstraintException {
return session.completePendingPromise(uniqueId, confirmation);
}
}
3 changes: 3 additions & 0 deletions ocpp-common/src/main/java/eu/chargetime/ocpp/ISession.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package eu.chargetime.ocpp;

import eu.chargetime.ocpp.model.Confirmation;
import eu.chargetime.ocpp.model.Request;
import java.util.UUID;

Expand Down Expand Up @@ -39,5 +40,7 @@ public interface ISession {

void sendRequest(String action, Request payload, String uuid);

boolean completePendingPromise(String id, Confirmation confirmation) throws UnsupportedFeatureException, OccurenceConstraintException;

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ public RequestDispatcher(PromiseFulfiller fulfiller) {
this.fulfiller = fulfiller;
}

public CompletableFuture<Confirmation> handleRequest(Request request) {
CompletableFuture<Confirmation> promise = new CompletableFuture<>();
public void handleRequest(CompletableFuture<Confirmation> promise, Request request) {
fulfiller.fulfill(promise, eventHandler, request);
return promise;
}

public void setEventHandler(SessionEvents eventHandler) {
Expand Down
28 changes: 28 additions & 0 deletions ocpp-common/src/main/java/eu/chargetime/ocpp/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ public Confirmation handleRequest(Request request)
}
}

@Override
public boolean asyncCompleteRequest(String uniqueId, Confirmation confirmation) throws UnsupportedFeatureException, OccurenceConstraintException {
return session.completePendingPromise(uniqueId, confirmation);
}

@Override
public void handleError(
String uniqueId, String errorCode, String errorDescription, Object payload) {
Expand Down Expand Up @@ -219,6 +224,29 @@ public CompletableFuture<Confirmation> send(UUID sessionIndex, Request request)
return promise;
}

/**
* Indicate completion of a pending request.
*
* @param sessionIndex Session index of the client.
* @param uniqueId the unique id used for the original {@link Request}.
* @param confirmation the {@link Confirmation} to the original {@link Request}.
* @return a boolean indicating if pending request was found.
* @throws NotConnectedException Thrown if session with passed sessionIndex is not found
*/
public boolean asyncCompleteRequest(UUID sessionIndex, String uniqueId, Confirmation confirmation) throws NotConnectedException, UnsupportedFeatureException, OccurenceConstraintException {
ISession session = sessions.get(sessionIndex);

if (session == null) {
logger.warn("Session not found by index: {}", sessionIndex);

// No session found means client disconnected and request should be cancelled
throw new NotConnectedException();
}

return session.completePendingPromise(uniqueId, confirmation);
}


public boolean isSessionOpen(UUID sessionIndex) {
return sessions.containsKey(sessionIndex);
}
Expand Down
39 changes: 38 additions & 1 deletion ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ of this software and associated documentation files (the "Software"), to deal
import eu.chargetime.ocpp.utilities.MoreObjects;
import java.util.Optional;
import java.util.UUID;
import java.util.Map;
import java.util.HashMap;
import java.util.AbstractMap;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,6 +53,7 @@ public class Session implements ISession {
private final RequestDispatcher dispatcher;
private final IFeatureRepository featureRepository;
private SessionEvents events;
private final Map<String, AbstractMap.SimpleImmutableEntry<String, CompletableFuture<Confirmation>>> pendingPromises = new HashMap<>();

/**
* Handles required injections.
Expand Down Expand Up @@ -198,9 +202,12 @@ public synchronized void onCall(String id, String action, Object payload) {
try {
Request request =
communicator.unpackPayload(payload, featureOptional.get().getRequestType());
request.setOcppMessageId(id);
if (request.validate()) {
CompletableFuture<Confirmation> promise = dispatcher.handleRequest(request);
CompletableFuture<Confirmation> promise = new CompletableFuture<>();
promise.whenComplete(new ConfirmationHandler(id, action, communicator));
addPendingPromise(id, action, promise);
dispatcher.handleRequest(promise, request);
} else {
communicator.sendCallError(
id, action, "OccurenceConstraintViolation", OCCURENCE_CONSTRAINT_VIOLATION);
Expand Down Expand Up @@ -231,6 +238,36 @@ public void onConnected() {
}
}

private void addPendingPromise(String id, String action, CompletableFuture<Confirmation> promise) {
synchronized (pendingPromises) {
pendingPromises.put(id, new AbstractMap.SimpleImmutableEntry<>(action, promise));
}
}

@Override
public boolean completePendingPromise(String id, Confirmation confirmation) throws UnsupportedFeatureException, OccurenceConstraintException {
AbstractMap.SimpleImmutableEntry<String, CompletableFuture<Confirmation>> promiseAction = null;
// synchronization prevents from confirming one promise more than once, as we remove found promise
synchronized (pendingPromises) {
promiseAction = pendingPromises.get(id);
if (promiseAction == null) return false;
// remove promise from store
pendingPromises.remove(id);
}
// check confirmation type, it has to correspond to original request type
Optional<Feature> featureOptional = featureRepository.findFeature(promiseAction.getKey());
if (featureOptional.isPresent()) {
if (!featureOptional.get().getConfirmationType().isInstance(confirmation)) {
throw new OccurenceConstraintException();
}
} else {
logger.debug("Feature for confirmation with id: {} not found in session: {}", id, this);
throw new UnsupportedFeatureException("Error with getting confirmation type by request id = " + id);
}
promiseAction.getValue().complete(confirmation);
return true;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ public interface SessionEvents {
*/
Confirmation handleRequest(Request request) throws UnsupportedFeatureException;

/**
* Completes a pending request {@link Request}.
*
* @param uniqueId the unique id used for the {@link Request}.
* @param confirmation the {@link Confirmation} to the {@link Request}.
* @return a boolean indicating if pending request was found.
*/
boolean asyncCompleteRequest(String uniqueId, Confirmation confirmation) throws UnsupportedFeatureException, OccurenceConstraintException;

/**
* Handle a error to a {@link Request}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ public void fulfill(
CompletableFuture<Confirmation> promise, SessionEvents eventHandler, Request request) {
try {
Confirmation conf = eventHandler.handleRequest(request);
promise.complete(conf);
// Confirmation may be null, in this case asynchronous execution is assumed
if (conf != null) {
eventHandler.asyncCompleteRequest(request.getOcppMessageId(), conf);
}
} catch (Exception ex) {
logger.warn("fulfillPromis() failed", ex);
promise.completeExceptionally(ex);
Expand Down
10 changes: 10 additions & 0 deletions ocpp-common/src/main/java/eu/chargetime/ocpp/model/Exclude.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package eu.chargetime.ocpp.model;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface Exclude {}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ of this software and associated documentation files (the "Software"), to deal
/** Interface used to flag a model as Request payload. */
public interface Request extends Validatable {
boolean transactionRelated();
String getOcppMessageId();
void setOcppMessageId(String id);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package eu.chargetime.ocpp.model;

public abstract class RequestWithId implements Request {
@Override
public String getOcppMessageId() {
return ocppMessageId;
}

@Override
public void setOcppMessageId(String requestId) {
this.ocppMessageId = requestId;
}

@Exclude
private String ocppMessageId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*/

/** Test implementation of the Request interface. Used for tests. */
public class TestRequest implements Request {
public class TestRequest extends RequestWithId {
@Override
public boolean validate() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import eu.chargetime.ocpp.feature.Feature;
import eu.chargetime.ocpp.model.Confirmation;
import eu.chargetime.ocpp.model.Request;
import eu.chargetime.ocpp.model.RequestWithId;
import eu.chargetime.ocpp.model.TestRequest;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -86,7 +87,7 @@ public void sendRequest_sendRequestToCommunicator() {
// Given
String someAction = "Test action";
Request someRequest =
new Request() {
new RequestWithId() {
@Override
public boolean transactionRelated() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public CompletionStage<Confirmation> send(Request request)
return client.send(request);
}

@Override
public boolean asyncCompleteRequest(String uniqueId, Confirmation confirmation) throws UnsupportedFeatureException, OccurenceConstraintException {
return client.asyncCompleteRequest(uniqueId, confirmation);
}

@Override
public void disconnect() {
client.disconnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,9 @@ public CompletionStage<Confirmation> send(UUID session, Request request)
throws OccurenceConstraintException, UnsupportedFeatureException, NotConnectedException {
return server.send(session, request);
}

@Override
public boolean asyncCompleteRequest(UUID sessionIndex, String uniqueId, Confirmation confirmation) throws NotConnectedException, UnsupportedFeatureException, OccurenceConstraintException {
return server.asyncCompleteRequest(sessionIndex, uniqueId, confirmation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public CompletionStage<Confirmation> send(Request request)
return client.send(request);
}

@Override
public boolean asyncCompleteRequest(String uniqueId, Confirmation confirmation) throws UnsupportedFeatureException, OccurenceConstraintException {
return client.asyncCompleteRequest(uniqueId, confirmation);
}

/** Disconnect from server Closes down local callback service. */
public void disconnect() {
this.client.disconnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,9 @@ public CompletionStage<Confirmation> send(UUID session, Request request)
throws OccurenceConstraintException, UnsupportedFeatureException, NotConnectedException {
return server.send(session, request);
}

@Override
public boolean asyncCompleteRequest(UUID sessionIndex, String uniqueId, Confirmation confirmation) throws NotConnectedException, UnsupportedFeatureException, OccurenceConstraintException {
return server.asyncCompleteRequest(sessionIndex, uniqueId, confirmation);
}
}
2 changes: 2 additions & 0 deletions ocpp-v1_6/src/main/java/eu/chargetime/ocpp/IClientAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface IClientAPI {
CompletionStage<Confirmation> send(Request request)
throws OccurenceConstraintException, UnsupportedFeatureException;

boolean asyncCompleteRequest(String uniqueId, Confirmation confirmation) throws UnsupportedFeatureException, OccurenceConstraintException;

void disconnect();

boolean isClosed();
Expand Down
3 changes: 3 additions & 0 deletions ocpp-v1_6/src/main/java/eu/chargetime/ocpp/IServerAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,7 @@ public interface IServerAPI {

CompletionStage<Confirmation> send(UUID sessionIndex, Request request)
throws OccurenceConstraintException, UnsupportedFeatureException, NotConnectedException;

boolean asyncCompleteRequest(UUID sessionIndex, String uniqueId, Confirmation confirmation)
throws NotConnectedException, UnsupportedFeatureException, OccurenceConstraintException;
}
5 changes: 5 additions & 0 deletions ocpp-v1_6/src/main/java/eu/chargetime/ocpp/JSONClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ public CompletionStage<Confirmation> send(Request request)
return client.send(request);
}

@Override
public boolean asyncCompleteRequest(String uniqueId, Confirmation confirmation) throws UnsupportedFeatureException, OccurenceConstraintException {
return client.asyncCompleteRequest(uniqueId, confirmation);
}

@Override
public void disconnect() {
client.disconnect();
Expand Down
5 changes: 5 additions & 0 deletions ocpp-v1_6/src/main/java/eu/chargetime/ocpp/JSONServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,9 @@ public CompletionStage<Confirmation> send(UUID session, Request request)
throws OccurenceConstraintException, UnsupportedFeatureException, NotConnectedException {
return server.send(session, request);
}

@Override
public boolean asyncCompleteRequest(UUID sessionIndex, String uniqueId, Confirmation confirmation) throws NotConnectedException, UnsupportedFeatureException, OccurenceConstraintException {
return server.asyncCompleteRequest(sessionIndex, uniqueId, confirmation);
}
}
5 changes: 5 additions & 0 deletions ocpp-v1_6/src/main/java/eu/chargetime/ocpp/SOAPClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ public CompletionStage<Confirmation> send(Request request)
return client.send(request);
}

@Override
public boolean asyncCompleteRequest(String uniqueId, Confirmation confirmation) throws UnsupportedFeatureException, OccurenceConstraintException {
return client.asyncCompleteRequest(uniqueId, confirmation);
}

/** Disconnect from server Closes down local callback service. */
public void disconnect() {
this.client.disconnect();
Expand Down
5 changes: 5 additions & 0 deletions ocpp-v1_6/src/main/java/eu/chargetime/ocpp/SOAPServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,9 @@ public CompletionStage<Confirmation> send(UUID session, Request request)
throws OccurenceConstraintException, UnsupportedFeatureException, NotConnectedException {
return server.send(session, request);
}

@Override
public boolean asyncCompleteRequest(UUID sessionIndex, String uniqueId, Confirmation confirmation) throws NotConnectedException, UnsupportedFeatureException, OccurenceConstraintException {
return server.asyncCompleteRequest(sessionIndex, uniqueId, confirmation);
}
}
Loading

0 comments on commit 9fe3b79

Please sign in to comment.