Skip to content

Support for GraphQL subscriptions

dermakov edited this page Oct 16, 2022 · 6 revisions

In the articles "Overview" and "Abstract types", we looked in detail at the execution of GraphQL queries in the generated DSL. Execution of GraphQL mutations in the generated DSL is exactly the same as the execution of GraphQL queries (we just need to use context mutation function instead of query function). It remains to deal with GraphQL subscriptions. Let's define a GraphQL schema:

type Subscription {
    filmCreated: Film!
}

type Film {
    title: String!
}

According to this schema, we can subscribe to new films:

subscription {
    filmCreated {
        title
    }
}

To receive JSON messages that look like this:

{
  "data": {
    "filmCreated": {
      "title": "First"
    }
  }
}

Context

Let's take a look at the context interface generated by the schema:

interface ExampleContext {
    suspend fun query(__projection: QueryProjection.() -> Unit): Query
    suspend fun mutation(__projection: MutationProjection.() -> Unit): Mutation
    fun subscription(__projection: SubscriptionProjection.() -> Unit): ExampleSubscriber<Subscription>
}

fun interface ExampleSubscriber<T> {
    suspend fun subscribe(block: suspend ExampleReceiver<T>.() -> Unit): Unit
}

@ExampleDSL
fun interface ExampleReceiver<out T> {
    suspend fun receive(): T
}

The semantics of the subscription function is different from the semantics of the query and mutation functions. While the query and mutation functions take a projection argument to build a query and return the result of the query execution, the subscription function takes a projection but returns a ExampleSubscriber interface. The "subscriber" interface allows us to create a long-lived session to listen for incoming messages. The session lifetime is the same as the execution time of the subscribe function in the ExampleSubscriber interface. When we enter the subscribe function, a session is created, and when we exit it, the session is destroyed.

The subscribe function is executed in scope of ExampleReceiver interface, and we can call the receive function to receive the next message. Messages are usually listened to in an infinite loop with a call to the receive function inside the loop. Let's try this:

fun main() = runBlocking {
    val context: ExampleContext = exampleContextOf(createMyAdapter())
    context.subscription {
        filmCreated {
            title()
        }
    }.subscribe {
        // Subscription session created
        for (i in 1..3) { // Listening to the first 3 messages
            val message: Subscription = receive() // receive the next message
            println("Film created: ${message.filmCreated.title}")
        }
    }
    // Subscription session destroyed
}

Projection, Entity and DTO

There are no surprises here - projections, entities and data transfer objects are the same as we used to see them in queries.

Projection:

@ExampleDSL
interface SubscriptionProjection {
    fun filmCreated(__projection: FilmProjection.() -> Unit): Unit
}

@ExampleDSL
interface FilmProjection {
    fun title(): Unit
}

Entity:

interface Subscription {
    fun __context(): ExampleContext

    val filmCreated: Film
}

interface Film {
    fun __context(): ExampleContext

    val title: String
}

DTO (without Jackson's annotations):

data class SubscriptionDto(
    val filmCreated: FilmDto? = null
)

data class FilmDto(
    val title: String? = null
)

Adapter

interface ExampleAdapter {
    suspend fun executeQuery(query: String, variables: Map<String, Any?>): QueryDto
    suspend fun executeMutation(query: String, variables: Map<String, Any?>): MutationDto
    suspend fun executeSubscription(
        query: String,
        variables: Map<String, Any?>,
        block: suspend ExampleReceiver<SubscriptionDto>.() -> Unit
    ): Unit
}

@ExampleDSL
fun interface ExampleReceiver<out T> {
    suspend fun receive(): T
}

The executeSubscription function is called from subscribe function of ExampleSubscriber interface, which is reflected in the function's semantics. Like the executeQuery and executeMutation functions, the executeSubscription function takes query and variables arguments to send them to the server, but does not return a response. Instead, it starts a long-lived session to listen for incoming DTO messages by invoking the block argument. Thus, the subscribe function of the ExampleSubscriber interface takes DTO messages from the adapter layer, wraps them with an "entity" interface, and returns the entities to the application code.