Skip to content

Commit

Permalink
Update TrinoFileSystemCache to represent latest hadoop implementation
Browse files Browse the repository at this point in the history
 - Use ConcurrentHashMap to cache filesystem objects - improves
   concurrency by removing synchronized blocks
 - Filesystem object is created outside cache's lock - similar to latest
   hadoop fs cache impl, further reducing code in critical section.
   Helps with systems where filesystem creation is expensive.
 - Only one thread exclusively creates the filesystem object for a
   given key. Avoids speculative creation and then later discarding of
   filesystem objects compared to hadoop fs cache impl.
  • Loading branch information
jitheshtr committed Nov 18, 2022
1 parent 9884184 commit a90461f
Show file tree
Hide file tree
Showing 4 changed files with 351 additions and 71 deletions.
18 changes: 18 additions & 0 deletions lib/trino-hdfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@
</dependency>

<!-- for testing -->
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing-services</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>testing</artifactId>
Expand All @@ -111,6 +117,18 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down
174 changes: 104 additions & 70 deletions lib/trino-hdfs/src/main/java/io/trino/hdfs/TrinoFileSystemCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.hdfs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.log.Logger;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -36,21 +35,20 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.gaul.modernizer_maven_annotations.SuppressModernizer;

import javax.annotation.concurrent.GuardedBy;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
Expand All @@ -70,17 +68,13 @@ public class TrinoFileSystemCache

private final TrinoFileSystemCacheStats stats;

@GuardedBy("this")
private final Map<FileSystemKey, FileSystemHolder> map = new HashMap<>();
private final Map<FileSystemKey, FileSystemHolder> cache = new ConcurrentHashMap<>();
private final AtomicLong cacheSize = new AtomicLong();

@VisibleForTesting
TrinoFileSystemCache()
{
this.stats = new TrinoFileSystemCacheStats(() -> {
synchronized (this) {
return map.size();
}
});
this.stats = new TrinoFileSystemCacheStats(cache::size);
}

@Override
Expand All @@ -102,55 +96,45 @@ public FileSystem getUnique(URI uri, Configuration conf)
@VisibleForTesting
int getCacheSize()
{
return map.size();
return cache.size();
}

