Skip to content

Commit

Permalink
Handle And filter expired tasks (#17)
Browse files Browse the repository at this point in the history
* Handle And filter expired tasks

* Doc

* Add scope operator
  • Loading branch information
sebastianvarela authored Dec 5, 2023
1 parent 488b802 commit 010fbe2
Show file tree
Hide file tree
Showing 12 changed files with 201 additions and 6 deletions.
20 changes: 20 additions & 0 deletions Mini.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@
F297D27F286A0C6900323F24 /* Dispatcher+Combine.swift in Sources */ = {isa = PBXBuildFile; fileRef = F297D27D286A0C6900323F24 /* Dispatcher+Combine.swift */; };
F297D280286A0C6900323F24 /* Dispatcher+Combine.swift in Sources */ = {isa = PBXBuildFile; fileRef = F297D27D286A0C6900323F24 /* Dispatcher+Combine.swift */; };
F297D281286A0C6900323F24 /* Dispatcher+Combine.swift in Sources */ = {isa = PBXBuildFile; fileRef = F297D27D286A0C6900323F24 /* Dispatcher+Combine.swift */; };
F297F4522B1A313200B7B1FA /* Publishers.RemoveExpired.swift in Sources */ = {isa = PBXBuildFile; fileRef = F297F4512B1A313200B7B1FA /* Publishers.RemoveExpired.swift */; };
F297F4532B1A313200B7B1FA /* Publishers.RemoveExpired.swift in Sources */ = {isa = PBXBuildFile; fileRef = F297F4512B1A313200B7B1FA /* Publishers.RemoveExpired.swift */; };
F297F4542B1A313200B7B1FA /* Publishers.RemoveExpired.swift in Sources */ = {isa = PBXBuildFile; fileRef = F297F4512B1A313200B7B1FA /* Publishers.RemoveExpired.swift */; };
F297F4552B1A313200B7B1FA /* Publishers.RemoveExpired.swift in Sources */ = {isa = PBXBuildFile; fileRef = F297F4512B1A313200B7B1FA /* Publishers.RemoveExpired.swift */; };
F2AD8249286B6AD9005C024F /* TaskExpiration.swift in Sources */ = {isa = PBXBuildFile; fileRef = F2AD8248286B6AD9005C024F /* TaskExpiration.swift */; };
F2AD824A286B6AD9005C024F /* TaskExpiration.swift in Sources */ = {isa = PBXBuildFile; fileRef = F2AD8248286B6AD9005C024F /* TaskExpiration.swift */; };
F2AD824B286B6AD9005C024F /* TaskExpiration.swift in Sources */ = {isa = PBXBuildFile; fileRef = F2AD8248286B6AD9005C024F /* TaskExpiration.swift */; };
Expand Down Expand Up @@ -169,6 +173,10 @@
F2C09DC5286B57EA009C9C8E /* TaskStatus.swift in Sources */ = {isa = PBXBuildFile; fileRef = F2C09DC3286B57EA009C9C8E /* TaskStatus.swift */; };
F2C09DC6286B57EA009C9C8E /* TaskStatus.swift in Sources */ = {isa = PBXBuildFile; fileRef = F2C09DC3286B57EA009C9C8E /* TaskStatus.swift */; };
F2C09DC7286B57EA009C9C8E /* TaskStatus.swift in Sources */ = {isa = PBXBuildFile; fileRef = F2C09DC3286B57EA009C9C8E /* TaskStatus.swift */; };
F2CDC8852B1F4567004E1AFC /* Publishers.Scope.swift in Sources */ = {isa = PBXBuildFile; fileRef = F2CDC8842B1F4567004E1AFC /* Publishers.Scope.swift */; };
F2CDC8862B1F4567004E1AFC /* Publishers.Scope.swift in Sources */ = {isa = PBXBuildFile; fileRef = F2CDC8842B1F4567004E1AFC /* Publishers.Scope.swift */; };
F2CDC8872B1F4567004E1AFC /* Publishers.Scope.swift in Sources */ = {isa = PBXBuildFile; fileRef = F2CDC8842B1F4567004E1AFC /* Publishers.Scope.swift */; };
F2CDC8882B1F4567004E1AFC /* Publishers.Scope.swift in Sources */ = {isa = PBXBuildFile; fileRef = F2CDC8842B1F4567004E1AFC /* Publishers.Scope.swift */; };
F2D0DA1829E8472900A114EC /* Taskable.swift in Sources */ = {isa = PBXBuildFile; fileRef = F2D0DA1729E8472900A114EC /* Taskable.swift */; };
F2D0DA1929E8472900A114EC /* Taskable.swift in Sources */ = {isa = PBXBuildFile; fileRef = F2D0DA1729E8472900A114EC /* Taskable.swift */; };
F2D0DA1A29E8472900A114EC /* Taskable.swift in Sources */ = {isa = PBXBuildFile; fileRef = F2D0DA1729E8472900A114EC /* Taskable.swift */; };
Expand Down Expand Up @@ -250,6 +258,7 @@
F288DCD829BB942100FBFED1 /* Publishers.CombineMiniTasksArray.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Publishers.CombineMiniTasksArray.swift; sourceTree = "<group>"; };
F297D268286A02E200323F24 /* KeyedAction.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = KeyedAction.swift; sourceTree = "<group>"; };
F297D27D286A0C6900323F24 /* Dispatcher+Combine.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Dispatcher+Combine.swift"; sourceTree = "<group>"; };
F297F4512B1A313200B7B1FA /* Publishers.RemoveExpired.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Publishers.RemoveExpired.swift; sourceTree = "<group>"; };
F2AD8248286B6AD9005C024F /* TaskExpiration.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TaskExpiration.swift; sourceTree = "<group>"; };
F2AD824D286B7065005C024F /* PublishersTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PublishersTests.swift; sourceTree = "<group>"; };
F2C09DAB286B1490009C9C8E /* TestError.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TestError.swift; sourceTree = "<group>"; };
Expand All @@ -259,6 +268,7 @@
F2C09DBB286B2698009C9C8E /* TestStoreController.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TestStoreController.swift; sourceTree = "<group>"; };
F2C09DBF286B530D009C9C8E /* ActionTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ActionTests.swift; sourceTree = "<group>"; };
F2C09DC3286B57EA009C9C8E /* TaskStatus.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TaskStatus.swift; sourceTree = "<group>"; };
F2CDC8842B1F4567004E1AFC /* Publishers.Scope.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Publishers.Scope.swift; sourceTree = "<group>"; };
F2D0DA1729E8472900A114EC /* Taskable.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Taskable.swift; sourceTree = "<group>"; };
F2D0DA1C29E8473700A114EC /* EmptyTask.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = EmptyTask.swift; sourceTree = "<group>"; };
F2DF4A2A26C2B69A00C082CF /* SharedDictionaryTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SharedDictionaryTests.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -441,6 +451,8 @@
F288DCD329BB922F00FBFED1 /* Publishers.CombineMiniTasksTuple3.swift */,
F288DCCE29BB922700FBFED1 /* Publishers.CombineMiniTasksTuple4.swift */,
3A20F7E129CCAC5500DDCF8D /* Publishers.EraseToEmptyTask.swift */,
F297F4512B1A313200B7B1FA /* Publishers.RemoveExpired.swift */,
F2CDC8842B1F4567004E1AFC /* Publishers.Scope.swift */,
);
path = Publishers;
sourceTree = "<group>";
Expand Down Expand Up @@ -788,6 +800,7 @@
F222D4BB25249B7E00672E7B /* ReducerGroup.swift in Sources */,
F222D4B725249B7E00672E7B /* Chain.swift in Sources */,
F297D27E286A0C6900323F24 /* Dispatcher+Combine.swift in Sources */,
F2CDC8852B1F4567004E1AFC /* Publishers.Scope.swift in Sources */,
F288761528649B1C0069790E /* Publishers.CombineMiniTasksTuple2.swift in Sources */,
F297D269286A02E200323F24 /* KeyedAction.swift in Sources */,
F227FD8729CDEBAE00F1E801 /* PublisherExtensions+EmptyActions.swift in Sources */,
Expand All @@ -802,6 +815,7 @@
F222D4F525249B9B00672E7B /* DispatchQueueExtensions.swift in Sources */,
F2D0DA1D29E8473700A114EC /* EmptyTask.swift in Sources */,
F222D4BF25249B7E00672E7B /* ActionReducer.swift in Sources */,
F297F4522B1A313200B7B1FA /* Publishers.RemoveExpired.swift in Sources */,
F26C3AF822537A4600189D28 /* Mini.swift in Sources */,
F2AD8249286B6AD9005C024F /* TaskExpiration.swift in Sources */,
F2D0DA1829E8472900A114EC /* Taskable.swift in Sources */,
Expand Down Expand Up @@ -847,6 +861,7 @@
F222D4BC25249B7E00672E7B /* ReducerGroup.swift in Sources */,
F222D4B825249B7E00672E7B /* Chain.swift in Sources */,
F297D27F286A0C6900323F24 /* Dispatcher+Combine.swift in Sources */,
F2CDC8862B1F4567004E1AFC /* Publishers.Scope.swift in Sources */,
F288761628649B1C0069790E /* Publishers.CombineMiniTasksTuple2.swift in Sources */,
F297D26A286A02E200323F24 /* KeyedAction.swift in Sources */,
F227FD8829CDEBAE00F1E801 /* PublisherExtensions+EmptyActions.swift in Sources */,
Expand All @@ -861,6 +876,7 @@
F222D4F625249B9B00672E7B /* DispatchQueueExtensions.swift in Sources */,
F2D0DA1E29E8473700A114EC /* EmptyTask.swift in Sources */,
F222D4C025249B7E00672E7B /* ActionReducer.swift in Sources */,
F297F4532B1A313200B7B1FA /* Publishers.RemoveExpired.swift in Sources */,
F26C3AF922537A4600189D28 /* Mini.swift in Sources */,
F2AD824A286B6AD9005C024F /* TaskExpiration.swift in Sources */,
F2D0DA1929E8472900A114EC /* Taskable.swift in Sources */,
Expand Down Expand Up @@ -906,6 +922,7 @@
F222D4BD25249B7E00672E7B /* ReducerGroup.swift in Sources */,
F222D4B925249B7E00672E7B /* Chain.swift in Sources */,
F297D280286A0C6900323F24 /* Dispatcher+Combine.swift in Sources */,
F2CDC8872B1F4567004E1AFC /* Publishers.Scope.swift in Sources */,
F288761728649B1C0069790E /* Publishers.CombineMiniTasksTuple2.swift in Sources */,
F297D26B286A02E200323F24 /* KeyedAction.swift in Sources */,
F227FD8929CDEBAE00F1E801 /* PublisherExtensions+EmptyActions.swift in Sources */,
Expand All @@ -920,6 +937,7 @@
F222D4F725249B9B00672E7B /* DispatchQueueExtensions.swift in Sources */,
F2D0DA1F29E8473700A114EC /* EmptyTask.swift in Sources */,
F222D4C125249B7E00672E7B /* ActionReducer.swift in Sources */,
F297F4542B1A313200B7B1FA /* Publishers.RemoveExpired.swift in Sources */,
F26C3AFA22537A4600189D28 /* Mini.swift in Sources */,
F2AD824B286B6AD9005C024F /* TaskExpiration.swift in Sources */,
F2D0DA1A29E8472900A114EC /* Taskable.swift in Sources */,
Expand Down Expand Up @@ -965,6 +983,7 @@
F222D4BE25249B7E00672E7B /* ReducerGroup.swift in Sources */,
F222D4BA25249B7E00672E7B /* Chain.swift in Sources */,
F297D281286A0C6900323F24 /* Dispatcher+Combine.swift in Sources */,
F2CDC8882B1F4567004E1AFC /* Publishers.Scope.swift in Sources */,
F288761828649B1C0069790E /* Publishers.CombineMiniTasksTuple2.swift in Sources */,
F297D26C286A02E200323F24 /* KeyedAction.swift in Sources */,
F227FD8A29CDEBAE00F1E801 /* PublisherExtensions+EmptyActions.swift in Sources */,
Expand All @@ -979,6 +998,7 @@
F222D4F825249B9B00672E7B /* DispatchQueueExtensions.swift in Sources */,
F2D0DA2029E8473700A114EC /* EmptyTask.swift in Sources */,
F222D4C225249B7E00672E7B /* ActionReducer.swift in Sources */,
F297F4552B1A313200B7B1FA /* Publishers.RemoveExpired.swift in Sources */,
F26C3AFB22537A4600189D28 /* Mini.swift in Sources */,
F2AD824C286B6AD9005C024F /* TaskExpiration.swift in Sources */,
F2D0DA1B29E8472900A114EC /* Taskable.swift in Sources */,
Expand Down
6 changes: 3 additions & 3 deletions Sources/Publishers/Publishers.EraseToEmptyTask.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Combine

