diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java index 7ecb8f08d573d..9fbf9b73c057e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java @@ -31,7 +31,7 @@ */ @LimitedPrivate @Evolving -public interface LedgerOffloaderFactory { +public interface LedgerOffloaderFactory extends AutoCloseable { /** * Check whether the provided driver driverName is supported. @@ -111,4 +111,9 @@ default T create(OffloadPoliciesImpl offloadPolicies, throws IOException { return create(offloadPolicies, userMetadata, scheduler, offloaderStats); } + + @Override + default void close() throws Exception { + // no-op + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java index 6910439e09131..cec15599242ae 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java @@ -46,6 +46,12 @@ public LedgerOffloaderFactory getOffloaderFactory(String driverName) throws IOEx @Override public void close() throws Exception { offloaders.forEach(offloader -> { + try { + offloader.getRight().close(); + } catch (Exception e) { + log.warn("Failed to close offloader '{}': {}", + offloader.getRight().getClass(), e.getMessage()); + } try { offloader.getLeft().close(); } catch (IOException e) { diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java index 2c9165674444d..60363cf8406db 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java @@ -25,6 +25,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloaderStats; import org.apache.bookkeeper.mledger.LedgerOffloaderStatsDisable; import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader; +import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffsetsCache; import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider; import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; @@ -33,12 +34,7 @@ * A jcloud based offloader factory. */ public class JCloudLedgerOffloaderFactory implements LedgerOffloaderFactory { - - public static JCloudLedgerOffloaderFactory of() { - return INSTANCE; - } - - private static final JCloudLedgerOffloaderFactory INSTANCE = new JCloudLedgerOffloaderFactory(); + private final OffsetsCache entryOffsetsCache = new OffsetsCache(); @Override public boolean isDriverSupported(String driverName) { @@ -58,6 +54,12 @@ public BlobStoreManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicie TieredStorageConfiguration config = TieredStorageConfiguration.create(offloadPolicies.toProperties()); - return BlobStoreManagedLedgerOffloader.create(config, userMetadata, scheduler, offloaderStats); + return BlobStoreManagedLedgerOffloader.create(config, userMetadata, scheduler, offloaderStats, + entryOffsetsCache); + } + + @Override + public void close() throws Exception { + entryOffsetsCache.close(); } } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index 4f68f90370e6f..e050d74a332bc 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -19,8 +19,6 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl; import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import io.netty.buffer.ByteBuf; import java.io.DataInputStream; import java.io.IOException; @@ -56,19 +54,13 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle { private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class); - private static final int CACHE_TTL_SECONDS = - Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.ttl.seconds", 30 * 60); private final long ledgerId; private final OffloadIndexBlock index; private final BackedInputStream inputStream; private final DataInputStream dataStream; private final ExecutorService executor; - // this Cache is accessed only by one thread - private final Cache entryOffsets = CacheBuilder - .newBuilder() - .expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS) - .build(); + private final OffsetsCache entryOffsetsCache; private final AtomicReference> closeFuture = new AtomicReference<>(); enum State { @@ -79,12 +71,14 @@ enum State { private volatile State state = null; private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index, - BackedInputStream inputStream, ExecutorService executor) { + BackedInputStream inputStream, ExecutorService executor, + OffsetsCache entryOffsetsCache) { this.ledgerId = ledgerId; this.index = index; this.inputStream = inputStream; this.dataStream = new DataInputStream(inputStream); this.executor = executor; + this.entryOffsetsCache = entryOffsetsCache; state = State.Opened; } @@ -109,7 +103,6 @@ public CompletableFuture closeAsync() { try { index.close(); inputStream.close(); - entryOffsets.invalidateAll(); state = State.Closed; promise.complete(null); } catch (IOException t) { @@ -164,7 +157,7 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr long entryId = dataStream.readLong(); if (entryId == nextExpectedId) { - entryOffsets.put(entryId, currentPosition); + entryOffsetsCache.put(ledgerId, entryId, currentPosition); ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length); entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf)); int toWrite = length; @@ -215,7 +208,7 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr } private void seekToEntry(long nextExpectedId) throws IOException { - Long knownOffset = entryOffsets.getIfPresent(nextExpectedId); + Long knownOffset = entryOffsetsCache.getIfPresent(ledgerId, nextExpectedId); if (knownOffset != null) { inputStream.seek(knownOffset); } else { @@ -269,7 +262,8 @@ public static ReadHandle open(ScheduledExecutorService executor, BlobStore blobStore, String bucket, String key, String indexKey, VersionCheck versionCheck, long ledgerId, int readBufferSize, - LedgerOffloaderStats offloaderStats, String managedLedgerName) + LedgerOffloaderStats offloaderStats, String managedLedgerName, + OffsetsCache entryOffsetsCache) throws IOException, BKException.BKNoSuchLedgerExistsException { int retryCount = 3; OffloadIndexBlock index = null; @@ -310,7 +304,7 @@ public static ReadHandle open(ScheduledExecutorService executor, BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key, versionCheck, index.getDataObjectLength(), readBufferSize, offloaderStats, managedLedgerName); - return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor); + return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor, entryOffsetsCache); } // for testing diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index 1b6062ffa0358..9f89bd52a8626 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -108,6 +108,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { private AtomicLong bufferLength = new AtomicLong(0); private AtomicLong segmentLength = new AtomicLong(0); private final long maxBufferLength; + private final OffsetsCache entryOffsetsCache; private final ConcurrentLinkedQueue offloadBuffer = new ConcurrentLinkedQueue<>(); private CompletableFuture offloadResult; private volatile PositionImpl lastOfferedPosition = PositionImpl.LATEST; @@ -123,13 +124,16 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration config, Map userMetadata, OrderedScheduler scheduler, - LedgerOffloaderStats offloaderStats) throws IOException { + LedgerOffloaderStats offloaderStats, + OffsetsCache entryOffsetsCache) + throws IOException { - return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata, offloaderStats); + return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata, offloaderStats, entryOffsetsCache); } BlobStoreManagedLedgerOffloader(TieredStorageConfiguration config, OrderedScheduler scheduler, - Map userMetadata, LedgerOffloaderStats offloaderStats) { + Map userMetadata, LedgerOffloaderStats offloaderStats, + OffsetsCache entryOffsetsCache) { this.scheduler = scheduler; this.userMetadata = userMetadata; @@ -140,6 +144,7 @@ public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration this.minSegmentCloseTimeMillis = Duration.ofSeconds(config.getMinSegmentTimeInSecond()).toMillis(); //ensure buffer can have enough content to fill a block this.maxBufferLength = Math.max(config.getWriteBufferSizeInBytes(), config.getMinBlockSizeInBytes()); + this.entryOffsetsCache = entryOffsetsCache; this.segmentBeginTimeMillis = System.currentTimeMillis(); if (!Strings.isNullOrEmpty(config.getRegion())) { this.writeLocation = new LocationBuilder() @@ -555,7 +560,8 @@ public CompletableFuture readOffloaded(long ledgerId, UUID uid, readBucket, key, indexKey, DataBlockUtils.VERSION_CHECK, ledgerId, config.getReadBufferSizeInBytes(), - this.offloaderStats, offloadDriverMetadata.get(MANAGED_LEDGER_NAME))); + this.offloaderStats, offloadDriverMetadata.get(MANAGED_LEDGER_NAME), + this.entryOffsetsCache)); } catch (Throwable t) { log.error("Failed readOffloaded: ", t); promise.completeExceptionally(t); diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCache.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCache.java new file mode 100644 index 0000000000000..fa13afa8ff0e7 --- /dev/null +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCache.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.offload.jcloud.impl; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import io.grpc.netty.shaded.io.netty.util.concurrent.DefaultThreadFactory; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class OffsetsCache implements AutoCloseable { + private static final int CACHE_TTL_SECONDS = + Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.ttl.seconds", 5 * 60); + // limit the cache size to avoid OOM + // 1 million entries consumes about 60MB of heap space + private static final int CACHE_MAX_SIZE = + Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.max.size", 1_000_000); + private final ScheduledExecutorService cacheEvictionExecutor; + + record Key(long ledgerId, long entryId) { + + } + + private final Cache entryOffsetsCache; + + public OffsetsCache() { + if (CACHE_MAX_SIZE > 0) { + entryOffsetsCache = CacheBuilder + .newBuilder() + .expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS) + .maximumSize(CACHE_MAX_SIZE) + .build(); + cacheEvictionExecutor = + Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("jcloud-offsets-cache-eviction")); + int period = Math.max(CACHE_TTL_SECONDS / 2, 1); + cacheEvictionExecutor.scheduleAtFixedRate(() -> { + entryOffsetsCache.cleanUp(); + }, period, period, TimeUnit.SECONDS); + } else { + cacheEvictionExecutor = null; + entryOffsetsCache = null; + } + } + + public void put(long ledgerId, long entryId, long currentPosition) { + if (entryOffsetsCache != null) { + entryOffsetsCache.put(new Key(ledgerId, entryId), currentPosition); + } + } + + public Long getIfPresent(long ledgerId, long entryId) { + return entryOffsetsCache != null ? entryOffsetsCache.getIfPresent(new Key(ledgerId, entryId)) : null; + } + + public void clear() { + if (entryOffsetsCache != null) { + entryOffsetsCache.invalidateAll(); + } + } + + @Override + public void close() { + if (cacheEvictionExecutor != null) { + cacheEvictionExecutor.shutdownNow(); + } + } +} diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java index 89d9021d36d7d..75faf098b409b 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java @@ -33,6 +33,7 @@ import org.jclouds.blobstore.BlobStore; import org.jclouds.domain.Credentials; import org.testng.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; public abstract class BlobStoreManagedLedgerOffloaderBase { @@ -46,6 +47,7 @@ public abstract class BlobStoreManagedLedgerOffloaderBase { protected final JCloudBlobStoreProvider provider; protected TieredStorageConfiguration config; protected BlobStore blobStore = null; + protected final OffsetsCache entryOffsetsCache = new OffsetsCache(); protected BlobStoreManagedLedgerOffloaderBase() throws Exception { scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(5).name("offloader").build(); @@ -56,6 +58,13 @@ protected BlobStoreManagedLedgerOffloaderBase() throws Exception { @AfterMethod(alwaysRun = true) public void cleanupMockBookKeeper() { bk.getLedgerMap().clear(); + entryOffsetsCache.clear(); + } + + @AfterClass(alwaysRun = true) + public void cleanup() throws Exception { + entryOffsetsCache.close(); + scheduler.shutdownNow(); } protected static MockManagedLedger createMockManagedLedger() { diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java index ad1529072f813..e706e4254cb11 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java @@ -82,7 +82,7 @@ private BlobStoreManagedLedgerOffloader getOffloader(String bucket, Map(), scheduler, this.offloaderStats); + .create(mockedConfig, new HashMap(), scheduler, this.offloaderStats, entryOffsetsCache); return offloader; } @@ -91,7 +91,7 @@ private BlobStoreManagedLedgerOffloader getOffloader(String bucket, BlobStore mo mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket, additionalConfig))); Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore(); BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader - .create(mockedConfig, new HashMap(), scheduler, this.offloaderStats); + .create(mockedConfig, new HashMap(), scheduler, this.offloaderStats, entryOffsetsCache); return offloader; } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java index 4419210c251f1..bf6ede896ab28 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -98,14 +98,16 @@ private BlobStoreManagedLedgerOffloader getOffloader(BlobStore mockedBlobStore) private BlobStoreManagedLedgerOffloader getOffloader(String bucket) throws IOException { mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket))); Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use the REAL blobStore - BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap(), scheduler, this.offloaderStats); + BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap(), scheduler, this.offloaderStats, + entryOffsetsCache); return offloader; } private BlobStoreManagedLedgerOffloader getOffloader(String bucket, BlobStore mockedBlobStore) throws IOException { mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket))); Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore(); - BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap(), scheduler, this.offloaderStats); + BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap(), scheduler, this.offloaderStats, + entryOffsetsCache); return offloader; } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCacheTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCacheTest.java new file mode 100644 index 0000000000000..86a72c7b5547e --- /dev/null +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCacheTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.offload.jcloud.impl; + +import lombok.extern.slf4j.Slf4j; +import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; + +@Slf4j +public class OffsetsCacheTest { + + @Test + public void testCache() throws Exception { + System.setProperty("pulsar.jclouds.readhandleimpl.offsetsscache.ttl.seconds", "1"); + OffsetsCache offsetsCache = new OffsetsCache(); + assertNull(offsetsCache.getIfPresent(1, 2)); + offsetsCache.put(1, 1, 1); + assertEquals(offsetsCache.getIfPresent(1, 1), 1); + offsetsCache.clear(); + assertNull(offsetsCache.getIfPresent(1, 1)); + // test ttl + offsetsCache.put(1, 2, 2); + assertEquals(offsetsCache.getIfPresent(1, 2), 2); + Thread.sleep(1500); + assertNull(offsetsCache.getIfPresent(1, 2)); + offsetsCache.close(); + } +}