Skip to content

Commit

Permalink
[improve][offload] Create offload resources lazily (apache#20775)
Browse files Browse the repository at this point in the history
  • Loading branch information
zymap committed Jul 13, 2023
1 parent ce28dda commit 7233f0e
Showing 1 changed file with 16 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,17 @@ public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration
config.getProvider().getDriver(), config.getServiceEndpoint(),
config.getBucket(), config.getRegion());

blobStores.putIfAbsent(config.getBlobStoreLocation(), config.getBlobStore());
this.offloaderStats = offloaderStats;
log.info("The ledger offloader was created.");
}

private BlobStore getBlobStore(BlobStoreLocation blobStoreLocation) {
return blobStores.computeIfAbsent(blobStoreLocation, location -> {
log.info("Creating blob store for location {}", location);
return config.getBlobStore();
});
}

@Override
public String getOffloadDriverName() {
return config.getDriver();
Expand All @@ -179,11 +185,11 @@ public CompletableFuture<Void> offload(ReadHandle readHandle,
Map<String, String> extraMetadata) {
final String managedLedgerName = extraMetadata.get(MANAGED_LEDGER_NAME);
final String topicName = TopicName.fromPersistenceNamingEncoding(managedLedgerName);
final BlobStore writeBlobStore = blobStores.get(config.getBlobStoreLocation());
log.info("offload {} uuid {} extraMetadata {} to {} {}", readHandle.getId(), uuid, extraMetadata,
config.getBlobStoreLocation(), writeBlobStore);
CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.chooseThread(readHandle.getId()).execute(() -> {
final BlobStore writeBlobStore = getBlobStore(config.getBlobStoreLocation());
log.info("offload {} uuid {} extraMetadata {} to {} {}", readHandle.getId(), uuid, extraMetadata,
config.getBlobStoreLocation(), writeBlobStore);
if (readHandle.getLength() == 0 || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0) {
promise.completeExceptionally(
new IllegalArgumentException("An empty or open ledger should never be offloaded"));
Expand Down Expand Up @@ -330,7 +336,7 @@ public CompletableFuture<OffloadHandle> streamingOffload(@NonNull ManagedLedger
driverMetadata);
log.debug("begin offload with {}:{}", beginLedger, beginEntry);
this.offloadResult = new CompletableFuture<>();
blobStore = blobStores.get(config.getBlobStoreLocation());
blobStore = getBlobStore(config.getBlobStoreLocation());
streamingIndexBuilder = OffloadIndexBlockV2Builder.create();
streamingDataBlockKey = segmentInfo.uuid.toString();
streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
Expand Down Expand Up @@ -536,13 +542,13 @@ public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,

BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
String readBucket = bsKey.getBucket();
BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());

CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
String key = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid);
String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid);
scheduler.chooseThread(ledgerId).execute(() -> {
try {
BlobStore readBlobstore = getBlobStore(config.getBlobStoreLocation());
promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
readBlobstore,
readBucket, key, indexKey,
Expand All @@ -562,7 +568,6 @@ public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, MLDataFormats.
Map<String, String> offloadDriverMetadata) {
BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
String readBucket = bsKey.getBucket();
BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
final List<MLDataFormats.OffloadSegment> offloadSegmentList = ledgerContext.getOffloadSegmentList();
List<String> keys = Lists.newLinkedList();
Expand All @@ -577,6 +582,7 @@ public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, MLDataFormats.

scheduler.chooseThread(ledgerId).execute(() -> {
try {
BlobStore readBlobstore = getBlobStore(config.getBlobStoreLocation());
promise.complete(BlobStoreBackedReadHandleImplV2.open(scheduler.chooseThread(ledgerId),
readBlobstore,
readBucket, keys, indexKeys,
Expand All @@ -596,11 +602,11 @@ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
Map<String, String> offloadDriverMetadata) {
BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
String readBucket = bsKey.getBucket(offloadDriverMetadata);
BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());

CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.chooseThread(ledgerId).execute(() -> {
try {
BlobStore readBlobstore = getBlobStore(config.getBlobStoreLocation());
readBlobstore.removeBlobs(readBucket,
ImmutableList.of(DataBlockUtils.dataBlockOffloadKey(ledgerId, uid),
DataBlockUtils.indexBlockOffloadKey(ledgerId, uid)));
Expand All @@ -623,11 +629,11 @@ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
public CompletableFuture<Void> deleteOffloaded(UUID uid, Map<String, String> offloadDriverMetadata) {
BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
String readBucket = bsKey.getBucket(offloadDriverMetadata);
BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());

CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.execute(() -> {
try {
BlobStore readBlobstore = getBlobStore(config.getBlobStoreLocation());
readBlobstore.removeBlobs(readBucket,
ImmutableList.of(uid.toString(),
DataBlockUtils.indexBlockOffloadKey(uid)));
Expand Down Expand Up @@ -667,7 +673,7 @@ public void scanLedgers(OffloadedLedgerMetadataConsumer consumer, Map<String,
String readBucket = bsKey.getBucket();
log.info("Scanning bucket {}, bsKey {}, location {} endpoint{} ", readBucket, bsKey,
config.getBlobStoreLocation(), endpoint);
BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());
BlobStore readBlobstore = getBlobStore(config.getBlobStoreLocation());
int batchSize = 100;
String bucketName = config.getBucket();
String marker = null;
Expand Down

0 comments on commit 7233f0e

Please sign in to comment.