-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update TrinoFileSystemCache to represent latest hadoop implementation #13243
Conversation
e904560
to
2346c4f
Compare
2346c4f
to
5dd03bb
Compare
Todo -
|
a343cd2
to
a90461f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some initial comments
int maxSize = conf.getInt("fs.cache.max-size", 1000); | ||
FileSystemHolder fileSystemHolder; | ||
try { | ||
fileSystemHolder = cache.compute(key, (k, currFileSystemHolder) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: currFileSystemHolder -> currentFileSystemHolder
lib/trino-hdfs/src/main/java/io/trino/hdfs/TrinoFileSystemCache.java
Outdated
Show resolved
Hide resolved
@@ -306,23 +313,49 @@ public String toString() | |||
|
|||
private static class FileSystemHolder | |||
{ | |||
private final FileSystem fileSystem; | |||
private final URI uri; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to store uri/conf here? I think we should be able to put them in createFileSystemOnce right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thought process was to keep uri and conf provided by the thread who created the FileSystemHolder key - which is what happens in existing implementation. createFileSystemOnce()
could be called by a different thread having a different uri and/or conf object due to the original thread being scheduled out of execution by operating system just before invoking createFileSystemOnce()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That shouldn't be a concern right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, should be fine, will update - we can avoid storing uri/conf in FileSystemHolder
with this change.
} | ||
}); | ||
|
||
fileSystemHolder.createFileSystemOnce(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment here why this is outside of the cache compute? Seems like that's an important piece of why this works
a90461f
to
ab05c55
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation looks good to me, but IMO would be useful to get some more 👀 on this as well since the change is a bit involved.
@@ -70,17 +68,13 @@ | |||
|
|||
private final TrinoFileSystemCacheStats stats; | |||
|
|||
@GuardedBy("this") | |||
private final Map<FileSystemKey, FileSystemHolder> map = new HashMap<>(); | |||
private final Map<FileSystemKey, FileSystemHolder> cache = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a comment saying why we need cacheSize
separately.
@@ -306,23 +313,49 @@ public String toString() | |||
|
|||
private static class FileSystemHolder | |||
{ | |||
private final FileSystem fileSystem; | |||
private final URI uri; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That shouldn't be a concern right ?
int maxCacheSize = 1000; | ||
for (int i = 0; i < maxCacheSize; ++i) { | ||
assertEquals(TrinoFileSystemCache.INSTANCE.getFileSystemCacheStats().getCacheSize(), i); | ||
getFileSystem(environment, ConnectorIdentity.ofUser("user" + String.valueOf(i))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String.valueOf seems redundant
} | ||
assertEquals(TrinoFileSystemCache.INSTANCE.getFileSystemCacheStats().getCacheSize(), maxCacheSize); | ||
|
||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use assertThatThrownBy
new ImpersonatingHdfsAuthentication(new SimpleHadoopAuthentication(), new SimpleUserNameProvider())); | ||
|
||
int maxCacheSize = 1000; | ||
for (int i = 0; i < maxCacheSize; ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i++ is followed generally in codebase
void consume(FileSystem fileSystem) throws IOException; | ||
} | ||
|
||
private static class FileSystemCloser |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this can be lambda
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use a lambda, we get the error Hadoop FileSystem instances are shared and should not be closed
. Had to add FileSystemCloser
and annotate its consume()
method with @SuppressModernizer
to fix this.
} | ||
} | ||
|
||
// A callable that creates (and consumes) filesystem objects X times for Y users |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: may be just use the variable names for comment
new FileSystemCloser())); | ||
} | ||
|
||
FileSystem.closeAll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be in @BeforeMethod
|
||
CreateFileSystemAndConsume(SplittableRandom random, int numUsers, int numGetCallsPerInvocation, FileSystemConsumer consumer) | ||
{ | ||
this.random = random; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: null check since this is also exposed outside of the class
cc @electrum |
ab05c55
to
2144552
Compare
2144552
to
5fc7018
Compare
please rebase |
5fc7018
to
53a3ed2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Final set of comments - looks good to me
try { | ||
fileSystemHolder = cache.compute(key, (k, currentFileSystemHolder) -> { | ||
if (currentFileSystemHolder == null) { | ||
if (cacheSize.getAndUpdate(currentSize -> currentSize < maxSize ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be simpler to write the following
cacheSize.getAndUpdate(currentSize -> Math.min(currentSize + 1, maxSize) == maxSize)
public static class BenchmarkData | ||
{ | ||
@Param({"10", "100", "1000"}) | ||
private int numUsers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: userCount
, threadCount
, getCallsPerInvocation
to avoid abbreviations;
throws IOException | ||
{ | ||
TrinoFileSystemCache.INSTANCE.closeAll(); | ||
executor.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shutdownNow()
as we do not need to wait here
throws InterruptedException, ExecutionException | ||
{ | ||
List<Future<Void>> futures = data.executor.invokeAll(data.callableTasks); | ||
for (Future<Void> fut : futures) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be simplified (simile to other comment)
@@ -70,6 +70,7 @@ public void tearDown() | |||
closeAll( | |||
() -> fs.delete(new Path(tempRoot.toURI()), true), | |||
fs); | |||
fs = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this a related change ? Can we keep this in a separate commit ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This came in as part of bringing trino-testing-services
dependency into trino-hdfs
. Resource deallocation check added via ManageTestResources
(in PR #15165) got activated in lib/trino-hdfs
resulting in this test failure here. Adding fs = null;
fixes this, as was the case with similar changes in the PR referenced. Can move this to a different commit
List<Future<Void>> futures = executor.invokeAll(callableTasks); | ||
for (Future<Void> fut : futures) { | ||
fut.get(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executor.invokeAll(callableTasks).forEach(MoreFutures::getFutureValue);
private final int numGetCallsPerInvocation; | ||
private final FileSystemConsumer consumer; | ||
|
||
private HdfsEnvironment environment = new HdfsEnvironment( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private static final
this.numUsers = numUsers; | ||
this.numGetCallsPerInvocation = numGetCallsPerInvocation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about naming
- Use ConcurrentHashMap to cache filesystem objects - improves concurrency by removing synchronized blocks - Filesystem object is created outside cache's lock - similar to latest hadoop fs cache impl, further reducing code in critical section. Helps with systems where filesystem creation is expensive. - Only one thread exclusively creates the filesystem object for a given key. Avoids speculative creation and then later discarding of filesystem objects compared to hadoop fs cache impl.
53a3ed2
to
f7f8460
Compare
Motivation
Hadoop's implementation of filesystem cache (hadoop 3.2 filesystem cache) creates the filesystem object outside of synchronized block. A side effect to this (in addition to reduced locking duration for slow-to-create filesystem implementations) is that there is no interaction between the lock in caching infrastructure and locks internal to a filesystem implementation (during filesystem object creation). We would like to bring this approach to
TrinoFileSystemCache
A secondary motivation is to improve the concurrency of
TrinoFileSystemCache
operations by avoiding synchronized blocks.Description
concurrency by removing synchronized blocks
hadoop fs cache impl, further reducing code in critical section.
Helps with systems where filesystem creation is expensive.
given key. Avoids speculative creation and then later discarding of
filesystem objects compared to hadoop fs cache impl.
There is a more recent update in hadoop 3.3.x branch that limits the number of parallel filesystem object creations using a semaphore. Looking at the description of the issue (HADOOP-17313), it seems to be created as a workaround for speculative-create-and-discard approach used in hadoop implementation which this code avoids.
Benchmark output -
Before:
After:
(above results are from an 8 core intel macbook pro)
Improvement
Update the implementation of TrinoFileSystemCache class in Hive connector.
Bring
TrinoFileSystemCache
implementation inline with latest hadoop implementation and improve cache performance in the process.Related issues, pull requests, and links
Documentation
( ) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
( ) No release notes entries required.
( ) Release notes entries required with the following suggested text: