Skip to content

Commit

Permalink
More
Browse files Browse the repository at this point in the history
  • Loading branch information
ardatan committed Dec 28, 2022
1 parent 4b16876 commit 745ee7a
Show file tree
Hide file tree
Showing 5 changed files with 466 additions and 49 deletions.
65 changes: 40 additions & 25 deletions packages/node-fetch/src/ReadableStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,53 @@ import { Readable, Writable } from 'stream';
function createController<T>(
desiredSize: number,
readable: Readable
): ReadableStreamDefaultController<T> & { enqueued: boolean } {
let enqueued = false;
): ReadableStreamDefaultController<T> & { _flush(): void; } {
let chunks: Buffer[] = [];
return {
desiredSize,
enqueue(chunk: any) {
enqueued = true;
readable.push(chunk);
chunks.push(Buffer.from(chunk));
},
close() {
if (chunks.length > 0) {
this._flush();
}
readable.push(null);
},
error(error: Error) {
if (chunks.length > 0) {
this._flush();
}
readable.destroy(error);
},
get enqueued() {
return enqueued;
},
_flush() {
if (chunks.length > 0) {
const concatenated = Buffer.concat(chunks);
readable.push(concatenated);
chunks = [];
}
}
};
}

export class PonyfillReadableStream<T> implements ReadableStream<T> {
readable: Readable;
constructor(underlyingSource?: UnderlyingSource<T> | Readable | ReadableStream<T> | PonyfillReadableStream<T>) {
let started = false;
if (underlyingSource instanceof PonyfillReadableStream) {
this.readable = underlyingSource.readable;
} else if (underlyingSource && 'read' in underlyingSource) {
this.readable = underlyingSource as Readable;
} else if (underlyingSource && 'getReader' in underlyingSource) {
const reader = underlyingSource.getReader();
let reader: ReadableStreamDefaultReader<T>;
this.readable = new Readable({
construct(callback) {
try {
reader = underlyingSource.getReader();
callback(null);
} catch (err: any) {
callback(err);
}
},
read() {
reader
.read()
Expand All @@ -53,26 +69,20 @@ export class PonyfillReadableStream<T> implements ReadableStream<T> {
},
});
} else {
let waitingForPull = false;
this.readable = new Readable({
async read(desiredSize) {
if (waitingForPull) {
return;
}
waitingForPull = true;
async construct(callback) {
try {
const controller = createController(desiredSize, this);
if (!started) {
started = true;
await underlyingSource?.start?.(controller);
}
if (!controller.enqueued) {
await underlyingSource?.pull?.(controller);
}
} finally {
waitingForPull = false;
await underlyingSource?.start?.(createController(0, this));
callback(null);
} catch (err: any) {
callback(err);
}
},
async read(desiredSize) {
const controller = createController(desiredSize, this);
await underlyingSource?.pull?.(controller);
controller._flush();
},
async destroy(err, callback) {
try {
await underlyingSource?.cancel?.(err);
Expand Down Expand Up @@ -148,4 +158,9 @@ export class PonyfillReadableStream<T> implements ReadableStream<T> {
pipe(writable: Writable) {
return this.readable.pipe(writable);
}

on(event: string, listener: (...args: any[]) => void) {
this.readable.on(event, listener);
return this;
}
}
3 changes: 3 additions & 0 deletions packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,8 @@
},
"peerDependencies": {
"@types/node": "^18.0.6"
},
"devDependencies": {
"fastify": "4.10.2"
}
}
71 changes: 71 additions & 0 deletions packages/server/test/fastify.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { ReadableStream, Response, TextEncoder, fetch } from '@whatwg-node/fetch';
import { createServerAdapter } from '../src/createServerAdapter';
import fastify, { FastifyReply, FastifyRequest, FastifyInstance } from 'fastify';
import { AddressInfo } from 'net';

describe('Fastify', () => {
let fastifyServer: FastifyInstance;
afterEach(async () => {
await fastifyServer?.close();
});
it('should handle streams', async () => {
let cnt = 0;
const encoder = new TextEncoder();
const serverAdapter = createServerAdapter<{
req: FastifyRequest;
reply: FastifyReply;
}>(
() =>
new Response(
new ReadableStream({
async pull(controller) {
controller.enqueue(
encoder.encode(
JSON.stringify({
cnt,
}) + '\n'
)
);
cnt++;
await new Promise(resolve => setTimeout(resolve, 300));
if (cnt > 3) {
controller.close();
}
},
})
)
);
fastifyServer = fastify();
fastifyServer.route({
url: '/mypath',
method: ['GET', 'POST', 'OPTIONS'],
handler: async (req, reply) => {
const response = await serverAdapter.handleNodeRequest(req, {
req,
reply,
});
response.headers.forEach((value, key) => {
reply.header(key, value);
});

reply.status(response.status);

reply.send(response.body);

return reply;
},
});
await fastifyServer.listen({
port: 0,
});
const res = await fetch(`http://localhost:${(fastifyServer.server.address() as AddressInfo).port}/mypath`);
const body = await res.text();
expect(body).toMatchInlineSnapshot(`
"{"cnt":0}
{"cnt":1}
{"cnt":2}
{"cnt":3}
"
`);
});
});
45 changes: 26 additions & 19 deletions packages/server/test/request-listener.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,38 @@ describe('Request Listener', () => {
});

async function compareReadableStream(toBeCheckedStream: ReadableStream | null, expected: BodyInit | null) {
if (expected != null) {
expect(toBeCheckedStream).toBeTruthy();
const expectedStream = (
typeof expected === 'object' && Symbol.asyncIterator in expected ? expected : Readable.from(expected as any)
) as AsyncIterable<Uint8Array>;
const expectedIterator = expectedStream[Symbol.asyncIterator]();
for await (const toBeCheckedChunk of toBeCheckedStream as any as AsyncIterable<Uint8Array>) {
if (toBeCheckedChunk) {
const toBeCheckedValues = Buffer.from(toBeCheckedChunk).toString().trim().split('\n');
for (const toBeCheckedValue of toBeCheckedValues) {
const trimmedToBeCheckedValue = toBeCheckedValue.trim();
if (trimmedToBeCheckedValue) {
const expectedResult = await expectedIterator.next();
const expectedChunk = expectedResult.value;
if (expectedChunk) {
const expectedValue = Buffer.from(expectedResult.value).toString().trim();
if (expectedValue) {
expect(trimmedToBeCheckedValue).toBe(expectedValue);
}
const toBeCheckedValues = [];
const expectedValues = [];
if (toBeCheckedStream) {
for await (const chunk of Readable.from(toBeCheckedStream as any)) {
if (chunk) {
const chunkString = Buffer.from(chunk).toString('utf-8');
if (chunkString) {
const chunkParts = chunkString.trim().split('\n');
for (const chunkPart of chunkParts) {
toBeCheckedValues.push(chunkPart);
}
}
}
}
if (expected) {
for await (const chunk of Readable.from(expected as any)) {
if (chunk) {
const chunkString = Buffer.from(chunk).toString('utf-8');
if (chunkString) {
const chunkParts = chunkString.trim().split('\n');
for (const chunkPart of chunkParts) {
expectedValues.push(chunkPart);
}
}
}
}
const toBeCheckedValuesString = toBeCheckedValues.join('\n');
const expectedValuesString = expectedValues.join('\n');
expect(toBeCheckedValuesString).toBe(expectedValuesString);
}
}

}

async function runTestForRequestAndResponse({
Expand Down
Loading

0 comments on commit 745ee7a

Please sign in to comment.