diff --git a/sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java b/sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java index 6cfc033ecfff..4c15845679b0 100644 --- a/sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java +++ b/sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java @@ -15,6 +15,7 @@ */ package io.juicefs; +import io.juicefs.utils.BgTaskUtil; import io.juicefs.utils.PatchUtil; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -29,8 +30,6 @@ import java.io.IOException; import java.net.URI; import java.security.PrivilegedExceptionAction; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /**************************************************************** @@ -44,7 +43,6 @@ public class JuiceFileSystem extends FilterFileSystem { private static boolean fileChecksumEnabled = false; private static boolean distcpPatched = false; - private ScheduledExecutorService emptier; private FileSystem emptierFs; static { @@ -70,16 +68,15 @@ private synchronized static void patchDistCpChecksum() { public void initialize(URI uri, Configuration conf) throws IOException { super.initialize(uri, conf); fileChecksumEnabled = Boolean.parseBoolean(getConf(conf, "file.checksum", "false")); - startTrashEmptier(uri, conf); + if (!Boolean.parseBoolean(getConf(conf, "disable-trash-emptier", "false"))) { + startTrashEmptier(uri, conf); + } } private void startTrashEmptier(URI uri, final Configuration conf) throws IOException { - emptier = Executors.newScheduledThreadPool(1, r -> { - Thread t = new Thread(r, "Trash Emptier"); - t.setDaemon(true); - return t; - }); - + if (BgTaskUtil.isRunning(uri.getHost(), "Trash emptier")) { + return; + } try { UserGroupInformation superUser = UserGroupInformation.createRemoteUser(getConf(conf, "superuser", "hdfs")); emptierFs = superUser.doAs((PrivilegedExceptionAction) () -> { @@ -87,7 +84,7 @@ private void startTrashEmptier(URI uri, final Configuration conf) throws IOExcep fs.initialize(uri, conf); return fs; }); - emptier.schedule(new Trash(emptierFs, conf).getEmptier(), 10, TimeUnit.MINUTES); + BgTaskUtil.startTrashEmptier(uri.getHost(), "Trash emptier", emptierFs, new Trash(emptierFs, conf).getEmptier(), TimeUnit.MINUTES.toMillis(10)); } catch (Exception e) { throw new IOException("start trash failed!",e); } @@ -151,13 +148,4 @@ public FileChecksum getFileChecksum(Path f) throws IOException { patchDistCpChecksum(); return super.getFileChecksum(f); } - - @Override - public void close() throws IOException { - if (this.emptier != null) { - emptier.shutdownNow(); - emptierFs.close(); - } - super.close(); - } } diff --git a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java index c5664936ae42..756a57703da0 100644 --- a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java +++ b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java @@ -18,6 +18,7 @@ import com.kenai.jffi.internal.StubLoader; import io.juicefs.exception.QuotaExceededException; import io.juicefs.metrics.JuiceFSInstrumentation; +import io.juicefs.utils.BgTaskUtil; import io.juicefs.utils.ConsistentHash; import io.juicefs.utils.NodesFetcher; import io.juicefs.utils.NodesFetcherBuilder; @@ -61,9 +62,7 @@ import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.*; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.jar.JarFile; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; @@ -93,8 +92,7 @@ public class JuiceFileSystemImpl extends FileSystem { private ConsistentHash hash = new ConsistentHash<>(1, Collections.singletonList("localhost")); private FsPermission uMask; private String hflushMethod; - private ScheduledExecutorService nodesFetcherThread; - private ScheduledExecutorService refreshUidThread; + private Map lastFileStatus = new HashMap<>(); private static final DirectBufferPool directBufferPool = new DirectBufferPool(); @@ -508,12 +506,7 @@ private void updateUidAndGrouping(String uidFile, String groupFile) { } private void refreshUidAndGrouping(String uidFile, String groupFile) { - refreshUidThread = Executors.newScheduledThreadPool(1, r -> { - Thread thread = new Thread(r, "Uid and group refresher"); - thread.setDaemon(true); - return thread; - }); - refreshUidThread.scheduleAtFixedRate(() -> { + BgTaskUtil.startScheduleTask(name, "Refresh guid", () -> { updateUidAndGrouping(uidFile, groupFile); }, 1, 1, TimeUnit.MINUTES); } @@ -712,12 +705,7 @@ private void initCache(Configuration conf) { } private void refreshCache(Configuration conf) { - nodesFetcherThread = Executors.newScheduledThreadPool(1, r -> { - Thread thread = new Thread(r, "Node fetcher"); - thread.setDaemon(true); - return thread; - }); - nodesFetcherThread.scheduleAtFixedRate(() -> { + BgTaskUtil.startScheduleTask(name, "Node fetcher", () -> { initCache(conf); }, 10, 10, TimeUnit.MINUTES); } @@ -1612,13 +1600,7 @@ public void setTimes(Path p, long mtime, long atime) throws IOException { @Override public void close() throws IOException { super.close(); - if (refreshUidThread != null) { - refreshUidThread.shutdownNow(); - } lib.jfs_term(Thread.currentThread().getId(), handle); - if (nodesFetcherThread != null) { - nodesFetcherThread.shutdownNow(); - } if (metricsEnable) { JuiceFSInstrumentation.close(); } diff --git a/sdk/java/src/main/java/io/juicefs/utils/BgTaskUtil.java b/sdk/java/src/main/java/io/juicefs/utils/BgTaskUtil.java new file mode 100644 index 000000000000..efc37cad56b4 --- /dev/null +++ b/sdk/java/src/main/java/io/juicefs/utils/BgTaskUtil.java @@ -0,0 +1,106 @@ +/* + * JuiceFS, Copyright 2023 Juicedata, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.juicefs.utils; + +import org.apache.hadoop.fs.FileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class BgTaskUtil { + private static final Logger LOG = LoggerFactory.getLogger(BgTaskUtil.class); + + private static BgTaskUtil staticFieldForGc = new BgTaskUtil(); + + private BgTaskUtil() { + } + + private static final ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(2, r -> { + Thread thread = new Thread(r, "Background Task"); + thread.setDaemon(true); + return thread; + }); + // use timer to run trash emptier because it will occupy a thread + private static final List timers = new ArrayList<>(); + private static final List fileSystems = new ArrayList<>(); + private static Set runningBgTask = new HashSet<>(); + + public static void startScheduleTask(String name, String type, Runnable task, long initialDelay, long period, TimeUnit unit) { + synchronized (runningBgTask) { + if (isRunning(name, type)) { + return; + } + threadPool.scheduleAtFixedRate(() -> { + try { + task.run(); + } catch (Exception e) { + LOG.error("Background task failed", e); + } + }, initialDelay, period, unit); + runningBgTask.add(genKey(name, type)); + } + } + + + public static void startTrashEmptier(String name, String type, FileSystem fs, Runnable emptierTask, long delay) { + synchronized (runningBgTask) { + if (isRunning(name, type)) { + return; + } + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + emptierTask.run(); + } + }, delay); + runningBgTask.add(genKey(name, type)); + timers.add(timer); + fileSystems.add(fs); + } + } + + public static boolean isRunning(String name, String type) { + synchronized (runningBgTask) { + return runningBgTask.contains(genKey(name, type)); + } + } + + private static String genKey(String name, String type) { + return name + "|" + type; + } + + @Override + protected void finalize() { + threadPool.shutdownNow(); + for (Timer timer : timers) { + timer.cancel(); + timer.purge(); + } + for (FileSystem fs : fileSystems) { + try { + fs.close(); + } catch (IOException e) { + LOG.warn("close trash emptier fs failed", e); + } + } + } +}