Skip to content

Commit

Permalink
Detect 3rd-party changes made during replication
Browse files Browse the repository at this point in the history
Modify `Change.diff()` to include current data revision in each
delta reported back. The current data revision is stored in
`delta.prev`.

Modify `PersistedModel.bulkUpdate()` to check that the current data
revision matches `delta.prev` and report a conflict if a third party
has modified the database under our hands.

Fix `Change` implementation and tests so that they are no longer
attempting to create instances with duplicate ids.
(This used to work because the memory connector was silently
converting such requests to updateOrCreate/findOrCreate.)
  • Loading branch information
Miroslav Bajtoš committed Mar 20, 2015
1 parent 91f59e1 commit 87940a4
Show file tree
Hide file tree
Showing 4 changed files with 480 additions and 61 deletions.
27 changes: 21 additions & 6 deletions common/models/change.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ module.exports = function(Change) {
modelId: modelId
});
ch.debug('creating change');
ch.save(callback);
Change.updateOrCreate(ch, callback);
}
});
};
Expand Down Expand Up @@ -248,6 +248,7 @@ module.exports = function(Change) {
*/

Change.revisionForInst = function(inst) {
assert(inst, 'Change.revisionForInst() requires an instance object.');
return this.hash(CJSON.stringify(inst));
};

Expand Down Expand Up @@ -370,15 +371,18 @@ module.exports = function(Change) {
this.find({
where: {
modelName: modelName,
modelId: {inq: modelIds},
checkpoint: {gte: since}
modelId: {inq: modelIds}
}
}, function(err, localChanges) {
}, function(err, allLocalChanges) {
if (err) return callback(err);
var deltas = [];
var conflicts = [];
var localModelIds = [];

var localChanges = allLocalChanges.filter(function(c) {
return c.checkpoint >= since;
});

localChanges.forEach(function(localChange) {
localChange = new Change(localChange);
localModelIds.push(localChange.modelId);
Expand All @@ -396,9 +400,20 @@ module.exports = function(Change) {
});

modelIds.forEach(function(id) {
if (localModelIds.indexOf(id) === -1) {
deltas.push(remoteChangeIndex[id]);
if (localModelIds.indexOf(id) !== -1) return;

var d = remoteChangeIndex[id];
var oldChange = allLocalChanges.filter(function(c) {
return c.modelId === id;
})[0];

if (oldChange) {
d.prev = oldChange.rev;
} else {
d.prev = null;
}

deltas.push(d);
});

callback(null, {
Expand Down
223 changes: 201 additions & 22 deletions lib/persisted-model.js
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,20 @@ function tryReplicate(sourceModel, targetModel, since, options, callback) {
function bulkUpdate(_updates, cb) {
debug('\tstarting bulk update');
updates = _updates;
targetModel.bulkUpdate(updates, cb);
targetModel.bulkUpdate(updates, function(err) {
var conflicts = err && err.details && err.details.conflicts;
if (conflicts && err.statusCode == 409) {
diff.conflicts = conflicts;
// filter out updates that were not applied
updates = updates.filter(function(u) {
return conflicts
.filter(function(d) { return d.modelId === u.change.modelId; })
.length === 0;
});
return cb();
}
cb(err);
});
}

function checkpoints() {
Expand All @@ -974,7 +987,7 @@ function tryReplicate(sourceModel, targetModel, since, options, callback) {

debug('\treplication finished');
debug('\t\t%s conflict(s) detected', diff.conflicts.length);
debug('\t\t%s change(s) applied', updates && updates.length);
debug('\t\t%s change(s) applied', updates ? updates.length : 0);
debug('\t\tnew checkpoints: { source: %j, target: %j }',
newSourceCp, newTargetCp);

Expand Down Expand Up @@ -1058,31 +1071,197 @@ PersistedModel.createUpdates = function(deltas, cb) {
PersistedModel.bulkUpdate = function(updates, callback) {
var tasks = [];
var Model = this;
var idName = this.dataSource.idName(this.modelName);
var Change = this.getChangeModel();
var conflicts = [];

updates.forEach(function(update) {
switch (update.type) {
case Change.UPDATE:
case Change.CREATE:
// var model = new Model(update.data);
// tasks.push(model.save.bind(model));
tasks.push(function(cb) {
var model = new Model(update.data);
model.save(cb);
});
break;
case Change.DELETE:
var data = {};
data[idName] = update.change.modelId;
var model = new Model(data);
tasks.push(model.destroy.bind(model));
break;
buildLookupOfAffectedModelData(Model, updates, function(err, currentMap) {
if (err) return callback(err);

updates.forEach(function(update) {
var id = update.change.modelId;
var current = currentMap[id];
switch (update.type) {
case Change.UPDATE:
tasks.push(function(cb) {
applyUpdate(Model, id, current, update.data, update.change, conflicts, cb);
});
break;

case Change.CREATE:
tasks.push(function(cb) {
applyCreate(Model, id, current, update.data, update.change, conflicts, cb);
});
break;
case Change.DELETE:
tasks.push(function(cb) {
applyDelete(Model, id, current, update.change, conflicts, cb);
});
break;
}
});

async.parallel(tasks, function(err) {
if (err) return callback(err);
if (conflicts.length) {
err = new Error('Conflict');
err.statusCode = 409;
err.details = { conflicts: conflicts };
return callback(err);
}
callback();
});
});
};

function buildLookupOfAffectedModelData(Model, updates, callback) {
var idName = Model.dataSource.idName(Model.modelName);
var affectedIds = updates.map(function(u) { return u.change.modelId; });
var whereAffected = {};
whereAffected[idName] = { inq: affectedIds };
Model.find({ where: whereAffected }, function(err, affectedList) {
if (err) return callback(err);
var dataLookup = {};
affectedList.forEach(function(it) {
dataLookup[it[idName]] = it;
});
callback(null, dataLookup);
});
}

function applyUpdate(Model, id, current, data, change, conflicts, cb) {
var Change = Model.getChangeModel();
var rev = current ? Change.revisionForInst(current) : null;

if (rev !== change.prev) {
debug('Detected non-rectified change of %s %j',
Model.modelName, id);
debug('\tExpected revision: %s', change.rev);
debug('\tActual revision: %s', rev);
conflicts.push(change);
return Change.rectifyModelChanges(Model.modelName, [id], cb);
}

// TODO(bajtos) modify `data` so that it instructs
// the connector to remove any properties included in "inst"
// but not included in `data`
// See https://github.com/strongloop/loopback/issues/1215

Model.updateAll(current.toObject(), data, function(err, result) {
if (err) return cb(err);

var count = result && result.count;
switch (count) {
case 1:
// The happy path, exactly one record was updated
return cb();

case 0:
debug('UpdateAll detected non-rectified change of %s %j',
Model.modelName, id);
conflicts.push(change);
// NOTE(bajtos) updateAll triggers change rectification
// for all model instances, even when no records were updated,
// thus we don't need to rectify explicitly ourselves
return cb();

case undefined:
case null:
return cb(new Error(
'Cannot apply bulk updates, ' +
'the connector does not correctly report ' +
'the number of updated records.'));

default:
debug('%s.updateAll modified unexpected number of instances: %j',
Model.modelName, count);
return cb(new Error(
'Bulk update failed, the connector has modified unexpected ' +
'number of records: ' + JSON.stringify(count)));
}
});
}

async.parallel(tasks, callback);
};
function applyCreate(Model, id, current, data, change, conflicts, cb) {
Model.create(data, function(createErr) {
if (!createErr) return cb();

// We don't have a reliable way how to detect the situation
// where he model was not create because of a duplicate id
// The workaround is to query the DB to check if the model already exists
Model.findById(id, function(findErr, inst) {
if (findErr || !inst) {
// There isn't any instance with the same id, thus there isn't
// any conflict and we just report back the original error.
return cb(createErr);
}

return conflict();
});
});

function conflict() {
// The instance already exists - report a conflict
debug('Detected non-rectified new instance of %s %j',
Model.modelName, id);
conflicts.push(change);

var Change = Model.getChangeModel();
return Change.rectifyModelChanges(Model.modelName, [id], cb);
}
}

function applyDelete(Model, id, current, change, conflicts, cb) {
if (!current) {
// The instance was either already deleted or not created at all,
// we are done.
return cb();
}

var Change = Model.getChangeModel();
var rev = Change.revisionForInst(current);
if (rev !== change.prev) {
debug('Detected non-rectified change of %s %j',
Model.modelName, id);
debug('\tExpected revision: %s', change.rev);
debug('\tActual revision: %s', rev);
conflicts.push(change);
return Change.rectifyModelChanges(Model.modelName, [id], cb);
}

Model.deleteAll(current.toObject(), function(err, result) {
if (err) return cb(err);

var count = result && result.count;
switch (count) {
case 1:
// The happy path, exactly one record was updated
return cb();

case 0:
debug('DeleteAll detected non-rectified change of %s %j',
Model.modelName, id);
conflicts.push(change);
// NOTE(bajtos) deleteAll triggers change rectification
// for all model instances, even when no records were updated,
// thus we don't need to rectify explicitly ourselves
return cb();

case undefined:
case null:
return cb(new Error(
'Cannot apply bulk updates, ' +
'the connector does not correctly report ' +
'the number of deleted records.'));

default:
debug('%s.deleteAll modified unexpected number of instances: %j',
Model.modelName, count);
return cb(new Error(
'Bulk update failed, the connector has deleted unexpected ' +
'number of records: ' + JSON.stringify(count)));
}
});
}

/**
* Get the `Change` model.
Expand Down
Loading

0 comments on commit 87940a4

Please sign in to comment.