Skip to content

Commit

Permalink
Merge pull request #1306 from JS-AK/fix/transit/updated-remove-pendin…
Browse files Browse the repository at this point in the history
…g-request-by-node-id

fix: updated removePendingRequestByNodeID
  • Loading branch information
icebob authored Nov 6, 2024
2 parents 3375833 + 6e1fd71 commit e73192d
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions src/transit.js
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,8 @@ class Transit {
* @returns {Stream}
*/
_handleIncomingRequestStream(payload) {
let pass = this.pendingReqStreams.get(payload.id);
const reqStream = this.pendingReqStreams.get(payload.id);
let pass = reqStream ? reqStream.stream : undefined;
let isNew = false;

if (!payload.stream && !pass && !payload.seq) {
Expand Down Expand Up @@ -548,7 +549,7 @@ class Transit {
pass.$prevSeq = -1;
pass.$pool = new Map();

this.pendingReqStreams.set(payload.id, pass);
this.pendingReqStreams.set(payload.id, { sender: payload.sender, stream: pass });
}

if (payload.seq > pass.$prevSeq + 1) {
Expand Down Expand Up @@ -1028,6 +1029,19 @@ class Transit {
*/
removePendingRequestByNodeID(nodeID) {
this.logger.debug(`Remove pending requests of '${nodeID}' node.`);

// Close pending request streams of the node
this.pendingReqStreams.forEach(({ sender, stream }, id) => {
if (sender === nodeID) {
// Close the stream with error
if (!stream.destroyed) {
stream.destroy(new Error(`Request stream closed by ${nodeID}`));
}

this.pendingReqStreams.delete(id);
}
});

this.pendingRequests.forEach((req, id) => {
if (req.nodeID === nodeID) {
this.pendingRequests.delete(id);
Expand Down

0 comments on commit e73192d

Please sign in to comment.