Skip to content

Commit

Permalink
fix: detect subscription properties and warn for exactly-once (#1561)
Browse files Browse the repository at this point in the history
* feat: detect subscription properties and warn for exactly-once

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: check for changes to subscription options on every message

* chore: fix copyright in new file

* fix: update copyright header again ("once more with feeling" edition)

* fix: still yet more copyright header

* fix: timing in CI is tricky, replace setTimeout

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
feywind and gcf-owl-bot[bot] authored May 30, 2022
1 parent 179b617 commit 98cf540
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 2 deletions.
41 changes: 39 additions & 2 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ import {Subscription} from './subscription';
import {defaultOptions} from './default-options';
import {SubscriberClient} from './v1';
import {createSpan} from './opentelemetry-tracing';
import {Throttler} from './util';

export type PullResponse = google.pubsub.v1.IPullResponse;
export type PullResponse = google.pubsub.v1.IStreamingPullResponse;
export type SubscriptionProperties =
google.pubsub.v1.StreamingPullResponse.ISubscriptionProperties;

/**
* Date object with nanosecond precision. Supports all standard Date arguments
Expand Down Expand Up @@ -253,6 +256,10 @@ export class Subscriber extends EventEmitter {
private _options!: SubscriberOptions;
private _stream!: MessageStream;
private _subscription: Subscription;
private _errorLog: Throttler;

subscriptionProperties?: SubscriptionProperties;

constructor(subscription: Subscription, options = {}) {
super();

Expand All @@ -266,8 +273,32 @@ export class Subscriber extends EventEmitter {
this._histogram = new Histogram({min: 10, max: 600});
this._latencies = new Histogram();
this._subscription = subscription;
this._errorLog = new Throttler(60 * 1000);

this.setOptions(options);
}

/**
* Sets our subscription properties from the first incoming message.
*
* @param {SubscriptionProperties} subscriptionProperties The new properties.
* @private
*/
setSubscriptionProperties(subscriptionProperties: SubscriptionProperties) {
this.subscriptionProperties = subscriptionProperties;

// If this is an exactly-once subscription, warn the user that they may have difficulty.
if (this.subscriptionProperties.exactlyOnceDeliveryEnabled) {
this._errorLog.doMaybe(() =>
console.error(
'WARNING: Exactly-once subscriptions are not yet supported ' +
'by the Node client library. This feature will be added ' +
'in a future release.'
)
);
}
}

/**
* The 99th percentile of request latencies.
*
Expand Down Expand Up @@ -517,7 +548,13 @@ export class Subscriber extends EventEmitter {
*
* @private
*/
private _onData({receivedMessages}: PullResponse): void {
private _onData(response: PullResponse): void {
// Grab the subscription properties for exactly once and ordering flags.
if (response.subscriptionProperties) {
this.setSubscriptionProperties(response.subscriptionProperties);
}

const {receivedMessages} = response;
for (const data of receivedMessages!) {
const message = new Message(this, data);

Expand Down
32 changes: 32 additions & 0 deletions src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,35 @@ export function promisifySome<T>(
}

export function noop() {}

/**
* Provides a very simple throttling capability for tasks like error logs.
* This ensures that no task is actually completed unless N millis have passed
* since the last one.
*
* @private
*/
export class Throttler {
minMillis: number;
lastTime?: number;

constructor(minMillis: number) {
this.minMillis = minMillis;
}

/**
* Performs the task requested, if enough time has passed since the
* last successful call.
*/
doMaybe(task: Function) {
const now = Date.now();
const doTask =
!this.lastTime ||
(this.lastTime && now - this.lastTime >= this.minMillis);

if (doTask) {
task();
this.lastTime = now;
}
}
}
46 changes: 46 additions & 0 deletions test/util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import {describe, it} from 'mocha';
import {Throttler} from '../src/util';
import * as assert from 'assert';

describe('utils', () => {
describe('Throttler', () => {
it('does not allow too many calls through at once', () => {
const throttler = new Throttler(300);
let totalCalls = '';

// This one should succeed.
throttler.doMaybe(() => {
totalCalls += 'FIRST';
});

// This one should fail.
throttler.doMaybe(() => {
totalCalls += 'SECOND';
});

// Simulate time passing.
throttler.lastTime! -= 1000;

// This one should succeed.
throttler.doMaybe(() => {
totalCalls += 'THIRD';
});

assert.strictEqual(totalCalls, 'FIRSTTHIRD');
});
});
});

0 comments on commit 98cf540

Please sign in to comment.