Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send and receive only channels #170

Open
paulofaria opened this issue Jun 16, 2022 · 6 comments
Open

Send and receive only channels #170

paulofaria opened this issue Jun 16, 2022 · 6 comments

Comments

@paulofaria
Copy link

paulofaria commented Jun 16, 2022

An important ability when passing around channels is to limit sending/receiving by passing a send-only or a receive-only channel. Right now, it is possible to erase receiving, since AsyncChannel is just an AsyncSequence. However, there's currently no mechanism to create a send-only version of a channel. The simplest solution I think is to create a wrapper type called SendingChannel or something. I think this is a good reason to rename AsyncChannel to just Channel (#47). Otherwise we would have SendingAsyncChannel, which is not too bad, but I really think the Async part doesn't help us much.

@paulofaria
Copy link
Author

I'm not sure if it's worth creating a wrapper type for ReceivingChannel, but one advantage would be symmetry.

@phausler
Copy link
Member

Once the language can support some AsyncSequence<T> then the ReceivingChannel is just the opaque AsyncChannel.

@Sajjon
Copy link

Sajjon commented Sep 12, 2022

@paulofaria @phausler Would this be a reasonable implementation of ReceivingChannel and ReceivingThrowingChannel respectively do you think?

If so I could create a PR.

ReceivingChannel

//
//  ReceivingChannel.swift
//  
//
//  Created by Alexander Cyon on 2022-09-12.
//

import Foundation
import AsyncAlgorithms

public final class ReceivingChannel<Element: Sendable>: Sendable, AsyncSequence {
    public typealias Base = AsyncChannel<Element>
    
    @frozen
    public struct Iterator: AsyncIteratorProtocol {
        @usableFromInline
        var iterator: Base.Iterator?

        @usableFromInline
        init(_ iterator: Base.Iterator) {
            self.iterator = iterator
        }

        @inlinable
        public mutating func next() async -> Base.Element? {
            if !Task.isCancelled, let value = await iterator?.next() {
                return value
            } else {
                iterator = nil
                return nil
            }
        }
    }

    @usableFromInline
    let base: Base

    @usableFromInline
    init(_ base: Base) {
        self.base = base
    }

    @inlinable
    public func makeAsyncIterator() -> Iterator {
        Iterator(base.makeAsyncIterator())
    }
    
    /// Send a finish to all awaiting iterations.
    /// All subsequent calls to `next(_:)` will resume immediately.
    public func finish() {
        self.base.finish()
    }
}

public extension AsyncChannel {
    func receivingOnly() -> ReceivingChannel<Element> {
        ReceivingChannel(self)
    }
}

ReceivingThrowingChannel

//
//  ReceivingThrowingChannel.swift
//  
//
//  Created by Alexander Cyon on 2022-09-12.
//

import Foundation
import AsyncAlgorithms

public final class ReceivingThrowingChannel<Element: Sendable, Failure: Error>: Sendable, AsyncSequence {
    public typealias Base = AsyncThrowingChannel<Element, Failure>
    
    @frozen
    public struct Iterator: AsyncIteratorProtocol {
        @usableFromInline
        var iterator: Base.Iterator?

        @usableFromInline
        init(_ iterator: Base.Iterator) {
            self.iterator = iterator
        }

        @inlinable
        public mutating func next() async throws -> Base.Element? {
            if !Task.isCancelled, let value = try await iterator?.next() {
                return value
            } else {
                iterator = nil
                return nil
            }
        }
    }

    @usableFromInline
    let base: Base

    @usableFromInline
    init(_ base: Base) {
        self.base = base
    }

    @inlinable
    public func makeAsyncIterator() -> Iterator {
        Iterator(base.makeAsyncIterator())
    }
    
    /// Send a finish to all awaiting iterations.
    /// All subsequent calls to `next(_:)` will resume immediately.
    public func finish() {
        self.base.finish()
    }
}

public extension AsyncThrowingChannel {
    func receivingThrowingOnly() -> ReceivingThrowingChannel<Element, Failure> {
        ReceivingThrowingChannel(self)
    }
}

Unit tests

Tests which are copy-paste of TestChannel but without tests requiring non public ManagedCriticalState - modified to send values on AsyncChannel and AsyncThroingChannel but which use makeAsyncIterator on let receivingChannel = asyncChannel.receivingOnly().

//
//  ReceivingChannelTests.swift
//  
//
//  Created by Alexander Cyon on 2022-09-12.
//

import Foundation
import XCTest
import AsyncAlgorithmExtensions
import AsyncAlgorithms

struct Failure: Error, Equatable { }

final class ReceivingChannelTests: XCTestCase {
    
    func test_asyncChannel_delivers_values_when_two_producers_and_two_consumers() async {
        let (sentFromProducer1, sentFromProducer2) = ("test1", "test2")
        let expected = Set([sentFromProducer1, sentFromProducer2])
        
        let asyncChannel = AsyncChannel<String>()
        Task {
            await asyncChannel.send(sentFromProducer1)
        }
        Task {
            await asyncChannel.send(sentFromProducer2)
        }
        let receivingChannel = asyncChannel.receivingOnly()
        let t: Task<String?, Never> = Task {
            var iterator = receivingChannel.makeAsyncIterator()
            let value = await iterator.next()
            return value
        }
        var iterator = receivingChannel.makeAsyncIterator()
        
        let (collectedFromConsumer1, collectedFromConsumer2) = (await t.value, await iterator.next())
        let collected = Set([collectedFromConsumer1, collectedFromConsumer2])
        
        XCTAssertEqual(collected, expected)
    }
    
