Skip to content

Commit

Permalink
wip: debugging stream clean up
Browse files Browse the repository at this point in the history
* Related #10

[ci skip]
  • Loading branch information
tegefaulkes committed Apr 27, 2023
1 parent 26dd251 commit 76c144f
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 70 deletions.
8 changes: 6 additions & 2 deletions src/QUICClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,16 @@ class QUICClient extends EventTarget {
return this._connection;
}

public async destroy() {
public async destroy({
force = false,
}: {
force?: boolean;
} = {}) {
const address = utils.buildAddress(this.socket.host, this.socket.port);
this.logger.info(`Destroy ${this.constructor.name} on ${address}`);

// We may want to allow one to specialise this
await this._connection.destroy();
await this._connection.destroy({ force });
if (!this.isSocketShared) {
await this.socket.stop();
this.socket.removeEventListener('error', this.handleQUICSocketError);
Expand Down
8 changes: 5 additions & 3 deletions src/QUICConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,16 @@ class QUICConnection extends EventTarget {
appError = false,
errorCode = quiche.ConnectionErrorCode.NoError,
errorMessage = '',
force = false,
}: {
appError?: boolean;
errorCode?: ConnectionErrorCode;
errorMessage?: string;
force?: boolean
} = {}) {
this.logger.info(`Destroy ${this.constructor.name}`);
for (const stream of this.streamMap.values()) {
await stream.destroy();
await stream.destroy({ force });
}
try {
// If this is already closed, then `Done` will be thrown
Expand Down Expand Up @@ -342,7 +344,7 @@ class QUICConnection extends EventTarget {
this.logger.debug(`Did a recv ${data.byteLength}`);
this.conn.recv(data, recvInfo);
} catch (e) {
this.logger.error(e.message);
this.logger.error(`recv error ${e.message}`);
// Depending on the exception, the `this.conn.recv`
// may have automatically started closing the connection
if (e.message === 'TlsFail') {
Expand Down Expand Up @@ -529,7 +531,7 @@ class QUICConnection extends EventTarget {
sendInfo.to.host,
);
} catch (e) {
this.logger.error(e.message);
this.logger.error(`send error ${e.message}`);
this.dispatchEvent(
new events.QUICConnectionErrorEvent({ detail: e }),
);
Expand Down
9 changes: 6 additions & 3 deletions src/QUICServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,15 @@ class QUICServer extends EventTarget {
/**
* Stops the QUICServer
*/
public async stop() {
public async stop({
force = false,
}: {
force?: boolean;
} = {}) {
const address = utils.buildAddress(this.socket.host, this.socket.port);
this.logger.info(`Stop ${this.constructor.name} on ${address}`);

for (const connection of this.connectionMap.serverConnections.values()) {
await connection.destroy();
await connection.destroy({ force });
}
this.socket.deregisterServer(this);
if (!this.isSocketShared) {
Expand Down
4 changes: 3 additions & 1 deletion src/QUICSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ class QUICSocket extends EventTarget {
// Each send/recv/timeout may result in a destruction
if (!conn[destroyed]) {
// Ignore any errors, concurrent with destruction
await conn.send().catch((e) => this.logger.error(e.message));
await conn.send().catch((e) => {
this.logger.error(`not destroyed send ${e.message}`)
});
}
};

Expand Down
125 changes: 86 additions & 39 deletions src/QUICStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import { quiche } from './native';
import * as events from './events';
import * as utils from './utils';
import * as errors from './errors';
import { destroyed } from "@matrixai/async-init";

/**
* Events:
Expand Down Expand Up @@ -49,6 +48,8 @@ class QUICStream
protected _recvClosed: boolean = false;
protected _recvPaused: boolean = false;
protected resolveWritableP?: () => void;
public readonly finishedP: Promise<void>;
protected resolveFinishedP: () => void;

/**
* For `reasonToCode`, return 0 means "unknown reason"
Expand All @@ -62,7 +63,7 @@ class QUICStream
streamId,
connection,
reasonToCode = () => 0,
codeToReason = (code) => new Error(code.toString()),
codeToReason = (type, code) => new Error(`${type.toString()} ${code.toString()}`),
logger = new Logger(`${this.name} ${streamId}`),
}: {
streamId: StreamId;
Expand Down Expand Up @@ -105,6 +106,12 @@ class QUICStream
this.streamMap = connection.streamMap;
this.reasonToCode = reasonToCode;
this.codeToReason = codeToReason;
const {
p: finishedP,
resolveP: resolveFinishedP
} = utils.promise<void>();
this.finishedP = finishedP;
this.resolveFinishedP = resolveFinishedP;

// Try the BYOB later, it seems more performant

Expand Down Expand Up @@ -136,7 +143,11 @@ class QUICStream
// with the `fin` set to true
// If this itself results in an error, we can continue
// But continue to do the below
await this.streamSend(new Uint8Array(0), true);
await this.streamSend(new Uint8Array(0), true)
.catch(e => {
// Ignore send error if stream is already closed
if (e.message !== 'send') throw e;
});
await this.closeSend();
},
abort: async (reason?: any) => {
Expand Down Expand Up @@ -175,7 +186,6 @@ class QUICStream
force?: boolean;
} = {}) {
this.logger.info(`Destroy ${this.constructor.name}`);

// If the streams are locked, this means they are in-use
// or they have been composed with `pipeThrough` or `pipeTo`.
// At this point the management of their lifecycle is no longer
Expand Down Expand Up @@ -208,7 +218,12 @@ class QUICStream
await this.closeSend(true, e);
}
}
await this.streamSend(new Uint8Array(0), true);
this.streamMap.delete(this.streamId);
this.logger.info(`finished ${this.conn.streamFinished(this.streamId)}`);
// We need to wait for the connection to finish before fully destroying
this.logger.info('Waiting for finish')
await this.finishedP;
this.dispatchEvent(new events.QUICStreamDestroyEvent());
this.logger.info(`Destroyed ${this.constructor.name}`);
}
Expand All @@ -217,8 +232,13 @@ class QUICStream
* External push is converted to internal pull
* Internal system decides when to unblock
*/
@ready(new errors.ErrorQUICStreamDestroyed())
@ready(new errors.ErrorQUICStreamDestroyed(), false, ['destroying'])
public read(): void {
// TODO: check if the readable has errored out?
this.logger.info(`finished read ${this.conn.streamFinished(this.streamId)}`);
if (this.conn.streamFinished(this.streamId)) {
this.resolveFinishedP();
}
if (this._recvPaused) {
// Do nothing if we are paused
return;
Expand All @@ -230,27 +250,32 @@ class QUICStream
* Internal push is converted to an external pull
* External system decides when to unblock
*/
@ready(new errors.ErrorQUICStreamDestroyed())
@ready(new errors.ErrorQUICStreamDestroyed(), false, ['destroying'])
public write(): void {
this.logger.info(`finished write ${this.conn.streamFinished(this.streamId)}`);
if (this.conn.streamFinished(this.streamId)) {
this.resolveFinishedP();
}
try {
this.conn.streamWritable(this.streamId, 0)
} catch (e) {
this.logger.info(e.message);
// const reason = await this.processSendStreamError(e, 'send');
// If the writable has ended, we need to close the writable.
this.writableController.error(Error('TMP ERROR'))
void this.closeSend();
}
if (this.resolveWritableP != null) {
this.resolveWritableP();
}
}

protected async streamRecv(): Promise<void> {
console.log('recv stream finished', this.conn.streamFinished(this.streamId));
try {
this.conn.streamWritable(this.streamId, 0)
} catch (e) {
console.log(e.message);
}
const buf = Buffer.alloc(1024);
let recvLength: number, fin: boolean;
try {
[recvLength, fin] = this.conn.streamRecv(this.streamId, buf);
console.log(fin);
} catch (e) {
console.error(e);
if (e.message === 'Done') {
// When it is reported to be `Done`, it just means that there is no data to read
// it does not mean that the stream is closed or finished
Expand All @@ -266,9 +291,8 @@ class QUICStream
this.logger.debug('Stream reported: error');
const match = e.message.match(/StreamReset\((.+)\)/);
if (match != null) {
console.log('ayyyyy');
// If it is `StreamReset(u64)` error, then the peer has closed
// the stream and we are receiving the error code
// the stream, and we are receiving the error code
const code = parseInt(match[1]);
const reason = await this.codeToReason('recv', code);
this.readableController.error(reason);
Expand All @@ -281,19 +305,20 @@ class QUICStream
}
return;
}
} finally {
// Let's check if sending side has finished
await this.connection.send();
}
// It's possible to get a 0-length buffer
// In fact 0-length buffers are used to "open" a stream
if (recvLength > 0) {
this.readableController.enqueue(buf.subarray(0, recvLength));
} else {
console.log("zero buffer received!!!!")
if (!this._recvClosed) this.readableController.enqueue(buf.subarray(0, recvLength));
}
// If fin is true, then that means, the stream is CLOSED
if (fin) {
// This will render `stream.cancel` a noop
this.logger.info('Stream reported: fin');
this.readableController.close();
if (!this._recvClosed) this.readableController.close();
await this.closeRecv();
return;
}
Expand All @@ -307,7 +332,6 @@ class QUICStream
}

protected async streamSend(chunk: Uint8Array, fin = false): Promise<void> {
console.log('send stream finished', this.conn.streamFinished(this.streamId));
// This means that the number of written bytes returned can be lower
// than the length of the input buffer when the stream doesn’t have
// enough capacity for the operation to complete. The application
Expand All @@ -316,7 +340,6 @@ class QUICStream
try {
sentLength = this.conn.streamSend(this.streamId, chunk, fin);
} catch (e) {
console.error(e);
// If the Done is returned
// then no data was sent
// because the stream has no capacity
Expand All @@ -334,16 +357,14 @@ class QUICStream
// and indicate that there was an error now
// Actually it's sufficient to simply throw an exception I think
// That would essentially do it
const match = e.message.match(/StreamStopped\((.+)\)/);
if (match != null) {
const code = parseInt(match[1]);
const reason = await this.codeToReason('send', code);
const reason = await this.processSendStreamError(e, 'send');
if (reason != null) {
// We have to close the send side (but the stream is already closed)
await this.closeSend();
// Throws the exception back to the writer
throw reason;
} else {
// Some thing else broke
// Something else broke
// here we close the stream by sending a `STREAM_RESET`
// with the error, this doesn't involving calling `streamSend`
await this.closeSend(true, e);
Expand Down Expand Up @@ -374,20 +395,28 @@ class QUICStream
isError: boolean = false,
reason?: any,
): Promise<void> {
// Further closes are NOPs
if (this._recvClosed) return;
this.logger.info(`Close Recv`);
if (isError) {
// This will send a `STOP_SENDING` frame with the code
// When the other peer sends, they will get a `StreamStopped(u64)` exception
const code = await this.reasonToCode('recv', reason);
// Indicate that the receiving side is closed
this._recvClosed = true;
const code = isError ? await this.reasonToCode('send', reason) : 0;
// This will send a `STOP_SENDING` frame with the code
// When the other peer sends, they will get a `StreamStopped(u64)` exception
try {
this.conn.streamShutdown(this.streamId, quiche.Shutdown.Read, code);
} catch (e) {
// Ignore if already shutdown
if (e.message !== 'Done') throw e;
}
this._recvClosed = true;
await this.connection.send();
if (this[status] !== 'destroying' && this._recvClosed && this._sendClosed) {
// Only destroy if we are not already destroying
// and that both recv and send is closed
await this.destroy();
}
this.logger.info(`Closed Recv`);
this.logger.info(`finished ${this.conn.streamFinished(this.streamId)}`);
}

/**
Expand All @@ -399,23 +428,41 @@ class QUICStream
isError: boolean = false,
reason?: any,
): Promise<void> {
// Further closes are NOPs
if (this._sendClosed) return;
this.logger.info(`Close Send`);
// Indicate that the sending side is closed
this._sendClosed = true;
// If the QUIC stream is already closed
// there's nothign to do on the QUIC stream
if (isError) {
// This will send a `RESET_STREAM` frame with the code
// When the other peer receives, they will get a `StreamReset(u64)` exception
const code = await this.reasonToCode('send', reason);
// there's nothing to do on the QUIC stream
const code = isError ? await this.reasonToCode('send', reason) : 0;
// This will send a `RESET_STREAM` frame with the code
// When the other peer receives, they will get a `StreamReset(u64)` exception
try {
this.conn.streamShutdown(this.streamId, quiche.Shutdown.Write, code);
} catch (e) {
// Ignore if already shutdown
if (e.message !== 'Done') throw e;
}
// Indicate that the sending side is closed
this._sendClosed = true;
await this.connection.send();
if (this[status] !== 'destroying' && this._recvClosed && this._sendClosed) {
// Only destroy if we are not already destroying
// and that both recv and send is closed
await this.destroy();
}
this.logger.info(`Closed Send`);
this.logger.info(`finished ${this.conn.streamFinished(this.streamId)}`);
}

protected async processSendStreamError(e: Error, type: 'recv' | 'send' ): Promise<any | null> {
const match =
e.message.match(/StreamStopped\((.+)\)/)
?? e.message.match(/InvalidStreamState\((.+)\)/)
if (match != null) {
const code = parseInt(match[1]);
return await this.codeToReason(type, code);
}
return null;
}
}

Expand Down
Loading

0 comments on commit 76c144f

Please sign in to comment.