Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try the experimental WebSocketNetworkTransport #5862

Open
martinbonnin opened this issue Apr 29, 2024 · 20 comments
Open

Try the experimental WebSocketNetworkTransport #5862

martinbonnin opened this issue Apr 29, 2024 · 20 comments

Comments

@martinbonnin
Copy link
Contributor

Description

Historically, WebSockets have been one of the most complex and error-prone parts of Apollo Kotlin.

Starting with 4.0.0-beta.6, apollo-runtime has an experimental ApolloWebSocketNetworkTransport aiming at simplifying the WebSocket setup.

If you have had issues with WebSockets, try it out. More documentation and migration guide is available at https://www.apollographql.com/docs/kotlin/v4/advanced/experimental-websockets

@dmitrytavpeko
Copy link

Great job, thanks! I don't observe kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed errors anymore that prevented from reinstating my subscriptions!
While migrating to the new implementation, I encountered these issues:

  1. API doesn't allow to set an OkHttpClient. In the old implementation we have .webSocketEngine(DefaultWebSocketEngine(mySharedOkHttpClient))
  2. The documentation says that it's possible to customize the retry logic by using addRetryOnErrorInterceptor. However, such an API doesn't exist. I used to customize the backoff in the following way:
                        .reopenWhen { throwable, attempt ->
                            val delayBeforeNextAttemptMs =
                                2.0.pow(attempt.toInt()).coerceAtMost(30.0).toLong() * 1_000L
                            delay(delayBeforeNextAttemptMs)
                            true
                        }

Will these issue be addressed in the future?

@martinbonnin
Copy link
Contributor Author

martinbonnin commented Jul 23, 2024

Thanks for trying it out and for the details!

  1. Good catch. JvmWebSocketEngine is currently internal. I'll open a public constructor function in next release.

  2. Good catch as well, the method is called retryOnErrorInterceptor, I'll update the docs. You can use it like so:

    val apolloClient = ApolloClient.Builder()
        .retryOnErrorInterceptor(MyRetryOnErrorInterceptor())
        .build()

    class MyRetryOnErrorInterceptor : ApolloInterceptor {
      object RetryException: Exception()

      override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
        var attempt = 0
        return chain.proceed(request).onEach {
              if (request.retryOnError == true && it.exception != null && it.exception is ApolloNetworkException) {
                throw RetryException
              } else {
                attempt = 0
              }
            }.retryWhen { cause, _ ->
              if (cause is RetryException) {
                attempt++
                delay(2.0.pow(attempt).seconds)
                true
              } else {
                // Not a RetryException, probably a programming error, pass it through
                false
              }
            }
      }
    }

I'm aware this is significantly more verbose than the old way but the old way used to do a bunch of assumptions that didn't always work. At least this method should give you full control over what exceptions you want to retry, exponential backoff, etc...

martinbonnin added a commit that referenced this issue Jul 23, 2024
* Add WebSocketEngine(WebSocket.Factory) and fix documentation

See #5862

* remove ApolloExperimental from internal function and expose ensureUniqueUuid()
@martinbonnin
Copy link
Contributor Author

martinbonnin commented Jul 25, 2024

@dmitrytavpeko WebSocketEngine(WebSocket.Factory) is now available in the SNAPSHOTs if you want to try it out.

