Skip to content

Commit

Permalink
wip: cleaning up
Browse files Browse the repository at this point in the history
* Related #10

[ci skip]
  • Loading branch information
tegefaulkes committed May 3, 2023
1 parent e6e96e1 commit 6b77915
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 69 deletions.
23 changes: 3 additions & 20 deletions src/QUICConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ class QUICConnection extends EventTarget {

// Immediately call this after construction
// if you want to pass the key log to something
// note that you must close the file descriptor afterwards
// note that you must close the file descriptor afterward
public setKeylog(path) {
this.conn.setKeylog(path);
}
Expand Down Expand Up @@ -358,7 +358,7 @@ class QUICConnection extends EventTarget {
* UDP -> Connection -> Stream
* This pushes data to the streams.
* When the connection is draining, we can still receive data.
* However no streams are allowed to read or write.
* However, no streams are allowed to read or write.
*/
@ready(new errors.ErrorQUICConnectionDestroyed(), false, ['destroying'])
public async recv(data: Uint8Array, remoteInfo: RemoteInfo) {
Expand All @@ -381,9 +381,8 @@ class QUICConnection extends EventTarget {
},
};
try {
this.logger.debug(`Did a recv ${data.byteLength}`);
this.conn.recv(data, recvInfo);
this.logger.info(`RECEIVED ${data.byteLength} of data`);
this.logger.debug(`RECEIVED ${data.byteLength} of data`);
} catch (e) {
this.logger.error(`recv error ${e.message}`);
// Depending on the exception, the `this.conn.recv`
Expand Down Expand Up @@ -418,7 +417,6 @@ class QUICConnection extends EventTarget {
this.resolveEstablishedP();
}
if (this.conn.isClosed()) {
this.logger.debug('recv CLOSED!!!!!');
if (this.resolveCloseP != null) this.resolveCloseP();
return;
}
Expand All @@ -428,7 +426,6 @@ class QUICConnection extends EventTarget {
) {
for (const streamId of this.conn.readable() as Iterable<StreamId>) {
let quicStream = this.streamMap.get(streamId);
this.logger.info(`Checking stream readable ${streamId}`);
if (quicStream == null) {
// The creation will set itself to the stream map
quicStream = await QUICStream.createQUICStream({
Expand All @@ -450,7 +447,6 @@ class QUICConnection extends EventTarget {
quicStream.read();
}
for (const streamId of this.conn.writable() as Iterable<StreamId>) {
this.logger.info(`Checking stream writable ${streamId}`);
let quicStream = this.streamMap.get(streamId);
if (quicStream == null) {
// The creation will set itself to the stream map
Expand All @@ -474,13 +470,9 @@ class QUICConnection extends EventTarget {
}
// Checking shortlist if streams have finished.
for (const [streamId, stream] of this.destroyingMap) {
this.logger.info(`Checking if stream ${streamId} has finished`);
if (stream.isFinished()) {
// If it has finished, it will trigger its own clean up.
// Remove the stream from the shortlist.
this.logger.info(
`Removing stream ${streamId} from destroying shortlist`,
);
this.destroyingMap.delete(streamId);
}
}
Expand Down Expand Up @@ -610,18 +602,11 @@ class QUICConnection extends EventTarget {
}
} finally {
this.logger.debug('SEND FINALLY');
this.logger.debug(
` ________ ED: ${this.conn.isInEarlyData()} TO: ${this.conn.isTimedOut()} EST: ${this.conn.isEstablished()}`,
);
this.checkTimeout();
this.logger.debug(
`state are draining: ${this.conn.isDraining()}, closed: ${this.conn.isClosed()}`,
);
if (
this[status] !== 'destroying' &&
(this.conn.isClosed() || this.conn.isDraining())
) {
this.logger.debug('CALLING DESTROY');
// Ignore errors and run in background
void this.destroy().catch(() => {});
} else if (
Expand Down Expand Up @@ -702,9 +687,7 @@ class QUICConnection extends EventTarget {
this.logger.debug(
`state are draining: ${this.conn.isDraining()}, closed: ${this.conn.isClosed()}`,
);
this.logger.debug('timeout SEND');
if (this[destroyed] === false) await this.send();
this.logger.debug('timeout SENDAFTER');
if (
this[status] !== 'destroying' &&
(this.conn.isClosed() || this.conn.isDraining())
Expand Down
72 changes: 23 additions & 49 deletions src/QUICStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class QUICStream
// with the `fin` set to true
// If this itself results in an error, we can continue
// But continue to do the below
this.logger.info('sending fin frame');
this.logger.debug('sending fin frame');
await this.streamSend(new Uint8Array(0), true).catch((e) => {
// Ignore send error if stream is already closed
if (e.message !== 'send') throw e;
Expand Down Expand Up @@ -210,15 +210,11 @@ class QUICStream
}
await this.connection.send();
// Await this.streamSend(new Uint8Array(0), true).catch(e => console.error(e));
this.logger.info('Waiting for FINISH');
this.logger.debug('waiting for underlying streams to finish');
this.destroyingMap.set(this.streamId, this);
this.isFinished();
QUICStream.logStreamState(this.streamId, this.conn, this.logger);
await Promise.all([
this.sendFinishedProm.p.then(() => this.logger.info('SEND HAS FINISHED')),
this.recvFinishedProm.p.then(() => this.logger.info('RECV HAS FINISHED')),
]);
this.logger.info('DONE waiting for FINISH');
await Promise.all([this.sendFinishedProm.p, this.recvFinishedProm.p]);
this.logger.debug('done waiting for underlying streams to finish');
this.streamMap.delete(this.streamId);
// Remove from the shortlist, just in case
this.destroyingMap.delete(this.streamId);
Expand All @@ -234,11 +230,9 @@ class QUICStream
@ready(new errors.ErrorQUICStreamDestroyed(), false, ['destroying'])
public read(): void {
// After reading it's possible the writer had a state change.
// void this.isFinished();
// void this.isRecvFinished();
this.isSendFinished();
if (this._recvPaused) {
// Do nothing if we are paused
this.logger.info('Skipping read, paused');
return;
}
void this.streamRecv();
Expand Down Expand Up @@ -274,10 +268,11 @@ class QUICStream
const recvFinished = this.conn.streamFinished(this.streamId);
if (recvFinished) {
// If it is finished then we resolve the promise and clean up
this.logger.info('recv has ended');
this.recvFinishedProm.resolveP();
if (!this._recvClosed) {
const err = Error('TMP ERROR');
const err = new errors.ErrorQUICStreamUnexpectedClose(
'Readable stream closed early with no reason',
);
this.readableController.error(err);
void this.closeRecv(true, err);
}
Expand All @@ -299,9 +294,12 @@ class QUICStream
// If the writable has ended, we need to close the writable.
// We need to do this in the background to keep this synchronous.
void this.processSendStreamError(e, 'send').then((reason) => {
this.logger.info('send has ended');
if (!this._sendClosed) {
const err = reason ?? Error('TMP ERROR');
const err =
reason ??
new errors.ErrorQUICStreamUnexpectedClose(
'Writable stream closed early with no reason',
);
this.writableController.error(err);
void this.closeSend(true, err);
}
Expand All @@ -327,10 +325,9 @@ class QUICStream
// or through an exception here where the stream reports an error
// Since we don't call this method unless it is readable
// This should never be reported... (this branch should be dead code)
this.logger.info('Stream reported: done');
return;
} else {
this.logger.info('Stream reported: error');
this.logger.debug('Stream reported: error');
// Signal receiving has ended
this.recvFinishedProm.resolveP();
const reason = await this.processSendStreamError(e, 'recv');
Expand All @@ -355,7 +352,7 @@ class QUICStream
// If fin is true, then that means, the stream is CLOSED
if (fin) {
// This will render `stream.cancel` a noop
this.logger.info('Stream reported: fin');
this.logger.debug('Stream reported: fin');
if (!this._recvClosed) this.readableController.close();
await this.closeRecv();
// Signal receiving has ended
Expand Down Expand Up @@ -443,7 +440,7 @@ class QUICStream
): Promise<void> {
// Further closes are NOPs
if (this._recvClosed) return;
this.logger.info(`Close Recv`);
this.logger.debug(`Close Recv`);
// Indicate that the receiving side is closed
this._recvClosed = true;
const code = isError ? await this.reasonToCode('send', reason) : 0;
Expand All @@ -461,7 +458,7 @@ class QUICStream
// and that both recv and send is closed
void this.destroy();
}
this.logger.info(`Closed Recv`);
this.logger.debug(`Closed Recv`);
}

/**
Expand All @@ -475,7 +472,7 @@ class QUICStream
): Promise<void> {
// Further closes are NOPs
if (this._sendClosed) return;
this.logger.info(`Close Send`);
this.logger.debug(`Close Send`);
// Indicate that the sending side is closed
this._sendClosed = true;
// If the QUIC stream is already closed
Expand All @@ -495,9 +492,13 @@ class QUICStream
// and that both recv and send is closed
void this.destroy();
}
this.logger.info(`Closed Send`);
this.logger.debug(`Closed Send`);
}

/**
* This will process any errors from a `streamSend` or `streamRecv`, extract the code and covert to a reason.
* Will return null if the error was not an expected stream ending error.
*/
protected async processSendStreamError(
e: Error,
type: 'recv' | 'send',
Expand All @@ -512,33 +513,6 @@ class QUICStream
}
return null;
}

// FIXME: Remove this before completing task.
public static logStreamState(
streamId: StreamId,
conn: Connection,
logger: Logger,
) {
let message = 'STATE:';
try {
conn.streamWritable(streamId, 0);
message += 'W';
} catch (e) {
message += '!W';
}
try {
if (conn.streamReadable(streamId)) message += 'R';
else message += '!R';
} catch (e) {
message += '!R';
}
if (conn.streamFinished(streamId)) {
message += 'F';
} else {
message += '!F';
}
logger.info(message);
}
}

export default QUICStream;
5 changes: 5 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class ErrorQUICStreamClose<T> extends ErrorQUICStream<T> {
static description = 'QUIC Stream force close';
}

class ErrorQUICStreamUnexpectedClose<T> extends ErrorQUICStream<T> {
static description = 'QUIC Stream closed early with no reason given';
}

class ErrorQUICUndefinedBehaviour<T> extends ErrorQUIC<T> {
static description = 'This should never happen';
}
Expand Down Expand Up @@ -118,5 +122,6 @@ export {
ErrorQUICStreamDestroyed,
ErrorQUICStreamLocked,
ErrorQUICStreamClose,
ErrorQUICStreamUnexpectedClose,
ErrorQUICUndefinedBehaviour,
};

0 comments on commit 6b77915

Please sign in to comment.