diff --git a/Sources/Functions/FunctionsClient.swift b/Sources/Functions/FunctionsClient.swift index 02d4917f..3d8e8034 100644 --- a/Sources/Functions/FunctionsClient.swift +++ b/Sources/Functions/FunctionsClient.swift @@ -1,5 +1,5 @@ import _Helpers -import Foundation +@preconcurrency import Foundation #if canImport(FoundationNetworking) import FoundationNetworking @@ -20,8 +20,8 @@ public actor FunctionsClient { var headers: [String: String] /// The Region to invoke the functions in. let region: String? - /// The fetch handler used to make requests. - let fetch: FetchHandler + + private let http: HTTPClient /// Initializes a new instance of `FunctionsClient`. /// @@ -29,12 +29,14 @@ public actor FunctionsClient { /// - url: The base URL for the functions. /// - headers: Headers to be included in the requests. (Default: empty dictionary) /// - region: The Region to invoke the functions in. + /// - logger: SupabaseLogger instance to use. /// - fetch: The fetch handler used to make requests. (Default: URLSession.shared.data(for:)) @_disfavoredOverload public init( url: URL, headers: [String: String] = [:], region: String? = nil, + logger: (any SupabaseLogger)? = nil, fetch: @escaping FetchHandler = { try await URLSession.shared.data(for: $0) } ) { self.url = url @@ -43,7 +45,7 @@ public actor FunctionsClient { self.headers["X-Client-Info"] = "functions-swift/\(version)" } self.region = region - self.fetch = fetch + http = HTTPClient(logger: logger, fetchHandler: fetch) } /// Initializes a new instance of `FunctionsClient`. @@ -52,20 +54,16 @@ public actor FunctionsClient { /// - url: The base URL for the functions. /// - headers: Headers to be included in the requests. (Default: empty dictionary) /// - region: The Region to invoke the functions in. + /// - logger: SupabaseLogger instance to use. /// - fetch: The fetch handler used to make requests. (Default: URLSession.shared.data(for:)) public init( url: URL, headers: [String: String] = [:], region: FunctionRegion? = nil, + logger: (any SupabaseLogger)? = nil, fetch: @escaping FetchHandler = { try await URLSession.shared.data(for: $0) } ) { - self.url = url - self.headers = headers - if headers["X-Client-Info"] == nil { - self.headers["X-Client-Info"] = "functions-swift/\(version)" - } - self.region = region?.rawValue - self.fetch = fetch + self.init(url: url, headers: headers, region: region?.rawValue, logger: logger, fetch: fetch) } /// Updates the authorization header. @@ -92,10 +90,10 @@ public actor FunctionsClient { options: FunctionInvokeOptions = .init(), decode: (Data, HTTPURLResponse) throws -> Response ) async throws -> Response { - let (data, response) = try await rawInvoke( + let response = try await rawInvoke( functionName: functionName, invokeOptions: options ) - return try decode(data, response) + return try decode(response.data, response.response) } /// Invokes a function and decodes the response as a specific type. @@ -130,33 +128,101 @@ public actor FunctionsClient { private func rawInvoke( functionName: String, invokeOptions: FunctionInvokeOptions - ) async throws -> (Data, HTTPURLResponse) { + ) async throws -> Response { + var request = Request( + path: functionName, + method: .post, + headers: invokeOptions.headers.merging(headers) { invoke, _ in invoke }, + body: invokeOptions.body + ) + + if let region = invokeOptions.region ?? region { + request.headers["x-region"] = region + } + + let response = try await http.fetch(request, baseURL: url) + + guard 200 ..< 300 ~= response.statusCode else { + throw FunctionsError.httpError(code: response.statusCode, data: response.data) + } + + let isRelayError = response.response.value(forHTTPHeaderField: "x-relay-error") == "true" + if isRelayError { + throw FunctionsError.relayError + } + + return response + } + + /// Invokes a function with streamed response. + /// + /// Function MUST return a `text/event-stream` content type for this method to work. + /// + /// - Parameters: + /// - functionName: The name of the function to invoke. + /// - invokeOptions: Options for invoking the function. + /// - Returns: A stream of Data. + /// + /// - Warning: Experimental method. + /// - Note: This method doesn't use the same underlying `URLSession` as the remaining methods in the library. + public func _invokeWithStreamedResponse( + _ functionName: String, + options invokeOptions: FunctionInvokeOptions = .init() + ) -> AsyncThrowingStream { + let (stream, continuation) = AsyncThrowingStream.makeStream() + let delegate = StreamResponseDelegate(continuation: continuation) + + let session = URLSession(configuration: .default, delegate: delegate, delegateQueue: nil) + let url = url.appendingPathComponent(functionName) var urlRequest = URLRequest(url: url) urlRequest.allHTTPHeaderFields = invokeOptions.headers.merging(headers) { invoke, _ in invoke } urlRequest.httpMethod = (invokeOptions.method ?? .post).rawValue urlRequest.httpBody = invokeOptions.body - let region = invokeOptions.region ?? region - if let region { - urlRequest.setValue(region, forHTTPHeaderField: "x-region") + let task = session.dataTask(with: urlRequest) { data, response, _ in + guard let httpResponse = response as? HTTPURLResponse else { + continuation.finish(throwing: URLError(.badServerResponse)) + return + } + + guard 200 ..< 300 ~= httpResponse.statusCode else { + let error = FunctionsError.httpError(code: httpResponse.statusCode, data: data ?? Data()) + continuation.finish(throwing: error) + return + } + + let isRelayError = httpResponse.value(forHTTPHeaderField: "x-relay-error") == "true" + if isRelayError { + continuation.finish(throwing: FunctionsError.relayError) + } } - let (data, response) = try await fetch(urlRequest) + task.resume() - guard let httpResponse = response as? HTTPURLResponse else { - throw URLError(.badServerResponse) - } + continuation.onTermination = { _ in + task.cancel() - guard 200 ..< 300 ~= httpResponse.statusCode else { - throw FunctionsError.httpError(code: httpResponse.statusCode, data: data) + // Hold a strong reference to delegate until continuation terminates. + _ = delegate } - let isRelayError = httpResponse.value(forHTTPHeaderField: "x-relay-error") == "true" - if isRelayError { - throw FunctionsError.relayError - } + return stream + } +} + +final class StreamResponseDelegate: NSObject, URLSessionDataDelegate, Sendable { + let continuation: AsyncThrowingStream.Continuation + + init(continuation: AsyncThrowingStream.Continuation) { + self.continuation = continuation + } + + func urlSession(_: URLSession, dataTask _: URLSessionDataTask, didReceive data: Data) { + continuation.yield(data) + } - return (data, httpResponse) + func urlSession(_: URLSession, task _: URLSessionTask, didCompleteWithError error: (any Error)?) { + continuation.finish(throwing: error) } } diff --git a/Sources/Supabase/SupabaseClient.swift b/Sources/Supabase/SupabaseClient.swift index 58655718..0bf2f4e0 100644 --- a/Sources/Supabase/SupabaseClient.swift +++ b/Sources/Supabase/SupabaseClient.swift @@ -71,6 +71,7 @@ public final class SupabaseClient: @unchecked Sendable { url: functionsURL, headers: defaultHeaders, region: options.functions.region, + logger: options.global.logger, fetch: fetchWithAuth )