diff --git a/packages/core/src/Util/Reachability.native.ts b/packages/core/src/Util/Reachability.native.ts index 3350698e83..391013f256 100644 --- a/packages/core/src/Util/Reachability.native.ts +++ b/packages/core/src/Util/Reachability.native.ts @@ -39,7 +39,7 @@ export default class ReachabilityNavigator implements Reachability { }, 2000); return () => { - logger.warn('unsubscribing reachability'); + logger.log('unsubscribing reachability'); clearInterval(id); }; diff --git a/packages/datastore/__tests__/DataStore.ts b/packages/datastore/__tests__/DataStore.ts index 08a7a7f9df..767bf96c9f 100644 --- a/packages/datastore/__tests__/DataStore.ts +++ b/packages/datastore/__tests__/DataStore.ts @@ -1,19 +1,37 @@ import 'fake-indexeddb/auto'; import * as uuidValidate from 'uuid-validate'; -import { initSchema as initSchemaType } from '../src/datastore/datastore'; +import { + initSchema as initSchemaType, + DataStore as DataStoreType, +} from '../src/datastore/datastore'; import { ModelInit, MutableModel, PersistentModelConstructor, Schema, } from '../src/types'; +import StorageType from '../src/storage/storage'; +import Observable from 'zen-observable-ts'; let initSchema: typeof initSchemaType; +let DataStore: typeof DataStoreType; +let Storage: typeof StorageType; beforeEach(() => { jest.resetModules(); - ({ initSchema } = require('../src/datastore/datastore')); + jest.doMock('../src/storage/storage', () => { + const mock = jest.fn().mockImplementation(() => ({ + runExclusive: jest.fn(), + query: jest.fn(), + observe: jest.fn(() => Observable.of()), + })); + + (mock).getNamespace = () => ({ models: {} }); + + return { default: mock }; + }); + ({ initSchema, DataStore } = require('../src/datastore/datastore')); }); describe('DataStore tests', () => { @@ -66,6 +84,14 @@ describe('DataStore tests', () => { uuidValidate(model.id.replace(/^(.{4})-(.{4})-(.{8})/, '$3-$2-$1'), 1) ).toBe(true); }); + + test('initSchema is executed only once', () => { + initSchema(testSchema()); + + expect(() => { + initSchema(testSchema()); + }).toThrow('The schema has already been initialized'); + }); }); describe('Immutability', () => { @@ -122,6 +148,39 @@ describe('DataStore tests', () => { expect(model1.id).toBe(model2.id); }); }); + + describe('Initialization', () => { + test('start is called only once', async () => { + Storage = require('../src/storage/storage').default; + + const classes = initSchema(testSchema()); + + const { Model } = classes; + + const promises = [ + DataStore.query(Model), + DataStore.query(Model), + DataStore.query(Model), + DataStore.query(Model), + ]; + + await Promise.all(promises); + + expect(Storage).toHaveBeenCalledTimes(1); + }); + }); + + test('It is initialized when observing (no query)', async () => { + Storage = require('../src/storage/storage').default; + + const classes = initSchema(testSchema()); + + const { Model } = classes; + + DataStore.observe(Model).subscribe(jest.fn()); + + expect(Storage).toHaveBeenCalledTimes(1); + }); }); //#region Test helpers @@ -144,6 +203,7 @@ function testSchema(): Schema { models: { Model: { name: 'Model', + pluralName: 'Models', syncable: true, fields: { id: { @@ -162,6 +222,7 @@ function testSchema(): Schema { }, LocalModel: { name: 'LocalModel', + pluralName: 'LocalModels', syncable: false, fields: { id: { @@ -179,7 +240,7 @@ function testSchema(): Schema { }, }, }, - version: 1, + version: '1', }; } diff --git a/packages/datastore/src/datastore/datastore.ts b/packages/datastore/src/datastore/datastore.ts index ecfb405ba6..8c24accb48 100644 --- a/packages/datastore/src/datastore/datastore.ts +++ b/packages/datastore/src/datastore/datastore.ts @@ -474,7 +474,6 @@ const observe: { modelConstructor?: PersistentModelConstructor, idOrCriteria?: string | ProducerModelPredicate ) => { - start(); let predicate: ModelPredicate; if (idOrCriteria !== undefined && modelConstructor === undefined) { @@ -504,9 +503,24 @@ const observe: { ); } - return storage - .observe(modelConstructor, predicate) - .filter(({ model }) => namespaceResolver(model) === USER); + return new Observable>(observer => { + let handle: ZenObservable.Subscription; + + (async () => { + await start(); + + handle = storage + .observe(modelConstructor, predicate) + .filter(({ model }) => namespaceResolver(model) === USER) + .subscribe(observer); + })(); + + return () => { + if (handle) { + handle.unsubscribe(); + } + }; + }); }; const query: { @@ -683,21 +697,29 @@ async function checkSchemaVersion( if (storedValue !== version) { await s.clear(false); } + } else { + await s.save( + modelInstanceCreator(Setting, { + key: SETTING_SCHEMA_VERSION, + value: JSON.stringify(version), + }) + ); } - - await s.save( - modelInstanceCreator(Setting, { - key: SETTING_SCHEMA_VERSION, - value: JSON.stringify(version), - }) - ); }); } let syncSubscription: ZenObservable.Subscription; +let initResolve: Function; +let initialized: Promise; async function start(): Promise { - if (storage !== undefined) { + if (initialized === undefined) { + initialized = new Promise(res => { + initResolve = res; + }); + } else { + await initialized; + return; } @@ -710,10 +732,6 @@ async function start(): Promise { await checkSchemaVersion(storage, schema.version); - if (sync !== undefined) { - return; - } - const { aws_appsync_graphqlEndpoint } = amplifyConfig; if (aws_appsync_graphqlEndpoint) { @@ -738,6 +756,8 @@ async function start(): Promise { }, }); } + + initResolve(); } async function clear() { diff --git a/packages/datastore/src/sync/index.ts b/packages/datastore/src/sync/index.ts index b83ca81ce4..526fced2a6 100644 --- a/packages/datastore/src/sync/index.ts +++ b/packages/datastore/src/sync/index.ts @@ -137,6 +137,7 @@ export class SyncEngine { new Reachability().networkMonitor().subscribe(async ({ online }) => { this.online = online; if (online) { + //#region GraphQL Subscriptions const [ ctlSubsObservable, dataSubsObservable, @@ -151,6 +152,9 @@ export class SyncEngine { } logger.log('Realtime ready'); + //#endregion + + //#region Base & Sync queries const currentTimeStamp = new Date().getTime(); const modelLastSync: Map< @@ -181,8 +185,9 @@ export class SyncEngine { observer.error(err); return; } + //#endregion - // process mutations + //#region process mutations subscriptions.push( this.mutationsProcessor .start() @@ -201,8 +206,9 @@ export class SyncEngine { } ) ); + //#endregion - // TODO: extract to funciton + // TODO: extract to function subscriptions.push( dataSubsObservable.subscribe( ([_transformerMutationType, modelDefinition, item]) => { diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider.ts index aba6a2908a..da14820740 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider.ts @@ -456,7 +456,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) { clearTimeout(this.keepAliveTimeoutId); this.keepAliveTimeoutId = setTimeout( - this._timeoutDisconnect.bind(this), + this._errorDisconnect.bind(this, 'Timeout disconnect'), this.keepAliveTimeout ); return; @@ -492,16 +492,15 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { } } - private _timeoutDisconnect() { + private _errorDisconnect(msg: string) { this.subscriptionObserverMap.forEach(({ observer }) => { if (!observer.closed) { observer.error({ - errors: [{ ...new GraphQLError(`Timeout disconnect`) }], + errors: [{ ...new GraphQLError(msg) }], }); - observer.complete(); } }); - this.subscriptionObserverMap = new Map(); + this.subscriptionObserverMap.clear(); if (this.awsRealTimeSocket) { this.awsRealTimeSocket.close(); } @@ -661,7 +660,10 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { this.awsRealTimeSocket.onmessage = this._handleIncomingSubscriptionMessage.bind( this ); - this.awsRealTimeSocket.onerror = logger.debug; + this.awsRealTimeSocket.onerror = err => { + logger.debug(err); + this._errorDisconnect('Connection closed'); + }; res('Cool, connected to AWS AppSyncRealTime'); return; }