Skip to content

Commit

Permalink
Added web socket support
Browse files Browse the repository at this point in the history
  • Loading branch information
isKONSTANTIN committed Jul 18, 2024
1 parent 8eb8919 commit 8d0baf1
Show file tree
Hide file tree
Showing 20 changed files with 628 additions and 2 deletions.
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

group = "app.finwave.api"
version = "1.0.4"
version = "1.2.0"

repositories {
mavenCentral()
Expand All @@ -13,6 +13,8 @@ repositories {
dependencies {
implementation 'com.google.code.gson:gson:2.11.0'

implementation 'org.java-websocket:Java-WebSocket:1.5.2'

testImplementation platform("org.junit:junit-bom:5.9.1")
testImplementation "org.junit.jupiter:junit-jupiter"
}
Expand Down
32 changes: 31 additions & 1 deletion src/main/java/app/finwave/api/FinWaveClient.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
package app.finwave.api;

import app.finwave.api.tools.*;
import app.finwave.api.websocket.FinWaveWebSocketClient;
import app.finwave.api.websocket.messages.handler.AbstractWebSocketHandler;
import app.finwave.api.websocket.messages.requests.AuthMessageRequest;
import com.google.gson.JsonElement;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FinWaveClient {
protected static ExecutorService threadPool = Executors.newFixedThreadPool(4,
protected static ExecutorService threadPool = Executors.newCachedThreadPool(
r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
Expand Down Expand Up @@ -51,6 +56,31 @@ public void setToken(String token) {
this.token = token;
}

public FinWaveWebSocketClient connectToWebsocket(String url, String path, AbstractWebSocketHandler handler) throws URISyntaxException, InterruptedException {
FinWaveWebSocketClient client = new FinWaveWebSocketClient(new URI(url + path));
client.setHandler(handler);

if (!client.connectBlocking())
return null;

if (token != null && !token.isBlank())
client.send(new AuthMessageRequest(token));

return client;
}

public FinWaveWebSocketClient connectToWebsocket(String path, AbstractWebSocketHandler handler) throws URISyntaxException, InterruptedException {
return connectToWebsocket(
baseURL.replaceFirst("http://", "ws://")
.replaceFirst("https://", "wss://"),
path, handler
);
}

public FinWaveWebSocketClient connectToWebsocket(AbstractWebSocketHandler handler) throws URISyntaxException, InterruptedException {
return connectToWebsocket("websockets/events", handler);
}

public <R extends IResponse, T extends IRequest<R>> CompletableFuture<R> runRequest(T request) {
CompletableFuture<R> future = new CompletableFuture<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package app.finwave.api.websocket;

import app.finwave.api.websocket.messages.handler.AbstractWebSocketHandler;
import app.finwave.api.websocket.messages.RequestMessage;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static app.finwave.api.tools.Misc.GSON;

public class FinWaveWebSocketClient extends WebSocketClient {
protected AbstractWebSocketHandler handler;

protected long lastPing = 0;

protected static ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);

return thread;
});

public FinWaveWebSocketClient(URI uri) {
super(uri);

scheduledExecutorService.scheduleAtFixedRate(() -> {
if (isOpen() && System.currentTimeMillis() - lastPing > 15000) {
send("ping");
}
}, 5, 5, TimeUnit.SECONDS);
}

public void setHandler(AbstractWebSocketHandler handler) {
handler.init(this);

this.handler = handler;
}

@Override
public void onOpen(ServerHandshake serverHandshake) {
if (handler != null)
handler.opened(serverHandshake);
}

@Override
public void onMessage(String rawMessage) {
if (rawMessage.equals("pong")) {
lastPing = System.currentTimeMillis();

return;
}

if (handler != null)
handler.onMessage(rawMessage);
}

@Override
public void onClose(int code, String reason, boolean remote) {
if (handler != null)
handler.closed(code, reason, remote);
}

@Override
public void onError(Exception e) {
if (handler != null)
handler.onError(e);
}

public void send(RequestMessage<?> message) {
lastPing = System.currentTimeMillis();

send(GSON.toJson(message));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package app.finwave.api.websocket.messages;

public class MessageBody {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package app.finwave.api.websocket.messages;

import app.finwave.api.tools.Misc;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

public class RequestMessage<T extends MessageBody> {
public final String type;
protected final T body;

public RequestMessage(String type, T body) {
this.type = type;
this.body = body;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package app.finwave.api.websocket.messages;

import app.finwave.api.tools.Misc;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

public class ResponseMessage {
public final String type;
protected final JsonElement body;

public ResponseMessage(String type, JsonObject body) {
this.type = type;
this.body = body;
}

public <T extends MessageBody> T getBody(Class<T> clazz) {
return Misc.GSON.fromJson(body, clazz);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package app.finwave.api.websocket.messages.handler;

import app.finwave.api.websocket.FinWaveWebSocketClient;
import app.finwave.api.websocket.messages.ResponseMessage;
import com.google.gson.JsonSyntaxException;
import org.java_websocket.handshake.ServerHandshake;

import static app.finwave.api.tools.Misc.GSON;

public abstract class AbstractWebSocketHandler {
protected FinWaveWebSocketClient client;

public void init(FinWaveWebSocketClient client) {
this.client = client;
}

public void onMessage(String rawMessage) {
ResponseMessage message;

try {
message = GSON.fromJson(rawMessage, ResponseMessage.class);
} catch (JsonSyntaxException e) {
unparsedMessage(rawMessage);

return;
}

onMessage(message);
}

public abstract void onMessage(ResponseMessage message);

public void unparsedMessage(String rawMessage) {}

public abstract void opened(ServerHandshake serverHandshake);
public abstract void onError(Exception exception);
public abstract void closed(int code, String reason, boolean remote);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package app.finwave.api.websocket.messages.handler;

import app.finwave.api.websocket.messages.ResponseMessage;
import org.java_websocket.handshake.ServerHandshake;

import java.util.ArrayDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;

public abstract class QueueWebSocketHandler extends AbstractWebSocketHandler {
protected ArrayDeque<CompletableFuture<ResponseMessage>> deque = new ArrayDeque<>();
protected ReentrantLock lock = new ReentrantLock();

@Override
public void onMessage(ResponseMessage message) {
lock.lock();

try {
CompletableFuture<ResponseMessage> future = deque.peek();

if (future == null || future.isDone()) {
deque.add(CompletableFuture.completedFuture(message));

return;
}

deque.remove(future);
future.complete(message);
}finally {
lock.unlock();
}
}

public CompletableFuture<ResponseMessage> messageExpected() {
lock.lock();

try {
CompletableFuture<ResponseMessage> future = deque.peek();

if (future != null && future.isDone())
deque.remove(future);

if (future != null)
return future;

future = new CompletableFuture<>();
deque.add(future);

return future;
}finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package app.finwave.api.websocket.messages.handler;

import app.finwave.api.websocket.messages.ResponseMessage;
import app.finwave.api.websocket.messages.response.GenericMessageBody;
import app.finwave.api.websocket.messages.response.NotifyUpdateBody;
import app.finwave.api.websocket.messages.response.auth.AuthStatusBody;
import app.finwave.api.websocket.messages.response.notifications.Notification;
import app.finwave.api.websocket.messages.response.notifications.NotificationPointRegisteredBody;
import app.finwave.api.websocket.messages.response.notifications.NotificationSubscribeBody;

import java.util.UUID;

public abstract class RoutedWebSocketHandler extends AbstractWebSocketHandler {
@Override
public void onMessage(ResponseMessage message) {
switch (message.type) {
case "generic" -> {
GenericMessageBody body = message.getBody(GenericMessageBody.class);
genericMessage(body.message, body.code);
}
case "update" -> notifyUpdate(message.getBody(NotifyUpdateBody.class).updated);
case "auth" -> authStatus(message.getBody(AuthStatusBody.class).status);
case "notification" -> notification(message.getBody(Notification.class));
case "newNotificationRegistered" -> {
NotificationPointRegisteredBody body = message.getBody(NotificationPointRegisteredBody.class);
notificationPointRegistered(body.id, body.uuid);
}
case "subscribeNotification" -> notificationPointSubscribe(message.getBody(NotificationSubscribeBody.class).status);
default -> unknownMessage(message);
}
}

public void unknownMessage(ResponseMessage message) {}

public abstract void notifyUpdate(String updated);
public abstract void genericMessage(String message, int code);
public abstract void notification(Notification notification);
public abstract void notificationPointRegistered(long pointId, UUID uuid);
public abstract void notificationPointSubscribe(String status);
public abstract void authStatus(String status);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package app.finwave.api.websocket.messages.requests;

import app.finwave.api.websocket.messages.MessageBody;
import app.finwave.api.websocket.messages.RequestMessage;

public class AuthMessageRequest extends RequestMessage<AuthMessageRequest.AuthMessageBody> {
public AuthMessageRequest(String token) {
super("auth", new AuthMessageBody(token));
}

protected static class AuthMessageBody extends MessageBody {
public final String token;

public AuthMessageBody(String token) {
this.token = token;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package app.finwave.api.websocket.messages.requests;

import app.finwave.api.websocket.messages.MessageBody;
import app.finwave.api.websocket.messages.RequestMessage;

public class NewNotificationPointRequest extends RequestMessage<NewNotificationPointRequest.NewNotificationPointBody> {
public NewNotificationPointRequest(String description, boolean isPrimary) {
super("newNotification", new NewNotificationPointBody(description, isPrimary));
}

protected static class NewNotificationPointBody extends MessageBody {
public final String description;
public final boolean isPrimary;

public NewNotificationPointBody(String description, boolean isPrimary) {
this.description = description;
this.isPrimary = isPrimary;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package app.finwave.api.websocket.messages.requests;

import app.finwave.api.websocket.messages.MessageBody;
import app.finwave.api.websocket.messages.RequestMessage;

import java.util.UUID;

public class SubscribeNotificationsRequest extends RequestMessage<SubscribeNotificationsRequest.SubscribeNotificationsBody> {
public SubscribeNotificationsRequest(UUID pointUUID) {
super("subscribeNotification", new SubscribeNotificationsBody(pointUUID));
}

protected static class SubscribeNotificationsBody extends MessageBody {
public final UUID pointUUID;

public SubscribeNotificationsBody(UUID pointUUID) {
this.pointUUID = pointUUID;
}
}
}
Loading

0 comments on commit 8d0baf1

Please sign in to comment.