public extension Publisher {
func eraseToEmptyTask() -> Publishers.EraseToEmptyTask<Self, Output.Failure>
func eraseToEmptyTask() -> Publishers.EraseToEmptyTask<Self>
where Output: Taskable {
Publishers.EraseToEmptyTask(upstream: self)
}
Expand All @@ -10,8 +10,8 @@ public extension Publisher {
public extension Publishers {
/// Create a `Publisher` that connect an Upstream (Another publisher) that type erases `Task`s to `EmptyTask`
/// The Output of this `Publisher` always is a combined `EmptyTask`
struct EraseToEmptyTask<Upstream: Publisher, TaskFailure: Error>: Publisher where Upstream.Output: Taskable, Upstream.Output.Failure == TaskFailure {
public typealias Output = EmptyTask<TaskFailure>
struct EraseToEmptyTask<Upstream: Publisher>: Publisher where Upstream.Output: Taskable {
public typealias Output = EmptyTask<Upstream.Output.Failure>
public typealias Failure = Upstream.Failure

public let upstream: Upstream
Expand Down
54 changes: 54 additions & 0 deletions Sources/Publishers/Publishers.RemoveExpired.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import Combine

public extension Publisher {
func removeExpired() -> Publishers.RemoveExpired<Self>
where Output: Taskable {
Publishers.RemoveExpired(upstream: self)
}
}

public extension Publishers {
/// Create a `Publisher` that connect an Upstream (Another publisher) that filter any expired task received
/// The Output of this `Publisher` is the same of the Upstream.
struct RemoveExpired<Upstream: Publisher>: Publisher where Upstream.Output: Taskable {
public typealias Output = Upstream.Output
public typealias Failure = Upstream.Failure

public let upstream: Upstream

public init(upstream: Upstream) {
self.upstream = upstream
}

public func receive<S: Subscriber>(subscriber: S) where Upstream.Failure == S.Failure, Output == S.Input {
upstream.subscribe(Inner(downstream: subscriber))
}
}
}

extension Publishers.RemoveExpired {
private struct Inner<Downstream: Subscriber>: Subscriber
where Downstream.Input == Output, Downstream.Failure == Upstream.Failure, Output: Taskable {
let combineIdentifier = CombineIdentifier()
private let downstream: Downstream

fileprivate init(downstream: Downstream) {
self.downstream = downstream
}

func receive(subscription: Subscription) {
downstream.receive(subscription: subscription)
}

func receive(_ input: Upstream.Output) -> Subscribers.Demand {
if input.isExpired {
return .none
}
return downstream.receive(input)
}

func receive(completion: Subscribers.Completion<Upstream.Failure>) {
downstream.receive(completion: completion)
}
}
}
12 changes: 12 additions & 0 deletions Sources/Publishers/Publishers.Scope.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import Combine
import Foundation

public extension Publisher {
/// From a publisher, we can focus on a task and filter all expired and duplicated task. This publisher don't send value if at suscription moment there is a expired task.
func scope<T: Taskable & Equatable>(_ transform: @escaping (Self.Output) -> T) -> AnyPublisher<T, Failure> {
map(transform)
.removeExpired()
.removeDuplicates()
.eraseToAnyPublisher()
}
}
5 changes: 5 additions & 0 deletions Sources/Task/KeyedTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ extension KeyedTask where Key: Hashable, Value: Taskable {
self[key]?.isRunning ?? false
}

/// Returns true if the KeyedTask contains a task with given key and its expired. If the key don't exists return false
public func isExpired(key: Key) -> Bool {
self[key]?.isExpired ?? false
}

/// Returns true if the KeyedTask contains a task with given key and its recently succeded. If the key don't exists return false
public func isRecentlySucceeded(key: Key) -> Bool {
self[key]?.isRecentlySucceeded ?? false
Expand Down
5 changes: 5 additions & 0 deletions Sources/Task/Task.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public class Task<T: Equatable, E: Error & Equatable>: Taskable, Equatable, Cust
status == .running
}

public var isExpired: Bool {
let margin: TimeInterval = 0.1 // 100ms for suscriptions propagations
return started.timeIntervalSinceNow + expiration.value + margin < 0
}

public var isRecentlySucceeded: Bool {
switch status {
case .success where started.timeIntervalSinceNow + expiration.value >= 0:
Expand Down
1 change: 1 addition & 0 deletions Sources/Task/Taskable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ public protocol Taskable {

var isIdle: Bool { get }
var isRunning: Bool { get }
var isExpired: Bool { get }
var isRecentlySucceeded: Bool { get }
var isTerminal: Bool { get }
var isSuccessful: Bool { get }
Expand Down
4 changes: 2 additions & 2 deletions Tests/Helpers/TestState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import Foundation
import Mini

struct TestState: State {
public let testTask: Task<None, TestError>
public let testTask: Task<Int, TestError>
public let counter: Int

public init(testTask: Task<None, TestError> = .idle(),
public init(testTask: Task<Int, TestError> = .idle(),
counter: Int = 0) {
self.testTask = testTask
self.counter = counter
Expand Down
2 changes: 1 addition & 1 deletion Tests/Helpers/TestStoreController.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ extension TestStore {
func reducerGroup(expectation: XCTestExpectation? = nil) -> ReducerGroup {
ReducerGroup { [
Reducer(of: TestAction.self, on: self.dispatcher) { action in
self.state = TestState(testTask: .success(), counter: action.counter)
self.state = TestState(testTask: .success(action.counter), counter: action.counter)
expectation?.fulfill()
}
]
Expand Down
28 changes: 28 additions & 0 deletions Tests/PublishersTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ import Combine
import XCTest

class PublishersTests: XCTestCase {
var taskSuccessExpired: Task<String, TestError> = .success("hola viejo", started: Date() - 1_000, expiration: .immediately)
var taskSuccess1: Task<String, TestError> = .success("hola")
var taskSuccess2: Task<String, TestError> = .success("chau")
var taskFailure1: Task<String, TestError> = .failure(.berenjenaError)
var taskFailureExpired: Task<String, TestError> = .failure(.berenjenaError, started: Date() - 1_000)
var taskFailure2: Task<String, TestError> = .failure(.bigBerenjenaError)
var taskRunning1: Task<String, TestError> = .running()
var taskIdle1: Task<String, TestError> = .idle()
Expand Down Expand Up @@ -291,4 +293,30 @@ class PublishersTests: XCTestCase {

waitForExpectations(timeout: 2)
}

// Remove Expired

func test_remove_expired() {
var cancellables = Set<AnyCancellable>()
let expectation = expectation(description: "wait for async process")

let subject = PassthroughSubject<Task<String, TestError>, Never>()

subject
.removeExpired() // Filter the 2 expired task
.removeDuplicates() // Pass only the first success task because the expired they never get here!
.sink { task in
XCTAssertFalse(task.isExpired)
expectation.fulfill()
}
.store(in: &cancellables)

// Send 2 unexpired and 2 expired:
subject.send(taskSuccess1)
subject.send(taskSuccessExpired)
subject.send(taskFailureExpired)
subject.send(taskSuccess1)

waitForExpectations(timeout: 2)
}
}
29 changes: 29 additions & 0 deletions Tests/ReducerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,33 @@ final class ReducerTests: XCTestCase {

wait(for: [expectation1, expectation2], timeout: 5.0)
}

func test_scope() {
var cancellables = Set<AnyCancellable>()
let dispatcher = Dispatcher()
let initialState = TestState()
let store = Store<TestState, TestStoreController>(initialState, dispatcher: dispatcher, storeController: TestStoreController())
let expectation1 = XCTestExpectation(description: "Subscription Emits 1")

store
.reducerGroup()
.store(in: &cancellables)

dispatcher.dispatch(TestAction(counter: 1))

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
store
.scope { $0.testTask }
.sink { task in
XCTAssertEqual(task.payload, 2) // Only get 2 because we scope the suscription to task
// on the state and receive non expired and unique values.
expectation1.fulfill()
}
.store(in: &cancellables)

dispatcher.dispatch(TestAction(counter: 2))
}

wait(for: [expectation1], timeout: 5.0)
}
}
Loading

0 comments on commit 010fbe2

Please sign in to comment.