Skip to content

Commit

Permalink
all tests passing on basic alignment of polling intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
Poincare committed Jul 15, 2016
1 parent 669f808 commit 4ad080f
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 68 deletions.
70 changes: 35 additions & 35 deletions src/QueryManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,23 +95,29 @@ export class ObservableQuery extends Observable<ApolloQueryResult> {
public stopPolling: () => void;
public startPolling: (p: number) => void;
public options: WatchQueryOptions;
public queryManager: QueryManager;
private queryId: string;
private scheduler: QueryScheduler;
private queryManager: QueryManager;

constructor({
queryManager,
scheduler,
options,
shouldSubscribe = true,
}: {
queryManager: QueryManager,
scheduler: QueryScheduler,
options: WatchQueryOptions,
shouldSubscribe?: boolean,
}) {

const queryManager = scheduler.queryManager;
const queryId = queryManager.generateQueryId();
const isPollingQuery = !!options.pollInterval;

const subscriberFunction = (observer: Observer<ApolloQueryResult>) => {
const retQuerySubscription = {
unsubscribe: () => {
if (isPollingQuery) {
scheduler.stopPollingQuery(queryId);
}
queryManager.stopQuery(queryId);
},
};
Expand All @@ -121,15 +127,23 @@ export class ObservableQuery extends Observable<ApolloQueryResult> {
queryManager.addQuerySubscription(queryId, retQuerySubscription);
}

if (isPollingQuery) {
this.scheduler.startPollingQuery(
options,
queryId
);
}
queryManager.startQuery(
queryId,
options,
queryManager.queryListenerForObserver(options, observer)
);

return retQuerySubscription;
};
super(subscriberFunction);
this.options = options;
this.scheduler = scheduler;
this.queryManager = queryManager;
this.queryId = queryId;

Expand All @@ -147,21 +161,22 @@ export class ObservableQuery extends Observable<ApolloQueryResult> {
};

this.stopPolling = () => {
if (this.queryManager.pollingTimers[this.queryId]) {
clearInterval(this.queryManager.pollingTimers[this.queryId]);
this.queryManager.stopQuery(this.queryId);
if (isPollingQuery) {
this.scheduler.stopPollingQuery(this.queryId);
}
};

this.startPolling = (pollInterval) => {
if (this.options.noFetch) {
throw new Error('noFetch option should not use query polling.');
}
this.queryManager.pollingTimers[this.queryId] = setInterval(() => {
const pollingOptions = assign({}, this.options) as WatchQueryOptions;
// subsequent fetches from polling always reqeust new data
pollingOptions.forceFetch = true;
this.queryManager.fetchQuery(this.queryId, pollingOptions);
}, pollInterval);

if (isPollingQuery) {
this.scheduler.stopPollingQuery(this.queryId);
}
options.pollInterval = pollInterval;
this.scheduler.startPollingQuery(this.options, this.queryId, false);
};
}

Expand All @@ -187,6 +202,8 @@ export type QueryListener = (queryStoreValue: QueryStoreValue) => void;

export class QueryManager {
public pollingTimers: {[queryId: string]: NodeJS.Timer | any}; //oddity in Typescript
public scheduler: QueryScheduler;

private networkInterface: NetworkInterface;
private store: ApolloStore;
private reduxRootKey: string;
Expand All @@ -195,7 +212,6 @@ export class QueryManager {

private idCounter = 0;

private scheduler: QueryScheduler;
private batcher: QueryBatcher;
private batchInterval: number;

Expand Down Expand Up @@ -410,7 +426,7 @@ export class QueryManager {
getQueryDefinition(options.query);

let observableQuery = new ObservableQuery({
queryManager: this,
scheduler: this.scheduler,
options: options,
shouldSubscribe: shouldSubscribe,
});
Expand Down Expand Up @@ -542,18 +558,11 @@ export class QueryManager {

public startQuery(queryId: string, options: WatchQueryOptions, listener: QueryListener) {
this.queryListeners[queryId] = listener;
this.fetchQuery(queryId, options);

if (options.pollInterval) {
if (options.noFetch) {
throw new Error('noFetch option should not use query polling.');
}
this.pollingTimers[queryId] = setInterval(() => {
const pollingOptions = assign({}, options) as WatchQueryOptions;
// subsequent fetches from polling always reqeust new data
pollingOptions.forceFetch = true;
this.fetchQuery(queryId, pollingOptions);
}, options.pollInterval);
// If the pollInterval is present, the scheduler has already taken care of firing the first
// fetch so we don't have to worry about it here.
if (!options.pollInterval) {
this.fetchQuery(queryId, options);
}

return queryId;
Expand All @@ -563,16 +572,7 @@ export class QueryManager {
// XXX in the future if we should cancel the request
// so that it never tries to return data
delete this.queryListeners[queryId];

// if we have a polling interval running, stop it
if (this.pollingTimers[queryId]) {
clearInterval(this.pollingTimers[queryId]);
}

this.store.dispatch({
type: 'APOLLO_QUERY_STOP',
queryId,
});
this.stopQueryInStore(queryId);
}

private fetchQueryOverInterface(
Expand Down
57 changes: 31 additions & 26 deletions src/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class QueryScheduler {

// We use this instance to actually fire queries (i.e. send them to the batching
// mechanism).
private queryManager: QueryManager;
public queryManager: QueryManager;

// Map going from polling interval widths to polling timers.
private pollingTimers: { [interval: number]: NodeJS.Timer | any }; // oddity in Typescript
Expand Down Expand Up @@ -65,49 +65,43 @@ export class QueryScheduler {
});
}

// The firstFetch option is used to denote whether we want to fire off a
// "first fetch" before we start polling. If startPollingQuery() is being called
// from an existing ObservableQuery, the first fetch has already been fired which
// means that firstFetch should be false.
public startPollingQuery(
options: WatchQueryOptions,
listener: QueryListener,
queryId?: string
queryId?: string,
firstFetch: boolean = true,
listener?: QueryListener
): string {
if (!queryId) {
queryId = this.queryManager.generateQueryId();
}

this.registeredQueries[queryId] = options;

// Fire an initial fetch before we start the polling query
this.fetchQuery(queryId, options);
this.queryManager.addQueryListener(queryId, listener);
if (firstFetch) {
this.fetchQuery(queryId, options);
}

if (listener) {
this.queryManager.addQueryListener(queryId, listener);
}
this.addQueryOnInterval(queryId, options);

return queryId;
}

public stopPollingQuery(queryId: string) {
// TODO should cancel in flight request so that there is no
// further data returned.
this.queryManager.removeQueryListener(queryId);

// Remove the query options from one of the registered queries.
// The polling function will then take care of not firing it anymore.
delete this.registeredQueries[queryId];

// Fire a APOLLO_STOP_QUERY state change to the underlying store.
this.queryManager.stopQueryInStore(queryId);
}

// Tell the QueryScheduler to schedule the queries fired by a polling query.
public registerPollingQuery(options: WatchQueryOptions): ObservableQuery {
if (!options.pollInterval) {
throw new Error('Tried to register a non-polling query with the scheduler.');
}

return new ObservableQuery({
queryManager: this.queryManager,
options: options,
});
}

// Fires the all of the queries on a particular interval. Called on a setInterval.
public fireQueriesOnInterval(interval: number) {
public fetchQueriesOnInterval(interval: number) {
this.intervalQueries[interval] = this.intervalQueries[interval].filter((queryId) => {
// If queryOptions can't be found from registeredQueries, it means that this queryId
// is no longer registered and should be removed from the list of queries firing on this
Expand Down Expand Up @@ -148,11 +142,22 @@ export class QueryScheduler {
this.intervalQueries[interval] = [queryId];
// set up the timer for the function that will handle this interval
this.pollingTimers[interval] = setInterval(() => {
this.fireQueriesOnInterval(interval);
this.fetchQueriesOnInterval(interval);
}, interval);
}
}

// Used only for unit testing.
public registerPollingQuery(queryOptions: WatchQueryOptions): ObservableQuery {
if (!queryOptions.pollInterval) {
throw new Error('Attempted to register a non-polling query with the scheduler.');
}
return new ObservableQuery({
scheduler: this,
options: queryOptions,
});
}

private addInFlight(queryId: string, options: WatchQueryOptions) {
this.inFlightQueries[queryId] = options;
}
Expand Down
6 changes: 3 additions & 3 deletions test/QueryManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2367,8 +2367,8 @@ describe('QueryManager', () => {
options: {
query: query,
},
queryManager: queryManager,
} as ObservableQuery;
scheduler: queryManager.scheduler,
} as any as ObservableQuery;

const queryId = 'super-fake-id';
queryManager.addObservableQuery(queryId, mockObservableQuery);
Expand Down Expand Up @@ -2400,7 +2400,7 @@ describe('QueryManager', () => {
},
options,
queryManager: queryManager,
} as ObservableQuery;
} as any as ObservableQuery;

const queryId = 'super-fake-id';
queryManager.addObservableQuery(queryId, mockObservableQuery);
Expand Down
8 changes: 4 additions & 4 deletions test/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ describe('QueryScheduler', () => {
queryManager,
});
let timesFired = 0;
const queryId = scheduler.startPollingQuery(queryOptions, (queryStoreValue) => {
const queryId = scheduler.startPollingQuery(queryOptions, 'fake-id', true, (queryStoreValue) => {
timesFired += 1;
});
setTimeout(() => {
Expand Down Expand Up @@ -115,7 +115,7 @@ describe('QueryScheduler', () => {
queryManager,
});
let timesFired = 0;
let queryId = scheduler.startPollingQuery(queryOptions, (queryStoreValue) => {
let queryId = scheduler.startPollingQuery(queryOptions, 'fake-id', true, (queryStoreValue) => {
timesFired += 1;
scheduler.stopPollingQuery(queryId);
});
Expand Down Expand Up @@ -252,7 +252,7 @@ describe('QueryScheduler', () => {
});
});

it.skip('should keep track of in flight queries', (done) => {
it('should keep track of in flight queries', (done) => {
const query = gql`
query {
fortuneCookie
Expand Down Expand Up @@ -298,7 +298,7 @@ describe('QueryScheduler', () => {
}, 100);
});

it.skip('should not fire another query if one with the same id is in flight', (done) => {
it('should not fire another query if one with the same id is in flight', (done) => {
const query = gql`
query {
fortuneCookie
Expand Down

0 comments on commit 4ad080f

Please sign in to comment.