Skip to content

Commit

Permalink
Fix to initialize only once and handle disconnection from network (#4921
Browse files Browse the repository at this point in the history
)

* initialize only once

* Fix onerror handler for WebSocket

* Change log level for unsubscribe message

* Add regions and fix typo

* Handle disconnection
  • Loading branch information
manueliglesias authored Feb 24, 2020
1 parent ef6a365 commit 7f60396
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 28 deletions.
2 changes: 1 addition & 1 deletion packages/core/src/Util/Reachability.native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export default class ReachabilityNavigator implements Reachability {
}, 2000);

return () => {
logger.warn('unsubscribing reachability');
logger.log('unsubscribing reachability');

clearInterval(id);
};
Expand Down
67 changes: 64 additions & 3 deletions packages/datastore/__tests__/DataStore.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
import 'fake-indexeddb/auto';
import * as uuidValidate from 'uuid-validate';
import { initSchema as initSchemaType } from '../src/datastore/datastore';
import {
initSchema as initSchemaType,
DataStore as DataStoreType,
} from '../src/datastore/datastore';
import {
ModelInit,
MutableModel,
PersistentModelConstructor,
Schema,
} from '../src/types';
import StorageType from '../src/storage/storage';
import Observable from 'zen-observable-ts';

let initSchema: typeof initSchemaType;
let DataStore: typeof DataStoreType;
let Storage: typeof StorageType;

beforeEach(() => {
jest.resetModules();

({ initSchema } = require('../src/datastore/datastore'));
jest.doMock('../src/storage/storage', () => {
const mock = jest.fn().mockImplementation(() => ({
runExclusive: jest.fn(),
query: jest.fn(),
observe: jest.fn(() => Observable.of()),
}));

(<any>mock).getNamespace = () => ({ models: {} });

return { default: mock };
});
({ initSchema, DataStore } = require('../src/datastore/datastore'));
});

describe('DataStore tests', () => {
Expand Down Expand Up @@ -66,6 +84,14 @@ describe('DataStore tests', () => {
uuidValidate(model.id.replace(/^(.{4})-(.{4})-(.{8})/, '$3-$2-$1'), 1)
).toBe(true);
});

test('initSchema is executed only once', () => {
initSchema(testSchema());

expect(() => {
initSchema(testSchema());
}).toThrow('The schema has already been initialized');
});
});

describe('Immutability', () => {
Expand Down Expand Up @@ -122,6 +148,39 @@ describe('DataStore tests', () => {
expect(model1.id).toBe(model2.id);
});
});

describe('Initialization', () => {
test('start is called only once', async () => {
Storage = require('../src/storage/storage').default;

const classes = initSchema(testSchema());

const { Model } = classes;

const promises = [
DataStore.query(Model),
DataStore.query(Model),
DataStore.query(Model),
DataStore.query(Model),
];

await Promise.all(promises);

expect(Storage).toHaveBeenCalledTimes(1);
});
});

test('It is initialized when observing (no query)', async () => {
Storage = require('../src/storage/storage').default;

const classes = initSchema(testSchema());

const { Model } = classes;

DataStore.observe(Model).subscribe(jest.fn());

expect(Storage).toHaveBeenCalledTimes(1);
});
});

