From 83108666a3cf0614d834143446de423d292df676 Mon Sep 17 00:00:00 2001 From: Satish Duggana Date: Wed, 14 Dec 2022 10:56:29 +0530 Subject: [PATCH 1/3] KAFKA-14466 Refactored ClassloaderAwareRemoteStorageManager moving to storage module --- ...ClassLoaderAwareRemoteStorageManager.scala | 76 ------------- .../kafka/log/remote/RemoteLogManager.scala | 2 +- ...sLoaderAwareRemoteStorageManagerTest.scala | 2 +- .../log/remote/RemoteLogManagerTest.scala | 10 +- ...ssLoaderAwareRemoteLogMetadataManager.java | 16 +-- .../ClassLoaderAwareRemoteStorageManager.java | 104 ++++++++++++++++++ .../remote/storage/RemoteStorageAction.java | 28 +++++ 7 files changed, 145 insertions(+), 93 deletions(-) delete mode 100644 core/src/main/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManager.scala create mode 100644 storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java create mode 100644 storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageAction.java diff --git a/core/src/main/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManager.scala b/core/src/main/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManager.scala deleted file mode 100644 index d35c70ed85ec..000000000000 --- a/core/src/main/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManager.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.log.remote - -import org.apache.kafka.server.log.remote.storage.{LogSegmentData, RemoteLogSegmentMetadata, RemoteStorageManager} - -import java.io.InputStream -import java.util - -/** - * A wrapper class of RemoteStorageManager that sets the context class loader when calling RSM methods. - */ -class ClassLoaderAwareRemoteStorageManager(val rsm: RemoteStorageManager, - val rsmClassLoader: ClassLoader) extends RemoteStorageManager { - - def withClassLoader[T](fun: => T): T = { - val originalClassLoader = Thread.currentThread.getContextClassLoader - Thread.currentThread.setContextClassLoader(rsmClassLoader) - try { - fun - } finally { - Thread.currentThread.setContextClassLoader(originalClassLoader) - } - } - - def delegate(): RemoteStorageManager = { - rsm - } - - override def close(): Unit = withClassLoader { - rsm.close() - } - - override def configure(configs: util.Map[String, _]): Unit = withClassLoader { - rsm.configure(configs) - } - - override def copyLogSegmentData(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, - logSegmentData: LogSegmentData): Unit = withClassLoader { - rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData) - } - - override def fetchLogSegment(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, - startPosition: Int): InputStream = withClassLoader { - rsm.fetchLogSegment(remoteLogSegmentMetadata, startPosition) - } - - override def fetchLogSegment(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, - startPosition: Int, - endPosition: Int): InputStream = withClassLoader { - rsm.fetchLogSegment(remoteLogSegmentMetadata, startPosition, endPosition) - } - - override def fetchIndex(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, - indexType: RemoteStorageManager.IndexType): InputStream = withClassLoader { - rsm.fetchIndex(remoteLogSegmentMetadata, indexType) - } - - override def deleteLogSegmentData(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Unit = withClassLoader { - rsm.deleteLogSegmentData(remoteLogSegmentMetadata) - } -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala b/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala index 6558094842ac..8324028c5ed3 100644 --- a/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala +++ b/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala @@ -26,7 +26,7 @@ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream} import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils} import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager -import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager} +import org.apache.kafka.server.log.remote.storage.{ClassLoaderAwareRemoteStorageManager, RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager} import java.io.{Closeable, InputStream} import java.security.{AccessController, PrivilegedAction} diff --git a/core/src/test/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManagerTest.scala b/core/src/test/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManagerTest.scala index 3cb1516a38b5..54d0aee9447f 100644 --- a/core/src/test/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManagerTest.scala +++ b/core/src/test/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManagerTest.scala @@ -16,7 +16,7 @@ */ package kafka.log.remote -import org.apache.kafka.server.log.remote.storage.RemoteStorageManager +import org.apache.kafka.server.log.remote.storage.{ClassLoaderAwareRemoteStorageManager, RemoteStorageManager} import org.junit.jupiter.api.Test import org.mockito.Mockito.mock import org.mockito.Mockito.when diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala index 92128c1e7d97..6fb62c148438 100644 --- a/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala @@ -22,23 +22,23 @@ import kafka.server.KafkaConfig import kafka.server.checkpoints.LeaderEpochCheckpoint import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import kafka.utils.MockTime -import org.apache.kafka.common.{KafkaException, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.config.AbstractConfig import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} +import org.apache.kafka.common.{KafkaException, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType -import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager} +import org.apache.kafka.server.log.remote.storage._ import org.apache.kafka.test.TestUtils +import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{BeforeEach, Test} +import org.mockito.ArgumentMatchers.{any, anyInt, anyLong} import org.mockito.Mockito._ -import org.junit.jupiter.api.Assertions._ import org.mockito.{ArgumentCaptor, ArgumentMatchers} -import org.mockito.ArgumentMatchers.{any, anyInt, anyLong} import java.io.{ByteArrayInputStream, File, FileInputStream} import java.nio.file.Files -import java.util.{Optional, Properties} import java.util +import java.util.{Optional, Properties} import scala.collection.Seq import scala.jdk.CollectionConverters._ diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java index a40e34031d39..bf79d605f142 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java @@ -21,6 +21,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageAction; import org.apache.kafka.server.log.remote.storage.RemoteStorageException; import java.io.IOException; @@ -69,8 +70,7 @@ public Optional highestOffsetForEpoch(TopicIdPartition topicIdPartition, @Override public CompletableFuture putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) throws RemoteStorageException { - return withClassLoader(() -> delegate.putRemotePartitionDeleteMetadata(remotePartitionDeleteMetadata) - ); + return withClassLoader(() -> delegate.putRemotePartitionDeleteMetadata(remotePartitionDeleteMetadata)); } @Override @@ -121,27 +121,23 @@ public void close() throws IOException { } @SuppressWarnings("UnusedReturnValue") - private T withTryCatchClassLoader(Worker worker) { + private T withTryCatchClassLoader(RemoteStorageAction action) { try { - return withClassLoader(worker); + return withClassLoader(action); } catch (final RemoteStorageException ex) { // ignore, this exception is not thrown by the method. } return null; } - private T withClassLoader(Worker worker) throws RemoteStorageException { + private T withClassLoader(RemoteStorageAction action) throws RemoteStorageException { ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(loader); try { - return worker.doWork(); + return action.execute(); } finally { Thread.currentThread().setContextClassLoader(originalClassLoader); } } - @FunctionalInterface - public interface Worker { - T doWork() throws RemoteStorageException; - } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java new file mode 100644 index 000000000000..56dd01a46fce --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +/** + * A wrapper class of {@link RemoteStorageManager} that sets the context class loader when calling the respective + * methods. + */ +public class ClassLoaderAwareRemoteStorageManager implements RemoteStorageManager { + + private final RemoteStorageManager delegate; + private final ClassLoader rsmClassLoader; + + public ClassLoaderAwareRemoteStorageManager(RemoteStorageManager rsm, ClassLoader rsmClassLoader) { + this.delegate = rsm; + this.rsmClassLoader = rsmClassLoader; + } + + public RemoteStorageManager delegate() { + return delegate; + } + + @Override + public void configure(Map configs) { + ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(rsmClassLoader); + try { + delegate.configure(configs); + } finally { + Thread.currentThread().setContextClassLoader(originalClassLoader); + } + } + + @Override + public void close() throws IOException { + ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(rsmClassLoader); + try { + delegate.close(); + } finally { + Thread.currentThread().setContextClassLoader(originalClassLoader); + } + } + + private T withClassLoader(RemoteStorageAction action) throws RemoteStorageException { + ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(rsmClassLoader); + try { + return action.execute(); + } finally { + Thread.currentThread().setContextClassLoader(originalClassLoader); + } + } + + public void copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata, + LogSegmentData logSegmentData) throws RemoteStorageException { + withClassLoader(() -> { + delegate.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData); + return null; + }); + } + + @Override + public InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, int startPosition) throws RemoteStorageException { + return withClassLoader(() -> delegate.fetchLogSegment(remoteLogSegmentMetadata, startPosition)); + } + + @Override + public InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, int startPosition, int endPosition) throws RemoteStorageException { + return withClassLoader(() -> delegate.fetchLogSegment(remoteLogSegmentMetadata, startPosition, endPosition)); + } + + @Override + public InputStream fetchIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata, IndexType indexType) throws RemoteStorageException { + return withClassLoader(() -> delegate.fetchIndex(remoteLogSegmentMetadata, indexType)); + } + + @Override + public void deleteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException { + withClassLoader(() -> { + delegate.deleteLogSegmentData(remoteLogSegmentMetadata); + return null; + }); + } + +} \ No newline at end of file diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageAction.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageAction.java new file mode 100644 index 000000000000..e3f5001b9493 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageAction.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +/** + * This interface is used to execute any remote storage/metadata related operation. + * + * @param return type for execute operation. + * @link RemoteStorageException} + */ +@FunctionalInterface +public interface RemoteStorageAction { + T execute() throws RemoteStorageException; +} From ef62b34ba687c45e4df05f85a2ef23db49f4a520 Mon Sep 17 00:00:00 2001 From: Satish Duggana Date: Mon, 19 Dec 2022 09:43:12 +0530 Subject: [PATCH 2/3] Refactored RemoteStorageAction as ClassLoaderAction with return type and exception type as suggested in the review. --- ...ssLoaderAwareRemoteLogMetadataManager.java | 20 +++++-------------- ...rageAction.java => ClassLoaderAction.java} | 8 ++++---- .../ClassLoaderAwareRemoteStorageManager.java | 11 ++++------ 3 files changed, 13 insertions(+), 26 deletions(-) rename storage/src/main/java/org/apache/kafka/server/log/remote/storage/{RemoteStorageAction.java => ClassLoaderAction.java} (84%) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java index bf79d605f142..36753d48a169 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java @@ -17,11 +17,11 @@ package org.apache.kafka.server.log.remote.metadata.storage; import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.server.log.remote.storage.ClassLoaderAction; import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; -import org.apache.kafka.server.log.remote.storage.RemoteStorageAction; import org.apache.kafka.server.log.remote.storage.RemoteStorageException; import java.io.IOException; @@ -87,7 +87,7 @@ public Iterator listRemoteLogSegments(TopicIdPartition @Override public void onPartitionLeadershipChanges(Set leaderPartitions, Set followerPartitions) { - withTryCatchClassLoader(() -> { + withClassLoader(() -> { delegate.onPartitionLeadershipChanges(leaderPartitions, followerPartitions); return null; }); @@ -95,7 +95,7 @@ public void onPartitionLeadershipChanges(Set leaderPartitions, @Override public void onStopPartitions(Set partitions) { - withTryCatchClassLoader(() -> { + withClassLoader(() -> { delegate.onStopPartitions(partitions); return null; }); @@ -103,7 +103,7 @@ public void onStopPartitions(Set partitions) { @Override public void configure(Map configs) { - withTryCatchClassLoader(() -> { + withClassLoader(() -> { delegate.configure(configs); return null; }); @@ -120,17 +120,7 @@ public void close() throws IOException { } } - @SuppressWarnings("UnusedReturnValue") - private T withTryCatchClassLoader(RemoteStorageAction action) { - try { - return withClassLoader(action); - } catch (final RemoteStorageException ex) { - // ignore, this exception is not thrown by the method. - } - return null; - } - - private T withClassLoader(RemoteStorageAction action) throws RemoteStorageException { + private T withClassLoader(ClassLoaderAction action) throws E { ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(loader); try { diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageAction.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAction.java similarity index 84% rename from storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageAction.java rename to storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAction.java index e3f5001b9493..c190e7e8e82b 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageAction.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAction.java @@ -19,10 +19,10 @@ /** * This interface is used to execute any remote storage/metadata related operation. * - * @param return type for execute operation. - * @link RemoteStorageException} + * @param return type for execute operation + * @param Exception type to be thrown */ @FunctionalInterface -public interface RemoteStorageAction { - T execute() throws RemoteStorageException; +public interface ClassLoaderAction { + T execute() throws E; } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java index 56dd01a46fce..a323e26b13aa 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java @@ -51,16 +51,13 @@ public void configure(Map configs) { @Override public void close() throws IOException { - ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(rsmClassLoader); - try { + withClassLoader(() -> { delegate.close(); - } finally { - Thread.currentThread().setContextClassLoader(originalClassLoader); - } + return null; + }); } - private T withClassLoader(RemoteStorageAction action) throws RemoteStorageException { + private T withClassLoader(ClassLoaderAction action) throws E { ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(rsmClassLoader); try { From 954f12f7cd053fbca31757176cbb61bbd07e1d0a Mon Sep 17 00:00:00 2001 From: Satish Duggana Date: Mon, 19 Dec 2022 14:56:12 +0530 Subject: [PATCH 3/3] Renamed ClassLoaderAction to StorageAction and moved to org.apache.kafka.server.log.internals package in storage module --- .../ClassLoaderAction.java => internals/StorageAction.java} | 6 +++--- .../storage/ClassLoaderAwareRemoteLogMetadataManager.java | 4 ++-- .../storage/ClassLoaderAwareRemoteStorageManager.java | 4 +++- 3 files changed, 8 insertions(+), 6 deletions(-) rename storage/src/main/java/org/apache/kafka/server/log/{remote/storage/ClassLoaderAction.java => internals/StorageAction.java} (82%) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAction.java b/storage/src/main/java/org/apache/kafka/server/log/internals/StorageAction.java similarity index 82% rename from storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAction.java rename to storage/src/main/java/org/apache/kafka/server/log/internals/StorageAction.java index c190e7e8e82b..434acd5eb881 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAction.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/StorageAction.java @@ -14,15 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.server.log.remote.storage; +package org.apache.kafka.server.log.internals; /** - * This interface is used to execute any remote storage/metadata related operation. + * This interface is used to execute any storage related operations. * * @param return type for execute operation * @param Exception type to be thrown */ @FunctionalInterface -public interface ClassLoaderAction { +public interface StorageAction { T execute() throws E; } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java index 36753d48a169..47c0942e5a2d 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java @@ -17,7 +17,7 @@ package org.apache.kafka.server.log.remote.metadata.storage; import org.apache.kafka.common.TopicIdPartition; -import org.apache.kafka.server.log.remote.storage.ClassLoaderAction; +import org.apache.kafka.server.log.internals.StorageAction; import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; @@ -120,7 +120,7 @@ public void close() throws IOException { } } - private T withClassLoader(ClassLoaderAction action) throws E { + private T withClassLoader(StorageAction action) throws E { ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(loader); try { diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java index a323e26b13aa..976a2a0d83f6 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.server.log.remote.storage; +import org.apache.kafka.server.log.internals.StorageAction; + import java.io.IOException; import java.io.InputStream; import java.util.Map; @@ -57,7 +59,7 @@ public void close() throws IOException { }); } - private T withClassLoader(ClassLoaderAction action) throws E { + private T withClassLoader(StorageAction action) throws E { ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(rsmClassLoader); try {