Skip to content

Commit

Permalink
Manage FallbackProvider stalling without unref (#815).
Browse files Browse the repository at this point in the history
  • Loading branch information
ricmoo committed May 3, 2020
1 parent 20a3d9b commit 7b1a7c7
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 26 deletions.
83 changes: 58 additions & 25 deletions packages/providers/src.ts/fallback-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export interface FallbackProviderConfig {

// Timeout before also triggering the next provider; this does not stop
// this provider and if its result comes back before a quorum is reached
// it will be used it will be used.
// it will be incorporated into the vote
// - lower values will cause more network traffic but may result in a
// faster retult.
stallTimeout?: number;
Expand All @@ -111,19 +111,46 @@ export interface FallbackProviderConfig {
weight?: number;
};

// Returns a promise that delays for duration
function stall(duration: number): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(resolve, duration);
if (timer.unref) { timer.unref(); }
});
// A Staller is used to provide a delay to give a Provider a chance to response
// before asking the next Provider to try.
type Staller = {
wait: (func: () => void) => Promise<void>
getPromise: () => Promise<void>,
cancel: () => void
};

function stall(duration: number): Staller {
let cancel: () => void = null;

let timer: NodeJS.Timer = null;
let promise = <Promise<void>>(new Promise((resolve) => {
cancel = function() {
if (timer) {
clearTimeout(timer);
timer = null;
}
resolve();
}
timer = setTimeout(cancel, duration);
}));

const wait = (func: () => void) => {
promise = promise.then(func);
return promise;
}

function getPromise(): Promise<void> {
return promise;
}

return { cancel, getPromise, wait };
}

interface RunningConfig extends FallbackProviderConfig {
start?: number;
done?: boolean;
runner?: Promise<any>;
staller?: Promise<void>;
staller?: Staller;
result?: any;
error?: Error;
};
Expand Down Expand Up @@ -373,29 +400,29 @@ export class FallbackProvider extends BaseProvider {

// Sending transactions is special; always broadcast it to all backends
if (method === "sendTransaction") {
return Promise.all(this.providerConfigs.map((c) => {
const results: Array<string | Error> = await Promise.all(this.providerConfigs.map((c) => {
return c.provider.sendTransaction(params.signedTransaction).then((result) => {
return result.hash;
}, (error) => {
return error;
});
})).then((results) => {
// Any success is good enough (other errors are likely "already seen" errors
for (let i = 0; i < results.length; i++) {
const result = results[i];
if (typeof(result) === "string") { return result; }
}
}));

// They were all an error; pick the first error
return Promise.reject(results[0]);
});
// Any success is good enough (other errors are likely "already seen" errors
for (let i = 0; i < results.length; i++) {
const result = results[i];
if (typeof(result) === "string") { return result; }
}

// They were all an error; pick the first error
throw results[0];
}

const processFunc = getProcessFunc(this, method, params);

// Shuffle the providers and then sort them by their priority; we
// shallowCopy them since we will store the result in them too
const configs: Array<RunningConfig> = shuffled(this.providerConfigs.map((c) => shallowCopy(c)));
const configs: Array<RunningConfig> = shuffled(this.providerConfigs.map(shallowCopy));
configs.sort((a, b) => (a.priority - b.priority));

let i = 0;
Expand All @@ -417,7 +444,8 @@ export class FallbackProvider extends BaseProvider {
const rid = nextRid++;

config.start = now();
config.staller = stall(config.stallTimeout).then(() => { config.staller = null; });
config.staller = stall(config.stallTimeout);
config.staller.wait(() => { config.staller = null; });

config.runner = getRunner(config.provider, method, params).then((result) => {
config.done = true;
Expand Down Expand Up @@ -448,8 +476,6 @@ export class FallbackProvider extends BaseProvider {
}
});

//running.push(config);

if (this.listenerCount("debug")) {
this.emit("debug", {
action: "request",
Expand All @@ -468,7 +494,7 @@ export class FallbackProvider extends BaseProvider {
configs.forEach((c) => {
if (c.done || !c.runner) { return; }
waiting.push(c.runner);
if (c.staller) { waiting.push(c.staller); }
if (c.staller) { waiting.push(c.staller.getPromise()); }
});

if (waiting.length) { await Promise.race(waiting); }
Expand All @@ -478,15 +504,22 @@ export class FallbackProvider extends BaseProvider {
const results = configs.filter((c) => (c.done && c.error == null));
if (results.length >= this.quorum) {
const result = processFunc(results);
if (result !== undefined) { return result; }
if (!first) { await stall(100); }
if (result !== undefined) {
// Shut down any stallers
configs.filter(c => c.staller).forEach(c => c.staller.cancel());
return result;
}
if (!first) { await stall(100).getPromise(); }
first = false;
}

// All configs have run to completion; we will never get more data
if (configs.filter((c) => !c.done).length === 0) { break; }
}

// Shut down any stallers; shouldn't be any
configs.filter(c => c.staller).forEach(c => c.staller.cancel());

return logger.throwError("failed to meet quorum", Logger.errors.SERVER_ERROR, {
method: method,
params: params,
Expand Down
3 changes: 2 additions & 1 deletion packages/tests/src.ts/test-contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import { ethers } from "ethers";

import contractData from "./test-contract.json";

const provider = new ethers.providers.InfuraProvider('rinkeby');
//const provider = new ethers.providers.InfuraProvider('rinkeby');
const provider = ethers.getDefaultProvider("rinkeby");

const TIMEOUT_PERIOD = 120000;

Expand Down

0 comments on commit 7b1a7c7

Please sign in to comment.