Skip to content

Commit

Permalink
feat(realtime): send broadcast events through HTTP (#476)
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev authored Jul 26, 2024
1 parent 9f15d2d commit 93f4ff5
Show file tree
Hide file tree
Showing 15 changed files with 386 additions and 184 deletions.
59 changes: 59 additions & 0 deletions .swiftpm/configuration/Package.resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
"pins" : [
{
"identity" : "swift-concurrency-extras",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-concurrency-extras",
"state" : {
"revision" : "bb5059bde9022d69ac516803f4f227d8ac967f71",
"version" : "1.1.0"
}
},
{
"identity" : "swift-crypto",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-crypto.git",
"state" : {
"revision" : "bc1c29221f6dfeb0ebbfbc98eb95cd3d4967868e",
"version" : "3.4.0"
}
},
{
"identity" : "swift-custom-dump",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-custom-dump",
"state" : {
"revision" : "aec6a73f5c1dc1f1be4f61888094b95cf995d973",
"version" : "1.3.2"
}
},
{
"identity" : "swift-snapshot-testing",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-snapshot-testing",
"state" : {
"revision" : "c097f955b4e724690f0fc8ffb7a6d4b881c9c4e3",
"version" : "1.17.2"
}
},
{
"identity" : "swift-syntax",
"kind" : "remoteSourceControl",
"location" : "https://github.com/swiftlang/swift-syntax",
"state" : {
"revision" : "303e5c5c36d6a558407d364878df131c3546fad8",
"version" : "510.0.2"
}
},
{
"identity" : "xctest-dynamic-overlay",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/xctest-dynamic-overlay",
"state" : {
"revision" : "357ca1e5dd31f613a1d43320870ebc219386a495",
"version" : "1.2.2"
}
}
],
"version" : 2
}
3 changes: 3 additions & 0 deletions .swiftpm/xcode/xcshareddata/xcschemes/Supabase.xcscheme
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
<TestPlanReference
reference = "container:TestPlans/Integration.xctestplan">
</TestPlanReference>
<TestPlanReference
reference = "container:TestPlans/AllTests.xctestplan">
</TestPlanReference>
</TestPlans>
</TestAction>
<LaunchAction
Expand Down
21 changes: 15 additions & 6 deletions Package.resolved
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-custom-dump",
"state" : {
"revision" : "d237304f42af07f22563aa4cc2d7e2cfb25da82e",
"version" : "1.3.1"
"revision" : "aec6a73f5c1dc1f1be4f61888094b95cf995d973",
"version" : "1.3.2"
}
},
{
"identity" : "swift-issue-reporting",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-issue-reporting",
"state" : {
"revision" : "c85092304cda8cb38d2d68454b29609a8013620b",
"version" : "1.2.1"
"revision" : "357ca1e5dd31f613a1d43320870ebc219386a495",
"version" : "1.2.2"
}
},
{
Expand All @@ -50,8 +50,17 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/swiftlang/swift-syntax",
"state" : {
"revision" : "4c6cc0a3b9e8f14b3ae2307c5ccae4de6167ac2c",
"version" : "600.0.0-prerelease-2024-06-12"
"revision" : "82a453c2dfa335c7e778695762438dfe72b328d2",
"version" : "600.0.0-prerelease-2024-07-24"
}
},
{
"identity" : "xctest-dynamic-overlay",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/xctest-dynamic-overlay",
"state" : {
"revision" : "357ca1e5dd31f613a1d43320870ebc219386a495",
"version" : "1.2.2"
}
}
],
Expand Down
18 changes: 9 additions & 9 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ let package = Package(
],
dependencies: [
.package(url: "https://github.com/apple/swift-crypto.git", "1.0.0" ..< "4.0.0"),
.package(url: "https://github.com/pointfreeco/swift-concurrency-extras", from: "1.0.0"),
.package(url: "https://github.com/pointfreeco/swift-custom-dump", from: "1.3.0"),
.package(url: "https://github.com/pointfreeco/swift-snapshot-testing", from: "1.8.1"),
.package(url: "https://github.com/pointfreeco/swift-issue-reporting", from: "1.2.0"),
.package(url: "https://github.com/pointfreeco/swift-concurrency-extras", from: "1.1.0"),
.package(url: "https://github.com/pointfreeco/swift-custom-dump", from: "1.3.2"),
.package(url: "https://github.com/pointfreeco/swift-snapshot-testing", from: "1.17.2"),
.package(url: "https://github.com/pointfreeco/xctest-dynamic-overlay", from: "1.2.2"),
],
targets: [
.target(
Expand Down Expand Up @@ -55,7 +55,7 @@ let package = Package(
dependencies: [
.product(name: "CustomDump", package: "swift-custom-dump"),
.product(name: "SnapshotTesting", package: "swift-snapshot-testing"),
.product(name: "IssueReporting", package: "swift-issue-reporting"),
.product(name: "XCTestDynamicOverlay", package: "xctest-dynamic-overlay"),
"Helpers",
"Auth",
"TestHelpers",
Expand All @@ -71,7 +71,7 @@ let package = Package(
dependencies: [
.product(name: "ConcurrencyExtras", package: "swift-concurrency-extras"),
.product(name: "SnapshotTesting", package: "swift-snapshot-testing"),
.product(name: "IssueReporting", package: "swift-issue-reporting"),
.product(name: "XCTestDynamicOverlay", package: "xctest-dynamic-overlay"),
"Functions",
"TestHelpers",
],
Expand All @@ -82,7 +82,7 @@ let package = Package(
dependencies: [
.product(name: "CustomDump", package: "swift-custom-dump"),
.product(name: "InlineSnapshotTesting", package: "swift-snapshot-testing"),
.product(name: "IssueReporting", package: "swift-issue-reporting"),
.product(name: "XCTestDynamicOverlay", package: "xctest-dynamic-overlay"),
"Helpers",
"Auth",
"PostgREST",
Expand Down Expand Up @@ -129,7 +129,7 @@ let package = Package(
name: "StorageTests",
dependencies: [
.product(name: "CustomDump", package: "swift-custom-dump"),
.product(name: "IssueReporting", package: "swift-issue-reporting"),
.product(name: "XCTestDynamicOverlay", package: "xctest-dynamic-overlay"),
"Storage",
]
),
Expand All @@ -155,7 +155,7 @@ let package = Package(
name: "TestHelpers",
dependencies: [
.product(name: "ConcurrencyExtras", package: "swift-concurrency-extras"),
.product(name: "IssueReporting", package: "swift-issue-reporting"),
.product(name: "XCTestDynamicOverlay", package: "xctest-dynamic-overlay"),
"Auth",
]
),
Expand Down
7 changes: 0 additions & 7 deletions Sources/Realtime/RealtimeChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,6 @@ public struct RealtimeChannelOptions {
}
}

/// Represents the different status of a push
public enum PushStatus: String, Sendable {
case ok
case error
case timeout
}

public enum RealtimeSubscribeStates {
case subscribed
case timedOut
Expand Down
7 changes: 7 additions & 0 deletions Sources/Realtime/V2/PushV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
import Foundation
import Helpers

/// Represents the different status of a push
public enum PushStatus: String, Sendable {
case ok
case error
case timeout
}

actor PushV2 {
private weak var channel: RealtimeChannelV2?
let message: RealtimeMessageV2
Expand Down
95 changes: 78 additions & 17 deletions Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,56 @@ import ConcurrencyExtras
import Foundation
import Helpers

#if canImport(FoundationNetworking)
import FoundationNetworking

extension HTTPURLResponse {
convenience init() {
self.init(
url: URL(string: "http://127.0.0.1")!,
statusCode: 200,
httpVersion: nil,
headerFields: nil
)!
}
}
#endif

public struct RealtimeChannelConfig: Sendable {
public var broadcast: BroadcastJoinConfig
public var presence: PresenceJoinConfig
public var isPrivate: Bool
}

struct Socket: Sendable {
var broadcastURL: @Sendable () -> URL
var status: @Sendable () -> RealtimeClientV2.Status
var options: @Sendable () -> RealtimeClientOptions
var accessToken: @Sendable () -> String?
var apiKey: @Sendable () -> String?
var makeRef: @Sendable () -> Int

var connect: @Sendable () async -> Void
var addChannel: @Sendable (_ channel: RealtimeChannelV2) -> Void
var removeChannel: @Sendable (_ channel: RealtimeChannelV2) async -> Void
var push: @Sendable (_ message: RealtimeMessageV2) async -> Void
var httpSend: @Sendable (_ request: HTTPRequest) async throws -> HTTPResponse
}

extension Socket {
init(client: RealtimeClientV2) {
self.init(
broadcastURL: { [weak client] in client?.broadcastURL ?? URL(string: "http://localhost")! },
status: { [weak client] in client?.status ?? .disconnected },
options: { [weak client] in client?.options ?? .init() },
accessToken: { [weak client] in client?.mutableState.accessToken },
apiKey: { [weak client] in client?.apikey },
makeRef: { [weak client] in client?.makeRef() ?? 0 },
connect: { [weak client] in await client?.connect() },
addChannel: { [weak client] in client?.addChannel($0) },
removeChannel: { [weak client] in await client?.removeChannel($0) },
push: { [weak client] in await client?.push($0) }
push: { [weak client] in await client?.push($0) },
httpSend: { [weak client] in try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) }
)
}
}
Expand Down Expand Up @@ -202,24 +223,64 @@ public final class RealtimeChannelV2: Sendable {
/// - event: Broadcast message event.
/// - message: Message payload.
public func broadcast(event: String, message: JSONObject) async {
assert(
status == .subscribed,
"You can only broadcast after subscribing to the channel. Did you forget to call `channel.subscribe()`?"
)
if status != .subscribed {
struct Message: Encodable {
let topic: String
let event: String
let payload: JSONObject
let `private`: Bool
}

await push(
RealtimeMessageV2(
joinRef: mutableState.joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.broadcast,
payload: [
"type": "broadcast",
"event": .string(event),
"payload": .object(message),
]
var headers = HTTPHeaders(["content-type": "application/json"])
if let apiKey = socket.apiKey() {
headers["apikey"] = apiKey
}
if let accessToken = socket.accessToken() {
headers["authorization"] = "Bearer \(accessToken)"
}

let task = Task { [headers] in
_ = try? await socket.httpSend(
HTTPRequest(
url: socket.broadcastURL(),
method: .post,
headers: headers,
body: JSONEncoder().encode(
[
"messages": [
Message(
topic: topic,
event: event,
payload: message,
private: config.isPrivate
),
],
]
)
)
)
}

if config.broadcast.acknowledgeBroadcasts {
try? await withTimeout(interval: socket.options().timeoutInterval) {
await task.value
}
}
} else {
await push(
RealtimeMessageV2(
joinRef: mutableState.joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.broadcast,
payload: [
"type": "broadcast",
"event": .string(event),
"payload": .object(message),
]
)
)
)
}
}

public func track(_ state: some Codable) async throws {
Expand Down
21 changes: 19 additions & 2 deletions Sources/Realtime/V2/RealtimeClientV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public final class RealtimeClientV2: Sendable {
let options: RealtimeClientOptions
let ws: any WebSocketClient
let mutableState = LockIsolated(MutableState())
let http: any HTTPClientType
let apikey: String?

public var subscriptions: [String: RealtimeChannelV2] {
Expand Down Expand Up @@ -128,6 +129,12 @@ public final class RealtimeClientV2: Sendable {
}

public convenience init(url: URL, options: RealtimeClientOptions) {
var interceptors: [any HTTPClientInterceptor] = []

if let logger = options.logger {
interceptors.append(LoggerInterceptor(logger: logger))
}

self.init(
url: url,
options: options,
Expand All @@ -137,14 +144,24 @@ public final class RealtimeClientV2: Sendable {
apikey: options.apikey
),
options: options
),
http: HTTPClient(
fetch: options.fetch ?? { try await URLSession.shared.data(for: $0) },
interceptors: interceptors
)
)
}

init(url: URL, options: RealtimeClientOptions, ws: any WebSocketClient) {
init(
url: URL,
options: RealtimeClientOptions,
ws: any WebSocketClient,
http: any HTTPClientType
) {
self.url = url
self.options = options
self.ws = ws
self.http = http
apikey = options.apikey

mutableState.withValue {
Expand Down Expand Up @@ -471,7 +488,7 @@ public final class RealtimeClientV2: Sendable {
return url
}

private var broadcastURL: URL {
var broadcastURL: URL {
url.appendingPathComponent("api/broadcast")
}
}
Loading

0 comments on commit 93f4ff5

Please sign in to comment.