Skip to content

Commit

Permalink
fix: Optimize Transaction PITR (#2002)
Browse files Browse the repository at this point in the history
* Optimize Transaction PITR

* Comments and types

* Enforce read-only cannot write

* Pretty
  • Loading branch information
tom-andersen authored Feb 28, 2024
1 parent 8799032 commit 2f08612
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 140 deletions.
9 changes: 8 additions & 1 deletion dev/src/document-reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {isPermanentRpcError} from './util';
import {google} from '../protos/firestore_v1_proto_api';
import {logger} from './logger';
import {Firestore} from './index';
import {Timestamp} from './timestamp';
import {DocumentData} from '@google-cloud/firestore';
import api = google.firestore.v1;

Expand All @@ -36,6 +37,8 @@ export class DocumentReader<AppModelType, DbModelType extends DocumentData> {
fieldMask?: FieldPath[];
/** An optional transaction ID to use for this read. */
transactionId?: Uint8Array;
/** An optional readTime to use for this read. */
readTime?: Timestamp;

private outstandingDocuments = new Set<string>();
private retrievedDocuments = new Map<string, DocumentSnapshot>();
Expand Down Expand Up @@ -99,9 +102,13 @@ export class DocumentReader<AppModelType, DbModelType extends DocumentData> {

const request: api.IBatchGetDocumentsRequest = {
database: this.firestore.formattedName,
transaction: this.transactionId,
documents: Array.from(this.outstandingDocuments),
};
if (this.transactionId) {
request.transaction = this.transactionId;
} else if (this.readTime) {
request.readTime = this.readTime.toProto().timestampValue;
}

if (this.fieldMask) {
const fieldPaths = this.fieldMask.map(
Expand Down
19 changes: 2 additions & 17 deletions dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1189,10 +1189,6 @@ export class Firestore implements firestore.Firestore {

const tag = requestTag();

let maxAttempts = DEFAULT_MAX_TRANSACTION_ATTEMPTS;
let readOnly = false;
let readTime: Timestamp | undefined;

if (transactionOptions) {
validateObject('transactionOptions', transactionOptions);
validateBoolean(
Expand All @@ -1207,29 +1203,18 @@ export class Firestore implements firestore.Firestore {
transactionOptions.readTime,
{optional: true}
);

readOnly = true;
readTime = transactionOptions.readTime as Timestamp | undefined;
maxAttempts = 1;
} else {
validateInteger(
'transactionOptions.maxAttempts',
transactionOptions.maxAttempts,
{optional: true, minValue: 1}
);

maxAttempts =
transactionOptions.maxAttempts || DEFAULT_MAX_TRANSACTION_ATTEMPTS;
}
}

const transaction = new Transaction(this, tag);
const transaction = new Transaction(this, tag, transactionOptions);
return this.initializeIfNeeded(tag).then(() =>
transaction.runTransaction(updateFunction, {
maxAttempts,
readOnly,
readTime,
})
transaction.runTransaction(updateFunction)
);
}

Expand Down
39 changes: 25 additions & 14 deletions dev/src/reference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2343,10 +2343,11 @@ export class Query<
*
* @private
* @internal
* @param {bytes=} transactionId A transaction ID.
* @param transactionIdOrReadTime A transaction ID or the read time at which
* to execute the query.
*/
_get(
transactionId?: Uint8Array
transactionIdOrReadTime?: Uint8Array | Timestamp
): Promise<QuerySnapshot<AppModelType, DbModelType>> {
const docs: Array<QueryDocumentSnapshot<AppModelType, DbModelType>> = [];

Expand All @@ -2356,7 +2357,7 @@ export class Query<
return new Promise((resolve, reject) => {
let readTime: Timestamp;

this._stream(transactionId)
this._stream(transactionIdOrReadTime)
.on('error', err => {
reject(wrapError(err, stack));
})
Expand Down Expand Up @@ -2616,12 +2617,15 @@ export class Query<
/**
* Internal streaming method that accepts an optional transaction ID.
*
* @param transactionId A transaction ID.
* @param transactionIdOrReadTime A transaction ID or the read time at which
* to execute the query.
* @private
* @internal
* @returns A stream of document results.
*/
_stream(transactionId?: Uint8Array): NodeJS.ReadableStream {
_stream(
transactionIdOrReadTime?: Uint8Array | Timestamp
): NodeJS.ReadableStream {
const tag = requestTag();
const startTime = Date.now();

Expand Down Expand Up @@ -2678,7 +2682,7 @@ export class Query<
// `toProto()` might throw an exception. We rely on the behavior of an
// async function to convert this exception into the rejected Promise we
// catch below.
let request = this.toProto(transactionId);
let request = this.toProto(transactionIdOrReadTime);

let streamActive: Deferred<boolean>;
do {
Expand All @@ -2695,7 +2699,10 @@ export class Query<

// If a non-transactional query failed, attempt to restart.
// Transactional queries are retried via the transaction runner.
if (!transactionId && !this._isPermanentRpcError(err, 'runQuery')) {
if (
!transactionIdOrReadTime &&
!this._isPermanentRpcError(err, 'runQuery')
) {
logger(
'Query._stream',
tag,
Expand Down Expand Up @@ -3338,15 +3345,15 @@ export class AggregateQuery<
* @param {bytes=} transactionId A transaction ID.
*/
_get(
transactionId?: Uint8Array
transactionIdOrReadTime?: Uint8Array | Timestamp
): Promise<
AggregateQuerySnapshot<AggregateSpecType, AppModelType, DbModelType>
> {
// Capture the error stack to preserve stack tracing across async calls.
const stack = Error().stack!;

return new Promise((resolve, reject) => {
const stream = this._stream(transactionId);
const stream = this._stream(transactionIdOrReadTime);
stream.on('error', err => {
reject(wrapError(err, stack));
});
Expand All @@ -3368,7 +3375,7 @@ export class AggregateQuery<
* @param transactionId A transaction ID.
* @returns A stream of document results.
*/
_stream(transactionId?: Uint8Array): Readable {
_stream(transactionIdOrReadTime?: Uint8Array | Timestamp): Readable {
const tag = requestTag();
const firestore = this._query.firestore;

Expand All @@ -3391,7 +3398,7 @@ export class AggregateQuery<
// `toProto()` might throw an exception. We rely on the behavior of an
// async function to convert this exception into the rejected Promise we
// catch below.
const request = this.toProto(transactionId);
const request = this.toProto(transactionIdOrReadTime);

const backendStream = await firestore.requestStream(
'runAggregationQuery',
Expand Down Expand Up @@ -3463,7 +3470,9 @@ export class AggregateQuery<
* @internal
* @returns Serialized JSON for the query.
*/
toProto(transactionId?: Uint8Array): api.IRunAggregationQueryRequest {
toProto(
transactionIdOrReadTime?: Uint8Array | Timestamp
): api.IRunAggregationQueryRequest {
const queryProto = this._query.toProto();
const runQueryRequest: api.IRunAggregationQueryRequest = {
parent: queryProto.parent,
Expand All @@ -3484,8 +3493,10 @@ export class AggregateQuery<
},
};

if (transactionId instanceof Uint8Array) {
runQueryRequest.transaction = transactionId;
if (transactionIdOrReadTime instanceof Uint8Array) {
runQueryRequest.transaction = transactionIdOrReadTime;
} else if (transactionIdOrReadTime instanceof Timestamp) {
runQueryRequest.readTime = transactionIdOrReadTime;
}

return runQueryRequest;
Expand Down
Loading

0 comments on commit 2f08612

Please sign in to comment.