Skip to content
This repository has been archived by the owner on Feb 26, 2024. It is now read-only.

fix: after provider.disconnect() is called, Ganache should stop serving requests #3433

Merged
merged 14 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions src/chains/ethereum/ethereum/src/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import {
MessageEvent,
VmConsoleLogEvent
} from "./provider-events";
import { ConsoleLogs } from "@ganache/console.log";

declare type RequestMethods = KnownKeys<EthereumApi>;

Expand Down Expand Up @@ -141,8 +140,8 @@ export class EthereumProvider
{
#options: EthereumInternalOptions;
#api: EthereumApi;
#executor: Executor;
#wallet: Wallet;
readonly #executor: Executor;
readonly #blockchain: Blockchain;

constructor(
Expand Down Expand Up @@ -419,10 +418,22 @@ export class EthereumProvider
}
};

/**
* Disconnect the provider instance. This will cause the underlying blockchain to be stopped, and any pending
* tasks to be rejected. Await the returned Promise to ensure that everything has been cleanly shut down
* before terminating the process.
jeffsmale90 marked this conversation as resolved.
Show resolved Hide resolved
* @return Promise<void> - indicating that the provider has been cleanly disconnected
jeffsmale90 marked this conversation as resolved.
Show resolved Hide resolved
*/
public disconnect = async () => {
const executor = this.#executor;
// We make a best effort to resolve any currently executing tasks, before rejecting pending tasks. This relies on
// this.#blockchain.stop() waiting to resolve until after all executing tasks have settled. Executor does not
// guarantee that no tasks are currently executing, before rejecting any remaining pending tasks.
executor.stop();
await this.#blockchain.stop();
executor.rejectPendingTasks();

this.emit("disconnect");
return;
};

//#region legacy
Expand Down
10 changes: 10 additions & 0 deletions src/chains/ethereum/ethereum/tests/provider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,16 @@ describe("provider", () => {
}
);
});

it("stops responding to RPC methods once disconnected", async () => {
const provider = await getProvider();
await provider.disconnect();

await assert.rejects(
provider.send("eth_getBlockByNumber", ["latest"]),
new Error("Cannot process request, Ganache is disconnected.")
);
});
});

