Skip to content

Commit

Permalink
fix 2 copies of 3bookie and stop 2 booksie after production. After th…
Browse files Browse the repository at this point in the history
…e entire bookie is restored, it can be produced at this time, and the produced data cannot be consumed normally. Restarting the broker can resume normal (#5965)

After stopping the bookie and recovering, it can be produced at this time, but cannot be consumed normally. Restarting the broker can resume normal

Fixes #5962

Motivation
problem :
Topic(E=3,W=3,A=2), stopped 2 bookie nodes, then recover the 2 bookie,
The message can be produced normally, but the consumer cannot pull the message unless the broker is restarted.
We hope that the bookie summary will resume normal consumption

Modifications
The main cause of this problem is that readHandle.readAsync does not catch the exception,
causing some methods to not trigger.
therefore, added the exception capture module , and the exceptions involved in the bookeeper project are also fixed.
  • Loading branch information
liudezhi2098 authored and jiazhai committed Jan 3, 2020
1 parent cff311c commit dcaa1d3
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,12 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEnt
} finally {
ledgerEntries.close();
}
}, ml.getExecutor().chooseThread(ml.getName()));
}, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception->{
ml.invalidateLedgerHandle(lh, exception);
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
return null;
}
);
}
}

Expand Down Expand Up @@ -327,7 +332,17 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo
} finally {
ledgerEntries.close();
}
}, ml.getExecutor().chooseThread(ml.getName()));
}, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception->{
if (exception instanceof BKException
&& ((BKException)exception).getCode() == BKException.Code.TooManyRequestsException) {
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
} else {
ml.invalidateLedgerHandle(lh, exception);
ManagedLedgerException mlException = createManagedLedgerException(exception);
callback.readEntriesFailed(mlException, ctx);
}
return null;
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole
ml.mbean.addReadEntriesSample(entries.size(), totalSize);

callback.readEntriesComplete(entries, ctx);
}).exceptionally(exception -> {
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
return null;
});
}

Expand Down

0 comments on commit dcaa1d3

Please sign in to comment.