PS: I also realized we can simplify renewing the uuid (see #6075)

@dmitrytavpeko
Copy link

@martinbonnin Thank you for the quick response!

@jvanderwee
Copy link

Hey @martinbonnin 👋 it looks like there are some changes required when authenticating websockets which aren't mentioned in the migration guide, specifically setting connection payload and refreshing token. Would it be possible to add this to the migration doc?

@martinbonnin
Copy link
Contributor Author

@jvanderwee Can you share your existing code here? I'll add the matching snippet to the doc

@jvanderwee
Copy link

class WebSocketReconnectException: Exception("The WebSocket needs to be reopened")
val apolloClient = ApolloClient.Builder()
    .serverUrl("http://localhost:8080/graphql")
    .subscriptionNetworkTransport(
        WebSocketNetworkTransport.Builder()
            .serverUrl("http://localhost:8080/subscriptions")
            .protocol(
                GraphQLWsProtocol.Factory(
                    connectionPayload = {
                        mapOf("Authorization" to token)
                    },
                ),
            )
            .reopenWhen { throwable, attempt ->
                if (throwable is WebSocketReconnectException) {
                    true
                } else {
                    // Another WebSocket error happened, decide what to do with it
                    // Here we're trying to reconnect at most 3 times
                    attempt < 3
                }
            }
            .build()
    )
    .build()
apolloClient.subscriptionNetworkTransport.closeConnection(WebSocketReconnectException())

@martinbonnin
Copy link
Contributor Author

martinbonnin commented Aug 6, 2024

Hi @jvanderwee thanks for sending this!

Looks like you're calling closeConnection() to force a reconnect? closeConnection() is still around so you can do something similar with:

class WebSocketReconnectException: Exception("The WebSocket needs to be reopened")
class RetryException: Exception("The WebSocket needs to be retried")
class MyInterceptor: ApolloInterceptor {
  override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
    return chain.proceed(request)
        .onEach {
          if (request.retryOnError == true && it.exception != null && it.exception is ApolloNetworkException) {
            throw RetryException()
          }
        }
        .retryWhen { cause, attempt ->
          when (cause) {
            is RetryException -> {
              attempt < 3
            }
            is WebSocketReconnectException -> {
              true
            }
            else -> false
          }
        }.catch { 
          if (it !is RetryException) {
            throw it
          }
        }
  }
}
val apolloClient = ApolloClient.Builder()
    .serverUrl("http://localhost:8080/graphql")
    .subscriptionNetworkTransport(
        WebSocketNetworkTransport.Builder()
            .serverUrl("http://localhost:8080/subscriptions")
            .protocol(
                GraphQLWsProtocol(
                    connectionParams = {
                        mapOf("Authorization" to token)
                    },
                ),
            )
            .retryOnError { it.operation is Subscription }
            .retryOnErrorInterceptor(MyInterceptor())
            .build()
    )
    .build()
apolloClient.subscriptionNetworkTransport.closeConnection(WebSocketReconnectException())

While the above should be working, it's pretty verbose and I'd love to take this opportunity to do things a bit better.

Can you share what triggers the call to closeConnection()? Is that a sepcific GraphQL message? A WebSocket close frame? Or something else completely out of band?

@jvanderwee
Copy link

When we change the import for closeConnection

-import com.apollographql.apollo.network.ws.closeConnection
+import com.apollographql.apollo.network.websocket.closeConnection

the exception is expected to be of type ApolloException, so we're forced to use something like DefaultApolloException:

apolloClient.subscriptionNetworkTransport.closeConnection(DefaultApolloException("WebSocket needs to be closed"))

but then this means we need is DefaultApolloException in the retryWhen which doesn't seem quite right:

.retryWhen { cause, attempt ->
    when (cause) {
        is DefaultApolloException -> {
            true
        }
        ...
    }
}

could we add a specific subclass of ApolloException to be used for closing the connection?

if we don't update the import an IllegalArgumentException is thrown because websocket.WebSocketNetworkTransport is not an instance of ws.WebSocketNetworkTransport

we close the connection whenever we need to update the authorisation header in the payload

@martinbonnin
Copy link
Contributor Author

Thanks for the follow up!

could we add a specific subclass of ApolloException to be used for closing the connection?

We can make a closeConnection() that reports a WebSocketManuallyClosedException. I'd still keep the closeConnection(ApolloException) in case someone needs to tunnel data.

we close the connection whenever we need to update the authorisation header in the payload

That's the question. When do you need to update the authorization header? Ideally I would expect the server to notify its listeners directly in the websocket with some message ("token expired") or even a WebSocket close frame.

This seems like the best way and would allow to handle websocket authentication in a single interceptor, just like for HTTP. That'd be a lot more symmetrical. But maybe your server doesn't signal that? In which case I'm curious how you can tell that you need to refresh the token (timeout, something else?)

@jvanderwee
Copy link

jvanderwee commented Aug 7, 2024

We can make a closeConnection() that reports a WebSocketManuallyClosedException

👌 thanks!

I would expect the server to notify its listeners directly in the websocket

very good point! we've been closing the connection as a side-effect of our HTTP authorisation interceptor, but it makes way more sense for this to be handle my the websocket itself. I'll look into this! we will still need to update the authorisation header when the user logs in though

@martinbonnin
Copy link
Contributor Author

martinbonnin commented Aug 7, 2024

