Skip to content

Commit

Permalink
wip: almost
Browse files Browse the repository at this point in the history
* Related #10

[ci skip]
  • Loading branch information
tegefaulkes committed May 1, 2023
1 parent 39606d7 commit 6d28740
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 12 deletions.
6 changes: 6 additions & 0 deletions src/QUICConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -632,17 +632,23 @@ class QUICConnection extends EventTarget {
logger: this.logger.getChild(`${QUICStream.name} ${streamId!}-${Math.floor(Math.random() * 100)}`),
});
const writer = quicStream.writable.getWriter();
const reader = quicStream.readable.getReader();

try {
// This will now wait until the 0-length buffer is actually sent
// TODO: 0 len messages are not sent, So we need to ignore first message.
// But its a stream, messages can be coalesced, so ignore first byte? IDK...
// We also want to do a ping/pong message to see if stream creation is rejected.
await writer.write(new Uint8Array(1));
// Ignore the first response message.
// await reader.read();
writer.releaseLock();
reader.releaseLock();
} catch (e) {
console.error(e);
// You must release the lock even before you run destroy
writer.releaseLock();
reader.releaseLock();
// If the write failed, it will only close the sending side
// But in this case, it means we actually failed to open the stream entirely
// In which case we destroy the stream
Expand Down
35 changes: 25 additions & 10 deletions src/QUICStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class QUICStream
// This resolves when `streamRecv` results in a `StreamReset(u64)` or a fin flag indicating receiving has ended
protected recvFinishedProm = utils.promise<void>();
protected destroyingMap: Map<StreamId, QUICStream>;
protected firstRecv = true;
protected firstSend = true;

/**
* For `reasonToCode`, return 0 means "unknown reason"
Expand Down Expand Up @@ -132,7 +134,10 @@ class QUICStream
cancel: async (reason) => {
await this.closeRecv(true, reason);
},
});
},
{
highWaterMark: 2,
});

this.writable = new WritableStream({
start: (controller) => {
Expand All @@ -147,6 +152,7 @@ class QUICStream
// with the `fin` set to true
// If this itself results in an error, we can continue
// But continue to do the below
this.logger.info('sending fin frame');
await this.streamSend(new Uint8Array(0), true).catch((e) => {
// Ignore send error if stream is already closed
if (e.message !== 'send') throw e;
Expand Down Expand Up @@ -241,7 +247,8 @@ class QUICStream
@ready(new errors.ErrorQUICStreamDestroyed(), false, ['destroying'])
public read(): void {
// After reading it's possible the writer had a state change.
void this.isFinished();
// void this.isFinished();
// void this.isRecvFinished();
if (this._recvPaused) {
// Do nothing if we are paused
this.logger.info('Skipping read, paused');
Expand All @@ -256,8 +263,8 @@ class QUICStream
*/
@ready(new errors.ErrorQUICStreamDestroyed(), false, ['destroying'])
public write(): void {
// Checking if streams have ended
void this.isFinished();
// Checking if writable has ended
void this.isSendFinished();
if (this.resolveWritableP != null) {
this.resolveWritableP();
}
Expand Down Expand Up @@ -318,10 +325,12 @@ class QUICStream
protected async streamRecv(): Promise<void> {
const buf = Buffer.alloc(1024);
let recvLength: number, fin: boolean;
console.trace('asd')
this.logger.info('trying receiving');
try {
[recvLength, fin] = this.conn.streamRecv(this.streamId, buf);
} catch (e) {
this.logger.error(e.message)
if (e.message === 'Done') {
// When it is reported to be `Done`, it just means that there is no data to read
// it does not mean that the stream is closed or finished
Expand Down Expand Up @@ -355,10 +364,14 @@ class QUICStream
// Let's check if sending side has finished
await this.connection.send();
}
// It's possible to get a 0-length buffer
// In fact 0-length buffers are used to "open" a stream
if (recvLength > 0) {
if (!this._recvClosed) {
if (!this._recvClosed) {
// First byte is used to open stream, we need to ignore this
if (this.firstRecv) {
this.firstRecv = false;
console.log('FIRST', recvLength);
// If message is only one byte then we skip
if (recvLength > 1) this.readableController.enqueue(buf.subarray(1, recvLength));
} else {
this.readableController.enqueue(buf.subarray(0, recvLength));
}
}
Expand All @@ -373,6 +386,7 @@ class QUICStream
return;
}
// Now we pause receiving if the queue is full
console.log('asd', this.readableController.desiredSize);
if (
this.readableController.desiredSize != null &&
this.readableController.desiredSize <= 0
Expand Down Expand Up @@ -509,8 +523,9 @@ class QUICStream
type: 'recv' | 'send',
): Promise<any | null> {
const match =
e.message.match(/StreamStopped\((.+)\)/) ??
e.message.match(/InvalidStreamState\((.+)\)/);
e.message.match(/StreamStopped\((.+)\)/)
?? e.message.match(/InvalidStreamState\((.+)\)/)
?? e.message.match(/StreamReset\((.+)\)/);
if (match != null) {
const code = parseInt(match[1]);
return await this.codeToReason(type, code);
Expand Down
102 changes: 100 additions & 2 deletions tests/QUICStream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import * as testsUtils from './utils';
import { sleep } from './utils';

describe(QUICStream.name, () => {
const logger = new Logger(`${QUICStream.name} Test`, LogLevel.WARN, [
const logger = new Logger(`${QUICStream.name} Test`, LogLevel.INFO, [
new StreamHandler(
formatting.format`${formatting.level}:${formatting.keys}:${formatting.msg}`,
),
Expand Down Expand Up @@ -288,7 +288,105 @@ describe(QUICStream.name, () => {
},
{ numRuns: 10 },
);
test.todo('should send data over stream');
testProp.only(
'should send data over stream',
[
tlsConfigWithCaArb,
fc.array(
fc.array(fc.uint8Array( { minLength: 1})),
{
minLength: 1,
maxLength: 1,
},
).noShrink()],
async (tlsConfigProm, streamsData) => {
const connectionEventProm = promise<events.QUICServerConnectionEvent>();
const tlsConfig = await tlsConfigProm;
let server: QUICServer | null = null;
let client: QUICClient | null = null;
try {
server = new QUICServer({
crypto,
logger: logger.getChild(QUICServer.name),
config: {
tlsConfig: tlsConfig.tlsConfig,
verifyPeer: false,
logKeys: './tmp/key.log',
},
});
server.addEventListener(
'connection',
(e: events.QUICServerConnectionEvent) =>
connectionEventProm.resolveP(e),
);
await server.start({
host: '127.0.0.1' as Host,
port: 55555 as Port,
});
client = await QUICClient.createQUICClient({
host: '::ffff:127.0.0.1' as Host,
port: server.port,
localHost: '::' as Host,
crypto,
logger: logger.getChild(QUICClient.name),
config: {
verifyPeer: false,
},
});
const conn = (await connectionEventProm.p).detail;
// Do the test
const activeServerStreams: Array<Promise<void>> = [];
conn.addEventListener('stream', (streamEvent: events.QUICConnectionStreamEvent) => {
const stream = streamEvent.detail;
const streamProm = stream.readable.pipeTo(stream.writable)
activeServerStreams.push(streamProm);
});

// Let's make a new streams.
const activeClientStreams: Array<Promise<void>> = [];
for (const data of streamsData) {
activeClientStreams.push((async () => {
const stream = await client.connection.streamNew();
console.log('a');
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();
console.log('a');
// do write and read messages here.
for (const message of data) {
console.log(message)
await writer.write(message);
console.log('a');
const readMessage = await reader.read();
expect(readMessage.done).toBeFalse();
expect(readMessage.value).toStrictEqual(message);
}
console.log('B');
await writer.close();
const value = await reader.read();
expect(value.done).toBeTrue();
console.log('B');
})());
}
console.log('waiting');
await Promise.all([
Promise.all(activeClientStreams),
// Promise.all(activeServerStreams),
])
console.log('test done');
} catch(e) {
console.log('ERROR');
console.error(e);
throw e;
} finally {
console.log('finally');
await client?.destroy({ force: true });
console.log('a');
await server?.stop({ force: true });
console.log('b');
}
},
{ numRuns: 1 },
);
test.todo('should propagate errors over stream');
test.todo('should clean up streams when connection ends');
test.todo('should handle rejected stream creation');
Expand Down

0 comments on commit 6d28740

Please sign in to comment.