Skip to content

Commit

Permalink
refactor(typescript): noImplicityAny for snapshot.ts and publisher.ts (
Browse files Browse the repository at this point in the history
  • Loading branch information
praveenqlogic authored and callmehiphop committed Feb 8, 2019
1 parent d8c6a3d commit 11c040a
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 185 deletions.
39 changes: 18 additions & 21 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,22 @@ const PKG = require('../../package.json');
const v1 = require('./v1');

import {Snapshot} from './snapshot';
import {Subscription, SubscriptionMetadata, SubscriptionMetadataRaw} from './subscription';
import {Subscription, SubscriptionMetadataRaw} from './subscription';
import {Topic, PublishOptions} from './topic';
import {CallOptions} from 'google-gax';
import {Readable} from 'stream';
import {google} from '../proto/pubsub';
import {ServiceError} from 'grpc';
import {FlowControlOptions} from './lease-manager';
import {BatchPublishOptions} from './publisher';

const opts = {} as gax.GrpcClientOptions;
const {grpc} = new gax.GrpcClient(opts);

export type SeekCallback = RequestCallback<google.pubsub.v1.ISeekResponse>;

export interface GetSubscriptionMetadataCallback {
(err: ServiceError|null, res?: google.pubsub.v1.Subscription|null): void;
(err: ServiceError|null, res?: google.pubsub.v1.ISubscription|null): void;
}

export interface ExistsCallback {
Expand All @@ -56,28 +59,22 @@ export interface GetCallOptions extends CallOptions {
autoCreate?: boolean;
}

export interface PushConfig {
pushEndpoint: string;
attibutes?: Map<string, string>;
export interface Attributes {
[key: string]: string;
}


export interface SubscriptionCallOptions {
flowControl?:
{maxBytes?: number, maxMessages?: number, allowExcessMessages: boolean;};
flowControl?: FlowControlOptions;
maxConnections?: number;
topic?: Topic;
ackDeadline?: number;
autoPaginate?: boolean;
gaxOpts?: CallOptions;
batching?:
{maxBytes?: number, maxMessages?: number, maxMilliseconds?: number};
batching?: BatchPublishOptions;
}

export interface PublisherCallOptions {
batching?:
{maxBytes?: number, maxMessages?: number, maxMilliseconds?: number};
}


/**
* @callback CreateTopicCallback
Expand All @@ -87,15 +84,15 @@ export interface PublisherCallOptions {
*/
export interface CreateSnapshotCallback {
(err: Error|null, snapshot?: Snapshot|null,
apiResponse?: google.pubsub.v1.Snapshot): void;
apiResponse?: google.pubsub.v1.ISnapshot): void;
}

/**
* @typedef {array} CreateSnapshotResponse
* @property {Snapshot}.
* @property {object} 1 The full API response.
*/
export type CreateSnapshotResponse = [Snapshot, google.pubsub.v1.Snapshot];
export type CreateSnapshotResponse = [Snapshot, google.pubsub.v1.ISnapshot];

/**
* Project ID placeholder.
Expand All @@ -112,7 +109,7 @@ export type Metadata = any;
* @property {Topic} 0 The new {@link Topic}.
* @property {object} 1 The full API response.
*/
export type CreateTopicResponse = [Topic, google.pubsub.v1.Topic];
export type CreateTopicResponse = [Topic, google.pubsub.v1.ITopic];

/**
* @callback CreateTopicCallback
Expand All @@ -122,7 +119,7 @@ export type CreateTopicResponse = [Topic, google.pubsub.v1.Topic];
*/
export interface CreateTopicCallback {
(err?: Error|null, topic?: Topic|null,
apiResponse?: google.pubsub.v1.Topic): void;
apiResponse?: google.pubsub.v1.ITopic): void;
}

/**
Expand All @@ -133,7 +130,7 @@ export interface CreateTopicCallback {
*/
export interface CreateSubscriptionCallback {
(err?: Error|null, subscription?: Subscription|null,
apiResponse?: google.pubsub.v1.Subscription): void;
apiResponse?: google.pubsub.v1.ISubscription): void;
}

