Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support Uint8ArrayList #26

Merged
merged 4 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@
"release": "aegir release"
},
"dependencies": {
"it-map": "^1.0.6",
"it-pushable": "^3.0.0",
"it-pushable": "^3.1.0",
"it-reader": "^6.0.1",
"it-stream-types": "^1.0.4",
"p-defer": "^4.0.0"
"p-defer": "^4.0.0",
"uint8arraylist": "^2.0.0"
},
"devDependencies": {
"aegir": "^37.2.0",
Expand Down
26 changes: 13 additions & 13 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
import { Reader, reader } from 'it-reader'
import { pushable } from 'it-pushable'
import defer from 'p-defer'
import map from 'it-map'
import type { Duplex, Source } from 'it-stream-types'
import type { Pushable } from 'it-pushable'
import type { Uint8ArrayList } from 'uint8arraylist'

export interface Handshake {
export interface Handshake<T extends Uint8Array | Uint8ArrayList = Uint8Array> {
reader: Reader
writer: Pushable<Uint8Array>
stream: Duplex<Uint8Array>
rest: () => Source<Uint8Array>
write: (data: Uint8Array) => void
read: () => Promise<Uint8Array | undefined>
writer: Pushable<T>
stream: Duplex<T>
rest: () => Source<T>
write: (data: T) => void
read: () => Promise<Uint8ArrayList | undefined>
}

// Convert a duplex stream into a reader and writer and rest stream
export function handshake (stream: Duplex<Uint8Array>): Handshake {
const writer = pushable() // Write bytes on demand to the sink
export function handshake<T extends Uint8Array | Uint8ArrayList = Uint8Array> (stream: Duplex<T>): Handshake<T> {
const writer = pushable<T>() // Write bytes on demand to the sink
const source = reader(stream.source) // Read bytes on demand from the source

// Waits for a source to be passed to the rest stream's sink
const sourcePromise = defer<Source<Uint8Array>>()
const sourcePromise = defer<Source<T>>()
let sinkErr: Error

const sinkPromise = stream.sink((async function * () {
Expand All @@ -33,7 +33,7 @@ export function handshake (stream: Duplex<Uint8Array>): Handshake {
sinkErr = err
})

const rest: Duplex<Uint8Array> = {
const rest: Duplex<T> = {
sink: async source => {
if (sinkErr != null) {
return await Promise.reject(sinkErr)
Expand All @@ -42,7 +42,7 @@ export function handshake (stream: Duplex<Uint8Array>): Handshake {
sourcePromise.resolve(source)
return await sinkPromise
},
source: map(source, bl => bl.slice())
source: stream.source
}

return {
Expand All @@ -55,7 +55,7 @@ export function handshake (stream: Duplex<Uint8Array>): Handshake {
const res = await source.next()

if (res.value != null) {
return res.value.slice()
return res.value
}
}
}
Expand Down
50 changes: 40 additions & 10 deletions test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { duplexPair } from 'it-pair/duplex'
import { pipe } from 'it-pipe'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import all from 'it-all'
import { Uint8ArrayList } from 'uint8arraylist'

describe('handshake', () => {
it('should be able to perform a handshake', async () => {
Expand All @@ -13,11 +14,11 @@ describe('handshake', () => {

iShake.write(uint8ArrayFromString('hello'))
let message = await rShake.read()
expect(message).to.eql(uint8ArrayFromString('hello'))
expect(message?.slice()).to.eql(uint8ArrayFromString('hello'))
rShake.write(uint8ArrayFromString('hi'))
rShake.rest()
message = await iShake.read()
expect(message).to.eql(uint8ArrayFromString('hi'))
expect(message?.slice()).to.eql(uint8ArrayFromString('hi'))
iShake.rest()

const buffer = uint8ArrayFromString('more data')
Expand All @@ -35,6 +36,35 @@ describe('handshake', () => {
const data = await pipe([buffer], iShake.stream, async (source) => await all(source))
expect(data).to.eql([buffer])
})
it('should be able to perform a handshake via Uint8ArrayList', async () => {
const [initiator, responder] = duplexPair<Uint8ArrayList>()
const iShake = handshake(initiator)
const rShake = handshake(responder)

iShake.write(new Uint8ArrayList(uint8ArrayFromString('hello')))
let message = await rShake.read()
expect(message?.slice()).to.eql(uint8ArrayFromString('hello'))
rShake.write(new Uint8ArrayList(uint8ArrayFromString('hi')))
rShake.rest()
message = await iShake.read()
expect(message?.slice()).to.eql(uint8ArrayFromString('hi'))
iShake.rest()

const buffer = new Uint8ArrayList(uint8ArrayFromString('more data'))
void pipe(
rShake.stream,
(source) => (async function * () {
for await (const message of source) {
expect(message).to.eql(buffer)
yield message
}
})(),
rShake.stream
)

const data = await pipe([buffer], iShake.stream, async (source) => await all(source))
expect(data).to.eql([buffer])
})

it('should be able to perform consecutive handshakes', async () => {
const [initiator, responder] = duplexPair<Uint8Array>()
Expand All @@ -43,23 +73,23 @@ describe('handshake', () => {

iShake.write(uint8ArrayFromString('hello'))
let message = await rShake.read()
expect(message).to.eql(uint8ArrayFromString('hello'))
expect(message?.slice()).to.eql(uint8ArrayFromString('hello'))
rShake.write(uint8ArrayFromString('hi'))
rShake.rest()
message = await iShake.read()
expect(message).to.eql(uint8ArrayFromString('hi'))
expect(message?.slice()).to.eql(uint8ArrayFromString('hi'))
iShake.rest()

const iShake2 = handshake(iShake.stream)
const rShake2 = handshake(rShake.stream)

iShake2.write(uint8ArrayFromString('ready?'))
message = await rShake2.read()
expect(message).to.eql(uint8ArrayFromString('ready?'))
expect(message?.slice()).to.eql(uint8ArrayFromString('ready?'))
rShake2.write(uint8ArrayFromString('yes!'))
rShake2.rest()
message = await iShake2.read()
expect(message).to.eql(uint8ArrayFromString('yes!'))
expect(message?.slice()).to.eql(uint8ArrayFromString('yes!'))
iShake2.rest()

void pipe(
Expand Down Expand Up @@ -87,10 +117,10 @@ describe('handshake', () => {
// handshake before the responder finishes
iShake.write(uint8ArrayFromString('hello'))
message = await rShake.read()
expect(message).to.eql(uint8ArrayFromString('hello'))
expect(message?.slice()).to.eql(uint8ArrayFromString('hello'))
rShake.write(uint8ArrayFromString('hi'))
message = await iShake.read()
expect(message).to.eql(uint8ArrayFromString('hi'))
expect(message?.slice()).to.eql(uint8ArrayFromString('hi'))
iShake.rest()

const iShake2 = handshake(iShake.stream)
Expand All @@ -100,11 +130,11 @@ describe('handshake', () => {
const rShake2 = handshake(rShake.stream)

message = await rShake2.read()
expect(message).to.eql(uint8ArrayFromString('ready?'))
expect(message?.slice()).to.eql(uint8ArrayFromString('ready?'))
rShake2.write(uint8ArrayFromString('yes!'))
rShake2.rest()
message = await iShake2.read()
expect(message).to.eql(uint8ArrayFromString('yes!'))
expect(message?.slice()).to.eql(uint8ArrayFromString('yes!'))
iShake2.rest()

void pipe(
Expand Down