private synchronized FileSystem getInternal(URI uri, Configuration conf, long unique)
private FileSystem getInternal(URI uri, Configuration conf, long unique)
throws IOException
{
UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
FileSystemKey key = createFileSystemKey(uri, userGroupInformation, unique);
Set<?> privateCredentials = getPrivateCredentials(userGroupInformation);

FileSystemHolder fileSystemHolder = map.get(key);
if (fileSystemHolder == null) {
int maxSize = conf.getInt("fs.cache.max-size", 1000);
if (map.size() >= maxSize) {
stats.newGetCallFailed();
throw new IOException(format("FileSystem max cache size has been reached: %s", maxSize));
}
try {
FileSystem fileSystem = createFileSystem(uri, conf);
fileSystemHolder = new FileSystemHolder(fileSystem, privateCredentials);
map.put(key, fileSystemHolder);
}
catch (IOException e) {
stats.newGetCallFailed();
throw e;
}
int maxSize = conf.getInt("fs.cache.max-size", 1000);
FileSystemHolder fileSystemHolder;
try {
fileSystemHolder = cache.compute(key, (k, currFileSystemHolder) -> {
if (currFileSystemHolder == null) {
if (cacheSize.getAndUpdate(curr -> curr < maxSize ? (curr + 1) : curr) >= maxSize) {
throw new RuntimeException(
new IOException(format("FileSystem max cache size has been reached: %s", maxSize)));
}
return new FileSystemHolder(uri, conf, privateCredentials);
}
else {
// Update file system instance when credentials change.
if (currFileSystemHolder.credentialsChanged(uri, conf, privateCredentials)) {
return new FileSystemHolder(uri, conf, privateCredentials);
}
else {
return currFileSystemHolder;
}
}
});

fileSystemHolder.createFileSystemOnce();
}

// Update file system instance when credentials change.
// - Private credentials are only set when using Kerberos authentication.
// When the user is the same, but the private credentials are different,
// that means that Kerberos ticket has expired and re-login happened.
// To prevent cache leak in such situation, the privateCredentials are not
// a part of the FileSystemKey, but part of the FileSystemHolder. When a
// Kerberos re-login occurs, re-create the file system and cache it using
// the same key.
// - Extra credentials are used to authenticate with certain file systems.
if ((isHdfs(uri) && !fileSystemHolder.getPrivateCredentials().equals(privateCredentials)) ||
extraCredentialsChanged(fileSystemHolder.getFileSystem(), conf)) {
map.remove(key);
try {
FileSystem fileSystem = createFileSystem(uri, conf);
fileSystemHolder = new FileSystemHolder(fileSystem, privateCredentials);
map.put(key, fileSystemHolder);
}
catch (IOException e) {
stats.newGetCallFailed();
throw e;
}
catch (RuntimeException | IOException e) {
stats.newGetCallFailed();
throwIfInstanceOf(e, IOException.class);
throwIfInstanceOf(e.getCause(), IOException.class);
throw e;
}

return fileSystemHolder.getFileSystem();
Expand Down Expand Up @@ -178,20 +162,49 @@ private static FileSystem createFileSystem(URI uri, Configuration conf)
}

@Override
public synchronized void remove(FileSystem fileSystem)
public void remove(FileSystem fileSystem)
{
stats.newRemoveCall();
map.values().removeIf(holder -> holder.getFileSystem().equals(fileSystem));
cache.forEach((key, holder) -> {
if (fileSystem.equals(holder.getFileSystem())) {
// decrement cacheSize only if the key is
// still mapped after acquiring the lock
cache.compute(key, (k, currFileSystemHolder) -> {
if (currFileSystemHolder != null) {
cacheSize.decrementAndGet();
}
return null;
});
}
});
}

@Override
public synchronized void closeAll()
public void closeAll()
throws IOException
{
for (FileSystemHolder fileSystemHolder : ImmutableList.copyOf(map.values())) {
closeFileSystem(fileSystemHolder.getFileSystem());
try {
cache.forEach((key, holder) -> {
try {
cache.compute(key, (k, currFileSystemHolder) -> {
if (currFileSystemHolder != null) {
cacheSize.decrementAndGet();
}
return null;
});
if (holder.getFileSystem() != null) {
closeFileSystem(holder.getFileSystem());
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
});
}
catch (RuntimeException e) {
throwIfInstanceOf(e.getCause(), IOException.class);
throw e;
}
map.clear();
}

@SuppressModernizer
Expand Down Expand Up @@ -245,12 +258,6 @@ private static boolean isHdfs(URI uri)
return "hdfs".equals(scheme) || "viewfs".equals(scheme);
}

private static boolean extraCredentialsChanged(FileSystem fileSystem, Configuration configuration)
{
return !configuration.get(CACHE_KEY, "").equals(
fileSystem.getConf().get(CACHE_KEY, ""));
}

private static class FileSystemKey
{
private final String scheme;
Expand Down Expand Up @@ -306,23 +313,49 @@ public String toString()

private static class FileSystemHolder
{
private final FileSystem fileSystem;
private final URI uri;
private final Configuration conf;
private final Set<?> privateCredentials;
private final String cacheCredentials;
private volatile FileSystem fileSystem;

public FileSystemHolder(FileSystem fileSystem, Set<?> privateCredentials)
public FileSystemHolder(URI uri, Configuration conf, Set<?> privateCredentials)
{
this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
this.uri = requireNonNull(uri, "uri is null");
this.conf = requireNonNull(conf, "conf is null");
this.privateCredentials = ImmutableSet.copyOf(requireNonNull(privateCredentials, "privateCredentials is null"));
this.cacheCredentials = conf.get(CACHE_KEY, "");
}

public FileSystem getFileSystem()
public void createFileSystemOnce()
throws IOException
{
return fileSystem;
if (fileSystem == null) {
synchronized (this) {
if (fileSystem == null) {
fileSystem = TrinoFileSystemCache.createFileSystem(uri, conf);
}
}
}
}

public Set<?> getPrivateCredentials()
public boolean credentialsChanged(URI newUri, Configuration newConf, Set<?> newPrivateCredentials)
{
return privateCredentials;
// - Private credentials are only set when using Kerberos authentication.
// When the user is the same, but the private credentials are different,
// that means that Kerberos ticket has expired and re-login happened.
// To prevent cache leak in such situation, the privateCredentials are not
// a part of the FileSystemKey, but part of the FileSystemHolder. When a
// Kerberos re-login occurs, re-create the file system and cache it using
// the same key.
// - Extra credentials are used to authenticate with certain file systems.
return (isHdfs(newUri) && !this.privateCredentials.equals(newPrivateCredentials))
|| !this.cacheCredentials.equals(newConf.get(CACHE_KEY, ""));
}

public FileSystem getFileSystem()
{
return fileSystem;
}

@Override
Expand All @@ -331,6 +364,7 @@ public String toString()
return toStringHelper(this)
.add("fileSystem", fileSystem)
.add("privateCredentials", privateCredentials)
.add("cacheCredentials", cacheCredentials)
.toString();
}
}
Expand Down
Loading

0 comments on commit a90461f

Please sign in to comment.