Skip to content

Commit

Permalink
Merge pull request #41 from resgateio/bugfix/gh-39-resource-not-found…
Browse files Browse the repository at this point in the history
…-in-cache-v2

Bugfix/gh 39 resource not found in cache v2
  • Loading branch information
jirenius authored May 14, 2020
2 parents 7474b29 + 1e86567 commit d69ec6e
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 26 deletions.
13 changes: 8 additions & 5 deletions src/class/CacheItem.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@ class CacheItem {
this.item = null;
this.direct = 0;
this.indirect = 0;
this.subscribed = false;
this.subscribed = 0; // Count of direct subscriptions towards Resgate
this.promise = null;
}

setSubscribed(isSubscribed) {
this.subscribed = isSubscribed;
if (!isSubscribed && this.unsubTimeout) {
/**
* Adds or subtracts from the subscribed counter.
* @param {number} dir Value to add. If 0, the subscribed counter will be set to 0.
*/
addSubscribed(dir) {
this.subscribed += dir ? dir : -this.subscribed;
if (!this.subscribed && this.unsubTimeout) {
clearTimeout(this.unsubTimeout);
this.unsubTimeout = null;
}
return this;
}

setPromise(promise) {
Expand Down
55 changes: 34 additions & 21 deletions src/class/ResClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ const supportedProtocol = "1.2.0";
* @returns {WebSocket} WebSocket instance implementing the [WebSocket API]{@link https://developer.mozilla.org/en-US/docs/Web/API/WebSocket}.
*/

/**
* OnConnect callback function.
* @callback ResClient~onConnectCallback
* @param {ResClient} ResClient instance
* @returns {?Promise} Promise for the onConnect handlers completion. Must always resolve.
*/

/**
* ResClient represents a client connection to a RES API.
*/
Expand Down Expand Up @@ -233,7 +240,7 @@ class ResClient {

/**
* Sets the onConnect callback.
* @param {?function} onConnect On connect callback called prior resolving the connect promise and subscribing to stale resources. May return a promise.
* @param {?ResClient~onConnectCallback} onConnect On connect callback called prior resolving the connect promise and subscribing to stale resources. May return a promise.
* @returns {this}
*/
setOnConnect(onConnect) {
Expand Down Expand Up @@ -359,7 +366,7 @@ class ResClient {
.then(result => {
this._cacheResources(result);
let ci = this.cache[result.rid];
ci.setSubscribed(true);
ci.addSubscribed(1);
return ci.item;
});
}
Expand Down Expand Up @@ -486,7 +493,7 @@ class ResClient {
if (result.rid) {
this._cacheResources(result);
let ci = this.cache[result.rid];
ci.setSubscribed(true);
ci.addSubscribed(1);
return ci.item;
}
return result.payload;
Expand Down Expand Up @@ -557,7 +564,7 @@ class ResClient {
break;

case 'unsubscribe':
handled = this._handleUnsubscribeEvent(cacheItem, event);
handled = this._handleUnsubscribeEvent(cacheItem);
break;
}

Expand Down Expand Up @@ -667,10 +674,10 @@ class ResClient {
return true;
}

_handleUnsubscribeEvent(cacheItem, event) {
cacheItem.setSubscribed(false);
_handleUnsubscribeEvent(cacheItem) {
cacheItem.addSubscribed(0);
this._tryDelete(cacheItem);
this.eventBus.emit(cacheItem.item, this.namespace + '.resource.' + cacheItem.rid + '.' + event, { item: cacheItem.item });
this.eventBus.emit(cacheItem.item, this.namespace + '.resource.' + cacheItem.rid + '.unsubscribe', { item: cacheItem.item });
return true;
}

Expand Down Expand Up @@ -700,14 +707,16 @@ class ResClient {

_subscribe(ci, throwError) {
let rid = ci.rid;
ci.setSubscribed(true);
ci.addSubscribed(1);
this._removeStale(rid);
return this._send('subscribe', rid)
.then(response => this._cacheResources(response))
.catch(err => {
this._handleFailedSubscribe(ci);
if (throwError) {
this._handleFailedSubscribe(ci);
throw err;
} else {
this._handleUnsubscribeEvent(ci);
}
});
}
Expand Down Expand Up @@ -751,7 +760,7 @@ class ResClient {
}
throw err;
})
.then(() => this.onConnect ? this.onConnect() : null)
.then(() => this.onConnect ? this.onConnect(this) : null)
.then(() => {
this._subscribeToAllStale();
this._emit('connect', e);
Expand Down Expand Up @@ -797,7 +806,7 @@ class ResClient {
for (let rid in this.cache) {
let ci = this.cache[rid];
if (ci.subscribed) {
ci.setSubscribed(false);
ci.addSubscribed(0);
this._addStale(rid);
this._tryDelete(ci);
}
Expand Down Expand Up @@ -1216,18 +1225,22 @@ class ResClient {

this._subscribeReferred(ci);

this._send('unsubscribe', ci.rid)
.then(() => {
ci.setSubscribed(false);
this._tryDelete(ci);
})
.catch(err => this._tryDelete(ci));
let i = ci.subscribed;
while (i--) {
this._send('unsubscribe', ci.rid)
.then(() => {
ci.addSubscribed(-1);
this._tryDelete(ci);
})
.catch(err => this._tryDelete(ci));
}
}

_subscribeReferred(ci) {
ci.subscribed = false;
let i = ci.subscribed;
ci.subscribed = 0;
let refs = this._getRefState(ci);
ci.subscribed = true;
ci.subscribed = i;

for (let rid in refs) {
let r = refs[rid];
Expand All @@ -1237,8 +1250,8 @@ class ResClient {
}
}

_handleFailedSubscribe(cacheItem, err) {
cacheItem.setSubscribed(false);
_handleFailedSubscribe(cacheItem) {
cacheItem.addSubscribed(-1);
this._tryDelete(cacheItem);
}

Expand Down
42 changes: 42 additions & 0 deletions src/class/ResClient.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,46 @@ describe("ResClient", () => {
}));
});
});

it("unsubscribes model with multiple direct subscriptions, when no longer listened to", () => {
return getServerResource('service.model', modelResources).then(model => {
let promise = model.call('test');

return flushRequests().then(() => {
let req = server.getNextRequest();
server.sendResponse(req, { rid: "service.model" });

return flushRequests()
.then(() => promise)
.then(m => {
expect(m).toBe(model);
// Cause unsubscribe by waiting
return waitAWhile().then(flushRequests).then(() => {
expect(server.error).toBe(null);
// Expect 2 unsubscribes
for (var i = 0; i < 2; i++) {
let req = server.getNextRequest();
expect(req).not.toBe(undefined);
expect(req.method).toBe('unsubscribe.service.model');
server.sendResponse(req, null);
}

// Wait for the unsubscribe response
return flushRequests().then(() => {
expect(server.error).toBe(null);

return getServerResource('service.model', modelResources).then(modelSecond => {
expect(model).not.toBe(modelSecond);

let req = server.getNextRequest();
expect(req).toBe(undefined);
});
});
});
});
});
});
});
});

describe("getResource collection", () => {
Expand Down Expand Up @@ -736,6 +776,8 @@ describe("ResClient", () => {
it("instantly resubscribes to a model when listening between an unsubscribe request and its response", () => {
// TODO
});


});

describe("collection.on", () => {
Expand Down

0 comments on commit d69ec6e

Please sign in to comment.