Skip to content

Commit

Permalink
fix: handle WebSocket connection failure (#2805)
Browse files Browse the repository at this point in the history
* Remove routed sessions

Signed-off-by: at670475 <[email protected]>

* small fix

Signed-off-by: at670475 <[email protected]>

* Add test

Signed-off-by: at670475 <[email protected]>

* put back code deleted by mistake

Signed-off-by: at670475 <[email protected]>

* call the handle close method in order to also close the session.

Signed-off-by: at670475 <[email protected]>

* refactoring to address pr review

Signed-off-by: at670475 <[email protected]>

* add test

Signed-off-by: at670475 <[email protected]>

* refactor

Signed-off-by: at670475 <[email protected]>

* add test

Signed-off-by: at670475 <[email protected]>

* add test

Signed-off-by: at670475 <[email protected]>

---------

Signed-off-by: at670475 <[email protected]>
  • Loading branch information
taban03 authored Feb 22, 2023
1 parent 7058290 commit 232bade
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,28 +184,47 @@ private void openWebSocketConnection(RoutedService service, ServiceInstance serv

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
log.debug("afterConnectionClosed(session={},status={})", session, status);
try {
session.close(status);
routedSessions.remove(session.getId());
}

WebSocketRoutedSession webSocketRoutedSession = getRoutedSession(session);
if (webSocketRoutedSession != null) {
webSocketRoutedSession.close(status);
}
private void close(WebSocketRoutedSession webSocketRoutedSession, CloseStatus status) {
log.debug("close(webSocketRoutedSession={},status={})", webSocketRoutedSession, status);
if (webSocketRoutedSession == null) return;

routedSessions.remove(session.getId());
} catch (NullPointerException | IOException e) {
try {
webSocketRoutedSession.close(status);
} catch (IOException e) {
log.debug("Error closing WebSocket connection: {}", e.getMessage(), e);
}
}

private void close(WebSocketSession session, CloseStatus status) {
log.debug("close(session={},status={})", session, status);
try {
session.close(status);
} catch (IOException e) {
log.debug("Error closing WebSocket connection: {}", e.getMessage(), e);
} finally {
routedSessions.remove(session.getId());
close(getRoutedSession(session), status);
}
}

@Override
public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage)
throws Exception {
public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) {
log.debug("handleMessage(session={},message={})", webSocketSession, webSocketMessage);
WebSocketRoutedSession session = getRoutedSession(webSocketSession);
if (session != null) {

if (session == null) {
close(webSocketSession, CloseStatus.SESSION_NOT_RELIABLE);
return;
}

try {
session.sendMessageToServer(webSocketMessage);
} catch (Exception ex) {
log.debug("Error sending WebSocket message. Closing session due to exception:", ex);
close(webSocketSession, CloseStatus.SESSION_NOT_RELIABLE);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public interface WebSocketRoutedSessionFactory {
* Create valid client websocket session based on the existing session, target Url and SSL Context.
* @param webSocketSession Valid Server side WebSocket Session.
* @param targetUrl Full websocket URL towards the server
* @param sslContextFactory Factory producing the current SSL Context.
* @param webSocketClientFactory Factory producing the current SSL Context.
* @return Valid routed session handling the client session
*/
WebSocketRoutedSession session(WebSocketSession webSocketSession, String targetUrl, WebSocketClientFactory webSocketClientFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

package org.zowe.apiml.gateway.ws;

import org.eclipse.jetty.websocket.api.WebSocketException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
Expand All @@ -22,14 +23,18 @@
import org.zowe.apiml.product.routing.RoutedService;
import org.zowe.apiml.product.routing.RoutedServices;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

Expand Down Expand Up @@ -181,42 +186,107 @@ void givenNoInstanceOfTheServiceIsInTheRepository() throws Exception {

verify(establishedSession).close(new CloseStatus(CloseStatus.SERVICE_RESTARTED.getCode(), "Requested service service-without-instance does not have available instance"));
}

@Test
void givenNullService_thenCloseWebSocket() throws Exception {
when(establishedSession.getUri()).thenReturn(new URI("wss://gatewayHost:1443/service-without-instance/ws/v1/valid-path"));
when(lbClient.choose(any())).thenReturn(null);
RoutedServices routesForSpecificValidService = mock(RoutedServices.class);
when(routesForSpecificValidService.findServiceByGatewayUrl("ws/v1"))
.thenReturn(null);
underTest.addRoutedServices("service-without-instance", routesForSpecificValidService);

underTest.afterConnectionEstablished(establishedSession);

verify(establishedSession).close(new CloseStatus(CloseStatus.NOT_ACCEPTABLE.getCode(), "Requested ws/v1 url is not known by the gateway"));
}
}
}

@Nested
class GivenValidExistingSession {
WebSocketSession establishedSession;
WebSocketRoutedSession internallyStoredSession;
WebSocketMessage<String> passedMessage;

@BeforeEach
void prepareSessionMock() {
establishedSession = mock(WebSocketSession.class);
String validSessionId = "123";
when(establishedSession.getId()).thenReturn(validSessionId);

passedMessage = mock(WebSocketMessage.class);
internallyStoredSession = mock(WebSocketRoutedSession.class);
routedSessions.put(validSessionId, internallyStoredSession);
}

@Test
void whenTheConnectionIsClosed_thenTheSessionIsClosedAndRemovedFromRepository() throws Exception {
void whenTheConnectionIsClosed_thenTheSessionIsClosedAndRemovedFromRepository() {
CloseStatus normalClose = CloseStatus.NORMAL;

underTest.afterConnectionClosed(establishedSession, normalClose);

verify(establishedSession).close(normalClose);
assertThat(routedSessions.entrySet(), hasSize(0));
}

@Test
void whenTheMessageIsReceived_thenTheMessageIsPassedToTheSession() throws Exception {
WebSocketMessage<String> passedMessage = mock(WebSocketMessage.class);

underTest.handleMessage(establishedSession, passedMessage);

verify(internallyStoredSession).sendMessageToServer(passedMessage);
}

@Test
void whenExceptionIsThrown_thenRemoveRoutedSession() throws Exception {
doThrow(new WebSocketException("error")).when(routedSessions.get("123")).sendMessageToServer(passedMessage);
underTest.handleMessage(establishedSession, passedMessage);
assertTrue(routedSessions.isEmpty());
}

@Test
void whenSessionIsNull_thenCloseAndReturn() throws IOException {
routedSessions.replace("123", null);

underTest.handleMessage(establishedSession, passedMessage);
assertTrue(routedSessions.isEmpty());
verify(establishedSession, times(1)).close(CloseStatus.SESSION_NOT_RELIABLE);
}

@Test
void whenClosingSessionThrowException_thenCatchIt() throws IOException {
CloseStatus status = CloseStatus.SESSION_NOT_RELIABLE;
doThrow(new IOException()).when(establishedSession).close(status);
underTest.afterConnectionClosed(establishedSession, status);
assertTrue(routedSessions.isEmpty());
}

@Test
void whenClosingRoutedSessionThrowException_thenCatchIt() throws IOException {
CloseStatus status = CloseStatus.SESSION_NOT_RELIABLE;
doThrow(new IOException()).when(routedSessions.get("123")).close(status);
underTest.afterConnectionClosed(establishedSession, status);
assertTrue(routedSessions.isEmpty());
}

}

@Nested
class WhenGettingRoutedSessions {
@Test
void thenReturnThem() {
Map<String, WebSocketRoutedSession> expectedRoutedSessions = underTest.getRoutedSessions();
assertThat(expectedRoutedSessions, is(routedSessions));
}
}

@Nested
class WhenGettingSubProtocols {
@Test
void thenReturnThem() {
ArrayList protocol = new ArrayList();
protocol.add("protocol");
ReflectionTestUtils.setField(underTest, "subProtocols", protocol);
List<String> subProtocols = underTest.getSubProtocols();
assertThat(subProtocols, is(protocol));
}
}
}

0 comments on commit 232bade

Please sign in to comment.