we've been closing the connection as a side-effect of our HTTP authorisation interceptor

Interesting! What happens if you're on a screen that listens to a subscription without ever doing a query/mutation? The subscription has no way to know when to renew its token in that case?

@jvanderwee
Copy link

correct! very much an oversight on our part 🙃

@martinbonnin
Copy link
Contributor Author

@jvanderwee Thanks for the follow up!

I did not mention closeConnection() explicitely in there because I'd like to avoid promoting it too much. While it's technically the only way to get your setup working for now, I'd like to encourage the backend to send something when the token needs to be updated instead.

This probably will require new APIs to update the token from an ApolloInterceptor, just like for the Authorization header for HTTP.

Right now I'm thinking something like this:

class AuthorizationInterceptor : ApolloInterceptor {
  override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
    return flow {
      val token = getOrRefreshToken()
      emit(
          request.newBuilder()
              // for HTTP transport
              .addHttpHeader("Authorization", "Bearer $token")
              // for WebSocket transport
              //.webSocketConnectionPayload(mapOf("token" to token))
              .build()
      )
    }.flatMapConcat {
      chain.proceed(it)
    }.onEach { 
      if (it.errors.contains("Token expired")) {
        throw TokenExpired
      }
    }.retryWhen { cause, attempt -> 
      cause is TokenExpired
    }
  }
}

I haven't tested it yet but assuming the server can return the 401 information is a regular GraphQL response, that would allow handling authentication consistently between HTTP and WebSocket. Any thoughts? Could that work for you?

@jvanderwee
Copy link

That would work for us! Thanks @martinbonnin

@jvanderwee
Copy link

👋 we're seeing the following crash occur on Android in 4.0.1 after migrating to experimental web sockets. unsure when this happens at the moment, will update if we narrow it down

          Fatal Exception: java.lang.NullPointerException: Attempt to invoke interface method 'void w6.u.close(int, java.lang.String)' on a null object reference
       at com.apollographql.apollo.network.websocket.internal.SubscribableWebSocket.shutdown(SubscribableWebSocket.kt:119)
       at com.apollographql.apollo.network.websocket.internal.SubscribableWebSocket.onError(SubscribableWebSocket.kt:220)
       at com.apollographql.apollo.network.websocket.JvmWebSocket.onFailure(WebSocketEngine.jvm.kt:67)
       at okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.kt:592)
       at okhttp3.internal.ws.RealWebSocket$connect$1.onFailure(RealWebSocket.kt:202)

@martinbonnin
Copy link
Contributor Author

@jvanderwee thanks for the feedback. This can happen if the onError callback is called while opening the WebSocket. I expected OkHttp to dispatch the callback but looks like this is not the case. I made a fix here.

For the record, would you have the rest of the stacktrace so we can see what triggers this error?

@jvanderwee
Copy link

Thanks!

Fatal Exception: java.lang.NullPointerException: Attempt to invoke interface method 'void w6.u.close(int, java.lang.String)' on a null object reference
       at com.apollographql.apollo.network.websocket.internal.SubscribableWebSocket.shutdown(SubscribableWebSocket.kt:119)
       at com.apollographql.apollo.network.websocket.internal.SubscribableWebSocket.onError(SubscribableWebSocket.kt:220)
       at com.apollographql.apollo.network.websocket.JvmWebSocket.onFailure(WebSocketEngine.jvm.kt:67)
       at okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.kt:592)
       at okhttp3.internal.ws.RealWebSocket$connect$1.onFailure(RealWebSocket.kt:202)
       at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:525)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:644)
       at java.lang.Thread.run(Thread.java:1012)

@martinbonnin
Copy link
Contributor Author

So it dispatches from a separate thread but that thread executes concurrently with the constructor... While the fix should work, I'm not sure it's 100% correct. I'll revisit.

martinbonnin added a commit that referenced this issue Oct 10, 2024
Should fix a race condition when the onError is called concurrently with the SubscribableWebSocket constructor

See #5862 (comment)
martinbonnin added a commit that referenced this issue Oct 10, 2024
Should fix a race condition when the onError is called concurrently with the SubscribableWebSocket constructor

See #5862 (comment)
@martinbonnin
Copy link
Contributor Author

New try here. If you get a chance to try the SNAPSHOTs, let us know how that goes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants