Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Commit

Permalink
Hub: fix publishAll behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
gcanti committed Sep 14, 2023
1 parent 3eca975 commit 600693e
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 5 deletions.
5 changes: 5 additions & 0 deletions .changeset/bright-poets-build.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/io": patch
---

Hub: fix publishAll behaviour
10 changes: 5 additions & 5 deletions src/internal/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,13 @@ class BoundedHubArb<A> implements AtomicHub<A> {
let iteratorIndex = 0
const publishAllIndex = this.publisherIndex + forHub
while (this.publisherIndex !== publishAllIndex) {
const a = pipe(chunk, Chunk.unsafeGet(iteratorIndex++))
const a = Chunk.unsafeGet(chunk, iteratorIndex++)
const index = this.publisherIndex % this.capacity
this.array[index] = a
this.subscribers[index] = this.subscriberCount
this.publisherIndex += 1
}
return pipe(chunk, Chunk.drop(iteratorIndex - 1))
return Chunk.drop(chunk, iteratorIndex)
}

slide(): void {
Expand Down Expand Up @@ -407,13 +407,13 @@ class BoundedHubPow2<A> implements AtomicHub<A> {
let iteratorIndex = 0
const publishAllIndex = this.publisherIndex + forHub
while (this.publisherIndex !== publishAllIndex) {
const elem = pipe(chunk, Chunk.unsafeGet(iteratorIndex++))
const elem = Chunk.unsafeGet(chunk, iteratorIndex++)
const index = this.publisherIndex & this.mask
this.array[index] = elem
this.subscribers[index] = this.subscriberCount
this.publisherIndex += 1
}
return pipe(chunk, Chunk.drop(iteratorIndex - 1))
return Chunk.drop(chunk, iteratorIndex)
}

slide(): void {
Expand Down Expand Up @@ -561,7 +561,7 @@ class BoundedHubSingle<A> implements AtomicHub<A> {
return chunk
}
if (this.publish(Chunk.unsafeHead(chunk))) {
return pipe(chunk, Chunk.drop(1))
return Chunk.drop(chunk, 1)
} else {
return chunk
}
Expand Down
54 changes: 54 additions & 0 deletions test/Hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,60 @@ import * as it from "@effect/io/test/utils/extend"
import { assert, describe } from "vitest"

describe.concurrent("Hub", () => {
it.effect("publishAll - capacity 2 (BoundedHubPow2)", () => {
const messages = [1, 2]
return Hub.bounded<number>(2).pipe(
Effect.flatMap((hub) =>
Effect.scoped(
Effect.gen(function*(_) {
const dequeue1 = yield* _(Hub.subscribe(hub))
const dequeue2 = yield* _(Hub.subscribe(hub))
yield* _(Hub.publishAll(hub, messages))
const takes1 = yield* _(Queue.takeAll(dequeue1))
const takes2 = yield* _(Queue.takeAll(dequeue2))
assert.deepStrictEqual([...takes1], messages)
assert.deepStrictEqual([...takes2], messages)
})
)
)
)
})
it.effect("publishAll - capacity 4 (BoundedHubPow2)", () => {
const messages = [1, 2]
return Hub.bounded<number>(4).pipe(
Effect.flatMap((hub) =>
Effect.scoped(
Effect.gen(function*(_) {
const dequeue1 = yield* _(Hub.subscribe(hub))
const dequeue2 = yield* _(Hub.subscribe(hub))
yield* _(Hub.publishAll(hub, messages))
const takes1 = yield* _(Queue.takeAll(dequeue1))
const takes2 = yield* _(Queue.takeAll(dequeue2))
assert.deepStrictEqual([...takes1], messages)
assert.deepStrictEqual([...takes2], messages)
})
)
)
)
})
it.effect("publishAll - capacity 3 (BoundedHubArb)", () => {
const messages = [1, 2]
return Hub.bounded<number>(3).pipe(
Effect.flatMap((hub) =>
Effect.scoped(
Effect.gen(function*(_) {
const dequeue1 = yield* _(Hub.subscribe(hub))
const dequeue2 = yield* _(Hub.subscribe(hub))
yield* _(Hub.publishAll(hub, messages))
const takes1 = yield* _(Queue.takeAll(dequeue1))
const takes2 = yield* _(Queue.takeAll(dequeue2))
assert.deepStrictEqual([...takes1], messages)
assert.deepStrictEqual([...takes2], messages)
})
)
)
)
})
it.effect("sequential publishers and subscribers with one publisher and one subscriber", () =>
Effect.gen(function*($) {
const values = ReadonlyArray.range(0, 9)
Expand Down

0 comments on commit 600693e

Please sign in to comment.