-
Notifications
You must be signed in to change notification settings - Fork 20
/
transport.ex
263 lines (214 loc) · 8.94 KB
/
transport.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
defmodule Absinthe.GraphqlWS.Transport do
@moduledoc """
Handles messages coming into the socket from clients (implemented in `handle_in/2`)
as well as messages coming from within Elixir/Absinthe (implemented in `handle_info/2`).
If the optional `c:Absinthe.GraphqlWS.Socket.handle_message/2` callback is implemented on
the socket, then messages that are not specifically caught by `handle_info/2` in this
module will be passed through to `c:Absinthe.GraphqlWS.Socket.handle_message/2`.
**Note:** This module is not intended for use by individuals integrating this library into
their codebase, but is documented to help understand the intentions of the code.
"""
alias Absinthe.GraphqlWS.{Message, Socket, Util}
alias Phoenix.Socket.Broadcast
require Logger
@ping "ping"
@pong "pong"
@type control :: Socket.control()
@type reply_inbound() :: Socket.reply_inbound()
@type reply_message() :: Socket.reply_message()
@type socket() :: Socket.t()
defmacrop debug(msg), do: quote(do: Logger.debug("[graph-socket@#{inspect(self())}] #{unquote(msg)}"))
defmacrop warn(msg), do: quote(do: Logger.warn("[graph-socket@#{inspect(self())}] #{unquote(msg)}"))
@doc """
Generally this will only receive `:pong` messages in response to our keepalive
ping messages. Client-side websocket libraries handle these control frames
automatically in order to adhere to the spec, so unless a customer is writing their
own low-level websocket it should be handled for them.
"""
@spec handle_control({term(), opcode: control()}, socket()) :: reply_inbound()
def handle_control({_, opcode: :ping}, socket), do: {:reply, :ok, {:pong, @pong}, socket}
def handle_control({_, opcode: :pong}, socket), do: {:ok, socket}
def handle_control(message, state) do
warn(" unhandled control frame #{inspect(message)}")
{:ok, state}
end
@doc """
Receive messages from clients. We expect all incoming messages to be JSON encoded
text, so if something else comes in we blow up.
"""
@spec handle_in({binary(), [opcode: :text]}, socket()) :: reply_inbound()
def handle_in({text, [opcode: :text]}, socket) do
Util.json_library().decode(text)
|> case do
{:ok, json} ->
handle_inbound(json, socket)
{:error, reason} ->
warn("JSON parse error: #{inspect(reason)}")
{:reply, :error, {:text, Message.Error.new("4400")}, socket}
end
end
@doc """
Receive messages from inside the house.
* `:keepalive` - Regularly send messages with opcode of `0x09`, ie `:ping`. The `graphql-ws`
library has a strong opinion that it does not want to implement client-side keepalive, so
in order to keep the websocket from closing we need to send it messages.
* `subscription:data` - After we subscribe to an Absinthe subscription, we may receive messages
for the relevant subscription. The `graphql-ws` will have sent us an `id` along with the
subscription query, so we need to map our internal topic back to that `id` in order for the
client to figure out what to do with our message.
* `:complete` - If we get a `query` or a `mutation` on the websocket, we're supposed to reply
with a `Next` message followed by a `Complete` message. We follow through on the latter by
putting a message on our process queue.
* fallthrough - If `c:Absinthe.GraphqlWs.Socket.handle_message/2` is defined on the socket,
then uncaught messages will be sent there.
"""
@spec handle_info(term(), socket()) :: reply_message()
def handle_info(:keepalive, socket) do
Process.send_after(self(), :keepalive, socket.keepalive)
{:push, {:ping, @ping}, socket}
end
def handle_info(%Broadcast{event: "subscription:data", payload: payload, topic: topic}, socket) do
subscription_id = socket.subscriptions[topic]
{:push, {:text, Message.Next.new(subscription_id, payload.result)}, socket}
end
def handle_info({:complete, id}, socket) do
{:push, {:text, Message.Complete.new(id)}, socket}
end
def handle_info(message, socket) do
if function_exported?(socket.handler, :handle_message, 2) do
socket.handler.handle_message(message, socket)
else
{:ok, socket}
end
end
@doc """
Process was stopped.
"""
@spec terminate(term(), socket()) :: :ok
def terminate(reason, _socket) do
debug("terminated: #{inspect(reason)}")
:ok
end
@doc """
Callbacks for parsed JSON payloads coming in from a client.
See:
https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md
"""
@spec handle_inbound(map(), socket()) :: reply_inbound()
def handle_inbound(%{"type" => "connection_init"}, %{initialized?: true} = socket) do
close(4429, "Too many initialisation requests", socket)
end
def handle_inbound(%{"type" => "connection_init"} = message, %{handler: handler} = socket) do
if function_exported?(handler, :handle_init, 2) do
case handler.handle_init(Map.get(message, "payload", %{}), socket) do
{:ok, payload, socket} ->
{:reply, :ok, {:text, Message.ConnectionAck.new(payload)}, %{socket | initialized?: true}}
{:error, payload, socket} ->
{:reply, :ok, {:text, Message.Error.new(payload)}, socket}
end
else
{:reply, :ok, {:text, Message.ConnectionAck.new()}, %{socket | initialized?: true}}
end
end
def handle_inbound(%{"type" => "subscribe"}, %{initialized?: false} = socket) do
close(4400, "Subscribe message received before ConnectionInit", socket)
end
def handle_inbound(%{"id" => id, "type" => "subscribe", "payload" => payload}, socket) do
payload
|> handle_subscribe(id, socket)
end
def handle_inbound(%{"id" => id, "type" => "complete"}, socket) do
socket.subscriptions
|> Enum.find_value(fn
{topic, ^id} ->
{:ok, topic}
_ ->
false
end)
|> case do
{:ok, topic} ->
debug("unsubscribing from topic #{topic}")
Phoenix.PubSub.unsubscribe(socket.pubsub, topic)
Absinthe.Subscription.unsubscribe(socket.endpoint, topic)
{:ok, %{socket | subscriptions: Map.delete(socket.subscriptions, id)}}
_ ->
{:ok, socket}
end
end
def handle_inbound(%{"type" => "ping"}, socket),
do: {:reply, :ok, {:text, Message.Pong.new()}, socket}
def handle_inbound(msg, socket) do
warn("unhandled message #{inspect(msg)}")
close(4400, "Unhandled message from client", socket)
end
@doc """
Subscribe messages in graphql-ws may include a subscription, implying a subscription to
a long term stream of data. These messages may also be queries or mutations, so do not require
a stream.
"""
def handle_subscribe(payload, id, socket) do
with %{schema: schema} <- socket.absinthe,
{:ok, variables} <- parse_variables(payload),
{:ok, query} <- parse_query(payload) do
opts = socket.absinthe.opts |> Keyword.merge(variables: variables)
Absinthe.Logger.log_run(:debug, {
query,
schema,
[],
opts
})
run_doc(socket, id, query, socket.absinthe, opts)
else
_ ->
{:ok, socket}
end
end
defp close(code, message, socket) do
{:reply, :ok, {:close, code, message}, socket}
end
defp parse_query(%{"query" => query}) when is_binary(query), do: {:ok, query}
defp parse_query(_), do: {:ok, ""}
defp parse_variables(%{"variables" => variables}) when is_map(variables), do: {:ok, variables}
defp parse_variables(_), do: {:ok, %{}}
def pipeline(schema, options) do
schema
|> Absinthe.Pipeline.for_document(options)
end
defp run_doc(socket, id, query, config, opts) do
case run(query, config[:schema], config[:pipeline], opts) do
{:ok, %{"subscribed" => topic}, context} ->
debug("subscribed to topic #{topic}")
:ok =
Phoenix.PubSub.subscribe(
socket.pubsub,
topic,
# metadata: {:fastlane, self(), @serializer, []},
link: true
)
socket = merge_opts(socket, context: context)
{:ok, %{socket | subscriptions: Map.put(socket.subscriptions, topic, id)}}
{:ok, %{data: _} = reply, context} ->
queue_complete_message(id)
socket = merge_opts(socket, context: context)
{:reply, :ok, {:text, Message.Next.new(id, reply)}, socket}
{:ok, %{errors: errors}, context} ->
socket = merge_opts(socket, context: context)
{:reply, :ok, {:text, Message.Error.new(id, errors)}, socket}
{:error, reply} ->
{:reply, :error, {:text, Message.Error.new(id, reply)}, socket}
end
end
defp run(document, schema, pipeline, options) do
{module, fun} = pipeline
case Absinthe.Pipeline.run(document, apply(module, fun, [schema, options])) do
{:ok, %{result: result, execution: res}, _phases} ->
{:ok, result, res.context}
{:error, msg, _phases} ->
{:error, msg}
end
end
defp merge_opts(socket, opts) do
%{socket | absinthe: %{socket.absinthe | opts: opts}}
end
defp queue_complete_message(id), do: send(self(), {:complete, id})
end