Skip to content

Commit

Permalink
feat: support Uint8ArrayList (#26)
Browse files Browse the repository at this point in the history
Add type support for streams of Uint8ArrayLists with backwards compat through generics.

Co-authored-by: Alex Potsides <[email protected]>
  • Loading branch information
mpetrunic and achingbrain committed Aug 3, 2022
1 parent bdea8b1 commit 0fc8e5d
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 26 deletions.
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@
"release": "aegir release"
},
"dependencies": {
"it-map": "^1.0.6",
"it-pushable": "^3.0.0",
"it-pushable": "^3.1.0",
"it-reader": "^6.0.1",
"it-stream-types": "^1.0.4",
"p-defer": "^4.0.0"
"p-defer": "^4.0.0",
"uint8arraylist": "^2.0.0"
},
"devDependencies": {
"aegir": "^37.2.0",
Expand Down
26 changes: 13 additions & 13 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
import { Reader, reader } from 'it-reader'
import { pushable } from 'it-pushable'
import defer from 'p-defer'
import map from 'it-map'
import type { Duplex, Source } from 'it-stream-types'
import type { Pushable } from 'it-pushable'
import type { Uint8ArrayList } from 'uint8arraylist'

export interface Handshake {
export interface Handshake<T extends Uint8Array | Uint8ArrayList = Uint8Array> {
reader: Reader
writer: Pushable<Uint8Array>
stream: Duplex<Uint8Array>
rest: () => Source<Uint8Array>
write: (data: Uint8Array) => void
read: () => Promise<Uint8Array | undefined>
writer: Pushable<T>
stream: Duplex<T>
rest: () => Source<T>
write: (data: T) => void
read: () => Promise<Uint8ArrayList | undefined>
}

// Convert a duplex stream into a reader and writer and rest stream
export function handshake (stream: Duplex<Uint8Array>): Handshake {
const writer = pushable() // Write bytes on demand to the sink
export function handshake<T extends Uint8Array | Uint8ArrayList = Uint8Array> (stream: Duplex<T>): Handshake<T> {
const writer = pushable<T>() // Write bytes on demand to the sink
const source = reader(stream.source) // Read bytes on demand from the source

// Waits for a source to be passed to the rest stream's sink
const sourcePromise = defer<Source<Uint8Array>>()
const sourcePromise = defer<Source<T>>()
let sinkErr: Error

const sinkPromise = stream.sink((async function * () {
Expand All @@ -33,7 +33,7 @@ export function handshake (stream: Duplex<Uint8Array>): Handshake {
sinkErr = err
})

const rest: Duplex<Uint8Array> = {
const rest: Duplex<T> = {
sink: async source => {
if (sinkErr != null) {
return await Promise.reject(sinkErr)
Expand All @@ -42,7 +42,7 @@ export function handshake (stream: Duplex<Uint8Array>): Handshake {
sourcePromise.resolve(source)
return await sinkPromise
},
source: map(source, bl => bl.slice())
source: stream.source
}

return {
Expand All @@ -55,7 +55,7 @@ export function handshake (stream: Duplex<Uint8Array>): Handshake {
const res = await source.next()

if (res.value != null) {
return res.value.slice()
return res.value
}
}
}
Expand Down
50 changes: 40 additions & 10 deletions test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { duplexPair } from 'it-pair/duplex'
import { pipe } from 'it-pipe'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import all from 'it-all'
import { Uint8ArrayList } from 'uint8arraylist'

describe('handshake', () => {
it('should be able to perform a handshake', async () => {
Expand All @@ -13,11 +14,11 @@ describe('handshake', () => {

iShake.write(uint8ArrayFromString('hello'))
let message = await rShake.read()
expect(message).to.eql(uint8ArrayFromString('hello'))
expect(message?.slice()).to.eql(uint8ArrayFromString('hello'))
rShake.write(uint8ArrayFromString('hi'))
rShake.rest()
message = await iShake.read()
expect(message).to.eql(uint8ArrayFromString('hi'))
expect(message?.slice()).to.eql(uint8ArrayFromString('hi'))
iShake.rest()

const buffer = uint8ArrayFromString('more data')
Expand All @@ -35,6 +36,35 @@ describe('handshake', () => {
const data = await pipe([buffer], iShake.stream, async (source) => await all(source))
expect(data).to.eql([buffer])
})
it('should be able to perform a handshake via Uint8ArrayList', async () => {
const [initiator, responder] = duplexPair<Uint8ArrayList>()
const iShake = handshake(initiator)
const rShake = handshake(responder)

iShake.write(new Uint8ArrayList(uint8ArrayFromString('hello')))
let message = await rShake.read()
expect(message?.slice()).to.eql(uint8ArrayFromString('hello'))
rShake.write(new Uint8ArrayList(uint8ArrayFromString('hi')))
rShake.rest()
message = await iShake.read()
expect(message?.slice()).to.eql(uint8ArrayFromString('hi'))
iShake.rest()

const buffer = new Uint8ArrayList(uint8ArrayFromString('more data'))
void pipe(
rShake.stream,
(source) => (async function * () {
for await (const message of source) {
expect(message).to.eql(buffer)
yield message
}
})(),
rShake.stream
)

const data = await pipe([buffer], iShake.stream, async (source) => await all(source))
expect(data).to.eql([buffer])
})

it('should be able to perform consecutive handshakes', async () => {
const [initiator, responder] = duplexPair<Uint8Array>()
Expand All @@ -43,23 +73,23 @@ describe('handshake', () => {

iShake.write(uint8ArrayFromString('hello'))
let message = await rShake.read()
expect(message).to.eql(uint8ArrayFromString('hello'))
expect(message?.slice()).to.eql(uint8ArrayFromString('hello'))
rShake.write(uint8ArrayFromString('hi'))
rShake.rest()
message = await iShake.read()
expect(message).to.eql(uint8ArrayFromString('hi'))
expect(message?.slice()).to.eql(uint8ArrayFromString('hi'))
iShake.rest()

const iShake2 = handshake(iShake.stream)
const rShake2 = handshake(rShake.stream)

iShake2.write(uint8ArrayFromString('ready?'))
message = await rShake2.read()
expect(message).to.eql(uint8ArrayFromString('ready?'))
expect(message?.slice()).to.eql(uint8ArrayFromString('ready?'))
rShake2.write(uint8ArrayFromString('yes!'))
rShake2.rest()
message = await iShake2.read()
expect(message).to.eql(uint8ArrayFromString('yes!'))
expect(message?.slice()).to.eql(uint8ArrayFromString('yes!'))
iShake2.rest()

void pipe(
Expand Down Expand Up @@ -87,10 +117,10 @@ describe('handshake', () => {
// handshake before the responder finishes
iShake.write(uint8ArrayFromString('hello'))
message = await rShake.read()
expect(message).to.eql(uint8ArrayFromString('hello'))
expect(message?.slice()).to.eql(uint8ArrayFromString('hello'))
rShake.write(uint8ArrayFromString('hi'))
message = await iShake.read()
expect(message).to.eql(uint8ArrayFromString('hi'))
expect(message?.slice()).to.eql(uint8ArrayFromString('hi'))
iShake.rest()

const iShake2 = handshake(iShake.stream)
Expand All @@ -100,11 +130,11 @@ describe('handshake', () => {
const rShake2 = handshake(rShake.stream)

message = await rShake2.read()
expect(message).to.eql(uint8ArrayFromString('ready?'))
expect(message?.slice()).to.eql(uint8ArrayFromString('ready?'))
rShake2.write(uint8ArrayFromString('yes!'))
rShake2.rest()
message = await iShake2.read()
expect(message).to.eql(uint8ArrayFromString('yes!'))
expect(message?.slice()).to.eql(uint8ArrayFromString('yes!'))
iShake2.rest()

void pipe(
Expand Down

0 comments on commit 0fc8e5d

Please sign in to comment.