diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java index 43ba31c1889d6..99280fab41f14 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java @@ -235,7 +235,7 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEnt } finally { ledgerEntries.close(); } - }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception->{ + }, ml.getPinnedExecutor()).exceptionally(exception->{ ml.invalidateLedgerHandle(lh); callback.readEntryFailed(createManagedLedgerException(exception), ctx); return null; @@ -313,7 +313,7 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo } checkNotNull(ml.getName()); - checkNotNull(ml.getExecutor()); + checkNotNull(ml.getPinnedExecutor()); try { // We got the entries, we need to transform them to a List<> type @@ -334,7 +334,7 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo } finally { ledgerEntries.close(); } - }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception->{ + }, ml.getPinnedExecutor()).exceptionally(exception->{ if (exception instanceof BKException && ((BKException)exception).getCode() == BKException.Code.TooManyRequestsException) { callback.readEntriesFailed(createManagedLedgerException(exception), ctx); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java index c87bcb8aa4031..19af225fd0936 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java @@ -247,7 +247,7 @@ public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks. } finally { ledgerEntries.close(); } - }, ml.getExecutor().chooseThread(ml.getName())); + }, ml.getPinnedExecutor()); } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 82b4cfcc9b68b..b2e13055d2b95 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -776,7 +776,7 @@ public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntrie // Check again for new entries after the configured time, then if still no entries are available register // to be notified if (config.getNewEntriesCheckDelayInMillis() > 0) { - ledger.getScheduledExecutor() + ledger.getPinnedScheduledExecutor() .schedule(() -> checkForNewEntries(op, callback, ctx), config.getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS); } else { @@ -1137,7 +1137,7 @@ public void asyncResetCursor(Position newPos, AsyncCallbacks.ResetCursorCallback final PositionImpl newPosition = (PositionImpl) newPos; // order trim and reset operations on a ledger - ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> { + ledger.getPinnedExecutor().execute(safeRun(() -> { PositionImpl actualPosition = newPosition; if (!ledger.isValidPosition(actualPosition) && @@ -2286,7 +2286,7 @@ private boolean shouldPersistUnackRangesToLedger() { private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl position, Map properties, MetaStoreCallback callback, boolean persistIndividualDeletedMessageRanges) { if (state == State.Closed) { - ledger.getExecutor().execute(safeRun(() -> callback.operationFailed(new MetaStoreException( + ledger.getPinnedExecutor().execute(safeRun(() -> callback.operationFailed(new MetaStoreException( new CursorAlreadyClosedException(name + " cursor already closed"))))); return; } @@ -2444,7 +2444,7 @@ void createNewMetadataLedger(final VoidCallback callback) { return; } - ledger.getExecutor().execute(safeRun(() -> { + ledger.getPinnedExecutor().execute(safeRun(() -> { ledger.mbean.endCursorLedgerCreateOp(); if (rc != BKException.Code.OK) { log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name, @@ -2785,7 +2785,7 @@ private void asyncDeleteLedger(final LedgerHandle lh, int retry) { log.warn("[{}] Failed to delete ledger {}: {}", ledger.getName(), lh.getId(), BKException.getMessage(rc)); if (!isNoSuchLedgerExistsException(rc)) { - ledger.getScheduledExecutor().schedule(safeRun(() -> asyncDeleteLedger(lh, retry - 1)), + ledger.getPinnedScheduledExecutor().schedule(safeRun(() -> asyncDeleteLedger(lh, retry - 1)), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS); } return; @@ -2820,7 +2820,7 @@ private void asyncDeleteCursorLedger(int retry) { log.warn("[{}][{}] Failed to delete ledger {}: {}", ledger.getName(), name, cursorLedger.getId(), BKException.getMessage(rc)); if (!isNoSuchLedgerExistsException(rc)) { - ledger.getScheduledExecutor().schedule(safeRun(() -> asyncDeleteCursorLedger(retry - 1)), + ledger.getPinnedScheduledExecutor().schedule(safeRun(() -> asyncDeleteCursorLedger(retry - 1)), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 0823886e0bdbb..0fddffe64f372 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -57,6 +57,8 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -78,7 +80,6 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.common.util.Backoff; -import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.common.util.Retries; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -252,7 +253,8 @@ public enum PositionBound { protected volatile State state = null; private final OrderedScheduler scheduledExecutor; - private final OrderedExecutor executor; + private final ScheduledExecutorService pinnedScheduledExecutor; + private final Executor pinnedExecutor; final ManagedLedgerFactoryImpl factory; protected final ManagedLedgerMBeanImpl mbean; protected final Clock clock; @@ -298,7 +300,8 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper this.ledgerMetadata = LedgerMetadataUtils.buildBaseManagedLedgerMetadata(name); this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType()); this.scheduledExecutor = scheduledExecutor; - this.executor = bookKeeper.getMainWorkerPool(); + this.pinnedScheduledExecutor = scheduledExecutor.chooseThread(name); + this.pinnedExecutor = bookKeeper.getMainWorkerPool().chooseThread(name); TOTAL_SIZE_UPDATER.set(this, 0); NUMBER_OF_ENTRIES_UPDATER.set(this, 0); ENTRIES_ADDED_COUNTER_UPDATER.set(this, 0); @@ -353,7 +356,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { if (ledgers.size() > 0) { final long id = ledgers.lastKey(); OpenCallback opencb = (rc, lh, ctx1) -> { - executor.executeOrdered(name, safeRun(() -> { + pinnedExecutor.execute(safeRun(() -> { mbean.endDataLedgerOpenOp(); if (log.isDebugEnabled()) { log.debug("[{}] Opened ledger {}: {}", name, id, BKException.getMessage(rc)); @@ -462,7 +465,7 @@ public void operationFailed(MetaStoreException e) { return; } - executor.executeOrdered(name, safeRun(() -> { + pinnedExecutor.execute(safeRun(() -> { mbean.endDataLedgerCreateOp(); if (rc != BKException.Code.OK) { callback.initializeFailed(createManagedLedgerException(rc)); @@ -701,7 +704,7 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx); // Jump to specific thread to avoid contention from writers writing from different threads - executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation))); + pinnedExecutor.execute(safeRun(() -> internalAsyncAddEntry(addOperation))); } @Override @@ -713,7 +716,7 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback OpAddEntry addOperation = OpAddEntry.create(this, buffer, numberOfMessages, callback, ctx); // Jump to specific thread to avoid contention from writers writing from different threads - executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation))); + pinnedExecutor.execute(safeRun(() -> internalAsyncAddEntry(addOperation))); } private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { @@ -1481,7 +1484,7 @@ public void operationFailed(MetaStoreException e) { private void updateLedgersListAfterRollover(MetaStoreCallback callback) { if (!metadataMutex.tryLock()) { // Defer update for later - scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback), 100, TimeUnit.MILLISECONDS); + pinnedScheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback), 100, TimeUnit.MILLISECONDS); return; } @@ -1778,7 +1781,7 @@ CompletableFuture getLedgerHandle(long ledgerId) { } promise.complete(res); } - }, executor.chooseThread(name)); + }, pinnedExecutor); return promise; }); } @@ -2159,7 +2162,7 @@ void notifyCursors() { break; } - executor.execute(safeRun(waitingCursor::notifyEntriesAvailable)); + pinnedExecutor.execute(safeRun(waitingCursor::notifyEntriesAvailable)); } } @@ -2170,7 +2173,7 @@ void notifyWaitingEntryCallBacks() { break; } - executor.execute(safeRun(cb::entriesAvailable)); + pinnedExecutor.execute(safeRun(cb::entriesAvailable)); } } @@ -2217,15 +2220,16 @@ private void trimConsumedLedgersInBackground() { @Override public void trimConsumedLedgersInBackground(CompletableFuture promise) { - executor.executeOrdered(name, safeRun(() -> internalTrimConsumedLedgers(promise))); + pinnedExecutor.execute(safeRun(() -> internalTrimConsumedLedgers(promise))); } public void trimConsumedLedgersInBackground(boolean isTruncate, CompletableFuture promise) { - executor.executeOrdered(name, safeRun(() -> internalTrimLedgers(isTruncate, promise))); + pinnedExecutor.execute(safeRun(() -> internalTrimLedgers(isTruncate, promise))); } private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture promise) { - scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground(isTruncate, promise)), 100, TimeUnit.MILLISECONDS); + pinnedScheduledExecutor + .schedule(safeRun(() -> trimConsumedLedgersInBackground(isTruncate, promise)), 100, TimeUnit.MILLISECONDS); } private void maybeOffloadInBackground(CompletableFuture promise) { @@ -2234,13 +2238,13 @@ private void maybeOffloadInBackground(CompletableFuture promise) { && config.getLedgerOffloader().getOffloadPolicies() != null && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) { - executor.executeOrdered(name, safeRun(() -> maybeOffload(promise))); + pinnedExecutor.execute(safeRun(() -> maybeOffload(promise))); } } private void maybeOffload(CompletableFuture finalPromise) { if (!offloadMutex.tryLock()) { - scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)), + pinnedScheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)), 100, TimeUnit.MILLISECONDS); } else { CompletableFuture unlockingPromise = new CompletableFuture<>(); @@ -2660,7 +2664,7 @@ private void asyncDeleteLedger(long ledgerId, long retry) { log.warn("[{}] Ledger was already deleted {}", name, ledgerId); } else if (rc != BKException.Code.OK) { log.error("[{}] Error deleting ledger {} : {}", name, ledgerId, BKException.getMessage(rc)); - scheduledExecutor.schedule(safeRun(() -> asyncDeleteLedger(ledgerId, retry - 1)), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS); + pinnedScheduledExecutor.schedule(safeRun(() -> asyncDeleteLedger(ledgerId, retry - 1)), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS); } else { if (log.isDebugEnabled()) { log.debug("[{}] Deleted ledger {}", name, ledgerId); @@ -2932,7 +2936,7 @@ private void tryTransformLedgerInfo(long ledgerId, LedgerInfoTransformation tran synchronized (this) { if (!metadataMutex.tryLock()) { // retry in 100 milliseconds - scheduledExecutor.schedule( + pinnedScheduledExecutor.schedule( safeRun(() -> tryTransformLedgerInfo(ledgerId, transformation, finalPromise)), 100, TimeUnit.MILLISECONDS); } else { // lock acquired @@ -3412,12 +3416,12 @@ public NavigableMap getLedgersInfo() { return ledgers; } - OrderedScheduler getScheduledExecutor() { - return scheduledExecutor; + ScheduledExecutorService getPinnedScheduledExecutor() { + return pinnedScheduledExecutor; } - OrderedExecutor getExecutor() { - return executor; + Executor getPinnedExecutor() { + return pinnedExecutor; } private ManagedLedgerInfo getManagedLedgerInfo() { @@ -3618,7 +3622,7 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated); return; } - scheduledExecutor.schedule(() -> { + pinnedScheduledExecutor.schedule(() -> { if (!ledgerCreated.get()) { if (log.isDebugEnabled()) { log.debug("[{}] Timeout creating ledger", name); @@ -3667,7 +3671,7 @@ private void scheduleTimeoutTask() { timeoutSec = timeoutSec <= 0 ? Math.max(config.getAddEntryTimeoutSeconds(), config.getReadEntryTimeoutSeconds()) : timeoutSec; - this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> { + this.timeoutTask = this.pinnedScheduledExecutor.scheduleAtFixedRate(safeRun(() -> { checkAddTimeout(); checkReadTimeout(); }), timeoutSec, timeoutSec, TimeUnit.SECONDS); @@ -3808,7 +3812,7 @@ private void asyncUpdateProperties(Map properties, boolean isDel String deleteKey, final UpdatePropertiesCallback callback, Object ctx) { if (!metadataMutex.tryLock()) { // Defer update for later - scheduledExecutor.schedule(() -> asyncUpdateProperties(properties, isDelete, deleteKey, + pinnedScheduledExecutor.schedule(() -> asyncUpdateProperties(properties, isDelete, deleteKey, callback, ctx), 100, TimeUnit.MILLISECONDS); return; } @@ -3955,7 +3959,7 @@ private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() { // and the previous checkLedgerRollTask is not done, we could cancel it checkLedgerRollTask.cancel(true); } - this.checkLedgerRollTask = this.scheduledExecutor.schedule( + this.checkLedgerRollTask = this.pinnedScheduledExecutor.schedule( safeRun(this::rollCurrentLedgerIfFull), this.maximumRolloverTimeMs, TimeUnit.MILLISECONDS); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index c76d532092608..5f8421a315cee 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -154,7 +154,7 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) } checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(), lh.getId()); - + if (!checkAndCompleteOp(ctx)) { // means callback might have been completed by different thread (timeout task thread).. so do nothing return; @@ -170,7 +170,7 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) handleAddFailure(lh); } else { // Trigger addComplete callback in a thread hashed on the managed ledger name - ml.getExecutor().executeOrdered(ml.getName(), this); + ml.getPinnedExecutor().execute(this); } } @@ -248,7 +248,7 @@ private void updateLatency() { /** * Checks if add-operation is completed - * + * * @return true if task is not already completed else returns false. */ private boolean checkAndCompleteOp(Object ctx) { @@ -269,7 +269,7 @@ void handleAddTimeoutFailure(final LedgerHandle ledger, Object ctx) { /** * It handles add failure on the given ledger. it can be triggered when add-entry fails or times out. - * + * * @param ledger */ void handleAddFailure(final LedgerHandle ledger) { @@ -278,7 +278,7 @@ void handleAddFailure(final LedgerHandle ledger) { // be marked as failed. ml.mbean.recordAddEntryError(); - ml.getExecutor().executeOrdered(ml.getName(), SafeRun.safeRun(() -> { + ml.getPinnedExecutor().execute(SafeRun.safeRun(() -> { // Force the creation of a new ledger. Doing it in a background thread to avoid acquiring ML lock // from a BK callback. ml.ledgerClosed(ledger); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 91a6e26f567d0..86ccb4cf725e6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -93,7 +93,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { if (!entries.isEmpty()) { // There were already some entries that were read before, we can return them - cursor.ledger.getExecutor().execute(safeRun(() -> { + cursor.ledger.getPinnedExecutor().execute(safeRun(() -> { callback.readEntriesComplete(entries, ctx); recycle(); })); @@ -141,8 +141,8 @@ void checkReadCompletion() { cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this); } - // Schedule next read in a different thread - cursor.ledger.getExecutor().execute(safeRun(() -> { + // Schedule next read + cursor.ledger.getPinnedExecutor().execute(safeRun(() -> { readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this); cursor.ledger.asyncReadEntries(OpReadEntry.this); })); @@ -152,7 +152,7 @@ void checkReadCompletion() { cursor.readOperationCompleted(); } finally { - cursor.ledger.getExecutor().executeOrdered(cursor.ledger.getName(), safeRun(() -> { + cursor.ledger.getPinnedExecutor().execute(safeRun(() -> { callback.readEntriesComplete(entries, ctx); recycle(); })); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java index 315eafa5a3989..56223f16777ca 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java @@ -54,13 +54,13 @@ protected void setUpTestCase() throws Exception { OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).build(); ml1 = mock(ManagedLedgerImpl.class); - when(ml1.getScheduledExecutor()).thenReturn(executor); + when(ml1.getPinnedScheduledExecutor()).thenReturn(executor); when(ml1.getName()).thenReturn("cache1"); when(ml1.getMBean()).thenReturn(new ManagedLedgerMBeanImpl(ml1)); - when(ml1.getExecutor()).thenReturn(super.executor); + when(ml1.getPinnedExecutor()).thenReturn(super.executor); ml2 = mock(ManagedLedgerImpl.class); - when(ml2.getScheduledExecutor()).thenReturn(executor); + when(ml2.getPinnedScheduledExecutor()).thenReturn(executor); when(ml2.getName()).thenReturn("cache2"); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java index d64ef31e49a58..0a15765fa61c8 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java @@ -29,7 +29,6 @@ import io.netty.buffer.Unpooled; -import java.lang.reflect.Method; import java.util.List; import java.util.Vector; import java.util.concurrent.CompletableFuture; @@ -45,7 +44,6 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.mockito.Mockito; import org.testng.Assert; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class EntryCacheTest extends MockedBookKeeperTestCase { @@ -56,7 +54,7 @@ public class EntryCacheTest extends MockedBookKeeperTestCase { protected void setUpTestCase() throws Exception { ml = mock(ManagedLedgerImpl.class); when(ml.getName()).thenReturn("name"); - when(ml.getExecutor()).thenReturn(executor); + when(ml.getPinnedExecutor()).thenReturn(executor); when(ml.getMBean()).thenReturn(new ManagedLedgerMBeanImpl(ml)); }