Skip to content

Add async queue to run asynchronous closures one at a time #16

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions Development/TinkoffConcurrency/AsyncQueue/TCAsyncQueue.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
public final actor TCAsyncQueue {

// MARK: - Dependencies

private let taskFactory: ITCTaskFactory

// MARK: - Private Properties

private var lastEnqueuedTask: ITask?

// MARK: - Initializers

public init(taskFactory: ITCTaskFactory = TCTaskFactory()) {
self.taskFactory = taskFactory
}

// MARK: - Methods

@discardableResult
public func enqueue<T>(operation: @escaping @Sendable () async -> T) -> Task<T, Never> {
let lastEnqueuedTask = lastEnqueuedTask

let task = taskFactory.task {
await lastEnqueuedTask?.wait()

return await operation()
}

self.lastEnqueuedTask = task

return task
}

@discardableResult
public func enqueue<T>(operation: @escaping @Sendable () async throws -> T) -> Task<T, Error> {
let lastEnqueuedTask = lastEnqueuedTask

let task = taskFactory.task {
await lastEnqueuedTask?.wait()

return try await operation()
}

self.lastEnqueuedTask = task

return task
}
}

extension TCAsyncQueue {

// MARK: - Methods

public func perform<T>(operation: @escaping @Sendable () async throws -> T) async rethrows -> T {
let task = enqueue(operation: operation)

return try await withTaskCancellationHandler {
try await task.value
} onCancel: {
task.cancel()
}
}
}
15 changes: 15 additions & 0 deletions Development/TinkoffConcurrency/Internals/ITask.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
protocol ITask {

// MARK: - Methods

func wait() async
}

extension Task: ITask {

// MARK: - ITask

func wait() async {
_ = await result
}
}
5 changes: 3 additions & 2 deletions Example/Podfile.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
PODS:
- TinkoffConcurrency (1.2.0)
- TinkoffConcurrency/Tests (1.2.0)
- TinkoffConcurrency/Tests (1.2.0):
- TinkoffConcurrencyTesting (~> 1.2.0)
- TinkoffConcurrencyTesting (1.2.0):
- TinkoffConcurrency (~> 1.2.0)
- TinkoffConcurrencyTesting/Tests (1.2.0):
Expand All @@ -19,7 +20,7 @@ EXTERNAL SOURCES:
:path: "../"

SPEC CHECKSUMS:
TinkoffConcurrency: c733c4635a33a074bc64e1d72e03ba7d5affd4c7
TinkoffConcurrency: a63843ad0a598fa151732a5738ba2109049cc8d4
TinkoffConcurrencyTesting: ddddbefe4e962b54464ce2069db26e6558e51e47

PODFILE CHECKSUM: 59bbb119a3817973bcf62c31b9a94125545be248
Expand Down
167 changes: 167 additions & 0 deletions Tests/TinkoffConcurrency/AsyncQueue/TCAsyncQueueTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import XCTest

import TinkoffConcurrency
import TinkoffConcurrencyTesting

final class TCAsyncQueueTests: XCTestCase {

// MARK: - Dependencies

private var taskFactory: TCTestTaskFactory!

// MARK: - XCTestCase

override func setUp() {
super.setUp()

taskFactory = TCTestTaskFactory()
}

// MARK: - Tests

func test_asyncQueue_enqueue_order() async throws {
// given
let expectation1 = expectation(description: "operation 1")
let expectation2 = expectation(description: "operation 2")
let expectation3 = expectation(description: "operation 3")

let result = UncheckedSendable([Int]())

let queue = TCAsyncQueue(taskFactory: taskFactory)

// throwing operation that throws
await queue.enqueue {
await XCTWaiter.waitAsync(for: [expectation1], timeout: 1)

result.mutate { $0.append(1) }

throw FakeErrors.default
}

// throwing operation that returns value
await queue.enqueue {
await XCTWaiter.waitAsync(for: [expectation2], timeout: 1)

try throwingHelper()

result.mutate { $0.append(2) }
}

// non-throwing operation
await queue.enqueue {
await XCTWaiter.waitAsync(for: [expectation3], timeout: 1)

result.mutate { $0.append(3) }
}

// when
expectation3.fulfill()
expectation2.fulfill()
expectation1.fulfill()

await taskFactory.runUntilIdle()

// then
XCTAssertEqual(result.value, [1, 2, 3])
}

func test_asyncQueue_enqueue_result() async throws {
// given
let queue = TCAsyncQueue(taskFactory: taskFactory)

let queueEnqueueResult = String.fake()

// when
let task = await queue.enqueue {
queueEnqueueResult
}

await taskFactory.runUntilIdle()

let result = await task.value

// then
XCTAssertEqual(result, queueEnqueueResult)
}

func test_asyncQueue_throwingEnqueue_result() async throws {
// given
let queue = TCAsyncQueue(taskFactory: taskFactory)

let queueEnqueueResult = String.fake()

// when
let task = await queue.enqueue {
try throwingHelper()
return queueEnqueueResult
}

await taskFactory.runUntilIdle()

let result = try await task.value

// then
XCTAssertEqual(result, queueEnqueueResult)
}

func test_asyncQueue_throwingEnqueue_throwing() async throws {
// given
let queue = TCAsyncQueue(taskFactory: taskFactory)

let queueEnqueueResult = FakeErrors.default

// when
let task = await queue.enqueue {
throw queueEnqueueResult
}

await taskFactory.runUntilIdle()

let result = await XCTExecuteThrowsError(try await task.value)!

// then
XCTAssertEqualErrors(result, queueEnqueueResult)
}

func test_asyncQueue_perform() async throws {
// given
let queue = TCAsyncQueue(taskFactory: taskFactory)

let queueEnqueueResult = String.fake()

// when
let result = await queue.perform {
queueEnqueueResult
}

await taskFactory.runUntilIdle()

// then
XCTAssertEqual(result, queueEnqueueResult)
}

func test_asyncQueue_perform_cancel() async throws {
// given
let queue = TCAsyncQueue(taskFactory: taskFactory)

let expectation = expectation(description: "operation started")

// when
let task = Task {
await XCTWaiter.waitAsync(for: [expectation], timeout: 1)

await queue.perform {
// then
XCTAssertTrue(Task.isCancelled)
}
}

task.cancel()

expectation.fulfill()

await taskFactory.runUntilIdle()
}
}

private func throwingHelper() throws {}
2 changes: 2 additions & 0 deletions TinkoffConcurrency.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ Pod::Spec.new do |s|
s.source_files = 'Development/TinkoffConcurrency/**/*.{swift,md,docc}'

s.test_spec 'Tests' do |test_spec|
test_spec.dependency 'TinkoffConcurrencyTesting', '~> 1.2.0'

test_spec.source_files = ["Tests/TinkoffConcurrency/**/*.swift", "Tests/TestSupport/**/*.swift"]
end
end