Skip to content

Commit

Permalink
patch: ensure that the client pool consistently uses grpc clients aft…
Browse files Browse the repository at this point in the history
…er transitioning from rest (#1807)

* Adding docs for the preferRest setting.

* Cleaned up the logic around the grpc transition when using preferRest, so that the client pool consistently uses grpc after the transition, rather than occasionally reverting back to REST.
  • Loading branch information
MarkDuckworth authored Jan 3, 2023
1 parent a5b680d commit 3068361
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 7 deletions.
7 changes: 7 additions & 0 deletions dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,13 @@ export class Firestore implements firestore.Firestore {
* to `true`, these properties are skipped and not written to Firestore. If
* set `false` or omitted, the SDK throws an exception when it encounters
* properties of type `undefined`.
* @param {boolean=} settings.preferRest Whether to force the use of HTTP/1.1 REST
* transport until a method that requires gRPC is called. When a method requires gRPC,
* this Firestore client will load dependent gRPC libraries and then use gRPC transport
* for communication from that point forward. Currently the only operation
* that requires gRPC is creating a snapshot listener with the method
* `DocumentReference<T>.onSnapshot()`, `CollectionReference<T>.onSnapshot()`, or
* `Query<T>.onSnapshot()`.
*/
constructor(settings?: firestore.Settings) {
const libraryHeader = {
Expand Down
23 changes: 22 additions & 1 deletion dev/src/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ export class ClientPool<T> {
let selectedClient: T | null = null;
let selectedClientRequestCount = -1;

// Transition to grpc when we see the first operation that requires grpc.
this.grpcEnabled = this.grpcEnabled || requiresGrpc;

// Require a grpc client for this operation if we have transitioned to grpc.
requiresGrpc = requiresGrpc || this.grpcEnabled;

for (const [client, metadata] of this.activeClients) {
// Use the "most-full" client that can still accommodate the request
// in order to maximize the number of idle clients as operations start to
Expand All @@ -101,7 +107,7 @@ export class ClientPool<T> {
!this.failedClients.has(client) &&
metadata.activeRequestCount > selectedClientRequestCount &&
metadata.activeRequestCount < this.concurrentOperationLimit &&
(!requiresGrpc || metadata.grpcEnabled)
(metadata.grpcEnabled || !requiresGrpc)
) {
selectedClient = client;
selectedClientRequestCount = metadata.activeRequestCount;
Expand Down Expand Up @@ -223,6 +229,21 @@ export class ClientPool<T> {
return activeOperationCount;
}

/**
* The currently active clients.
*
* @return The currently active clients.
* @private
* @internal
*/
// Visible for testing.
get _activeClients(): Map<
T,
{activeRequestCount: number; grpcEnabled: boolean}
> {
return this.activeClients;
}

/**
* Runs the provided operation in this pool. This function may create an
* additional client if all existing clients already operate at the concurrent
Expand Down
178 changes: 178 additions & 0 deletions dev/test/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,26 @@ function deferredPromises(count: number): Array<Deferred<void>> {
return deferred;
}

function assertOpCount<T>(
pool: ClientPool<T>,
grpcClientOpCount: number,
restClientOpCount: number
): void {
let actualGrpcClientOpCount = 0;
let actualRestClientOpCount = 0;

pool._activeClients.forEach(clientConfig => {
if (clientConfig.grpcEnabled) {
actualGrpcClientOpCount += clientConfig.activeRequestCount;
} else {
actualRestClientOpCount += clientConfig.activeRequestCount;
}
});

expect(actualGrpcClientOpCount).to.equal(grpcClientOpCount);
expect(actualRestClientOpCount).to.equal(restClientOpCount);
}

describe('Client pool', () => {
it('creates new instances as needed', () => {
const clientPool = new ClientPool<{}>(3, 0, () => {
Expand Down Expand Up @@ -133,6 +153,7 @@ describe('Client pool', () => {
() => operationPromises[1].promise
);
expect(clientPool.size).to.equal(2);
assertOpCount(clientPool, 1, 1);

operationPromises[0].resolve();
operationPromises[1].resolve();
Expand All @@ -156,9 +177,166 @@ describe('Client pool', () => {
() => operationPromises[1].promise
);
expect(clientPool.size).to.equal(1);
assertOpCount(clientPool, 2, 0);

operationPromises[0].resolve();
operationPromises[1].resolve();
});

it('does not re-use rest instance after beginning the transition to grpc', async () => {
const clientPool = new ClientPool<{}>(10, 1, () => {
return {};
});

const operationPromises = deferredPromises(3);

void clientPool.run(
REQUEST_TAG,
USE_REST,
() => operationPromises[0].promise
);
void clientPool.run(
REQUEST_TAG,
USE_GRPC,
() => operationPromises[1].promise
);
void clientPool.run(
REQUEST_TAG,
USE_REST,
() => operationPromises[2].promise
);

expect(clientPool.size).to.equal(2);
assertOpCount(clientPool, 2, 1);

operationPromises[0].resolve();
operationPromises[1].resolve();
operationPromises[2].resolve();
});

it('does not re-use rest instance after beginning the transition to grpc - rest operation resolved', async () => {
const clientPool = new ClientPool<{}>(10, 1, () => {
return {};
});

const operationPromises = deferredPromises(3);

const restOperation = clientPool.run(
REQUEST_TAG,
USE_REST,
() => operationPromises[0].promise
);
void clientPool.run(
REQUEST_TAG,
USE_GRPC,
() => operationPromises[1].promise
);

// resolve rest operation
operationPromises[0].resolve();
await restOperation;
expect(clientPool.opCount).to.equal(1);

// Run new rest operation
void clientPool.run(
REQUEST_TAG,
USE_REST,
() => operationPromises[2].promise
);

// Assert client pool status
expect(clientPool.size).to.equal(1);
assertOpCount(clientPool, 2, 0);

operationPromises[1].resolve();
operationPromises[2].resolve();
});

it('does not re-use rest instance after beginning the transition to grpc - grpc client full', async () => {
const operationLimit = 10;
const clientPool = new ClientPool<{}>(operationLimit, 1, () => {
return {};
});

const restPromises = deferredPromises(operationLimit);
const grpcPromises = deferredPromises(1);

// First operation use GRPC
void clientPool.run(REQUEST_TAG, USE_GRPC, () => grpcPromises[0].promise);

// Next X operations can use rest, this will fill the first
// client and create a new client.
// The new client should use GRPC since we have transitioned.
restPromises.forEach(restPromise => {
void clientPool.run(REQUEST_TAG, USE_REST, () => restPromise.promise);
});
expect(clientPool.opCount).to.equal(11);
expect(clientPool.size).to.equal(2);
assertOpCount(clientPool, 11, 0);

grpcPromises.forEach(grpcPromise => grpcPromise.resolve());
restPromises.forEach(restPromise => restPromise.resolve());
});

it('does not re-use rest instance after beginning the transition to grpc - multiple rest clients', async () => {
const operationLimit = 10;
const clientPool = new ClientPool<{}>(operationLimit, 1, () => {
return {};
});

const restPromises = deferredPromises(15);
const grpcPromises = deferredPromises(5);

// First 15 operations can use rest, this will fill the first
// client and create a new client.
restPromises.forEach(restPromise => {
void clientPool.run(REQUEST_TAG, USE_REST, () => restPromise.promise);
});
expect(clientPool.opCount).to.equal(15);
expect(clientPool.size).to.equal(2);
assertOpCount(clientPool, 0, 15);

// Next 5 operations alternate between gRPC and REST, this will create a new client using gRPC
let transport = USE_GRPC;
grpcPromises.forEach(grpcPromise => {
void clientPool.run(REQUEST_TAG, transport, () => grpcPromise.promise);
transport = !transport;
});
expect(clientPool.opCount).to.equal(20);
expect(clientPool.size).to.equal(3);
assertOpCount(clientPool, 5, 15);

grpcPromises.forEach(grpcPromise => grpcPromise.resolve());
restPromises.forEach(restPromise => restPromise.resolve());
});

it('does not re-use rest instance after beginning the transition to grpc - grpc client RST_STREAM', async () => {
const clientPool = new ClientPool<{}>(10, 1, () => {
return {};
});

const operationPromises = deferredPromises(1);

const grpcOperation = clientPool.run(REQUEST_TAG, USE_GRPC, () =>
Promise.reject(
new GoogleError('13 INTERNAL: Received RST_STREAM with code 2')
)
);

await grpcOperation.catch(e => e);

// Run new rest operation
void clientPool.run(
REQUEST_TAG,
USE_REST,
() => operationPromises[0].promise
);

// Assert client pool status
expect(clientPool.size).to.equal(1);
assertOpCount(clientPool, 1, 0);

operationPromises[0].resolve();
});

it('bin packs operations', async () => {
Expand Down
12 changes: 6 additions & 6 deletions types/firestore.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,12 @@ declare namespace FirebaseFirestore {
ignoreUndefinedProperties?: boolean;

/**
* Use HTTP for requests that can be served over HTTP and JSON. This reduces
* the amount of networking code that is loaded to serve requests within
* Firestore.
*
* This setting does not apply to `onSnapshot` APIs as they cannot be served
* over native HTTP.
* Whether to force the use of HTTP/1.1 REST transport until a method that requires gRPC
* is called. When a method requires gRPC, this Firestore client will load dependent gRPC
* libraries and then use gRPC transport for communication from that point forward.
* Currently the only operation that requires gRPC is creating a snapshot listener with
* the method `DocumentReference<T>.onSnapshot()`, `CollectionReference<T>.onSnapshot()`,
* or `Query<T>.onSnapshot()`.
*/
preferRest?: boolean;

Expand Down

0 comments on commit 3068361

Please sign in to comment.