Skip to content

Commit

Permalink
Implement new threaded event model.
Browse files Browse the repository at this point in the history
  • Loading branch information
kentonv committed Jul 7, 2016
1 parent 433e8ce commit 998265f
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 24 deletions.
4 changes: 4 additions & 0 deletions shell/client/notifications-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ Template.notificationItem.helpers({
const plan = Meteor.user().plan;
return plan && plan !== "free";
},

multiple: function () {
return (this.count || 1) > 1;
},
});

Template.notificationItem.events({
Expand Down
1 change: 1 addition & 0 deletions shell/client/shell.html
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ <h4>Notifications</h4>
{{> mailingListBonusNotification}}
{{else}}
{{text.defaultText}}
{{#if multiple}}x{{count}}{{/if}}
{{/if}}
{{/if}}
{{/if}}
Expand Down
64 changes: 64 additions & 0 deletions shell/packages/sandstorm-db/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,10 @@ Notifications = new Mongo.Collection("notifications");
// text: The JSON-ified LocalizedText to display in the notification.
// isUnread: Boolean indicating if this notification is unread.
// timestamp: Date when this notification was last updated
// eventType: If this notification is due to an activity event, this is the numeric index
// of the event type on the grain's ViewInfo.
// count: The number of times this exact event has repeated. Identical events are
// aggregated by incrementing the count.
// initiatingIdentity: Identity ID of the user who initiated this notification.
// path: Path inside the grain to which the user should be directed if they click on
// the notification.
Expand All @@ -520,6 +524,26 @@ Notifications = new Mongo.Collection("notifications");
// a one-time notification only to Oasis users who existed when the bonus program
// was implemented.

ActivitySubscriptions = new Mongo.Collection("activitySubscriptions");
// Activity events to which a user is subscribed.
//
// Each contains:
// _id: random
// identityId: Who is subscribed.
// grainId: Grain to which subscription applies.
// threadPath: If present, the subscription is on a specific thread. Otherwise, it is on the
// whole grain.
// mute: If true, this is an anti-subscription -- matching events should NOT notify.
// This allows is useful to express:
// - A user wants to subscribe to a grain but mute a specific thread.
// - The owner of a grain does not want notifications (normally, they are
// implicitly subscribed).
// - A user no longer wishes to be implicitly subscribed to threads in a grain on
// which they comment, so they mute the grain.

ActivitySubscriptions.ensureIndexOnServer("identityId");
ActivitySubscriptions.ensureIndexOnServer({ "grainId": 1, "threadPath": 1 });

StatsTokens = new Mongo.Collection("statsTokens");
// Access tokens for the Stats collection
//
Expand Down Expand Up @@ -1518,6 +1542,44 @@ _.extend(SandstormDb.prototype, {
const setting = Settings.findOne({ _id: "samlEntityId" });
return setting ? setting.value : ""; // empty if subscription is not ready.
},

getActivitySubscriptions: function (grainId, threadPath) {
return ActivitySubscriptions.find({
grainId: grainId,
threadPath: threadPath || { $exists: false },
}, {
fields: { identityId: 1, mute: 1, _id: 0 },
}).fetch();
},

subscribeToActivity: function (identityId, grainId, threadPath) {
// Subscribe the given identity to activity events with the given grainId and (optional)
// threadPath -- unless the identity has previously muted this grainId/threadPath, in which
// case do nothing.

const record = { identityId, grainId };
if (threadPath) {
record.threadPath = threadPath;
}

// The $set here is redundant since an upsert automatically initializes a new record to contain
// the fields from the query, but if we try to do { $set: {} } Mongo throws an exception, and
// if we try to just pass {}, Mongo interprets it as "replace the record with an empty record".
// What a wonderful query language.
ActivitySubscriptions.upsert(record, { $set: record });
},

muteActivity: function (identityId, grainId, threadPath) {
// Mute notifications for the given identity originating from the given grainId and
// (optional) threadPath.

const record = { identityId, grainId };
if (threadPath) {
record.threadPath = threadPath;
}

ActivitySubscriptions.upsert(record, { $set: { mute: true } });
},
});

SandstormDb.escapeMongoKey = (key) => {
Expand Down Expand Up @@ -1833,6 +1895,8 @@ if (Meteor.isServer) {

this.removeApiTokens({ "owner.grain.grainId": grain._id });

ActivitySubscriptions.remove({ grainId: grain._id });

if (grain.lastUsed) {
DeleteStats.insert({
type: "grain", // Demo grains can never get here!
Expand Down
129 changes: 105 additions & 24 deletions shell/server/notifications-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,52 +19,133 @@ const SupervisorCapnp = Capnp.importSystem("sandstorm/supervisor.capnp");
const SystemPersistent = SupervisorCapnp.SystemPersistent;

logActivity = function (grainId, identityId, event) {
check(grainId, String);
check(identityId, String);
// `event` is always an ActivityEvent parsed from Cap'n Proto buf that's too complicated to check
// here.

// TODO(perf): A cached copy of the grain from when the session opened would be fine to use
// here, rather than looking it up every time.
grain = Grains.findOne(grainId);
if (!grain) {
// Shouldn't be possible since activity events come from the grain.
throw new Error("no such grain");
}

// Look up the event typedef.
const eventType = ((grain.cachedViewInfo || {}).eventTypes || [])[event.type];
if (!eventType) {
throw new Error("No such event type in app's ViewInfo: " + event.type);
}

// Clear the "seenAllActivity" bit for all users except the acting user.
// TODO(perf): Consider throttling? Or should that be the app's responsibility?
Grains.update({ _id: grainId,
identityId: { $ne: identityId },
}, { $unset: { ownerSeenAllActivity: true } });
if (identityId != grain.identityId) {
Grains.update(grainId, { $unset: { ownerSeenAllActivity: true } });
}

// Also clear on ApiTokens.
ApiTokens.update({
"grainId": grainId,
"owner.user.seenAllActivity": true,
"owner.user.identityId": { $ne: identityId },
}, { $unset: { "owner.user.seenAllActivity": true } }, { multi: true });

if (event.users && event.users.length > 0) {
// Some users may have been mentioned. Prepare a notification to send them.
// Apply auto-subscriptions.
if (eventType.autoSubscribeToGrain) {
globalDb.subscribeToActivity(identityId, grainId);
}

const notification = {
grainId: grainId,
isUnread: true,
timestamp: new Date(),
path: event.path || "",
};
if (event.thread && eventType.autoSubscribeToThread) {
globalDb.subscribeToActivity(identityId, grainId, event.thread.path || "");
}

if (identityId) {
notification.initiatingIdentity = identityId;
// Figure out whom we need to notify.
const notifyMap = {};
const addRecipient = recipient => {
// Mutes take priority over subscriptions.
if (recipient.mute) {
notifyMap[identityId] = false;
} else {
if (!(recipient.identityId in notifyMap)) {
notifyMap[recipient.identityId] = true;
}
}
};

if (event.notification && event.notification.caption) {
notification.text = event.notification.caption;
// Don't notify self.
addRecipient({ identityId: identityId, mute: true });

// The grain owner is implicitly subscribed.
if (eventType.notifySubscribers) {
addRecipient({ identityId: grain.identityId });

// Add everyone subscribed to the grain.
globalDb.getActivitySubscriptions(grainId).forEach(addRecipient);

if (event.thread) {
// Add everyone subscribed to the thread.
globalDb.getActivitySubscriptions(grainId, event.thread.path || "").forEach(addRecipient);
}
}

// Add everyone who is mentioned.
if (event.users && event.users.length > 0) {
const promises = [];

event.users.forEach(user => {
if (user.identity && user.mentioned) {
promises.push(unwrapFrontendCap(user.identity, "identity", (targetId) => {
Meteor.users.find({ $or: [
{ "loginIdentities.id": targetId },
{ "nonLoginIdentities.id": targetId },
], }).forEach((account) => {
Notifications.insert(_.extend({ userId: account._id }, notification));
});
promises.push(unwrapFrontendCap(user.identity, "identity", targetId => {
addRecipient({ identityId: targetId });
}));
}
});

waitPromise(Promise.all(promises).then(junk => undefined));
}

// Make a list of everyone to notify.
const notify = [];
for (const identityId in notifyMap) {
if (notifyMap[identityId]) {
notify.push(identityId);
}
}

if (notify.length > 0) {
const notification = {
grainId: grainId,
path: event.path || "",
};

// Fields we'll update even if the notification already exists.
const update = {
isUnread: true,
timestamp: new Date(),
};

if (event.thread) {
notification.threadPath = event.thread.path || "";
}

if (identityId) {
notification.initiatingIdentity = identityId;
}

notification.eventType = event.type;
update.text = eventType.verbPhrase;

notify.forEach(targetId => {
// Notify all accounts connected with this identity.
Meteor.users.find({ $or: [
{ "loginIdentities.id": targetId },
{ "nonLoginIdentities.id": targetId },
], }).forEach((account) => {
Notifications.upsert(_.extend({ userId: account._id }, notification), {
$set: update,
$inc: { count: 1 },
});
});
});
}
};

Meteor.methods({
Expand Down

0 comments on commit 998265f

Please sign in to comment.