Skip to content

Commit

Permalink
refactor(realtime)!: make realtime channel and client actors (#492)
Browse files Browse the repository at this point in the history
* refactor(realtime)!: make realtime channel and client actors

* fix integration tests not building
  • Loading branch information
grdsdev committed Sep 25, 2024
1 parent 9beb41b commit 5687217
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 290 deletions.
2 changes: 1 addition & 1 deletion Sources/Realtime/Push.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ actor Push {

if channel?.config.broadcast.acknowledgeBroadcasts == true {
do {
return try await withTimeout(interval: channel?.socket.options().timeoutInterval ?? 10) {
return try await withTimeout(interval: channel?.socket.options.timeoutInterval ?? 10) {
await withCheckedContinuation {
self.receivedContinuation = $0
}
Expand Down
128 changes: 40 additions & 88 deletions Sources/Realtime/RealtimeChannel.swift
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
//
// RealtimeChannel.swift
//
//
// Created by Guilherme Souza on 26/12/23.
//

import ConcurrencyExtras
import Foundation
import Helpers
Expand Down Expand Up @@ -33,40 +26,7 @@ public struct RealtimeChannelConfig: Sendable {
public var isPrivate: Bool
}

struct Socket: Sendable {
var broadcastURL: @Sendable () -> URL
var status: @Sendable () -> RealtimeClient.Status
var options: @Sendable () -> RealtimeClientOptions
var accessToken: @Sendable () -> String?
var apiKey: @Sendable () -> String?
var makeRef: @Sendable () -> Int

var connect: @Sendable () async -> Void
var addChannel: @Sendable (_ channel: RealtimeChannel) -> Void
var removeChannel: @Sendable (_ channel: RealtimeChannel) async -> Void
var push: @Sendable (_ message: RealtimeMessage) async -> Void
var httpSend: @Sendable (_ request: HTTPRequest) async throws -> HTTPResponse
}

extension Socket {
init(client: RealtimeClient) {
self.init(
broadcastURL: { [weak client] in client?.broadcastURL ?? URL(string: "http://localhost")! },
status: { [weak client] in client?.status ?? .disconnected },
options: { [weak client] in client?.options ?? .init() },
accessToken: { [weak client] in client?.mutableState.accessToken },
apiKey: { [weak client] in client?.apikey },
makeRef: { [weak client] in client?.makeRef() ?? 0 },
connect: { [weak client] in await client?.connect() },
addChannel: { [weak client] in client?.addChannel($0) },
removeChannel: { [weak client] in await client?.removeChannel($0) },
push: { [weak client] in await client?.push($0) },
httpSend: { [weak client] in try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) }
)
}
}

public final class RealtimeChannel: Sendable {
public actor RealtimeChannel {
public typealias Subscription = ObservationToken

public enum Status: Sendable {
Expand All @@ -76,21 +36,23 @@ public final class RealtimeChannel: Sendable {
case unsubscribing
}

struct MutableState {
var clientChanges: [PostgresJoinConfig] = []
var joinRef: String?
var pushes: [String: Push] = [:]
}

private let mutableState = LockIsolated(MutableState())

let topic: String
let config: RealtimeChannelConfig
let logger: (any SupabaseLogger)?
let socket: Socket
private weak var _socket: RealtimeClient?

var socket: RealtimeClient {
guard let _socket else {
fatalError("Expected a RealtimeClient instance to be associated with this channel.")
}
return _socket
}

private let callbackManager = CallbackManager()
private let statusEventEmitter = EventEmitter<Status>(initialEvent: .unsubscribed)
private(set) var clientChanges: [PostgresJoinConfig] = []
private(set) var joinRef: String?
private(set) var pushes: [String: Push] = [:]

public private(set) var status: Status {
get { statusEventEmitter.lastEvent }
Expand All @@ -115,13 +77,13 @@ public final class RealtimeChannel: Sendable {
init(
topic: String,
config: RealtimeChannelConfig,
socket: Socket,
socket: RealtimeClient,
logger: (any SupabaseLogger)?
) {
self.topic = topic
self.config = config
self.logger = logger
self.socket = socket
_socket = socket
}

deinit {
Expand All @@ -130,34 +92,33 @@ public final class RealtimeChannel: Sendable {

/// Subscribes to the channel
public func subscribe() async {
if socket.status() != .connected {
if socket.options().connectOnSubscribe != true {
if await socket.status != .connected {
if socket.options.connectOnSubscribe != true {
fatalError(
"You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?"
)
}
await socket.connect()
}

socket.addChannel(self)
await socket.addChannel(self)

status = .subscribing
logger?.debug("subscribing to channel \(topic)")

let joinConfig = RealtimeJoinConfig(
broadcast: config.broadcast,
presence: config.presence,
postgresChanges: mutableState.clientChanges,
postgresChanges: clientChanges,
isPrivate: config.isPrivate
)

let payload = RealtimeJoinPayload(
let payload = await RealtimeJoinPayload(
config: joinConfig,
accessToken: socket.accessToken()
accessToken: socket.accessToken
)

let joinRef = socket.makeRef().description
mutableState.withValue { $0.joinRef = joinRef }
joinRef = await socket.makeRef().description

logger?.debug("subscribing to channel with body: \(joinConfig)")

Expand All @@ -172,7 +133,7 @@ public final class RealtimeChannel: Sendable {
)

do {
try await withTimeout(interval: socket.options().timeoutInterval) { [self] in
try await withTimeout(interval: socket.options.timeoutInterval) { [self] in
_ = await statusChange.first { @Sendable in $0 == .subscribed }
}
} catch {
Expand All @@ -191,7 +152,7 @@ public final class RealtimeChannel: Sendable {

await push(
RealtimeMessage(
joinRef: mutableState.joinRef,
joinRef: joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.leave,
Expand All @@ -204,7 +165,7 @@ public final class RealtimeChannel: Sendable {
logger?.debug("Updating auth token for channel \(topic)")
await push(
RealtimeMessage(
joinRef: mutableState.joinRef,
joinRef: joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.accessToken,
Expand Down Expand Up @@ -235,17 +196,18 @@ public final class RealtimeChannel: Sendable {
}

var headers = HTTPHeaders(["content-type": "application/json"])
if let apiKey = socket.apiKey() {
if let apiKey = socket.apikey {
headers["apikey"] = apiKey
}
if let accessToken = socket.accessToken() {

if let accessToken = await socket.accessToken {
headers["authorization"] = "Bearer \(accessToken)"
}

let task = Task { [headers] in
_ = try? await socket.httpSend(
_ = try? await socket.http.send(
HTTPRequest(
url: socket.broadcastURL(),
url: socket.broadcastURL,
method: .post,
headers: headers,
body: JSONEncoder().encode(
Expand All @@ -265,14 +227,14 @@ public final class RealtimeChannel: Sendable {
}

if config.broadcast.acknowledgeBroadcasts {
try? await withTimeout(interval: socket.options().timeoutInterval) {
try? await withTimeout(interval: socket.options.timeoutInterval) {
await task.value
}
}
} else {
await push(
RealtimeMessage(
joinRef: mutableState.joinRef,
joinRef: joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.broadcast,
Expand All @@ -298,7 +260,7 @@ public final class RealtimeChannel: Sendable {

await push(
RealtimeMessage(
joinRef: mutableState.joinRef,
joinRef: joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.presence,
Expand All @@ -314,7 +276,7 @@ public final class RealtimeChannel: Sendable {
public func untrack() async {
await push(
RealtimeMessage(
joinRef: mutableState.joinRef,
joinRef: joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.presence,
Expand All @@ -326,7 +288,7 @@ public final class RealtimeChannel: Sendable {
)
}

func onMessage(_ message: RealtimeMessage) {
func onMessage(_ message: RealtimeMessage) async {
do {
guard let eventType = message.eventType else {
logger?.debug("Received message without event type: \(message)")
Expand Down Expand Up @@ -437,13 +399,9 @@ public final class RealtimeChannel: Sendable {
callbackManager.triggerBroadcast(event: event, json: payload)

case .close:
Task { [weak self] in
guard let self else { return }

await socket.removeChannel(self)
logger?.debug("Unsubscribed from channel \(message.topic)")
status = .unsubscribed
}
await socket.removeChannel(self)
logger?.debug("Unsubscribed from channel \(message.topic)")
status = .unsubscribed

case .error:
logger?.debug(
Expand Down Expand Up @@ -551,9 +509,7 @@ public final class RealtimeChannel: Sendable {
filter: filter
)

mutableState.withValue {
$0.clientChanges.append(config)
}
clientChanges.append(config)

let id = callbackManager.addPostgresCallback(filter: config, callback: callback)
return Subscription { [weak callbackManager, logger] in
Expand All @@ -578,18 +534,14 @@ public final class RealtimeChannel: Sendable {
private func push(_ message: RealtimeMessage) async -> PushStatus {
let push = Push(channel: self, message: message)
if let ref = message.ref {
mutableState.withValue {
$0.pushes[ref] = push
}
pushes[ref] = push
}
return await push.send()
}

private func didReceiveReply(ref: String, status: String) {
Task {
let push = mutableState.withValue {
$0.pushes.removeValue(forKey: ref)
}
let push = pushes.removeValue(forKey: ref)
await push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
}
}
Expand Down
Loading

0 comments on commit 5687217

Please sign in to comment.