    func test_asyncThrowingChannel_delivers_values_when_two_producers_and_two_consumers() async throws {
        let (sentFromProducer1, sentFromProducer2) = ("test1", "test2")
        let expected = Set([sentFromProducer1, sentFromProducer2])
        
        let asyncChannel = AsyncThrowingChannel<String, Error>()
        Task {
            await asyncChannel.send("test1")
        }
        Task {
            await asyncChannel.send("test2")
        }
        let receivingChannel = asyncChannel.receivingThrowingOnly()
        let t: Task<String?, Error> = Task {
            var iterator = receivingChannel.makeAsyncIterator()
            let value = try await iterator.next()
            return value
        }
        var iterator = receivingChannel.makeAsyncIterator()
        
        let (collectedFromConsumer1, collectedFromConsumer2) = (try await t.value, try await iterator.next())
        let collected = Set([collectedFromConsumer1, collectedFromConsumer2])
        
        XCTAssertEqual(collected, expected)
    }
    
    func test_asyncThrowingChannel_throws_and_discards_additional_sent_values_when_fail_is_called() async {
        let sendImmediatelyResumes = expectation(description: "Send immediately resumes after fail")
        
        let asyncChannel = AsyncThrowingChannel<String, Error>()
        asyncChannel.fail(Failure())
        
        let receivingChannel = asyncChannel.receivingThrowingOnly()
        var iterator = receivingChannel.makeAsyncIterator()
        do {
            let _ = try await iterator.next()
            XCTFail("The AsyncThrowingChannel should have thrown")
        } catch {
            XCTAssertEqual(error as? Failure, Failure())
        }
        
        do {
            let pastFailure = try await iterator.next()
            XCTAssertNil(pastFailure)
        } catch {
            XCTFail("The AsyncThrowingChannel should not fail when failure has already been fired")
        }
        
        await asyncChannel.send("send")
        sendImmediatelyResumes.fulfill()
        wait(for: [sendImmediatelyResumes], timeout: 1.0)
    }
  
    func test_asyncChannel_ends_iterator_when_task_is_cancelled() async {
        let asyncChannel = AsyncChannel<String>()
        let receivingChannel = asyncChannel.receivingOnly()
        let ready = expectation(description: "ready")
        let task: Task<String?, Never> = Task {
            var iterator = receivingChannel.makeAsyncIterator()
            ready.fulfill()
            return await iterator.next()
        }
        wait(for: [ready], timeout: 1.0)
        task.cancel()
        let value = await task.value
        XCTAssertNil(value)
    }
    
    func test_asyncThrowingChannel_ends_iterator_when_task_is_cancelled() async throws {
        let asyncChannel = AsyncThrowingChannel<String, Error>()
        let receivingChannel = asyncChannel.receivingThrowingOnly()
        let ready = expectation(description: "ready")
        let task: Task<String?, Error> = Task {
            var iterator = receivingChannel.makeAsyncIterator()
            ready.fulfill()
            return try await iterator.next()
        }
        wait(for: [ready], timeout: 1.0)
        task.cancel()
        let value = try await task.value
        XCTAssertNil(value)
    }
    
  
}

@twittemb
Copy link
Contributor

twittemb commented Sep 12, 2022

100% agree with the previous comment from @phausler. We could perhaps have a SendingChannel protocol that AsyncChannel conforms to.

protocol SendingChannel<Element> {
  associatedtype Element: Sendable

  func send(_:) async
  func finish()
}

It could make sense if there are several types of Channels in the future.

@Sajjon
Copy link

Sajjon commented Sep 12, 2022

Yes, but if we do not want to wait for Async types to get Primary Associated Type (PRAT) support, might be nice to get something in the mean time?

It sound as if AsyncSequences getting PRAT support might take some time since it is effectively blocked by Precise Error Typing (just asked for status update) - from Swift Forums thread:

(1) AsyncSequence and AsyncIteratorProtocol logically ought to have Element as their primary associated type. However, we have ongoing evolution discussions 14 about adding a precise error type to these. If those discussions bear fruit, then the new Error associated type would need to also be marked primary. To prevent source compatibility complications, adding primary associated types to these two protocols is deferred to a future proposal.

@FranzBusch
Copy link
Member

I think what AsyncChannel could benefit from is having a nested type called Source which is the type you can write to. We have been doing this in our swift-nio async sequence as well.

The benefit of doing this is that it not only allows you to vend a specific interface to only the write side of things, it also allows you to implement logic inside AsyncChannel to finish the sequence if nobody is holding onto the Source anymore.

For the places where you want to both write and consume the channel you can just pass both types. This way you also don't need a Receiving/ or SendingChannel but just an AsyncChannel and an AsyncChannel.Source.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants