diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 34dd3610d4ec9..f546a487f84be 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -440,6 +440,7 @@ public void initializeFailed(ManagedLedgerException e) { // Clean the map if initialization fails ledgers.remove(name, future); + entryCacheManager.removeEntryCache(name); if (pendingInitializeLedgers.remove(name, pendingLedger)) { pendingLedger.ledger.asyncClose(new CloseCallback() { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java index 7b2f8228ad722..d72bffa27d30a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java @@ -31,12 +31,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.DigestType; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.Entry; @@ -509,6 +511,35 @@ public void recoverAfterWriteError() throws Exception { entries.forEach(Entry::release); } + @Test + public void recoverAfterOpenManagedLedgerFail() throws Exception { + ManagedLedger ledger = factory.open("recoverAfterOpenManagedLedgerFail"); + Position position = ledger.addEntry("entry".getBytes()); + ledger.close(); + bkc.failAfter(0, BKException.Code.BookieHandleNotAvailableException); + try { + factory.open("recoverAfterOpenManagedLedgerFail"); + } catch (Exception e) { + // ok + } + + ledger = factory.open("recoverAfterOpenManagedLedgerFail"); + CompletableFuture future = new CompletableFuture<>(); + ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + future.complete(entry.getData()); + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + byte[] bytes = future.get(30, TimeUnit.SECONDS); + assertEquals(new String(bytes), "entry"); + } + @Test public void recoverLongTimeAfterMultipleWriteErrors() throws Exception { ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("recoverLongTimeAfterMultipleWriteErrors");