From 73c9e4ada8dee20639ed5b75ad03ec1ad8cb36e0 Mon Sep 17 00:00:00 2001 From: Jamie Quadri Date: Wed, 7 Aug 2024 11:29:00 -0500 Subject: [PATCH] [fix]: update RxSwift bindings to observe on main thread --- .../Sources/ObservableWorkflow.swift | 3 +- .../Tests/Rx+ReactiveWorkers.swift | 48 +++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/WorkflowRxSwift/Sources/ObservableWorkflow.swift b/WorkflowRxSwift/Sources/ObservableWorkflow.swift index b21e469be..e124642ac 100644 --- a/WorkflowRxSwift/Sources/ObservableWorkflow.swift +++ b/WorkflowRxSwift/Sources/ObservableWorkflow.swift @@ -20,7 +20,7 @@ import class Workflow.Lifetime extension Observable: AnyWorkflowConvertible { public func asAnyWorkflow() -> AnyWorkflow { - return ObservableWorkflow(observable: self).asAnyWorkflow() + ObservableWorkflow(observable: self).asAnyWorkflow() } } @@ -41,6 +41,7 @@ struct ObservableWorkflow: Workflow { let disposable = observable .map { AnyWorkflowAction(sendingOutput: $0) } .subscribe(on: MainScheduler.asyncInstance) + .observe(on: MainScheduler.asyncInstance) .subscribe(onNext: { value in sink.send(value) }) diff --git a/WorkflowRxSwift/Tests/Rx+ReactiveWorkers.swift b/WorkflowRxSwift/Tests/Rx+ReactiveWorkers.swift index e2b59c978..9dec41a89 100644 --- a/WorkflowRxSwift/Tests/Rx+ReactiveWorkers.swift +++ b/WorkflowRxSwift/Tests/Rx+ReactiveWorkers.swift @@ -38,6 +38,54 @@ class Rx_ReactiveWorkersTests: XCTestCase { disposable?.dispose() } + + func test_observes_on_main_queue() { + struct TestWorkflow: Workflow { + enum Action: WorkflowAction { + typealias WorkflowType = TestWorkflow + case complete + + func apply(toState state: inout State) -> Output? { + switch self { + case .complete: + return .finished + } + } + } + + enum Output { + case finished + } + + func render(state: Void, context: RenderContext) { + Single.create { observer in + DispatchQueue.global().async { + observer(.success(())) + } + return Disposables.create() + } + .asObservable() + .running(in: context) { _ in + XCTAssert(Thread.isMainThread) + return Action.complete + } + } + } + + let host = WorkflowHost( + workflow: TestWorkflow() + ) + + let expectation = XCTestExpectation() + let disposable = host.output.signal.observeValues { output in + if output == .finished { + expectation.fulfill() + } + } + + wait(for: [expectation], timeout: 1.0) + disposable?.dispose() + } } struct CombinedWorkflow: Workflow {