//#region Test helpers
Expand All @@ -144,6 +203,7 @@ function testSchema(): Schema {
models: {
Model: {
name: 'Model',
pluralName: 'Models',
syncable: true,
fields: {
id: {
Expand All @@ -162,6 +222,7 @@ function testSchema(): Schema {
},
LocalModel: {
name: 'LocalModel',
pluralName: 'LocalModels',
syncable: false,
fields: {
id: {
Expand All @@ -179,7 +240,7 @@ function testSchema(): Schema {
},
},
},
version: 1,
version: '1',
};
}

Expand Down
52 changes: 36 additions & 16 deletions packages/datastore/src/datastore/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,6 @@ const observe: {
modelConstructor?: PersistentModelConstructor<T>,
idOrCriteria?: string | ProducerModelPredicate<T>
) => {
start();
let predicate: ModelPredicate<T>;

if (idOrCriteria !== undefined && modelConstructor === undefined) {
Expand Down Expand Up @@ -504,9 +503,24 @@ const observe: {
);
}

return storage
.observe(modelConstructor, predicate)
.filter(({ model }) => namespaceResolver(model) === USER);
return new Observable<SubscriptionMessage<any>>(observer => {
let handle: ZenObservable.Subscription;

(async () => {
await start();

handle = storage
.observe(modelConstructor, predicate)
.filter(({ model }) => namespaceResolver(model) === USER)
.subscribe(observer);
})();

return () => {
if (handle) {
handle.unsubscribe();
}
};
});
};

const query: {
Expand Down Expand Up @@ -683,21 +697,29 @@ async function checkSchemaVersion(
if (storedValue !== version) {
await s.clear(false);
}
} else {
await s.save(
modelInstanceCreator(Setting, {
key: SETTING_SCHEMA_VERSION,
value: JSON.stringify(version),
})
);
}

await s.save(
modelInstanceCreator(Setting, {
key: SETTING_SCHEMA_VERSION,
value: JSON.stringify(version),
})
);
});
}

let syncSubscription: ZenObservable.Subscription;

let initResolve: Function;
let initialized: Promise<void>;
async function start(): Promise<void> {
if (storage !== undefined) {
if (initialized === undefined) {
initialized = new Promise(res => {
initResolve = res;
});
} else {
await initialized;

return;
}

Expand All @@ -710,10 +732,6 @@ async function start(): Promise<void> {

await checkSchemaVersion(storage, schema.version);

if (sync !== undefined) {
return;
}

const { aws_appsync_graphqlEndpoint } = amplifyConfig;

if (aws_appsync_graphqlEndpoint) {
Expand All @@ -738,6 +756,8 @@ async function start(): Promise<void> {
},
});
}

initResolve();
}

async function clear() {
Expand Down
10 changes: 8 additions & 2 deletions packages/datastore/src/sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ export class SyncEngine {
new Reachability().networkMonitor().subscribe(async ({ online }) => {
this.online = online;
if (online) {
//#region GraphQL Subscriptions
const [
ctlSubsObservable,
dataSubsObservable,
Expand All @@ -151,6 +152,9 @@ export class SyncEngine {
}

logger.log('Realtime ready');
//#endregion

//#region Base & Sync queries
const currentTimeStamp = new Date().getTime();

const modelLastSync: Map<
Expand Down Expand Up @@ -181,8 +185,9 @@ export class SyncEngine {
observer.error(err);
return;
}
//#endregion

// process mutations
//#region process mutations
subscriptions.push(
this.mutationsProcessor
.start()
Expand All @@ -201,8 +206,9 @@ export class SyncEngine {
}
)
);
//#endregion

// TODO: extract to funciton
// TODO: extract to function
subscriptions.push(
dataSubsObservable.subscribe(
([_transformerMutationType, modelDefinition, item]) => {
Expand Down
14 changes: 8 additions & 6 deletions packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) {
clearTimeout(this.keepAliveTimeoutId);
this.keepAliveTimeoutId = setTimeout(
this._timeoutDisconnect.bind(this),
this._errorDisconnect.bind(this, 'Timeout disconnect'),
this.keepAliveTimeout
);
return;
Expand Down Expand Up @@ -492,16 +492,15 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
}
}

private _timeoutDisconnect() {
private _errorDisconnect(msg: string) {
this.subscriptionObserverMap.forEach(({ observer }) => {
if (!observer.closed) {
observer.error({
errors: [{ ...new GraphQLError(`Timeout disconnect`) }],
errors: [{ ...new GraphQLError(msg) }],
});
observer.complete();
}
});
this.subscriptionObserverMap = new Map();
this.subscriptionObserverMap.clear();
if (this.awsRealTimeSocket) {
this.awsRealTimeSocket.close();
}
Expand Down Expand Up @@ -661,7 +660,10 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
this.awsRealTimeSocket.onmessage = this._handleIncomingSubscriptionMessage.bind(
this
);
this.awsRealTimeSocket.onerror = logger.debug;
this.awsRealTimeSocket.onerror = err => {
logger.debug(err);
this._errorDisconnect('Connection closed');
};
res('Cool, connected to AWS AppSyncRealTime');
return;
}
Expand Down

0 comments on commit 7f60396

Please sign in to comment.