Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support expiring ops from oplog #33

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ Livedb.prototype.getOps = function(cName, docName, from, to, callback) {

this.driver.getOps(c, docName, from, to, function(err, ops) {
if (self.sdc) self.sdc.timing('livedb.getOps', Date.now() - start);

// If we got results, check that we got the right ops.
// Sometimes, if ops have expired from the oplog, there might be some missing.
if (ops && ops.length > 0 && ops[0].v !== from)
return callback('Missing operations');

// Interestingly, this will filter ops for other types as if they were the projected type. This
// is a bug, but it shouldn't cause any problems for now. I'll have to revisit this
Expand Down Expand Up @@ -330,6 +335,14 @@ Livedb.prototype._trySubmit = function(submitData) {
self._resumeSubmitting(opData.src);
return;
}

// Check if the first op we received wasn't the one we requested.
// This can occur if the ops were dropped from the oplog (e.g. expired).
if (from !== snapshot.v && (ops.length === 0 || ops[0].v !== from)) {
submitData.callback('Missing operations');
self._resumeSubmitting(opData.src);
return;
}

for (var i = 0; i < ops.length; i++) {
var op = ops[i];
Expand All @@ -342,7 +355,7 @@ Livedb.prototype._trySubmit = function(submitData) {
self._resumeSubmitting(opData.src);
return;
}

// Bring both the op and the snapshot up to date. At least one of
// these two conditionals should be true.
if (snapshot.v === op.v) {
Expand Down
5 changes: 3 additions & 2 deletions lib/redisdriver.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ RedisDriver.prototype.atomicSubmit = function(cName, docName, opData, options, c
// In this case, we should write a no-op ramp to the snapshot
// version, followed by a delete & a create to fill in the missing
// ops.
throw Error('Missing oplog for ' + cName + ' ' + docName);
return callback('Missing oplog for ' + cName + ' ' + docName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return callback('Missing oplog');

... so people can match on the error string.

}
self._redisSubmitScript(cName, docName, opData, dirtyData, version, callbackWrapper);
});
Expand Down Expand Up @@ -323,7 +323,8 @@ RedisDriver.prototype._oplogGetOps = function(cName, docName, from, to, callback
var self = this;
this.oplog.getOps(cName, docName, from, to, function(err, ops) {
if (err) return callback(err);
if (ops.length && ops[0].v !== from) throw Error('Oplog is returning incorrect ops');
if (ops.length && ops[0].v !== from)
return callback('Missing operations');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be easier to just call self.getOps(...) instead of replicating the logic here.


for (var i = 0; i < ops.length; i++) {
ops[i].v = from++;
Expand Down
7 changes: 1 addition & 6 deletions test/oplog.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,10 @@ module.exports = (create) ->
check = (error, ops) ->
throw new Error error if error
assert.deepEqual ops, []
done() if ++num is 7
done() if ++num is 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test doesn't do the thing it says anymore.


@db.getOps @cName, @docName, 0, 0, check
@db.getOps @cName, @docName, 0, 1, check
@db.getOps @cName, @docName, 0, 10, check
@db.getOps @cName, @docName, 0, null, check
@db.getOps @cName, @docName, 10, 10, check
@db.getOps @cName, @docName, 10, 11, check
@db.getOps @cName, @docName, 10, null, check

it 'returns ops', (done) ->
num = 0
Expand Down
51 changes: 46 additions & 5 deletions test/test.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,36 @@ describe 'livedb', ->
throw new Error err if err
assert.equal m.language, null
done()

it 'errors when oplog is missing all operations', (done) -> @create =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when oplog is missing any operations

@collection.submit @docName, {v:1, op:['test']}, (err, v) =>
throw new Error err if err

getOps = @client.driver.getOps
@client.driver.getOps = (cName, docName, from, to, fn) ->
fn null, []

@collection.submit @docName, {v:1, op:['test']}, (err, v) =>
assert.equal err, 'Missing operations'
@client.driver.getOps = getOps
done()

it 'errors when oplog is missing some operations', (done) -> @create =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is bad behaviour. If the oplog throws, that implies something has gone wrong. We shouldn't just silently eat the exception. (Exceptions ruin optimization, they shouldn't ever be used as part of the expected behaviour of a program)

@collection.submit @docName, {v:1, op:['test']}, (err, v) =>
throw new Error err if err
@collection.submit @docName, {v:2, op:['test 2']}, (err, v) =>
throw new Error err if err

getOps = @client.driver.getOps
@client.driver.getOps = (cName, docName, from, to, fn) =>
getOps.call @client.driver, cName, docName, from, to, (err, ops) ->
throw new Error err if err
fn null, ops.slice 1

@collection.submit @docName, {v:1, op:['test']}, (err, v) =>
assert.equal err, 'Missing operations'
@client.driver.getOps = getOps
done()

it 'can modify a document', (done) -> @create =>
@collection.submit @docName, v:1, op:['hi'], (err, v) =>
Expand Down Expand Up @@ -375,11 +405,22 @@ describe 'livedb', ->
assert.deepEqual stripTs(ops), [{create:{type:textType.uri, data:''}, v:0, m:{}, src:''}, {op:['hi'], v:1, m:{}, src:''}]
done()



it 'errors if ops are missing from the snapshotdb and oplogs'


it 'errors if ops are missing from the snapshotdb and oplogs', (done) -> @create =>
@collection.submit @docName, {v:1, op:['test']}, (err, v) =>
throw new Error err if err
@collection.submit @docName, {v:1, op:['test2']}, (err, v) =>
throw new Error err if err

getOps = @client.driver.getOps
@client.driver.getOps = (cName, docName, from, to, fn) =>
getOps.call @client.driver, cName, docName, from, to, (err, ops) ->
throw new Error err if err
fn null, ops.slice 1

@collection.getOps @docName, 0, (err, ops) =>
assert.equal err, 'Missing operations'
@client.driver.getOps = getOps
done()

it 'works with separate clients', (done) -> @create =>
return done() unless @driver.distributed
Expand Down