export type Client = 'PublisherClient'|'SubscriberClient';
Expand All @@ -160,11 +157,11 @@ export interface RequestCallback<TResponse> {
* @property {object} 1 The full API response.
*/
export type CreateSubscriptionResponse =
[Subscription, google.pubsub.v1.Subscription];
[Subscription, google.pubsub.v1.ISubscription];


export interface CreateSubscriptionOptions {
flowControl?: {maxBytes?: number; maxMessages?: number;};
flowControl?: FlowControlOptions;
gaxOpts?: CallOptions;
/**
* Duration in seconds.
Expand Down Expand Up @@ -406,7 +403,7 @@ export class PubSub {
name: subscription.name,
});

this.request(
this.request<google.pubsub.v1.ISubscription>(
{
client: 'SubscriberClient',
method: 'createSubscription',
Expand Down
53 changes: 33 additions & 20 deletions src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,26 @@
import {promisifyAll} from '@google-cloud/promisify';
import * as arrify from 'arrify';
import {CallOptions} from 'google-gax';
import {ServiceError} from 'grpc';
import {google} from '../proto/pubsub';

const each = require('async-each');
import * as extend from 'extend';
import * as is from 'is';
import {Topic} from './topic';
import {Attributes} from '.';
import {ServiceError} from 'grpc';

export interface PublishCallback {
(err: null|ServiceError, messageId: string): void;
interface Inventory {
callbacks: PublishCallback[];
queued: google.pubsub.v1.IPubsubMessage[];
bytes: number;
}

interface PublishApiResponse {
messageIds: string[];
export interface PublishCallback {
(err?: ServiceError|null, messageId?: string|null): void;
}


/**
* @typedef BatchPublishOptions
* @property {number} [maxBytes=1024^2 * 5] The maximum number of bytes to
Expand All @@ -41,7 +46,7 @@ interface PublishApiResponse {
* @property {number} [maxMilliseconds=100] The maximum duration to wait before
* sending a payload.
*/
interface BatchPublishOptions {
export interface BatchPublishOptions {
maxBytes?: number;
maxMessages?: number;
maxMilliseconds?: number;
Expand Down Expand Up @@ -80,7 +85,7 @@ export class Publisher {
// tslint:disable-next-line variable-name
Promise?: PromiseConstructor;
topic: Topic;
inventory_;
inventory_: Inventory;
settings!: PublishOptions;
timeoutHandle_?: NodeJS.Timer;
constructor(topic: Topic, options?: PublishOptions) {
Expand Down Expand Up @@ -157,17 +162,22 @@ export class Publisher {
* //-
* publisher.publish(data).then((messageId) => {});
*/
publish(data: Buffer, attributes?: object): Promise<string>;
publish(data: Buffer, attributes?: Attributes): Promise<string>;
publish(data: Buffer, callback: PublishCallback): void;
publish(data: Buffer, attributes: object, callback: PublishCallback): void;
publish(data: Buffer, attributes?, callback?): Promise<string>|void {
publish(data: Buffer, attributes: Attributes, callback: PublishCallback):
void;
publish(
data: Buffer, attributesOrCallback?: Attributes|PublishCallback,
callback?: PublishCallback): Promise<string>|void {
if (!(data instanceof Buffer)) {
throw new TypeError('Data must be in the form of a Buffer.');
}
if (is.fn(attributes)) {
callback = attributes;
attributes = {};
}

const attributes =
typeof attributesOrCallback === 'object' ? attributesOrCallback : {};
callback = typeof attributesOrCallback === 'function' ?
attributesOrCallback :
callback;
// Ensure the `attributes` object only has string values
for (const key of Object.keys(attributes)) {
const value = attributes[key];
Expand All @@ -185,10 +195,10 @@ export class Publisher {
this.publish_();
}
// add it to the queue!
this.queue_(data, attributes, callback);
this.queue_(data, attributes, callback!);
// next lets check if this message brings us to the message cap or if we
// hit the max byte limit
const hasMaxMessages = this.inventory_.queued.length === opts.maxMessages!;
const hasMaxMessages = this.inventory_.queued.length === opts.maxMessages;
if (this.inventory_.bytes >= opts.maxBytes! || hasMaxMessages) {
this.publish_();
return;
Expand Down Expand Up @@ -247,7 +257,7 @@ export class Publisher {
topic: this.topic.name,
messages,
};
this.topic.request(
this.topic.request<google.pubsub.v1.IPublishResponse>(
{
client: 'PublisherClient',
method: 'publish',
Expand All @@ -256,7 +266,7 @@ export class Publisher {
},
(err, resp) => {
const messageIds = arrify(resp && resp.messageIds);
each(callbacks, (callback, next) => {
each(callbacks, (callback: PublishCallback, next: Function) => {
const messageId = messageIds[callbacks.indexOf(callback)];
callback(err, messageId);
next();
Expand All @@ -272,13 +282,16 @@ export class Publisher {
* @param {object} attributes The message attributes.
* @param {function} callback The callback function.
*/
queue_(data, attrs, callback) {
queue_(data: Buffer, attrs: Attributes): Promise<string>;
queue_(data: Buffer, attrs: Attributes, callback: PublishCallback): void;
queue_(data: Buffer, attrs: Attributes, callback?: PublishCallback):
void|Promise<string> {
this.inventory_.queued.push({
data,
attributes: attrs,
});
this.inventory_.bytes += data.length;
this.inventory_.callbacks.push(callback);
this.inventory_.callbacks.push(callback!);
}
}

Expand Down
Loading

0 comments on commit 11c040a

Please sign in to comment.