describe("web3 compatibility", () => {
Expand Down
9 changes: 9 additions & 0 deletions src/packages/utils/src/utils/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ export class Executor {
this.#requestCoordinator = requestCoordinator;
}

public stop() {
davidmurdoch marked this conversation as resolved.
Show resolved Hide resolved
this.#requestCoordinator.stop();
}

public rejectPendingTasks() {
this.#requestCoordinator.rejectPendingTasks();
}


/**
* Executes the method with the given methodName on the API
* @param methodName - The name of the JSON-RPC method to execute.
Expand Down
48 changes: 44 additions & 4 deletions src/packages/utils/src/utils/request-coordinator.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { OverloadedParameters } from "../types";

const noop = () => {};
type RejectableTask = {
execute: (...args: any) => Promise<any>;
reject: (reason?: any) => void;
};

/**
* Responsible for managing global concurrent requests.
Expand All @@ -14,7 +18,7 @@ export class RequestCoordinator {
/**
* The pending requests. You can't do anything with this array.
*/
public readonly pending: ((...args: any) => Promise<any>)[] = [];
public readonly pending: RejectableTask[] = [];

/**
* The number of tasks currently being processed.
Expand Down Expand Up @@ -60,7 +64,8 @@ export class RequestCoordinator {
) {
const current = this.pending.shift();
this.runningTasks++;
current()
current
.execute()
// By now, we've resolved the fn's `value` by sending it to the parent scope.
// But over here, we're also waiting for this fn's _value_ to settle _itself_ (it might be a promise) before
// continuing through the `pending` queue. Because we wait for it again here, it could potentially throw here,
Expand All @@ -74,6 +79,41 @@ export class RequestCoordinator {
}
};

/**
* Stop processing tasks - calls to queue(), and resume() will reject with an error indicating that Ganache is
* disconnected. This is an irreversible action. If you wish to be able to resume processing, use pause() instead.
*
* Note: This will _not_ reject any pending tasks - see rejectPendingTasks()
*/
public stop() {
this.pause();

// ensure nothing can be requeued (although tasks can be added directly to this.pending but they will never be
// processed). We make this async to force a Promise return type.
this.queue = async () => {
throw new Error("Cannot process request, Ganache is disconnected.");
};

// ensure that processing cannot be resumed.
this.resume = () => {
throw new Error(
"Cannot resume processing requests, Ganache is disconnected."
);
};
}

/**
* Reject any pending tasks with an error indicating that Ganache is disconnected. Tasks are rejected in FIFO order.
*/
public rejectPendingTasks() {
let current: RejectableTask;
while(current = this.pending.shift()) {
current.reject(
new Error("Cannot process request, Ganache is disconnected.")
);
}
}

/**
* Insert a new function into the queue.
*/
Expand All @@ -84,7 +124,7 @@ export class RequestCoordinator {
) => {
return new Promise<{ value: ReturnType<typeof fn> }>((resolve, reject) => {
// const executor is `async` to force the return value into a Promise.
jeffsmale90 marked this conversation as resolved.
Show resolved Hide resolved
const executor = async () => {
const execute = async () => {
try {
const value = Reflect.apply(
fn,
Expand All @@ -97,7 +137,7 @@ export class RequestCoordinator {
reject(e);
}
};
this.pending.push(executor);
this.pending.push({ execute, reject });
this.#process();
});
};
Expand Down
74 changes: 74 additions & 0 deletions src/packages/utils/tests/request-coordinator.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import assert from "assert";
import { RequestCoordinator } from "../src/utils/request-coordinator";

describe("request-coordinator", () => {
const noop = () => undefined;
let coordinator: RequestCoordinator;

beforeEach("instantiate RequestCoordinator", () => {
coordinator = new RequestCoordinator(0);
});

describe("stop()", () => {
it("should pause processing", () => {
davidmurdoch marked this conversation as resolved.
Show resolved Hide resolved
coordinator.stop();

assert(coordinator.paused);
});

it("should not allow processing to be resumed", () => {
coordinator.stop();

assert.throws(
() => coordinator.resume(),
new Error("Cannot resume processing requests, Ganache is disconnected.")
);
});

it("should not allow new requests to be queued", async () => {
coordinator.stop();

await assert.rejects(
coordinator.queue(noop, this, []),
davidmurdoch marked this conversation as resolved.
Show resolved Hide resolved
new Error("Cannot process request, Ganache is disconnected.")
);
});
});

describe("rejectAllPendingRequests()", () => {
it("should reject pending requests in the order that they were received", async () => {
coordinator.pause();

let taskIndex = 0;
const pendingAssertions: Promise<any>[] = [];
for (let i = 0; i < 10; i++) {
const task = coordinator.queue(noop, this, []);
davidmurdoch marked this conversation as resolved.
Show resolved Hide resolved

let nextRejectionIndex = taskIndex;
pendingAssertions.push(task.catch(_ => {
assert.strictEqual(i, nextRejectionIndex, `Rejected in incorrect order, waiting on task at index ${nextRejectionIndex}, got ${i}.`);
}));

taskIndex++;

pendingAssertions.push(assert.rejects(task, new Error("Cannot process request, Ganache is disconnected.")));
}

coordinator.rejectPendingTasks();
await Promise.all(pendingAssertions);
});

it("should clear the pending tasks queue", () => {
coordinator.pause();

for (let i = 0; i < 10; i++) {
coordinator.queue(noop, this, []);
}

assert.equal(coordinator.pending.length, 10, "Incorrect pending queue length before calling rejectAllPendingRequests");

coordinator.rejectPendingTasks();
assert.equal(coordinator.pending.length, 0, "Incorrect pending queue length after calling rejectAllPendingRequests");
});
});
});