From 17e7a3abe9ca4aaebb6c2839a7414e79b7c5927b Mon Sep 17 00:00:00 2001 From: thu-david Date: Fri, 14 Jun 2024 08:31:17 +0800 Subject: [PATCH 01/10] update multipartUpload --- .../main/java/alluxio/conf/PropertyKey.java | 32 +++ .../underfs/tos/TOSLowLevelOutputStream.java | 203 ++++++++++++++++++ .../underfs/tos/TOSUnderFileSystem.java | 20 ++ 3 files changed, 255 insertions(+) create mode 100644 underfs/tos/src/main/java/alluxio/underfs/tos/TOSLowLevelOutputStream.java diff --git a/core/common/src/main/java/alluxio/conf/PropertyKey.java b/core/common/src/main/java/alluxio/conf/PropertyKey.java index d8272f4d309e..7bfa5680c129 100755 --- a/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -1707,6 +1707,25 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.SERVER) .build(); + public static final PropertyKey UNDERFS_TOS_STREAMING_UPLOAD_ENABLED = + booleanBuilder(Name.UNDERFS_TOS_STREAMING_UPLOAD_ENABLED) + .setAlias("alluxio.underfs.tos.streaming.upload.enabled") + .setDefaultValue(false) + .setDescription("(Experimental) If true, using streaming upload to write to S3.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) + .setScope(Scope.SERVER) + .build(); + public static final PropertyKey UNDERFS_TOS_STREAMING_UPLOAD_PARTITION_SIZE = + dataSizeBuilder(Name.UNDERFS_TOS_STREAMING_UPLOAD_PARTITION_SIZE) + .setAlias("alluxio.underfs.tos.streaming.upload.partition.size") + .setDefaultValue("64MB") + .setDescription("Maximum allowable size of a single buffer file when using " + + "TOS streaming upload. When the buffer file reaches the partition size, " + + "it will be uploaded and the upcoming data will write to other buffer files." + + "If the partition size is too small, TOS upload speed might be affected. ") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.SERVER) + .build(); // UFS access control related properties // @@ -2032,6 +2051,13 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.SERVER) .build(); + public static final PropertyKey UNDERFS_TOS_STREAMING_UPLOAD_THREADS = + intBuilder(Name.UNDERFS_TOS_STREAMING_UPLOAD_THREADS) + .setDefaultValue(20) + .setDescription("the number of threads to use for streaming upload data to OSS.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.SERVER) + .build(); // // Mount table related properties // @@ -8045,6 +8071,12 @@ public static final class Name { "alluxio.underfs.obs.streaming.upload.partition.size"; public static final String UNDERFS_OBS_STREAMING_UPLOAD_THREADS = "alluxio.underfs.obs.streaming.upload.threads"; + public static final String UNDERFS_TOS_STREAMING_UPLOAD_ENABLED = + "alluxio.underfs.tos.streaming.upload.enabled"; + public static final String UNDERFS_TOS_STREAMING_UPLOAD_PARTITION_SIZE = + "alluxio.underfs.tos.streaming.upload.partition.size"; + public static final String UNDERFS_TOS_STREAMING_UPLOAD_THREADS = + "alluxio.underfs.tos.streaming.upload.threads"; // // UFS access control related properties diff --git a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSLowLevelOutputStream.java b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSLowLevelOutputStream.java new file mode 100644 index 000000000000..a8845b3beda8 --- /dev/null +++ b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSLowLevelOutputStream.java @@ -0,0 +1,203 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.underfs.tos; + +import alluxio.conf.AlluxioConfiguration; +import alluxio.conf.PropertyKey; +import alluxio.underfs.ObjectLowLevelOutputStream; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.volcengine.tos.TOSV2; +import com.volcengine.tos.TosClientException; +import com.volcengine.tos.comm.io.TosRepeatableBoundedFileInputStream; +import com.volcengine.tos.model.object.AbortMultipartUploadInput; +import com.volcengine.tos.model.object.CompleteMultipartUploadV2Input; +import com.volcengine.tos.model.object.CompleteMultipartUploadV2Output; +import com.volcengine.tos.model.object.CreateMultipartUploadInput; +import com.volcengine.tos.model.object.CreateMultipartUploadOutput; +import com.volcengine.tos.model.object.PutObjectInput; +import com.volcengine.tos.model.object.UploadPartV2Input; +import com.volcengine.tos.model.object.UploadPartV2Output; +import com.volcengine.tos.model.object.UploadedPartV2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; + +/** + * Object storage low output stream for TOS. + */ +@NotThreadSafe +public class TOSLowLevelOutputStream extends ObjectLowLevelOutputStream { + private static final Logger LOG = LoggerFactory.getLogger(TOSLowLevelOutputStream.class); + + /** + * The TOS client to interact with TOS. + */ + protected TOSV2 mClient; + /** + * Tags for the uploaded part, provided by TOS after uploading. + */ + private final List mTags = Collections.synchronizedList(new ArrayList<>()); + + /** + * The upload id of this multipart upload. + */ + protected volatile String mUploadId; + + private String mContentHash; + + /** + * Constructs a new stream for writing a file. + * + * @param bucketName the name of the bucket + * @param key the key of the file + * @param tosClient the TOS client to upload the file with + * @param executor a thread pool executor + * @param ufsConf the object store under file system configuration + */ + public TOSLowLevelOutputStream(String bucketName, String key, TOSV2 tosClient, + ListeningExecutorService executor, + AlluxioConfiguration ufsConf) { + super(bucketName, key, executor, + ufsConf.getBytes(PropertyKey.UNDERFS_TOS_STREAMING_UPLOAD_PARTITION_SIZE), ufsConf); + mClient = Preconditions.checkNotNull(tosClient); + } + + @Override + protected void abortMultiPartUploadInternal() throws IOException { + try { + AbortMultipartUploadInput input = new AbortMultipartUploadInput().setBucket(mBucketName) + .setKey(mKey).setUploadID(mUploadId); + getClient().abortMultipartUpload(input); + } catch (TosClientException e) { + LOG.debug("failed to abort multi part upload. upload id: {}", mUploadId, e); + throw new IOException(String.format( + "failed to upload part. key: %s uploadId: %s", + mKey, mUploadId), e); + } + } + + @Override + protected void uploadPartInternal( + File file, + int partNumber, + boolean isLastPart, + @Nullable String md5) + throws IOException { + long fileSize = file.length(); + long offset = partNumber * (mPartitionSize - 1); + long partSize = mPartitionSize; + try (FileInputStream content = new FileInputStream(file)) { + content.skip(offset); + InputStream wrappedContent = new TosRepeatableBoundedFileInputStream(content, mPartitionSize); + if (fileSize - offset < mPartitionSize) { + partSize = fileSize - offset; + } + final UploadPartV2Input input = new UploadPartV2Input() + .setBucket(mBucketName) + .setKey(mKey) + .setUploadID(mUploadId) + .setPartNumber(partNumber) + .setContentLength(partSize) + .setContent(wrappedContent); + UploadPartV2Output output = getClient().uploadPart(input); + mTags.add(new UploadedPartV2().setPartNumber(partNumber).setEtag(output.getEtag())); + } catch (IOException e) { + LOG.debug("failed to upload part.", e); + throw new IOException(String.format( + "failed to upload part. key: %s part number: %s uploadId: %s", + mKey, partNumber, mUploadId), e); + } + } + + @Override + protected void initMultiPartUploadInternal() throws IOException { + try { + CreateMultipartUploadInput create = + new CreateMultipartUploadInput().setBucket(mBucketName).setKey(mKey); + CreateMultipartUploadOutput output = getClient().createMultipartUpload(create); + mUploadId = output.getUploadID(); + } catch (TosClientException e) { + LOG.debug("failed to init multi part upload", e); + throw new IOException("failed to init multi part upload", e); + } + } + + @Override + protected void completeMultiPartUploadInternal() throws IOException { + try { + LOG.debug("complete multi part {}", mUploadId); + CompleteMultipartUploadV2Input complete = new CompleteMultipartUploadV2Input() + .setBucket(mBucketName) + .setKey(mKey) + .setUploadID(mUploadId); + complete.setUploadedParts(mTags); + CompleteMultipartUploadV2Output completedOutput = + getClient().completeMultipartUpload(complete); + mContentHash = completedOutput.getEtag(); + } catch (TosClientException e) { + LOG.debug("failed to complete multi part upload", e); + throw new IOException( + String.format("failed to complete multi part upload, key: %s, upload id: %s", + mKey, mUploadId) + e); + } + } + + @Override + protected void createEmptyObject(String key) throws IOException { + try { + PutObjectInput putObjectInput = new PutObjectInput() + .setBucket(mBucketName) + .setKey(key) + .setContent(new ByteArrayInputStream(new byte[0])) + .setContentLength(0); + mContentHash = getClient().putObject(putObjectInput).getEtag(); + } catch (TosClientException e) { + throw new IOException(e); + } + } + + @Override + protected void putObject(String key, File file, @Nullable String md5) throws IOException { + try { + PutObjectInput putObjectInput = new PutObjectInput() + .setBucket(mBucketName) + .setKey(key) + .setContent(new FileInputStream(file)) + .setContentLength(0); + mContentHash = getClient().putObject(putObjectInput).getEtag(); + } catch (TosClientException e) { + throw new IOException(e); + } + } + + @Override + public Optional getContentHash() { + return Optional.ofNullable(mContentHash); + } + + protected TOSV2 getClient() { + return mClient; + } +} diff --git a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java index f9c2300a15e5..d1f43c7ce51d 100644 --- a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java +++ b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java @@ -20,9 +20,13 @@ import alluxio.underfs.UnderFileSystemConfiguration; import alluxio.underfs.options.OpenOptions; import alluxio.util.UnderFileSystemUtils; +import alluxio.util.executor.ExecutorServiceFactories; import alluxio.util.io.PathUtils; import com.google.common.base.Preconditions; +import com.google.common.base.Suppliers; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.volcengine.tos.TOSV2; import com.volcengine.tos.TOSV2ClientBuilder; import com.volcengine.tos.TosClientException; @@ -54,6 +58,8 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -80,6 +86,8 @@ public class TOSUnderFileSystem extends ObjectUnderFileSystem { */ private final String mBucketName; + private final Supplier mStreamingUploadExecutor; + /** * Constructs a new instance of {@link TOSUnderFileSystem}. * @@ -119,6 +127,14 @@ protected TOSUnderFileSystem(AlluxioURI uri, @Nullable TOSV2 tosClient, String b super(uri, conf); mClient = tosClient; mBucketName = bucketName; + mStreamingUploadExecutor = Suppliers.memoize(() -> { + int numTransferThreads = + conf.getInt(PropertyKey.UNDERFS_TOS_STREAMING_UPLOAD_THREADS); + ExecutorService service = ExecutorServiceFactories + .fixedThreadPool("alluxio-tos-streaming-upload-worker", + numTransferThreads).create(); + return MoreExecutors.listeningDecorator(service); + }); } @Override @@ -170,6 +186,10 @@ public boolean createEmptyObject(String key) { @Override protected OutputStream createObject(String key) throws IOException { + if (mUfsConf.getBoolean(PropertyKey.UNDERFS_TOS_STREAMING_UPLOAD_ENABLED)) { + return new TOSLowLevelOutputStream(mBucketName, key, mClient, + mStreamingUploadExecutor.get(), mUfsConf); + } return new TOSOutputStream(mBucketName, key, mClient, mUfsConf.getList(PropertyKey.TMP_DIRS)); } From 4f76074063ec34946688a17175b9f1715244d99e Mon Sep 17 00:00:00 2001 From: thu-david Date: Fri, 14 Jun 2024 15:54:29 +0800 Subject: [PATCH 02/10] Update TOSLowLevelOutputStream.java --- .../underfs/tos/TOSLowLevelOutputStream.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSLowLevelOutputStream.java b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSLowLevelOutputStream.java index a8845b3beda8..d42965e04a8c 100644 --- a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSLowLevelOutputStream.java +++ b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSLowLevelOutputStream.java @@ -37,6 +37,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -106,13 +107,11 @@ protected void uploadPartInternal( @Nullable String md5) throws IOException { long fileSize = file.length(); - long offset = partNumber * (mPartitionSize - 1); long partSize = mPartitionSize; try (FileInputStream content = new FileInputStream(file)) { - content.skip(offset); InputStream wrappedContent = new TosRepeatableBoundedFileInputStream(content, mPartitionSize); - if (fileSize - offset < mPartitionSize) { - partSize = fileSize - offset; + if (isLastPart) { + partSize = fileSize; } final UploadPartV2Input input = new UploadPartV2Input() .setBucket(mBucketName) @@ -180,14 +179,14 @@ protected void createEmptyObject(String key) throws IOException { @Override protected void putObject(String key, File file, @Nullable String md5) throws IOException { - try { + try (InputStream content = Files.newInputStream(file.toPath())) { PutObjectInput putObjectInput = new PutObjectInput() .setBucket(mBucketName) .setKey(key) - .setContent(new FileInputStream(file)) - .setContentLength(0); + .setContent(content) + .setContentLength(file.length()); // Set the correct content length mContentHash = getClient().putObject(putObjectInput).getEtag(); - } catch (TosClientException e) { + } catch (IOException e) { throw new IOException(e); } } From b83d0f247b8987ac6583b90ecc24f4b334084acf Mon Sep 17 00:00:00 2001 From: thu-david Date: Fri, 14 Jun 2024 16:11:59 +0800 Subject: [PATCH 03/10] update --- core/common/src/main/java/alluxio/conf/PropertyKey.java | 4 ++-- .../main/java/alluxio/underfs/ObjectLowLevelOutputStream.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/common/src/main/java/alluxio/conf/PropertyKey.java b/core/common/src/main/java/alluxio/conf/PropertyKey.java index af5daca71e68..1195851bdce9 100755 --- a/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -1718,7 +1718,7 @@ public String toString() { booleanBuilder(Name.UNDERFS_TOS_STREAMING_UPLOAD_ENABLED) .setAlias("alluxio.underfs.tos.streaming.upload.enabled") .setDefaultValue(false) - .setDescription("(Experimental) If true, using streaming upload to write to S3.") + .setDescription("(Experimental) If true, using streaming upload to write to TOS.") .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.SERVER) .build(); @@ -2061,7 +2061,7 @@ public String toString() { public static final PropertyKey UNDERFS_TOS_STREAMING_UPLOAD_THREADS = intBuilder(Name.UNDERFS_TOS_STREAMING_UPLOAD_THREADS) .setDefaultValue(20) - .setDescription("the number of threads to use for streaming upload data to OSS.") + .setDescription("the number of threads to use for streaming upload data to TOS.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.SERVER) .build(); diff --git a/core/common/src/main/java/alluxio/underfs/ObjectLowLevelOutputStream.java b/core/common/src/main/java/alluxio/underfs/ObjectLowLevelOutputStream.java index a5d3bd1a1ebb..f2211c843251 100644 --- a/core/common/src/main/java/alluxio/underfs/ObjectLowLevelOutputStream.java +++ b/core/common/src/main/java/alluxio/underfs/ObjectLowLevelOutputStream.java @@ -330,7 +330,7 @@ protected void uploadPart(File file, int partNumber, boolean lastPart) { }; ListenableFuture futureTag = mExecutor.submit(callable); mFutures.add(futureTag); - LOG.debug( + LOG.info( "Submit upload part request. key={}, partNum={}, file={}, fileSize={}, lastPart={}.", mKey, partNumber, file.getPath(), file.length(), lastPart); } From 0eaee66b0486e037879dca13ec6a66d69a180582 Mon Sep 17 00:00:00 2001 From: thu-david Date: Wed, 19 Jun 2024 15:37:16 +0800 Subject: [PATCH 04/10] Support more config in TOS --- .../main/java/alluxio/conf/PropertyKey.java | 498 +++++++++++------- .../underfs/tos/TOSUnderFileSystem.java | 67 ++- 2 files changed, 376 insertions(+), 189 deletions(-) diff --git a/core/common/src/main/java/alluxio/conf/PropertyKey.java b/core/common/src/main/java/alluxio/conf/PropertyKey.java index 1195851bdce9..02072625ba5d 100755 --- a/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -94,19 +94,29 @@ @ThreadSafe @PublicApi public final class PropertyKey implements Comparable { - /** Regex string to find "${key}" for variable substitution. */ + /** + * Regex string to find "${key}" for variable substitution. + */ public static final String REGEX_STRING = "(\\$\\{([^{}]*)\\})"; - /** Regex to find ${key} for variable substitution. */ + /** + * Regex to find ${key} for variable substitution. + */ public static final Pattern CONF_REGEX = Pattern.compile(REGEX_STRING); private static final Logger LOG = LoggerFactory.getLogger(PropertyKey.class); // The following two maps must be the first to initialize within this file. - /** A map from default property key's string name to the key. */ + /** + * A map from default property key's string name to the key. + */ private static final Map DEFAULT_KEYS_MAP = new ConcurrentHashMap<>(); - /** A map from default property key's alias to the key. */ + /** + * A map from default property key's alias to the key. + */ private static final Map DEFAULT_ALIAS_MAP = new ConcurrentHashMap<>(); - /** A cache storing result for template regexp matching results. */ + /** + * A cache storing result for template regexp matching results. + */ private static final Cache REGEXP_CACHE = CacheBuilder.newBuilder() .maximumSize(1024) .build(); @@ -282,7 +292,7 @@ public static Builder stringBuilder(String name) { } /** - * @param name name of the property + * @param name name of the property * @param enumType enum class of the property * @return a Builder for enum properties */ @@ -339,29 +349,29 @@ private Builder( /** * @param template template for the property name - * @param params parameters of the template + * @param params parameters of the template */ public Builder(PropertyKey.Template template, Object... params) { this(PropertyType.STRING, template, params); } /** - * @param type type of the property + * @param type type of the property * @param template template for the property name - * @param params parameters of the template + * @param params parameters of the template */ public Builder(PropertyType type, PropertyKey.Template template, Object... params) { this(format(template.mFormat, params), type); } /** - * @param type type of the property + * @param type type of the property * @param delimiter delimiter for value, if list value is given as a string - * @param template template for the property name - * @param params parameters of the template + * @param template template for the property name + * @param params parameters of the template */ public Builder(PropertyType type, Optional delimiter, - PropertyKey.Template template, Object... params) { + PropertyKey.Template template, Object... params) { this(format(template.mFormat, params), type, Optional.empty(), delimiter); } @@ -399,7 +409,7 @@ public Builder setDefaultSupplier(DefaultSupplier defaultSupplier) { } /** - * @param supplier supplier for the property's default value + * @param supplier supplier for the property's default value * @param description description of the default value * @return the updated builder instance */ @@ -688,9 +698,9 @@ public String toString() { intBuilder(Name.METRICS_EXECUTOR_TASK_WARN_SIZE) .setDefaultValue(1000) .setDescription(String.format("When instrumenting an executor with" - + " InstrumentedExecutorService, if the number of" - + " active tasks (queued or running) is greater than this value, a warning log" - + " will be printed at the interval given by %s", + + " InstrumentedExecutorService, if the number of" + + " active tasks (queued or running) is greater than this value, a warning log" + + " will be printed at the interval given by %s", Name.METRICS_EXECUTOR_TASK_WARN_FREQUENCY)) .setScope(Scope.ALL) .setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE) @@ -699,9 +709,9 @@ public String toString() { durationBuilder(Name.METRICS_EXECUTOR_TASK_WARN_FREQUENCY) .setDefaultValue("5sec") .setDescription(String.format("When instrumenting an executor with" - + "InstrumentedExecutorService, if the number of" - + " active tasks (queued or running) is greater than %s value, a warning log" - + " will be printed at the given interval", + + "InstrumentedExecutorService, if the number of" + + " active tasks (queued or running) is greater than %s value, a warning log" + + " will be printed at the given interval", Name.METRICS_EXECUTOR_TASK_WARN_SIZE)) .setScope(Scope.ALL) .setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE) @@ -784,7 +794,8 @@ public String toString() { stringBuilder(Name.ROCKS_INODE_CONF_FILE) .setDescription(format("Path of file containing RocksDB inode store configuration." + " A template configuration cab be found at ${%s}/rocks-inode.ini.template." - + " See https://github.com/facebook/rocksdb/blob/main/examples/rocksdb_option_file_example.ini" + + + " See https://github.com/facebook/rocksdb/blob/main/examples/rocksdb_option_file_example.ini" + " for more information on RocksDB configuration files." + " If unset then a default configuration will" + " be used.", Name.CONF_DIR)) @@ -794,7 +805,8 @@ public String toString() { stringBuilder(Name.ROCKS_BLOCK_CONF_FILE) .setDescription(format("Path of file containing RocksDB block store configuration." + " A template configuration cab be found at ${%s}/rocks-block.ini.template." - + " See https://github.com/facebook/rocksdb/blob/main/examples/rocksdb_option_file_example.ini" + + + " See https://github.com/facebook/rocksdb/blob/main/examples/rocksdb_option_file_example.ini" + " for more information on RocksDB configuration files." + " If unset then a default configuration will" + " be used.", Name.CONF_DIR)) @@ -1084,10 +1096,10 @@ public String toString() { public static final PropertyKey UNDERFS_GCS_OWNER_ID_TO_USERNAME_MAPPING = stringBuilder(Name.UNDERFS_GCS_OWNER_ID_TO_USERNAME_MAPPING) .setDescription(format("Optionally, specify a preset gcs owner id " - + "to Alluxio username static mapping in the format \"id1=user1;id2=user2\". " - + "The Google Cloud Storage IDs can be found at the console address " - + "https://console.cloud.google.com/storage/settings . Please use the " - + "\"Owners\" one. This property key is only valid when %s=1", + + "to Alluxio username static mapping in the format \"id1=user1;id2=user2\". " + + "The Google Cloud Storage IDs can be found at the console address " + + "https://console.cloud.google.com/storage/settings . Please use the " + + "\"Owners\" one. This property key is only valid when %s=1", Name.UNDERFS_GCS_VERSION)) .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.SERVER) @@ -1138,9 +1150,9 @@ public String toString() { intBuilder(Name.UNDERFS_GCS_VERSION) .setDefaultValue(2) .setDescription(format("Specify the version of GCS module to use. " - + "GCS version \"1\" builds on top of jets3t package " - + "which requires %s and %s. GCS version \"2\" build on top " - + "of Google cloud API which requires %s", Name.GCS_ACCESS_KEY, Name.GCS_SECRET_KEY, + + "GCS version \"1\" builds on top of jets3t package " + + "which requires %s and %s. GCS version \"2\" build on top " + + "of Google cloud API which requires %s", Name.GCS_ACCESS_KEY, Name.GCS_SECRET_KEY, Name.GCS_CREDENTIAL_PATH)) .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.SERVER) @@ -1588,7 +1600,7 @@ public String toString() { durationBuilder(Name.UNDERFS_S3_CONNECT_TTL) .setDefaultValue(-1) .setDescription("The expiration time of S3 connections in ms. -1 means the connection " - + "will never expire.") + + "will never expire.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.SERVER) .build(); @@ -1733,6 +1745,46 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.SERVER) .build(); + public static final PropertyKey UNDERFS_TOS_RETRY_MAX = + intBuilder(Name.UNDERFS_TOS_RETRY_MAX) + .setAlias("alluxio.underfs.tos.retry.max") + .setDefaultValue(3) + .setDescription("The maximum number of TOS error retry.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.SERVER) + .build(); + public static final PropertyKey UNDERFS_TOS_WRITE_TIMEOUT = + durationBuilder(Name.UNDERFS_TOS_WRITE_TIMEOUT) + .setAlias("alluxio.underfs.tos.write.timeout.ms", "alluxio.underfs.tos.write.timeout") + .setDefaultValue("30sec") + .setDescription("The timeout for a single write request to TOS.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.SERVER) + .build(); + public static final PropertyKey UNDERFS_TOS_READ_TIMEOUT = + durationBuilder(Name.UNDERFS_TOS_READ_TIMEOUT) + .setAlias("alluxio.underfs.tos.read.timeout.ms", "alluxio.underfs.tos.read.timeout") + .setDefaultValue("30sec") + .setDescription("The timeout for a single read request to TOS.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.SERVER) + .build(); + public static final PropertyKey UNDERFS_TOS_CONNECT_TIMEOUT = + durationBuilder(Name.UNDERFS_TOS_CONNECT_TIMEOUT) + .setAlias("alluxio.underfs.tos.connect.timeout.ms", "alluxio.underfs.tos.connect.timeout") + .setDefaultValue("30sec") + .setDescription("The timeout for a connection to TOS.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.SERVER) + .build(); + public static final PropertyKey UNDERFS_TOS_CONNECT_TTL = + durationBuilder(Name.UNDERFS_TOS_CONNECT_TTL) + .setDefaultValue("60sec") + .setDescription("The expiration time of TOS connections in ms. -1 means the connection " + + "will never expire.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.SERVER) + .build(); // UFS access control related properties // @@ -2065,6 +2117,13 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.SERVER) .build(); + public static final PropertyKey UNDERFS_TOS_CONNECT_MAX = + intBuilder(Name.UNDERFS_TOS_CONNECT_MAX) + .setDefaultValue(1024) + .setDescription("The maximum number of TOS connections.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.SERVER) + .build(); // // Mount table related properties // @@ -2104,7 +2163,17 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.MASTER) .build(); - + public static final PropertyKey UNDERFS_TOS_INTERMEDIATE_UPLOAD_CLEAN_AGE = + durationBuilder(Name.UNDERFS_TOS_INTERMEDIATE_UPLOAD_CLEAN_AGE) + .setDefaultValue("3day") + .setDescription("Streaming uploads may not have been completed/aborted correctly " + + "and need periodical ufs cleanup. If ufs cleanup is enabled, " + + "intermediate multipart uploads in all non-readonly TOS mount points " + + "older than this age will be cleaned. This may impact other " + + "ongoing upload operations, so a large clean age is encouraged.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) + .setScope(Scope.SERVER) + .build(); /** * Master related properties. */ @@ -2360,17 +2429,17 @@ public String toString() { .build(); public static final PropertyKey MASTER_CONTAINER_ID_RESERVATION_SIZE = intBuilder(Name.MASTER_CONTAINER_ID_RESERVATION_SIZE) - .setDefaultValue(1000) - .setDescription("The number of container ids to 'reserve' before having to journal " + .setDefaultValue(1000) + .setDescription("The number of container ids to 'reserve' before having to journal " + "container id state. This allows the master to return container ids within " + "the reservation, without having to write to.") - .setScope(Scope.MASTER) - .build(); + .setScope(Scope.MASTER) + .build(); public static final PropertyKey MASTER_EMBEDDED_JOURNAL_ADDRESSES = listBuilder(Name.MASTER_EMBEDDED_JOURNAL_ADDRESSES) .setDescription(format("A comma-separated list of journal addresses for all " - + "masters in the cluster. The format is 'hostname1:port1,hostname2:port2,...'. When " - + "left unset, Alluxio uses ${%s}:${%s} by default", Name.MASTER_HOSTNAME, + + "masters in the cluster. The format is 'hostname1:port1,hostname2:port2,...'. When " + + "left unset, Alluxio uses ${%s}:${%s} by default", Name.MASTER_HOSTNAME, Name.MASTER_EMBEDDED_JOURNAL_PORT)) // We intentionally don't set a default value here. That way, we can use isSet() to check // whether the user explicitly set these addresses. If they did, we determine job master @@ -2389,10 +2458,11 @@ public String toString() { durationBuilder(Name.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT) .setDescription(format( "The max election timeout for the embedded journal. When a random period between " - + "${%s} and ${%s} elapses without a master receiving any messages, the master " - + "will attempt to become the primary Election timeout will be waited initially " - + "when the cluster is forming. So larger values for election timeout will cause " - + "longer start-up time. Smaller values might introduce instability to leadership.", + + "${%s} and ${%s} elapses without a master receiving any messages, the master " + + "will attempt to become the primary Election timeout will be waited initially " + + "when the cluster is forming. So larger values for election timeout will cause " + + + "longer start-up time. Smaller values might introduce instability to leadership.", Name.MASTER_EMBEDDED_JOURNAL_MIN_ELECTION_TIMEOUT, Name.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT)) // TODO(qian0817): dynamically set here @@ -2450,7 +2520,8 @@ public String toString() { + "slow or contested disk. WARNING: enabling this property may result in metadata " + "loss if half or more of the master nodes fail. See Ratis property " + "raft.server.log.unsafe-flush.enabled at " - + "https://github.com/apache/ratis/blob/master/ratis-docs/src/site/markdown/configuraions.md.") + + + "https://github.com/apache/ratis/blob/master/ratis-docs/src/site/markdown/configuraions.md.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.MASTER) .build(); @@ -2702,21 +2773,21 @@ public String toString() { .build(); public static final PropertyKey MASTER_METASTORE_ROCKS_PARALLEL_BACKUP = booleanBuilder(Name.MASTER_METASTORE_ROCKS_PARALLEL_BACKUP) - .setDefaultValue(false) - .setDescription(format("Whether to checkpoint rocksdb in parallel using the number of" - + " threads set by %s.", Name.MASTER_METASTORE_ROCKS_PARALLEL_BACKUP_THREADS)) - .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) - .setScope(Scope.MASTER) - .build(); + .setDefaultValue(false) + .setDescription(format("Whether to checkpoint rocksdb in parallel using the number of" + + " threads set by %s.", Name.MASTER_METASTORE_ROCKS_PARALLEL_BACKUP_THREADS)) + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.MASTER) + .build(); public static final PropertyKey MASTER_METASTORE_ROCKS_PARALLEL_BACKUP_THREADS = intBuilder(Name.MASTER_METASTORE_ROCKS_PARALLEL_BACKUP_THREADS) - .setDefaultSupplier(() -> Math.min(16, - Math.max(1, Runtime.getRuntime().availableProcessors() / 2)), - "The default number of threads used by backing up rocksdb in parallel.") - .setDescription("The number of threads used by backing up rocksdb in parallel.") - .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) - .setScope(Scope.MASTER) - .build(); + .setDefaultSupplier(() -> Math.min(16, + Math.max(1, Runtime.getRuntime().availableProcessors() / 2)), + "The default number of threads used by backing up rocksdb in parallel.") + .setDescription("The number of threads used by backing up rocksdb in parallel.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.MASTER) + .build(); public static final PropertyKey MASTER_METASTORE_INODE_CACHE_EVICT_BATCH_SIZE = intBuilder(Name.MASTER_METASTORE_INODE_CACHE_EVICT_BATCH_SIZE) // TODO(andrew): benchmark different batch sizes to improve the default and provide a @@ -2750,7 +2821,7 @@ public String toString() { public static final PropertyKey MASTER_METASTORE_INODE_CACHE_MAX_SIZE = intBuilder(Name.MASTER_METASTORE_INODE_CACHE_MAX_SIZE) .setDefaultSupplier(() -> (int) Math.min(Integer.MAX_VALUE / 2, - Runtime.getRuntime().maxMemory() / 2000 / 2), + Runtime.getRuntime().maxMemory() / 2000 / 2), "{Max memory of master JVM} / 2 / 2 KB per inode") .setDescription("The number of inodes to cache on-heap. " + "The default value is chosen based on half the amount of maximum available memory " @@ -3221,10 +3292,11 @@ public String toString() { .build(); public static final PropertyKey MASTER_JOURNAL_SPACE_MONITOR_INTERVAL = durationBuilder(Name.MASTER_JOURNAL_SPACE_MONITOR_INTERVAL) - .setDefaultValue("10min") - .setDescription(format("How often to check and update information on space " - + "utilization of the journal disk. This is currently only compatible with linux-based" - + "systems and when %s is configured to EMBEDDED", Name.MASTER_JOURNAL_TYPE)) + .setDefaultValue("10min") + .setDescription(format("How often to check and update information on space " + + + "utilization of the journal disk. This is currently only compatible with linux-based" + + "systems and when %s is configured to EMBEDDED", Name.MASTER_JOURNAL_TYPE)) .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.MASTER) .build(); @@ -3370,10 +3442,10 @@ public String toString() { public static final PropertyKey MASTER_PERSISTENCE_BLACKLIST = listBuilder(Name.MASTER_PERSISTENCE_BLACKLIST) .setDescription("Patterns to blacklist persist, comma separated, string match, no regex." - + " This affects any async persist call (including ASYNC_THROUGH writes and CLI " - + "persist) but does not affect CACHE_THROUGH writes. Users may want to specify " - + "temporary files in the blacklist to avoid unnecessary I/O and errors. Some " - + "examples are `.staging` and `.tmp`.") + + " This affects any async persist call (including ASYNC_THROUGH writes and CLI " + + "persist) but does not affect CACHE_THROUGH writes. Users may want to specify " + + "temporary files in the blacklist to avoid unnecessary I/O and errors. Some " + + "examples are `.staging` and `.tmp`.") .setScope(Scope.MASTER) .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .build(); @@ -3515,7 +3587,7 @@ public String toString() { intBuilder(Name.MASTER_UFS_ACTIVE_SYNC_MAX_AGE) .setDefaultValue(10) .setDescription("The maximum number of intervals we will wait to find a quiet " - + "period before we have to sync the directories") + + "period before we have to sync the directories") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.MASTER) .build(); @@ -3707,7 +3779,7 @@ public String toString() { + "This property determines the wait time.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.MASTER) - .build(); + .build(); public static final PropertyKey MASTER_WORKER_INFO_CACHE_REFRESH_TIME = durationBuilder(Name.MASTER_WORKER_INFO_CACHE_REFRESH_TIME) .setDefaultValue("10sec") @@ -4509,7 +4581,7 @@ public String toString() { } }, "2/3 of total system memory, or 1GB if system memory size cannot be determined") .setDescription("The allocated memory for each worker node's ramdisk(s). " - + "It is recommended to set this value explicitly.") + + "It is recommended to set this value explicitly.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.WORKER) .build(); @@ -4772,11 +4844,11 @@ public String toString() { .setDefaultValue(1_000_000) .setDescription( "When " + Name.WORKER_REGISTER_TO_ALL_MASTERS + "=true, " - + "because a worker will send block reports to all masters, " - + "we use a threshold to limit the unsent block report size in worker's memory. " - + "If the worker block heartbeat is larger than the threshold, " - + "we discard the heartbeat message and force " - + "the worker to register with that master with a full report." + + "because a worker will send block reports to all masters, " + + "we use a threshold to limit the unsent block report size in worker's memory. " + + "If the worker block heartbeat is larger than the threshold, " + + "we discard the heartbeat message and force " + + "the worker to register with that master with a full report." ) .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.WORKER) @@ -5262,8 +5334,8 @@ public String toString() { listBuilder(Name.WORKER_WHITELIST) .setDefaultValue("/") .setDescription("A comma-separated list of prefixes of the paths which are " - + "cacheable, separated by semi-colons. Alluxio will try to cache the cacheable " - + "file when it is read for the first time.") + + "cacheable, separated by semi-colons. Alluxio will try to cache the cacheable " + + "file when it is read for the first time.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.WORKER) .build(); @@ -5369,7 +5441,7 @@ public String toString() { .setAlias("alluxio.proxy.master.heartbeat.interval.ms") .setDefaultValue("10sec") .setDescription("Proxy instances maintain a heartbeat with the primary master. " - + "This key specifies the heartbeat interval.") + + "This key specifies the heartbeat interval.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.SERVER) .build(); @@ -5488,29 +5560,29 @@ public String toString() { booleanBuilder(Name.PROXY_S3_TAGGING_RESTRICTIONS_ENABLED) .setDefaultValue(true) .setDescription("Toggles whether or not the Alluxio S3 API will enforce " - + "AWS S3 tagging restrictions (10 tags, 128 character keys, 256 character " - + "values) See " - + "https://docs.aws.amazon.com/AmazonS3/latest/userguide/tagging-managing.html.") + + "AWS S3 tagging restrictions (10 tags, 128 character keys, 256 character " + + "values) See " + + "https://docs.aws.amazon.com/AmazonS3/latest/userguide/tagging-managing.html.") .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.SERVER) .build(); public static final PropertyKey PROXY_S3_V2_VERSION_ENABLED = - booleanBuilder(Name.PROXY_S3_V2_VERSION_ENABLED) - .setDefaultValue(true) - .setDescription("(Experimental) V2, an optimized version of " - + "Alluxio s3 proxy service.") - .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) - .setScope(Scope.SERVER) - .build(); + booleanBuilder(Name.PROXY_S3_V2_VERSION_ENABLED) + .setDefaultValue(true) + .setDescription("(Experimental) V2, an optimized version of " + + "Alluxio s3 proxy service.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) + .setScope(Scope.SERVER) + .build(); public static final PropertyKey PROXY_S3_V2_ASYNC_PROCESSING_ENABLED = - booleanBuilder(Name.PROXY_S3_V2_ASYNC_PROCESSING_ENABLED) - .setDefaultValue(false) - .setDescription("(Experimental) If enabled, handle S3 request " - + "in async mode when v2 version of Alluxio s3 " - + "proxy service is enabled.") - .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) - .setScope(Scope.SERVER) - .build(); + booleanBuilder(Name.PROXY_S3_V2_ASYNC_PROCESSING_ENABLED) + .setDefaultValue(false) + .setDescription("(Experimental) If enabled, handle S3 request " + + "in async mode when v2 version of Alluxio s3 " + + "proxy service is enabled.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) + .setScope(Scope.SERVER) + .build(); public static final PropertyKey PROXY_S3_V2_ASYNC_CONTEXT_TIMEOUT_MS = longBuilder(Name.PROXY_S3_V2_ASYNC_CONTEXT_TIMEOUT_MS) .setDefaultValue(30000L) @@ -6009,10 +6081,10 @@ public String toString() { durationBuilder(Name.USER_FILE_PERSISTENCE_INITIAL_WAIT_TIME) .setDefaultValue("0") .setDescription(format("Time to wait before starting the persistence job. " - + "When the value is set to -1, the file will be persisted by rename operation " - + "or persist CLI but will not be automatically persisted in other cases. " - + "This is to avoid the heavy object copy in rename operation when %s is set to %s. " - + "This value should be smaller than the value of %s", + + "When the value is set to -1, the file will be persisted by rename operation " + + "or persist CLI but will not be automatically persisted in other cases. " + + "This is to avoid the heavy object copy in rename operation when %s is set to %s. " + + "This value should be smaller than the value of %s", Name.USER_FILE_WRITE_TYPE_DEFAULT, WritePType.ASYNC_THROUGH, Name.MASTER_PERSISTENCE_MAX_TOTAL_WAIT_TIME_MS)) .setScope(Scope.CLIENT) @@ -6148,15 +6220,16 @@ public String toString() { booleanBuilder(Name.USER_CLIENT_CACHE_SHADOW_ENABLED) .setDefaultValue(false) .setDescription( - "If this is enabled, a shadow cache will be created to tracking the working set of " - + "a past time window, and measure the hit ratio if the working set fits the cache") + "If this is enabled, a shadow cache will be created to tracking the working set of " + + + "a past time window, and measure the hit ratio if the working set fits the cache") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN).setScope(Scope.CLIENT).build(); public static final PropertyKey USER_CLIENT_CACHE_SHADOW_TYPE = enumBuilder(Name.USER_CLIENT_CACHE_SHADOW_TYPE, ShadowCacheType.class) .setDefaultValue("CLOCK_CUCKOO_FILTER") .setDescription("The type of shadow cache to be used. " + "Valid options are `MULTIPLE_BLOOM_FILTER` (which uses a chain of bloom filters), " - + "`CLOCK_CUCKOO_FILTER` (which uses cuckoo filter with extended field).") + + "`CLOCK_CUCKOO_FILTER` (which uses cuckoo filter with extended field).") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN).setScope(Scope.CLIENT).build(); public static final PropertyKey USER_CLIENT_CACHE_SHADOW_WINDOW = durationBuilder(Name.USER_CLIENT_CACHE_SHADOW_WINDOW) @@ -6356,13 +6429,14 @@ public String toString() { public static final PropertyKey USER_FILE_WRITE_TYPE_DEFAULT = enumBuilder(Name.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.class) .setDefaultValue(WriteType.ASYNC_THROUGH) - .setDescription( - format("Default write type when creating Alluxio files. Valid " + "options are " - + "`MUST_CACHE` (write will only go to Alluxio and must be stored in Alluxio), " - + "`CACHE_THROUGH` (try to cache, write to UnderFS synchronously), `THROUGH` " - + "(no cache, write to UnderFS synchronously), `ASYNC_THROUGH` (write to cache, " - + "write to UnderFS asynchronously, replicated %s times in Alluxio before data is " - + "persisted.", USER_FILE_REPLICATION_DURABLE)) + .setDescription( + format("Default write type when creating Alluxio files. Valid " + "options are " + + "`MUST_CACHE` (write will only go to Alluxio and must be stored in Alluxio), " + + "`CACHE_THROUGH` (try to cache, write to UnderFS synchronously), `THROUGH` " + + "(no cache, write to UnderFS synchronously), `ASYNC_THROUGH` (write to cache, " + + + "write to UnderFS asynchronously, replicated %s times in Alluxio before data is " + + "persisted.", USER_FILE_REPLICATION_DURABLE)) .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.CLIENT) .build(); @@ -6825,7 +6899,7 @@ public String toString() { .setDefaultValue(1) .setDescription( "The maximum number of physical connections to be " - + "used per target host.") + + "used per target host.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.CLIENT) .build(); @@ -6887,7 +6961,7 @@ public String toString() { .setDefaultValue(64) .setDescription( "The maximum number of physical connections to be " - + "used per target host.") + + "used per target host.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.CLIENT) .build(); @@ -7124,8 +7198,8 @@ public String toString() { .setAlias(Name.WORKER_FUSE_MOUNT_ALLUXIO_PATH) .setDefaultValue("/") .setDescription(format("The Alluxio path to mount to the given " - + "Fuse mount point configured by %s in the worker when %s is enabled " - + "or in the standalone Fuse process.", + + "Fuse mount point configured by %s in the worker when %s is enabled " + + "or in the standalone Fuse process.", Name.FUSE_MOUNT_POINT, Name.WORKER_FUSE_ENABLED)) .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.ALL) @@ -7294,9 +7368,9 @@ public String toString() { public static final PropertyKey SECURITY_LOGIN_IMPERSONATION_USERNAME = stringBuilder(Name.SECURITY_LOGIN_IMPERSONATION_USERNAME) .setDescription(format("When %s is set to SIMPLE or CUSTOM, user application uses this " - + "property to indicate the IMPERSONATED user requesting Alluxio service. If it is " - + "not set explicitly, or set to %s, impersonation will not be used. A special " - + "value of '%s' can be specified to impersonate the hadoop client user.", + + "property to indicate the IMPERSONATED user requesting Alluxio service. If it is " + + "not set explicitly, or set to %s, impersonation will not be used. A special " + + "value of '%s' can be specified to impersonate the hadoop client user.", SECURITY_AUTHENTICATION_TYPE, Constants.IMPERSONATION_NONE, Constants.IMPERSONATION_HDFS_USER)) .setDefaultValue(Constants.IMPERSONATION_HDFS_USER) @@ -7328,12 +7402,12 @@ public String toString() { .setScope(Scope.MASTER) .build(); public static final PropertyKey S3_REST_AUTHENTICATOR_CLASSNAME = - classBuilder(Name.S3_REST_AUTHENTICATOR_CLASSNAME) - .setDescription("The class's name is instantiated as an S3 authenticator.") - .setDefaultValue("alluxio.proxy.s3.auth.PassAllAuthenticator") - .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) - .setScope(Scope.ALL) - .build(); + classBuilder(Name.S3_REST_AUTHENTICATOR_CLASSNAME) + .setDescription("The class's name is instantiated as an S3 authenticator.") + .setDefaultValue("alluxio.proxy.s3.auth.PassAllAuthenticator") + .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) + .setScope(Scope.ALL) + .build(); // // Network TLS support // @@ -7411,11 +7485,11 @@ public String toString() { .build(); public static final PropertyKey JOB_RETENTION_TIME = durationBuilder(Name.JOB_RETENTION_TIME) - .setDescription("The length of time the Alluxio should save information about " - + "completed jobs before they are discarded.") - .setDefaultValue("1d") - .setScope(Scope.MASTER) - .build(); + .setDescription("The length of time the Alluxio should save information about " + + "completed jobs before they are discarded.") + .setDefaultValue("1d") + .setScope(Scope.MASTER) + .build(); // // Job service @@ -7589,11 +7663,11 @@ public String toString() { public static final PropertyKey JOB_MASTER_RPC_ADDRESSES = listBuilder(Name.JOB_MASTER_RPC_ADDRESSES) .setDescription(format("A list of comma-separated host:port RPC addresses where " - + "the client should look for job masters when using multiple job masters " - + "without Zookeeper. This property is not used " - + "when Zookeeper is enabled, since Zookeeper already stores the job master " - + "addresses. If property is not defined, clients will look for job masters " - + "using [%s]:%s first, then for [%s]:%s.", + + "the client should look for job masters when using multiple job masters " + + "without Zookeeper. This property is not used " + + "when Zookeeper is enabled, since Zookeeper already stores the job master " + + "addresses. If property is not defined, clients will look for job masters " + + "using [%s]:%s first, then for [%s]:%s.", Name.MASTER_RPC_ADDRESSES, Name.JOB_MASTER_RPC_PORT, Name.JOB_MASTER_EMBEDDED_JOURNAL_ADDRESSES, Name.JOB_MASTER_RPC_PORT)) .setScope(Scope.ALL) @@ -7779,7 +7853,7 @@ public String toString() { intBuilder(Name.TABLE_LOAD_DEFAULT_REPLICATION) .setDefaultValue(1) .setDescription("The default replication number of files under the SDS table after " - + "load option.") + + "load option.") .setScope(Scope.CLIENT) .build(); public static final PropertyKey HADOOP_SECURITY_AUTHENTICATION = @@ -8086,7 +8160,20 @@ public static final class Name { "alluxio.underfs.tos.streaming.upload.partition.size"; public static final String UNDERFS_TOS_STREAMING_UPLOAD_THREADS = "alluxio.underfs.tos.streaming.upload.threads"; - + public static final String UNDERFS_TOS_CONNECT_MAX = + "alluxio.underfs.tos.connect.max"; + public static final String UNDERFS_TOS_INTERMEDIATE_UPLOAD_CLEAN_AGE = + "alluxio.underfs.tos.intermediate.upload.clean.age"; + public static final String UNDERFS_TOS_RETRY_MAX = + "alluxio.underfs.tos.retry.max"; + public static final String UNDERFS_TOS_CONNECT_TIMEOUT = + "alluxio.underfs.tos.connect.timeout"; + public static final String UNDERFS_TOS_CONNECT_TTL = + "alluxio.underfs.tos.connect.ttl"; + public static final String UNDERFS_TOS_WRITE_TIMEOUT = + "alluxio.underfs.tos.write.timeout"; + public static final String UNDERFS_TOS_READ_TIMEOUT = + "alluxio.underfs.tos.read.timeout"; // // UFS access control related properties // @@ -8710,7 +8797,7 @@ public static final class Name { public static final String WORKER_NETWORK_KEEPALIVE_TIMEOUT_MS = "alluxio.worker.network.keepalive.timeout"; public static final String WORKER_NETWORK_PERMIT_KEEPALIVE_TIME_MS = - "alluxio.worker.network.permit.keepalive.time"; + "alluxio.worker.network.permit.keepalive.time"; public static final String WORKER_NETWORK_MAX_INBOUND_MESSAGE_SIZE = "alluxio.worker.network.max.inbound.message.size"; public static final String WORKER_NETWORK_NETTY_BOSS_THREADS = @@ -8796,9 +8883,9 @@ public static final class Name { public static final String WORKER_REGISTER_LEASE_RETRY_MAX_DURATION = "alluxio.worker.register.lease.retry.max.duration"; public static final String WORKER_REVIEWER_PROBABILISTIC_HARDLIMIT_BYTES = - "alluxio.worker.reviewer.probabilistic.hardlimit.bytes"; + "alluxio.worker.reviewer.probabilistic.hardlimit.bytes"; public static final String WORKER_REVIEWER_PROBABILISTIC_SOFTLIMIT_BYTES = - "alluxio.worker.reviewer.probabilistic.softlimit.bytes"; + "alluxio.worker.reviewer.probabilistic.softlimit.bytes"; public static final String WORKER_REVIEWER_CLASS = "alluxio.worker.reviewer.class"; public static final String WORKER_RPC_PORT = "alluxio.worker.rpc.port"; public static final String WORKER_RPC_EXECUTOR_TYPE = "alluxio.worker.rpc.executor.type"; @@ -8879,9 +8966,9 @@ public static final class Name { public static final String PROXY_AUDIT_LOGGING_ENABLED = "alluxio.proxy.audit.logging.enabled"; public static final String PROXY_S3_V2_VERSION_ENABLED = - "alluxio.proxy.s3.v2.version.enabled"; + "alluxio.proxy.s3.v2.version.enabled"; public static final String PROXY_S3_V2_ASYNC_PROCESSING_ENABLED = - "alluxio.proxy.s3.v2.async.processing.enabled"; + "alluxio.proxy.s3.v2.async.processing.enabled"; public static final String PROXY_S3_V2_ASYNC_CONTEXT_TIMEOUT_MS = "alluxio.proxy.s3.v2.async.context.timeout.ms"; public static final String PROXY_S3_V2_ASYNC_LIGHT_POOL_CORE_THREAD_NUMBER = @@ -9010,11 +9097,11 @@ public static final class Name { public static final String USER_CLIENT_CACHE_QUOTA_ENABLED = "alluxio.user.client.cache.quota.enabled"; public static final String USER_CLIENT_CACHE_TTL_ENABLED = - "alluxio.user.client.cache.ttl.enabled"; + "alluxio.user.client.cache.ttl.enabled"; public static final String USER_CLIENT_CACHE_TTL_CHECK_INTERVAL_SECONDS = - "alluxio.user.client.cache.ttl.check.interval.seconds"; + "alluxio.user.client.cache.ttl.check.interval.seconds"; public static final String USER_CLIENT_CACHE_TTL_THRESHOLD_SECONDS = - "alluxio.user.client.cache.ttl.threshold.seconds"; + "alluxio.user.client.cache.ttl.threshold.seconds"; public static final String USER_CLIENT_CACHE_SIZE = "alluxio.user.client.cache.size"; public static final String USER_CLIENT_CACHE_STORE_OVERHEAD = @@ -9316,7 +9403,7 @@ public static final class Name { public static final String JOB_MASTER_BIND_HOST = "alluxio.job.master.bind.host"; public static final String JOB_MASTER_HOSTNAME = "alluxio.job.master.hostname"; public static final String JOB_MASTER_LOST_MASTER_INTERVAL = - "alluxio.job.master.lost.master.interval"; + "alluxio.job.master.lost.master.interval"; public static final String JOB_MASTER_LOST_WORKER_INTERVAL = "alluxio.job.master.lost.worker.interval"; public static final String JOB_MASTER_RPC_PORT = "alluxio.job.master.rpc.port"; @@ -9398,14 +9485,15 @@ public static final class Name { public static final String HADOOP_CHECKSUM_COMBINE_MODE = "alluxio.hadoop.checksum.combine.mode"; - private Name() {} // prevent instantiation + private Name() { + } // prevent instantiation } /** * list of substrings of a name where any custom PropertyKey with a name that contains it * should have a {@link DisplayType} of CREDENTIALS. */ - private static final String[] CUSTOM_CREDENTIAL_NAME_SUBSTR = new String[]{ + private static final String[] CUSTOM_CREDENTIAL_NAME_SUBSTR = new String[] { "accessKeyId", "secretKey" }; @@ -9484,7 +9572,7 @@ public enum Template { "fs.adl.account.%s.oauth2.credential", "fs\\.adl\\.account\\.(\\w+)\\.oauth2\\.credential", PropertyCreators.fromBuilder(stringBuilder("fs.adl.account.%s.oauth2.credential") - .setDisplayType(DisplayType.CREDENTIALS))), + .setDisplayType(DisplayType.CREDENTIALS))), UNDERFS_AZURE_REFRESH_URL( "fs.adl.account.%s.oauth2.refresh.url", "fs\\.adl\\.account\\.(\\w+)\\.oauth2\\.refresh\\.url", @@ -9625,21 +9713,22 @@ private static BiFunction createNestedProperty } Template(String format, String re, - BiFunction propertyCreator) { + BiFunction propertyCreator) { this(format, re, PropertyType.STRING, Optional.empty(), Optional.empty(), propertyCreator); } /** * Constructs a property key format. * - * @param format String of this property as formatted string - * @param re String of this property as regexp - * @param enumType enum class of an enum property - * @param delimiter delimiter of this property + * @param format String of this property as formatted string + * @param re String of this property as regexp + * @param enumType enum class of an enum property + * @param delimiter delimiter of this property * @param propertyCreator a function that creates property key given name and base property key */ Template(String format, String re, PropertyType type, Optional> enumType, - Optional delimiter, BiFunction propertyCreator) { + Optional delimiter, + BiFunction propertyCreator) { mFormat = format; mPattern = Pattern.compile(re); mType = type; @@ -9780,71 +9869,102 @@ public static Collection defaultKeys() { return DEFAULT_KEYS_MAP.values(); } - /** Property name. */ + /** + * Property name. + */ private final String mName; - /** Property Key description. */ + /** + * Property Key description. + */ private final String mDescription; - /** Property type. */ + /** + * Property type. + */ private final PropertyType mType; - /** Property's enum class type, if property type is ENUM. */ + /** + * Property's enum class type, if property type is ENUM. + */ private final Optional> mEnumType; - /** Property's list delimiter, if property type is LIST. */ + /** + * Property's list delimiter, if property type is LIST. + */ private final Optional mDelimiter; - /** Supplies the Property Key default value. */ + /** + * Supplies the Property Key default value. + */ private final DefaultSupplier mDefaultSupplier; - /** Property Key alias. */ + /** + * Property Key alias. + */ private final String[] mAliases; - /** Whether to ignore as a site property. */ + /** + * Whether to ignore as a site property. + */ private final boolean mIgnoredSiteProperty; - /** Whether the property is an Alluxio built-in property. */ + /** + * Whether the property is an Alluxio built-in property. + */ private final boolean mIsBuiltIn; - /** Whether to hide in document. */ + /** + * Whether to hide in document. + */ private final boolean mIsHidden; - /** Whether property should be consistent within the cluster. */ + /** + * Whether property should be consistent within the cluster. + */ private final ConsistencyCheckLevel mConsistencyCheckLevel; - /** The scope this property applies to. */ + /** + * The scope this property applies to. + */ private final Scope mScope; - /** The displayType which indicates how the property value should be displayed. **/ + /** + * The displayType which indicates how the property value should be displayed. + **/ private final DisplayType mDisplayType; - /** Whether the property could be updated dynamically. */ + /** + * Whether the property could be updated dynamically. + */ private final boolean mDynamic; - /** A custom function to validate the value. */ + /** + * A custom function to validate the value. + */ private final Function mValueValidationFunction; /** - * @param name String of this property - * @param description String description of this property key - * @Param type the property's type - * @param defaultSupplier default value supplier - * @param aliases alias of this property key - * @param ignoredSiteProperty true if Alluxio ignores user-specified value for this property in - * site properties file - * @param isHidden whether to hide in document + * @param name String of this property + * @param description String description of this property key + * @param defaultSupplier default value supplier + * @param aliases alias of this property key + * @param ignoredSiteProperty true if Alluxio ignores user-specified value for this property in + * site properties file + * @param isHidden whether to hide in document * @param consistencyCheckLevel the consistency check level to apply to this property - * @param scope the scope this property applies to - * @param displayType how the property value should be displayed - * @param isBuiltIn whether this is an Alluxio built-in property + * @param scope the scope this property applies to + * @param displayType how the property value should be displayed + * @param isBuiltIn whether this is an Alluxio built-in property + * @Param type the property's type */ private PropertyKey(String name, String description, PropertyType type, - Optional> enumType, Optional delimiter, - DefaultSupplier defaultSupplier, String[] aliases, boolean ignoredSiteProperty, - boolean isHidden, ConsistencyCheckLevel consistencyCheckLevel, - Scope scope, DisplayType displayType, boolean isBuiltIn, boolean dynamic, - Function valueValidationFunction) { + Optional> enumType, Optional delimiter, + DefaultSupplier defaultSupplier, String[] aliases, + boolean ignoredSiteProperty, + boolean isHidden, ConsistencyCheckLevel consistencyCheckLevel, + Scope scope, DisplayType displayType, boolean isBuiltIn, boolean dynamic, + Function valueValidationFunction) { mName = Preconditions.checkNotNull(name, "name"); // TODO(binfan): null check after we add description for each property key mDescription = Strings.isNullOrEmpty(description) ? "N/A" : description; @@ -9867,7 +9987,7 @@ private PropertyKey(String name, String description, PropertyType type, * @param name String of this property */ private PropertyKey(String name, PropertyType type, - Optional> enumType, Optional delimiter) { + Optional> enumType, Optional delimiter) { this(name, null, type, enumType, delimiter, new DefaultSupplier(() -> null, "null"), null, false, false, ConsistencyCheckLevel.IGNORE, Scope.ALL, DisplayType.DEFAULT, true, true, null); @@ -10000,8 +10120,7 @@ public PropertyType getType() { /** * @return enum class of the enum property, or throws when property is not of enum type */ - public Class getEnumType() - { + public Class getEnumType() { checkState(mType == PropertyType.ENUM && mEnumType.isPresent(), "PropertyKey %s is not of enum type", mName); return mEnumType.get(); @@ -10119,7 +10238,7 @@ private static boolean validateValue( } break; case ENUM: - if (!value.getClass().equals(enumType.get())) { + if (!value.getClass().equals(enumType.get())) { return false; } break; @@ -10152,6 +10271,7 @@ private static boolean validateValue( /** * For each property type, there might be different forms of acceptable input format, * convert these acceptable formats to proper internal format. + * * @param value property value in string format * @return property value in the expected type */ @@ -10160,7 +10280,8 @@ public Object formatValue(Object value) { } private static Object formatValue(Object value, PropertyType type, - Optional> enumType, Optional delimiter) { + Optional> enumType, + Optional delimiter) { if (value instanceof Number) { switch (type) { case LONG: @@ -10211,6 +10332,7 @@ private static Object formatValue(Object value, PropertyType type, /** * Parses value from string. + * * @param stringValue property value in string format * @return property value in the expected type */ @@ -10258,7 +10380,7 @@ public Object parseValue(String stringValue) { /** * Returns whether or not the given property key is marked as deprecated. - * + *

* It first checks if the specific key is deprecated, otherwise it will fall back to checking * if the key's name matches any of the PropertyKey templates. If no keys or templates match, it * will return false. This will only return true when the key is marked with a {@link Deprecated} @@ -10283,7 +10405,7 @@ public static boolean isDeprecated(String name) { /** * Returns whether or not a property key has been removed from use. - * + *

* If a PropertyKey or {@link Template} is deemed as "Removed" it will exist within * {@link RemovedKey}. This method can be used to detect if a key being utilized has been removed. * diff --git a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java index d1f43c7ce51d..36cb7b76b193 100644 --- a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java +++ b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java @@ -13,6 +13,7 @@ import alluxio.AlluxioURI; import alluxio.Constants; +import alluxio.conf.AlluxioConfiguration; import alluxio.conf.PropertyKey; import alluxio.retry.RetryPolicy; import alluxio.underfs.ObjectUnderFileSystem; @@ -27,11 +28,14 @@ import com.google.common.base.Suppliers; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.volcengine.tos.TOSClientConfiguration; import com.volcengine.tos.TOSV2; import com.volcengine.tos.TOSV2ClientBuilder; import com.volcengine.tos.TosClientException; import com.volcengine.tos.TosException; import com.volcengine.tos.TosServerException; +import com.volcengine.tos.auth.StaticCredentials; +import com.volcengine.tos.model.object.AbortMultipartUploadInput; import com.volcengine.tos.model.object.CopyObjectV2Input; import com.volcengine.tos.model.object.CopyObjectV2Output; import com.volcengine.tos.model.object.DeleteMultiObjectsV2Input; @@ -41,13 +45,17 @@ import com.volcengine.tos.model.object.Deleted; import com.volcengine.tos.model.object.HeadObjectV2Input; import com.volcengine.tos.model.object.HeadObjectV2Output; +import com.volcengine.tos.model.object.ListMultipartUploadsV2Input; +import com.volcengine.tos.model.object.ListMultipartUploadsV2Output; import com.volcengine.tos.model.object.ListObjectsType2Input; import com.volcengine.tos.model.object.ListObjectsType2Output; import com.volcengine.tos.model.object.ListedCommonPrefix; import com.volcengine.tos.model.object.ListedObjectV2; +import com.volcengine.tos.model.object.ListedUpload; import com.volcengine.tos.model.object.ObjectMetaRequestOptions; import com.volcengine.tos.model.object.ObjectTobeDeleted; import com.volcengine.tos.model.object.PutObjectInput; +import com.volcengine.tos.transport.TransportConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,7 +118,13 @@ public static TOSUnderFileSystem createInstance(AlluxioURI uri, UnderFileSystemC String secretKey = conf.getString(PropertyKey.TOS_SECRET_KEY); String regionName = conf.getString(PropertyKey.TOS_REGION); String endPoint = conf.getString(PropertyKey.TOS_ENDPOINT_KEY); - TOSV2 tos = new TOSV2ClientBuilder().build(regionName, endPoint, accessKey, secretKey); + TOSClientConfiguration configuration = TOSClientConfiguration.builder() + .transportConfig(initializeTOSClientConfig(conf)) + .region(regionName) + .endpoint(endPoint) + .credentials(new StaticCredentials(accessKey, secretKey)) + .build(); + TOSV2 tos = new TOSV2ClientBuilder().build(configuration); return new TOSUnderFileSystem(uri, tos, bucketName, conf); } @@ -152,6 +166,33 @@ public void setOwner(String path, String user, String group) { public void setMode(String path, short mode) throws IOException { } + @Override + public void cleanup() throws IOException { + long cleanAge = mUfsConf.getMs(PropertyKey.UNDERFS_TOS_INTERMEDIATE_UPLOAD_CLEAN_AGE); + Date cleanBefore = new Date(new Date().getTime() - cleanAge); + boolean isTruncated = true; + String keyMarker = null; + String uploadIdMarker = null; + int maxKeys = 10; + while (isTruncated) { + ListMultipartUploadsV2Input input = new ListMultipartUploadsV2Input().setBucket(mBucketName) + .setMaxUploads(maxKeys).setKeyMarker(keyMarker).setUploadIDMarker(uploadIdMarker); + ListMultipartUploadsV2Output output = mClient.listMultipartUploads(input); + if (output.getUploads() != null) { + for (int i = 0; i < output.getUploads().size(); ++i) { + ListedUpload upload = output.getUploads().get(i); + if (upload.getInitiated().before(cleanBefore)) { + mClient.abortMultipartUpload(new AbortMultipartUploadInput().setBucket(mBucketName) + .setKey(upload.getKey()).setUploadID(upload.getUploadID())); + } + } + } + isTruncated = output.isTruncated(); + keyMarker = output.getNextKeyMarker(); + uploadIdMarker = output.getNextUploadIdMarker(); + } + } + @Override protected boolean copyObject(String src, String dst) { LOG.debug("Copying {} to {}", src, dst); @@ -348,6 +389,30 @@ protected String getRootKey() { return Constants.HEADER_TOS + mBucketName; } + /** + * Creates an TOS {@code ClientConfiguration} using an Alluxio Configuration. + * @param alluxioConf the TOS Configuration + * @return the TOS {@link TransportConfig} + */ + public static TransportConfig initializeTOSClientConfig( + AlluxioConfiguration alluxioConf) { + int readTimeoutMills = (int) alluxioConf.getMs(PropertyKey.UNDERFS_TOS_READ_TIMEOUT); + int writeTimeoutMills = (int) alluxioConf.getMs(PropertyKey.UNDERFS_TOS_WRITE_TIMEOUT); + int connectionTimeoutMills = (int) alluxioConf.getMs(PropertyKey.UNDERFS_TOS_CONNECT_TIMEOUT); + int maxConnections = alluxioConf.getInt(PropertyKey.UNDERFS_TOS_CONNECT_MAX); + int idleConnectionTime = (int) alluxioConf.getMs(PropertyKey.UNDERFS_TOS_CONNECT_TTL); + int maxErrorRetry = alluxioConf.getInt(PropertyKey.UNDERFS_TOS_RETRY_MAX); + TransportConfig config = TransportConfig.builder() + .connectTimeoutMills(connectionTimeoutMills) + .maxConnections(maxConnections) + .maxRetryCount(maxErrorRetry) + .readTimeoutMills(readTimeoutMills) + .writeTimeoutMills(writeTimeoutMills) + .idleConnectionTimeMills(idleConnectionTime) + .build(); + return config; + } + @Override protected InputStream openObject(String key, OpenOptions options, RetryPolicy retryPolicy) throws IOException { From e6e485bffba4d832cb00754fc4d7629eac93b881 Mon Sep 17 00:00:00 2001 From: thu-david Date: Wed, 19 Jun 2024 16:39:58 +0800 Subject: [PATCH 05/10] Update PropertyKey.java --- .../main/java/alluxio/conf/PropertyKey.java | 538 ++++++++---------- 1 file changed, 241 insertions(+), 297 deletions(-) diff --git a/core/common/src/main/java/alluxio/conf/PropertyKey.java b/core/common/src/main/java/alluxio/conf/PropertyKey.java index 02072625ba5d..2986bf9c9337 100755 --- a/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -94,29 +94,19 @@ @ThreadSafe @PublicApi public final class PropertyKey implements Comparable { - /** - * Regex string to find "${key}" for variable substitution. - */ + /** Regex string to find "${key}" for variable substitution. */ public static final String REGEX_STRING = "(\\$\\{([^{}]*)\\})"; - /** - * Regex to find ${key} for variable substitution. - */ + /** Regex to find ${key} for variable substitution. */ public static final Pattern CONF_REGEX = Pattern.compile(REGEX_STRING); private static final Logger LOG = LoggerFactory.getLogger(PropertyKey.class); // The following two maps must be the first to initialize within this file. - /** - * A map from default property key's string name to the key. - */ + /** A map from default property key's string name to the key. */ private static final Map DEFAULT_KEYS_MAP = new ConcurrentHashMap<>(); - /** - * A map from default property key's alias to the key. - */ + /** A map from default property key's alias to the key. */ private static final Map DEFAULT_ALIAS_MAP = new ConcurrentHashMap<>(); - /** - * A cache storing result for template regexp matching results. - */ + /** A cache storing result for template regexp matching results. */ private static final Cache REGEXP_CACHE = CacheBuilder.newBuilder() .maximumSize(1024) .build(); @@ -292,7 +282,7 @@ public static Builder stringBuilder(String name) { } /** - * @param name name of the property + * @param name name of the property * @param enumType enum class of the property * @return a Builder for enum properties */ @@ -349,26 +339,26 @@ private Builder( /** * @param template template for the property name - * @param params parameters of the template + * @param params parameters of the template */ public Builder(PropertyKey.Template template, Object... params) { this(PropertyType.STRING, template, params); } /** - * @param type type of the property + * @param type type of the property * @param template template for the property name - * @param params parameters of the template + * @param params parameters of the template */ public Builder(PropertyType type, PropertyKey.Template template, Object... params) { this(format(template.mFormat, params), type); } /** - * @param type type of the property + * @param type type of the property * @param delimiter delimiter for value, if list value is given as a string - * @param template template for the property name - * @param params parameters of the template + * @param template template for the property name + * @param params parameters of the template */ public Builder(PropertyType type, Optional delimiter, PropertyKey.Template template, Object... params) { @@ -409,7 +399,7 @@ public Builder setDefaultSupplier(DefaultSupplier defaultSupplier) { } /** - * @param supplier supplier for the property's default value + * @param supplier supplier for the property's default value * @param description description of the default value * @return the updated builder instance */ @@ -709,9 +699,9 @@ public String toString() { durationBuilder(Name.METRICS_EXECUTOR_TASK_WARN_FREQUENCY) .setDefaultValue("5sec") .setDescription(String.format("When instrumenting an executor with" - + "InstrumentedExecutorService, if the number of" - + " active tasks (queued or running) is greater than %s value, a warning log" - + " will be printed at the given interval", + + "InstrumentedExecutorService, if the number of" + + " active tasks (queued or running) is greater than %s value, a warning log" + + " will be printed at the given interval", Name.METRICS_EXECUTOR_TASK_WARN_SIZE)) .setScope(Scope.ALL) .setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE) @@ -794,8 +784,7 @@ public String toString() { stringBuilder(Name.ROCKS_INODE_CONF_FILE) .setDescription(format("Path of file containing RocksDB inode store configuration." + " A template configuration cab be found at ${%s}/rocks-inode.ini.template." - + - " See https://github.com/facebook/rocksdb/blob/main/examples/rocksdb_option_file_example.ini" + + " See https://github.com/facebook/rocksdb/blob/main/examples/rocksdb_option_file_example.ini" + " for more information on RocksDB configuration files." + " If unset then a default configuration will" + " be used.", Name.CONF_DIR)) @@ -805,8 +794,7 @@ public String toString() { stringBuilder(Name.ROCKS_BLOCK_CONF_FILE) .setDescription(format("Path of file containing RocksDB block store configuration." + " A template configuration cab be found at ${%s}/rocks-block.ini.template." - + - " See https://github.com/facebook/rocksdb/blob/main/examples/rocksdb_option_file_example.ini" + + " See https://github.com/facebook/rocksdb/blob/main/examples/rocksdb_option_file_example.ini" + " for more information on RocksDB configuration files." + " If unset then a default configuration will" + " be used.", Name.CONF_DIR)) @@ -1096,10 +1084,10 @@ public String toString() { public static final PropertyKey UNDERFS_GCS_OWNER_ID_TO_USERNAME_MAPPING = stringBuilder(Name.UNDERFS_GCS_OWNER_ID_TO_USERNAME_MAPPING) .setDescription(format("Optionally, specify a preset gcs owner id " - + "to Alluxio username static mapping in the format \"id1=user1;id2=user2\". " - + "The Google Cloud Storage IDs can be found at the console address " - + "https://console.cloud.google.com/storage/settings . Please use the " - + "\"Owners\" one. This property key is only valid when %s=1", + + "to Alluxio username static mapping in the format \"id1=user1;id2=user2\". " + + "The Google Cloud Storage IDs can be found at the console address " + + "https://console.cloud.google.com/storage/settings . Please use the " + + "\"Owners\" one. This property key is only valid when %s=1", Name.UNDERFS_GCS_VERSION)) .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.SERVER) @@ -1150,9 +1138,9 @@ public String toString() { intBuilder(Name.UNDERFS_GCS_VERSION) .setDefaultValue(2) .setDescription(format("Specify the version of GCS module to use. " - + "GCS version \"1\" builds on top of jets3t package " - + "which requires %s and %s. GCS version \"2\" build on top " - + "of Google cloud API which requires %s", Name.GCS_ACCESS_KEY, Name.GCS_SECRET_KEY, + + "GCS version \"1\" builds on top of jets3t package " + + "which requires %s and %s. GCS version \"2\" build on top " + + "of Google cloud API which requires %s", Name.GCS_ACCESS_KEY, Name.GCS_SECRET_KEY, Name.GCS_CREDENTIAL_PATH)) .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.SERVER) @@ -1600,7 +1588,7 @@ public String toString() { durationBuilder(Name.UNDERFS_S3_CONNECT_TTL) .setDefaultValue(-1) .setDescription("The expiration time of S3 connections in ms. -1 means the connection " - + "will never expire.") + + "will never expire.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.SERVER) .build(); @@ -1745,46 +1733,6 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.SERVER) .build(); - public static final PropertyKey UNDERFS_TOS_RETRY_MAX = - intBuilder(Name.UNDERFS_TOS_RETRY_MAX) - .setAlias("alluxio.underfs.tos.retry.max") - .setDefaultValue(3) - .setDescription("The maximum number of TOS error retry.") - .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) - .setScope(Scope.SERVER) - .build(); - public static final PropertyKey UNDERFS_TOS_WRITE_TIMEOUT = - durationBuilder(Name.UNDERFS_TOS_WRITE_TIMEOUT) - .setAlias("alluxio.underfs.tos.write.timeout.ms", "alluxio.underfs.tos.write.timeout") - .setDefaultValue("30sec") - .setDescription("The timeout for a single write request to TOS.") - .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) - .setScope(Scope.SERVER) - .build(); - public static final PropertyKey UNDERFS_TOS_READ_TIMEOUT = - durationBuilder(Name.UNDERFS_TOS_READ_TIMEOUT) - .setAlias("alluxio.underfs.tos.read.timeout.ms", "alluxio.underfs.tos.read.timeout") - .setDefaultValue("30sec") - .setDescription("The timeout for a single read request to TOS.") - .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) - .setScope(Scope.SERVER) - .build(); - public static final PropertyKey UNDERFS_TOS_CONNECT_TIMEOUT = - durationBuilder(Name.UNDERFS_TOS_CONNECT_TIMEOUT) - .setAlias("alluxio.underfs.tos.connect.timeout.ms", "alluxio.underfs.tos.connect.timeout") - .setDefaultValue("30sec") - .setDescription("The timeout for a connection to TOS.") - .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) - .setScope(Scope.SERVER) - .build(); - public static final PropertyKey UNDERFS_TOS_CONNECT_TTL = - durationBuilder(Name.UNDERFS_TOS_CONNECT_TTL) - .setDefaultValue("60sec") - .setDescription("The expiration time of TOS connections in ms. -1 means the connection " - + "will never expire.") - .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) - .setScope(Scope.SERVER) - .build(); // UFS access control related properties // @@ -2117,6 +2065,57 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.SERVER) .build(); + public static final PropertyKey UNDERFS_TOS_RETRY_MAX = + intBuilder(Name.UNDERFS_TOS_RETRY_MAX) + .setAlias("alluxio.underfs.tos.retry.max") + .setDefaultValue(3) + .setDescription("The maximum number of TOS error retry.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.SERVER) + .build(); + public static final PropertyKey UNDERFS_TOS_WRITE_TIMEOUT = + intBuilder(Name.UNDERFS_TOS_WRITE_TIMEOUT) + .setAlias("alluxio.underfs.tos.write.timeout.ms", "alluxio.underfs.tos.write.timeout") + .setDefaultValue(30000) + .setDescription("The timeout for a single write request to TOS.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.SERVER) + .build(); + public static final PropertyKey UNDERFS_TOS_READ_TIMEOUT = + intBuilder(Name.UNDERFS_TOS_READ_TIMEOUT) + .setAlias("alluxio.underfs.tos.read.timeout.ms", "alluxio.underfs.tos.read.timeout") + .setDefaultValue(30000) + .setDescription("The timeout for a single read request to TOS.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.SERVER) + .build(); + public static final PropertyKey UNDERFS_TOS_CONNECT_TIMEOUT = + intBuilder(Name.UNDERFS_TOS_CONNECT_TIMEOUT) + .setAlias("alluxio.underfs.tos.connect.timeout.ms", "alluxio.underfs.tos.connect.timeout") + .setDefaultValue(30000) + .setDescription("The timeout for a connection to TOS.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.SERVER) + .build(); + public static final PropertyKey UNDERFS_TOS_CONNECT_TTL = + intBuilder(Name.UNDERFS_TOS_CONNECT_TTL) + .setDefaultValue(60000) + .setDescription("The expiration time of TOS connections in ms. -1 means the connection " + + "will never expire.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.SERVER) + .build(); + public static final PropertyKey UNDERFS_TOS_INTERMEDIATE_UPLOAD_CLEAN_AGE = + durationBuilder(Name.UNDERFS_TOS_INTERMEDIATE_UPLOAD_CLEAN_AGE) + .setDefaultValue("3day") + .setDescription("Streaming uploads may not have been completed/aborted correctly " + + "and need periodical ufs cleanup. If ufs cleanup is enabled, " + + "intermediate multipart uploads in all non-readonly TOS mount points " + + "older than this age will be cleaned. This may impact other " + + "ongoing upload operations, so a large clean age is encouraged.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) + .setScope(Scope.SERVER) + .build(); public static final PropertyKey UNDERFS_TOS_CONNECT_MAX = intBuilder(Name.UNDERFS_TOS_CONNECT_MAX) .setDefaultValue(1024) @@ -2163,17 +2162,7 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.MASTER) .build(); - public static final PropertyKey UNDERFS_TOS_INTERMEDIATE_UPLOAD_CLEAN_AGE = - durationBuilder(Name.UNDERFS_TOS_INTERMEDIATE_UPLOAD_CLEAN_AGE) - .setDefaultValue("3day") - .setDescription("Streaming uploads may not have been completed/aborted correctly " - + "and need periodical ufs cleanup. If ufs cleanup is enabled, " - + "intermediate multipart uploads in all non-readonly TOS mount points " - + "older than this age will be cleaned. This may impact other " - + "ongoing upload operations, so a large clean age is encouraged.") - .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) - .setScope(Scope.SERVER) - .build(); + /** * Master related properties. */ @@ -2429,17 +2418,17 @@ public String toString() { .build(); public static final PropertyKey MASTER_CONTAINER_ID_RESERVATION_SIZE = intBuilder(Name.MASTER_CONTAINER_ID_RESERVATION_SIZE) - .setDefaultValue(1000) - .setDescription("The number of container ids to 'reserve' before having to journal " + .setDefaultValue(1000) + .setDescription("The number of container ids to 'reserve' before having to journal " + "container id state. This allows the master to return container ids within " + "the reservation, without having to write to.") - .setScope(Scope.MASTER) - .build(); + .setScope(Scope.MASTER) + .build(); public static final PropertyKey MASTER_EMBEDDED_JOURNAL_ADDRESSES = listBuilder(Name.MASTER_EMBEDDED_JOURNAL_ADDRESSES) .setDescription(format("A comma-separated list of journal addresses for all " - + "masters in the cluster. The format is 'hostname1:port1,hostname2:port2,...'. When " - + "left unset, Alluxio uses ${%s}:${%s} by default", Name.MASTER_HOSTNAME, + + "masters in the cluster. The format is 'hostname1:port1,hostname2:port2,...'. When " + + "left unset, Alluxio uses ${%s}:${%s} by default", Name.MASTER_HOSTNAME, Name.MASTER_EMBEDDED_JOURNAL_PORT)) // We intentionally don't set a default value here. That way, we can use isSet() to check // whether the user explicitly set these addresses. If they did, we determine job master @@ -2458,11 +2447,10 @@ public String toString() { durationBuilder(Name.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT) .setDescription(format( "The max election timeout for the embedded journal. When a random period between " - + "${%s} and ${%s} elapses without a master receiving any messages, the master " - + "will attempt to become the primary Election timeout will be waited initially " - + "when the cluster is forming. So larger values for election timeout will cause " - + - "longer start-up time. Smaller values might introduce instability to leadership.", + + "${%s} and ${%s} elapses without a master receiving any messages, the master " + + "will attempt to become the primary Election timeout will be waited initially " + + "when the cluster is forming. So larger values for election timeout will cause " + + "longer start-up time. Smaller values might introduce instability to leadership.", Name.MASTER_EMBEDDED_JOURNAL_MIN_ELECTION_TIMEOUT, Name.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT)) // TODO(qian0817): dynamically set here @@ -2520,8 +2508,7 @@ public String toString() { + "slow or contested disk. WARNING: enabling this property may result in metadata " + "loss if half or more of the master nodes fail. See Ratis property " + "raft.server.log.unsafe-flush.enabled at " - + - "https://github.com/apache/ratis/blob/master/ratis-docs/src/site/markdown/configuraions.md.") + + "https://github.com/apache/ratis/blob/master/ratis-docs/src/site/markdown/configuraions.md.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.MASTER) .build(); @@ -2773,21 +2760,21 @@ public String toString() { .build(); public static final PropertyKey MASTER_METASTORE_ROCKS_PARALLEL_BACKUP = booleanBuilder(Name.MASTER_METASTORE_ROCKS_PARALLEL_BACKUP) - .setDefaultValue(false) - .setDescription(format("Whether to checkpoint rocksdb in parallel using the number of" - + " threads set by %s.", Name.MASTER_METASTORE_ROCKS_PARALLEL_BACKUP_THREADS)) - .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) - .setScope(Scope.MASTER) - .build(); + .setDefaultValue(false) + .setDescription(format("Whether to checkpoint rocksdb in parallel using the number of" + + " threads set by %s.", Name.MASTER_METASTORE_ROCKS_PARALLEL_BACKUP_THREADS)) + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.MASTER) + .build(); public static final PropertyKey MASTER_METASTORE_ROCKS_PARALLEL_BACKUP_THREADS = intBuilder(Name.MASTER_METASTORE_ROCKS_PARALLEL_BACKUP_THREADS) - .setDefaultSupplier(() -> Math.min(16, - Math.max(1, Runtime.getRuntime().availableProcessors() / 2)), - "The default number of threads used by backing up rocksdb in parallel.") - .setDescription("The number of threads used by backing up rocksdb in parallel.") - .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) - .setScope(Scope.MASTER) - .build(); + .setDefaultSupplier(() -> Math.min(16, + Math.max(1, Runtime.getRuntime().availableProcessors() / 2)), + "The default number of threads used by backing up rocksdb in parallel.") + .setDescription("The number of threads used by backing up rocksdb in parallel.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.MASTER) + .build(); public static final PropertyKey MASTER_METASTORE_INODE_CACHE_EVICT_BATCH_SIZE = intBuilder(Name.MASTER_METASTORE_INODE_CACHE_EVICT_BATCH_SIZE) // TODO(andrew): benchmark different batch sizes to improve the default and provide a @@ -2821,7 +2808,7 @@ public String toString() { public static final PropertyKey MASTER_METASTORE_INODE_CACHE_MAX_SIZE = intBuilder(Name.MASTER_METASTORE_INODE_CACHE_MAX_SIZE) .setDefaultSupplier(() -> (int) Math.min(Integer.MAX_VALUE / 2, - Runtime.getRuntime().maxMemory() / 2000 / 2), + Runtime.getRuntime().maxMemory() / 2000 / 2), "{Max memory of master JVM} / 2 / 2 KB per inode") .setDescription("The number of inodes to cache on-heap. " + "The default value is chosen based on half the amount of maximum available memory " @@ -3292,11 +3279,10 @@ public String toString() { .build(); public static final PropertyKey MASTER_JOURNAL_SPACE_MONITOR_INTERVAL = durationBuilder(Name.MASTER_JOURNAL_SPACE_MONITOR_INTERVAL) - .setDefaultValue("10min") - .setDescription(format("How often to check and update information on space " - + - "utilization of the journal disk. This is currently only compatible with linux-based" - + "systems and when %s is configured to EMBEDDED", Name.MASTER_JOURNAL_TYPE)) + .setDefaultValue("10min") + .setDescription(format("How often to check and update information on space " + + "utilization of the journal disk. This is currently only compatible with linux-based" + + "systems and when %s is configured to EMBEDDED", Name.MASTER_JOURNAL_TYPE)) .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.MASTER) .build(); @@ -3442,10 +3428,10 @@ public String toString() { public static final PropertyKey MASTER_PERSISTENCE_BLACKLIST = listBuilder(Name.MASTER_PERSISTENCE_BLACKLIST) .setDescription("Patterns to blacklist persist, comma separated, string match, no regex." - + " This affects any async persist call (including ASYNC_THROUGH writes and CLI " - + "persist) but does not affect CACHE_THROUGH writes. Users may want to specify " - + "temporary files in the blacklist to avoid unnecessary I/O and errors. Some " - + "examples are `.staging` and `.tmp`.") + + " This affects any async persist call (including ASYNC_THROUGH writes and CLI " + + "persist) but does not affect CACHE_THROUGH writes. Users may want to specify " + + "temporary files in the blacklist to avoid unnecessary I/O and errors. Some " + + "examples are `.staging` and `.tmp`.") .setScope(Scope.MASTER) .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .build(); @@ -3587,7 +3573,7 @@ public String toString() { intBuilder(Name.MASTER_UFS_ACTIVE_SYNC_MAX_AGE) .setDefaultValue(10) .setDescription("The maximum number of intervals we will wait to find a quiet " - + "period before we have to sync the directories") + + "period before we have to sync the directories") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.MASTER) .build(); @@ -3779,7 +3765,7 @@ public String toString() { + "This property determines the wait time.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.MASTER) - .build(); + .build(); public static final PropertyKey MASTER_WORKER_INFO_CACHE_REFRESH_TIME = durationBuilder(Name.MASTER_WORKER_INFO_CACHE_REFRESH_TIME) .setDefaultValue("10sec") @@ -4581,7 +4567,7 @@ public String toString() { } }, "2/3 of total system memory, or 1GB if system memory size cannot be determined") .setDescription("The allocated memory for each worker node's ramdisk(s). " - + "It is recommended to set this value explicitly.") + + "It is recommended to set this value explicitly.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.WORKER) .build(); @@ -4844,11 +4830,11 @@ public String toString() { .setDefaultValue(1_000_000) .setDescription( "When " + Name.WORKER_REGISTER_TO_ALL_MASTERS + "=true, " - + "because a worker will send block reports to all masters, " - + "we use a threshold to limit the unsent block report size in worker's memory. " - + "If the worker block heartbeat is larger than the threshold, " - + "we discard the heartbeat message and force " - + "the worker to register with that master with a full report." + + "because a worker will send block reports to all masters, " + + "we use a threshold to limit the unsent block report size in worker's memory. " + + "If the worker block heartbeat is larger than the threshold, " + + "we discard the heartbeat message and force " + + "the worker to register with that master with a full report." ) .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.WORKER) @@ -5334,8 +5320,8 @@ public String toString() { listBuilder(Name.WORKER_WHITELIST) .setDefaultValue("/") .setDescription("A comma-separated list of prefixes of the paths which are " - + "cacheable, separated by semi-colons. Alluxio will try to cache the cacheable " - + "file when it is read for the first time.") + + "cacheable, separated by semi-colons. Alluxio will try to cache the cacheable " + + "file when it is read for the first time.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.WORKER) .build(); @@ -5441,7 +5427,7 @@ public String toString() { .setAlias("alluxio.proxy.master.heartbeat.interval.ms") .setDefaultValue("10sec") .setDescription("Proxy instances maintain a heartbeat with the primary master. " - + "This key specifies the heartbeat interval.") + + "This key specifies the heartbeat interval.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.SERVER) .build(); @@ -5560,29 +5546,29 @@ public String toString() { booleanBuilder(Name.PROXY_S3_TAGGING_RESTRICTIONS_ENABLED) .setDefaultValue(true) .setDescription("Toggles whether or not the Alluxio S3 API will enforce " - + "AWS S3 tagging restrictions (10 tags, 128 character keys, 256 character " - + "values) See " - + "https://docs.aws.amazon.com/AmazonS3/latest/userguide/tagging-managing.html.") + + "AWS S3 tagging restrictions (10 tags, 128 character keys, 256 character " + + "values) See " + + "https://docs.aws.amazon.com/AmazonS3/latest/userguide/tagging-managing.html.") .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.SERVER) .build(); public static final PropertyKey PROXY_S3_V2_VERSION_ENABLED = - booleanBuilder(Name.PROXY_S3_V2_VERSION_ENABLED) - .setDefaultValue(true) - .setDescription("(Experimental) V2, an optimized version of " - + "Alluxio s3 proxy service.") - .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) - .setScope(Scope.SERVER) - .build(); + booleanBuilder(Name.PROXY_S3_V2_VERSION_ENABLED) + .setDefaultValue(true) + .setDescription("(Experimental) V2, an optimized version of " + + "Alluxio s3 proxy service.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) + .setScope(Scope.SERVER) + .build(); public static final PropertyKey PROXY_S3_V2_ASYNC_PROCESSING_ENABLED = - booleanBuilder(Name.PROXY_S3_V2_ASYNC_PROCESSING_ENABLED) - .setDefaultValue(false) - .setDescription("(Experimental) If enabled, handle S3 request " - + "in async mode when v2 version of Alluxio s3 " - + "proxy service is enabled.") - .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) - .setScope(Scope.SERVER) - .build(); + booleanBuilder(Name.PROXY_S3_V2_ASYNC_PROCESSING_ENABLED) + .setDefaultValue(false) + .setDescription("(Experimental) If enabled, handle S3 request " + + "in async mode when v2 version of Alluxio s3 " + + "proxy service is enabled.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) + .setScope(Scope.SERVER) + .build(); public static final PropertyKey PROXY_S3_V2_ASYNC_CONTEXT_TIMEOUT_MS = longBuilder(Name.PROXY_S3_V2_ASYNC_CONTEXT_TIMEOUT_MS) .setDefaultValue(30000L) @@ -6081,10 +6067,10 @@ public String toString() { durationBuilder(Name.USER_FILE_PERSISTENCE_INITIAL_WAIT_TIME) .setDefaultValue("0") .setDescription(format("Time to wait before starting the persistence job. " - + "When the value is set to -1, the file will be persisted by rename operation " - + "or persist CLI but will not be automatically persisted in other cases. " - + "This is to avoid the heavy object copy in rename operation when %s is set to %s. " - + "This value should be smaller than the value of %s", + + "When the value is set to -1, the file will be persisted by rename operation " + + "or persist CLI but will not be automatically persisted in other cases. " + + "This is to avoid the heavy object copy in rename operation when %s is set to %s. " + + "This value should be smaller than the value of %s", Name.USER_FILE_WRITE_TYPE_DEFAULT, WritePType.ASYNC_THROUGH, Name.MASTER_PERSISTENCE_MAX_TOTAL_WAIT_TIME_MS)) .setScope(Scope.CLIENT) @@ -6220,16 +6206,15 @@ public String toString() { booleanBuilder(Name.USER_CLIENT_CACHE_SHADOW_ENABLED) .setDefaultValue(false) .setDescription( - "If this is enabled, a shadow cache will be created to tracking the working set of " - + - "a past time window, and measure the hit ratio if the working set fits the cache") + "If this is enabled, a shadow cache will be created to tracking the working set of " + + "a past time window, and measure the hit ratio if the working set fits the cache") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN).setScope(Scope.CLIENT).build(); public static final PropertyKey USER_CLIENT_CACHE_SHADOW_TYPE = enumBuilder(Name.USER_CLIENT_CACHE_SHADOW_TYPE, ShadowCacheType.class) .setDefaultValue("CLOCK_CUCKOO_FILTER") .setDescription("The type of shadow cache to be used. " + "Valid options are `MULTIPLE_BLOOM_FILTER` (which uses a chain of bloom filters), " - + "`CLOCK_CUCKOO_FILTER` (which uses cuckoo filter with extended field).") + + "`CLOCK_CUCKOO_FILTER` (which uses cuckoo filter with extended field).") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN).setScope(Scope.CLIENT).build(); public static final PropertyKey USER_CLIENT_CACHE_SHADOW_WINDOW = durationBuilder(Name.USER_CLIENT_CACHE_SHADOW_WINDOW) @@ -6429,14 +6414,13 @@ public String toString() { public static final PropertyKey USER_FILE_WRITE_TYPE_DEFAULT = enumBuilder(Name.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.class) .setDefaultValue(WriteType.ASYNC_THROUGH) - .setDescription( - format("Default write type when creating Alluxio files. Valid " + "options are " - + "`MUST_CACHE` (write will only go to Alluxio and must be stored in Alluxio), " - + "`CACHE_THROUGH` (try to cache, write to UnderFS synchronously), `THROUGH` " - + "(no cache, write to UnderFS synchronously), `ASYNC_THROUGH` (write to cache, " - + - "write to UnderFS asynchronously, replicated %s times in Alluxio before data is " - + "persisted.", USER_FILE_REPLICATION_DURABLE)) + .setDescription( + format("Default write type when creating Alluxio files. Valid " + "options are " + + "`MUST_CACHE` (write will only go to Alluxio and must be stored in Alluxio), " + + "`CACHE_THROUGH` (try to cache, write to UnderFS synchronously), `THROUGH` " + + "(no cache, write to UnderFS synchronously), `ASYNC_THROUGH` (write to cache, " + + "write to UnderFS asynchronously, replicated %s times in Alluxio before data is " + + "persisted.", USER_FILE_REPLICATION_DURABLE)) .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.CLIENT) .build(); @@ -6899,7 +6883,7 @@ public String toString() { .setDefaultValue(1) .setDescription( "The maximum number of physical connections to be " - + "used per target host.") + + "used per target host.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.CLIENT) .build(); @@ -6961,7 +6945,7 @@ public String toString() { .setDefaultValue(64) .setDescription( "The maximum number of physical connections to be " - + "used per target host.") + + "used per target host.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.CLIENT) .build(); @@ -7198,8 +7182,8 @@ public String toString() { .setAlias(Name.WORKER_FUSE_MOUNT_ALLUXIO_PATH) .setDefaultValue("/") .setDescription(format("The Alluxio path to mount to the given " - + "Fuse mount point configured by %s in the worker when %s is enabled " - + "or in the standalone Fuse process.", + + "Fuse mount point configured by %s in the worker when %s is enabled " + + "or in the standalone Fuse process.", Name.FUSE_MOUNT_POINT, Name.WORKER_FUSE_ENABLED)) .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.ALL) @@ -7368,9 +7352,9 @@ public String toString() { public static final PropertyKey SECURITY_LOGIN_IMPERSONATION_USERNAME = stringBuilder(Name.SECURITY_LOGIN_IMPERSONATION_USERNAME) .setDescription(format("When %s is set to SIMPLE or CUSTOM, user application uses this " - + "property to indicate the IMPERSONATED user requesting Alluxio service. If it is " - + "not set explicitly, or set to %s, impersonation will not be used. A special " - + "value of '%s' can be specified to impersonate the hadoop client user.", + + "property to indicate the IMPERSONATED user requesting Alluxio service. If it is " + + "not set explicitly, or set to %s, impersonation will not be used. A special " + + "value of '%s' can be specified to impersonate the hadoop client user.", SECURITY_AUTHENTICATION_TYPE, Constants.IMPERSONATION_NONE, Constants.IMPERSONATION_HDFS_USER)) .setDefaultValue(Constants.IMPERSONATION_HDFS_USER) @@ -7402,12 +7386,12 @@ public String toString() { .setScope(Scope.MASTER) .build(); public static final PropertyKey S3_REST_AUTHENTICATOR_CLASSNAME = - classBuilder(Name.S3_REST_AUTHENTICATOR_CLASSNAME) - .setDescription("The class's name is instantiated as an S3 authenticator.") - .setDefaultValue("alluxio.proxy.s3.auth.PassAllAuthenticator") - .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) - .setScope(Scope.ALL) - .build(); + classBuilder(Name.S3_REST_AUTHENTICATOR_CLASSNAME) + .setDescription("The class's name is instantiated as an S3 authenticator.") + .setDefaultValue("alluxio.proxy.s3.auth.PassAllAuthenticator") + .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) + .setScope(Scope.ALL) + .build(); // // Network TLS support // @@ -7485,11 +7469,11 @@ public String toString() { .build(); public static final PropertyKey JOB_RETENTION_TIME = durationBuilder(Name.JOB_RETENTION_TIME) - .setDescription("The length of time the Alluxio should save information about " - + "completed jobs before they are discarded.") - .setDefaultValue("1d") - .setScope(Scope.MASTER) - .build(); + .setDescription("The length of time the Alluxio should save information about " + + "completed jobs before they are discarded.") + .setDefaultValue("1d") + .setScope(Scope.MASTER) + .build(); // // Job service @@ -7663,11 +7647,11 @@ public String toString() { public static final PropertyKey JOB_MASTER_RPC_ADDRESSES = listBuilder(Name.JOB_MASTER_RPC_ADDRESSES) .setDescription(format("A list of comma-separated host:port RPC addresses where " - + "the client should look for job masters when using multiple job masters " - + "without Zookeeper. This property is not used " - + "when Zookeeper is enabled, since Zookeeper already stores the job master " - + "addresses. If property is not defined, clients will look for job masters " - + "using [%s]:%s first, then for [%s]:%s.", + + "the client should look for job masters when using multiple job masters " + + "without Zookeeper. This property is not used " + + "when Zookeeper is enabled, since Zookeeper already stores the job master " + + "addresses. If property is not defined, clients will look for job masters " + + "using [%s]:%s first, then for [%s]:%s.", Name.MASTER_RPC_ADDRESSES, Name.JOB_MASTER_RPC_PORT, Name.JOB_MASTER_EMBEDDED_JOURNAL_ADDRESSES, Name.JOB_MASTER_RPC_PORT)) .setScope(Scope.ALL) @@ -7853,7 +7837,7 @@ public String toString() { intBuilder(Name.TABLE_LOAD_DEFAULT_REPLICATION) .setDefaultValue(1) .setDescription("The default replication number of files under the SDS table after " - + "load option.") + + "load option.") .setScope(Scope.CLIENT) .build(); public static final PropertyKey HADOOP_SECURITY_AUTHENTICATION = @@ -8160,20 +8144,15 @@ public static final class Name { "alluxio.underfs.tos.streaming.upload.partition.size"; public static final String UNDERFS_TOS_STREAMING_UPLOAD_THREADS = "alluxio.underfs.tos.streaming.upload.threads"; - public static final String UNDERFS_TOS_CONNECT_MAX = - "alluxio.underfs.tos.connect.max"; + public static final String UNDERFS_TOS_RETRY_MAX = "alluxio.underfs.tos.retry.max"; + public static final String UNDERFS_TOS_WRITE_TIMEOUT = "alluxio.underfs.tos.write.timeout"; + public static final String UNDERFS_TOS_READ_TIMEOUT = "alluxio.underfs.tos.read.timeout"; + public static final String UNDERFS_TOS_CONNECT_TIMEOUT = "alluxio.underfs.tos.connect.timeout"; + public static final String UNDERFS_TOS_CONNECT_TTL = "alluxio.underfs.tos.connect.ttl"; + public static final String UNDERFS_TOS_CONNECT_MAX = "alluxio.underfs.tos.connect.max"; public static final String UNDERFS_TOS_INTERMEDIATE_UPLOAD_CLEAN_AGE = "alluxio.underfs.tos.intermediate.upload.clean.age"; - public static final String UNDERFS_TOS_RETRY_MAX = - "alluxio.underfs.tos.retry.max"; - public static final String UNDERFS_TOS_CONNECT_TIMEOUT = - "alluxio.underfs.tos.connect.timeout"; - public static final String UNDERFS_TOS_CONNECT_TTL = - "alluxio.underfs.tos.connect.ttl"; - public static final String UNDERFS_TOS_WRITE_TIMEOUT = - "alluxio.underfs.tos.write.timeout"; - public static final String UNDERFS_TOS_READ_TIMEOUT = - "alluxio.underfs.tos.read.timeout"; + // // UFS access control related properties // @@ -8797,7 +8776,7 @@ public static final class Name { public static final String WORKER_NETWORK_KEEPALIVE_TIMEOUT_MS = "alluxio.worker.network.keepalive.timeout"; public static final String WORKER_NETWORK_PERMIT_KEEPALIVE_TIME_MS = - "alluxio.worker.network.permit.keepalive.time"; + "alluxio.worker.network.permit.keepalive.time"; public static final String WORKER_NETWORK_MAX_INBOUND_MESSAGE_SIZE = "alluxio.worker.network.max.inbound.message.size"; public static final String WORKER_NETWORK_NETTY_BOSS_THREADS = @@ -8883,9 +8862,9 @@ public static final class Name { public static final String WORKER_REGISTER_LEASE_RETRY_MAX_DURATION = "alluxio.worker.register.lease.retry.max.duration"; public static final String WORKER_REVIEWER_PROBABILISTIC_HARDLIMIT_BYTES = - "alluxio.worker.reviewer.probabilistic.hardlimit.bytes"; + "alluxio.worker.reviewer.probabilistic.hardlimit.bytes"; public static final String WORKER_REVIEWER_PROBABILISTIC_SOFTLIMIT_BYTES = - "alluxio.worker.reviewer.probabilistic.softlimit.bytes"; + "alluxio.worker.reviewer.probabilistic.softlimit.bytes"; public static final String WORKER_REVIEWER_CLASS = "alluxio.worker.reviewer.class"; public static final String WORKER_RPC_PORT = "alluxio.worker.rpc.port"; public static final String WORKER_RPC_EXECUTOR_TYPE = "alluxio.worker.rpc.executor.type"; @@ -8966,9 +8945,9 @@ public static final class Name { public static final String PROXY_AUDIT_LOGGING_ENABLED = "alluxio.proxy.audit.logging.enabled"; public static final String PROXY_S3_V2_VERSION_ENABLED = - "alluxio.proxy.s3.v2.version.enabled"; + "alluxio.proxy.s3.v2.version.enabled"; public static final String PROXY_S3_V2_ASYNC_PROCESSING_ENABLED = - "alluxio.proxy.s3.v2.async.processing.enabled"; + "alluxio.proxy.s3.v2.async.processing.enabled"; public static final String PROXY_S3_V2_ASYNC_CONTEXT_TIMEOUT_MS = "alluxio.proxy.s3.v2.async.context.timeout.ms"; public static final String PROXY_S3_V2_ASYNC_LIGHT_POOL_CORE_THREAD_NUMBER = @@ -9097,11 +9076,11 @@ public static final class Name { public static final String USER_CLIENT_CACHE_QUOTA_ENABLED = "alluxio.user.client.cache.quota.enabled"; public static final String USER_CLIENT_CACHE_TTL_ENABLED = - "alluxio.user.client.cache.ttl.enabled"; + "alluxio.user.client.cache.ttl.enabled"; public static final String USER_CLIENT_CACHE_TTL_CHECK_INTERVAL_SECONDS = - "alluxio.user.client.cache.ttl.check.interval.seconds"; + "alluxio.user.client.cache.ttl.check.interval.seconds"; public static final String USER_CLIENT_CACHE_TTL_THRESHOLD_SECONDS = - "alluxio.user.client.cache.ttl.threshold.seconds"; + "alluxio.user.client.cache.ttl.threshold.seconds"; public static final String USER_CLIENT_CACHE_SIZE = "alluxio.user.client.cache.size"; public static final String USER_CLIENT_CACHE_STORE_OVERHEAD = @@ -9403,7 +9382,7 @@ public static final class Name { public static final String JOB_MASTER_BIND_HOST = "alluxio.job.master.bind.host"; public static final String JOB_MASTER_HOSTNAME = "alluxio.job.master.hostname"; public static final String JOB_MASTER_LOST_MASTER_INTERVAL = - "alluxio.job.master.lost.master.interval"; + "alluxio.job.master.lost.master.interval"; public static final String JOB_MASTER_LOST_WORKER_INTERVAL = "alluxio.job.master.lost.worker.interval"; public static final String JOB_MASTER_RPC_PORT = "alluxio.job.master.rpc.port"; @@ -9485,15 +9464,14 @@ public static final class Name { public static final String HADOOP_CHECKSUM_COMBINE_MODE = "alluxio.hadoop.checksum.combine.mode"; - private Name() { - } // prevent instantiation + private Name() {} // prevent instantiation } /** * list of substrings of a name where any custom PropertyKey with a name that contains it * should have a {@link DisplayType} of CREDENTIALS. */ - private static final String[] CUSTOM_CREDENTIAL_NAME_SUBSTR = new String[] { + private static final String[] CUSTOM_CREDENTIAL_NAME_SUBSTR = new String[]{ "accessKeyId", "secretKey" }; @@ -9572,7 +9550,7 @@ public enum Template { "fs.adl.account.%s.oauth2.credential", "fs\\.adl\\.account\\.(\\w+)\\.oauth2\\.credential", PropertyCreators.fromBuilder(stringBuilder("fs.adl.account.%s.oauth2.credential") - .setDisplayType(DisplayType.CREDENTIALS))), + .setDisplayType(DisplayType.CREDENTIALS))), UNDERFS_AZURE_REFRESH_URL( "fs.adl.account.%s.oauth2.refresh.url", "fs\\.adl\\.account\\.(\\w+)\\.oauth2\\.refresh\\.url", @@ -9713,22 +9691,21 @@ private static BiFunction createNestedProperty } Template(String format, String re, - BiFunction propertyCreator) { + BiFunction propertyCreator) { this(format, re, PropertyType.STRING, Optional.empty(), Optional.empty(), propertyCreator); } /** * Constructs a property key format. * - * @param format String of this property as formatted string - * @param re String of this property as regexp - * @param enumType enum class of an enum property - * @param delimiter delimiter of this property + * @param format String of this property as formatted string + * @param re String of this property as regexp + * @param enumType enum class of an enum property + * @param delimiter delimiter of this property * @param propertyCreator a function that creates property key given name and base property key */ Template(String format, String re, PropertyType type, Optional> enumType, - Optional delimiter, - BiFunction propertyCreator) { + Optional delimiter, BiFunction propertyCreator) { mFormat = format; mPattern = Pattern.compile(re); mType = type; @@ -9869,102 +9846,71 @@ public static Collection defaultKeys() { return DEFAULT_KEYS_MAP.values(); } - /** - * Property name. - */ + /** Property name. */ private final String mName; - /** - * Property Key description. - */ + /** Property Key description. */ private final String mDescription; - /** - * Property type. - */ + /** Property type. */ private final PropertyType mType; - /** - * Property's enum class type, if property type is ENUM. - */ + /** Property's enum class type, if property type is ENUM. */ private final Optional> mEnumType; - /** - * Property's list delimiter, if property type is LIST. - */ + /** Property's list delimiter, if property type is LIST. */ private final Optional mDelimiter; - /** - * Supplies the Property Key default value. - */ + /** Supplies the Property Key default value. */ private final DefaultSupplier mDefaultSupplier; - /** - * Property Key alias. - */ + /** Property Key alias. */ private final String[] mAliases; - /** - * Whether to ignore as a site property. - */ + /** Whether to ignore as a site property. */ private final boolean mIgnoredSiteProperty; - /** - * Whether the property is an Alluxio built-in property. - */ + /** Whether the property is an Alluxio built-in property. */ private final boolean mIsBuiltIn; - /** - * Whether to hide in document. - */ + /** Whether to hide in document. */ private final boolean mIsHidden; - /** - * Whether property should be consistent within the cluster. - */ + /** Whether property should be consistent within the cluster. */ private final ConsistencyCheckLevel mConsistencyCheckLevel; - /** - * The scope this property applies to. - */ + /** The scope this property applies to. */ private final Scope mScope; - /** - * The displayType which indicates how the property value should be displayed. - **/ + /** The displayType which indicates how the property value should be displayed. **/ private final DisplayType mDisplayType; - /** - * Whether the property could be updated dynamically. - */ + /** Whether the property could be updated dynamically. */ private final boolean mDynamic; - /** - * A custom function to validate the value. - */ + /** A custom function to validate the value. */ private final Function mValueValidationFunction; /** - * @param name String of this property - * @param description String description of this property key - * @param defaultSupplier default value supplier - * @param aliases alias of this property key - * @param ignoredSiteProperty true if Alluxio ignores user-specified value for this property in - * site properties file - * @param isHidden whether to hide in document - * @param consistencyCheckLevel the consistency check level to apply to this property - * @param scope the scope this property applies to - * @param displayType how the property value should be displayed - * @param isBuiltIn whether this is an Alluxio built-in property + * @param name String of this property + * @param description String description of this property key * @Param type the property's type + * @param defaultSupplier default value supplier + * @param aliases alias of this property key + * @param ignoredSiteProperty true if Alluxio ignores user-specified value for this property in + * site properties file + * @param isHidden whether to hide in document + * @param consistencyCheckLevel the consistency check level to apply to this property + * @param scope the scope this property applies to + * @param displayType how the property value should be displayed + * @param isBuiltIn whether this is an Alluxio built-in property */ private PropertyKey(String name, String description, PropertyType type, - Optional> enumType, Optional delimiter, - DefaultSupplier defaultSupplier, String[] aliases, - boolean ignoredSiteProperty, - boolean isHidden, ConsistencyCheckLevel consistencyCheckLevel, - Scope scope, DisplayType displayType, boolean isBuiltIn, boolean dynamic, - Function valueValidationFunction) { + Optional> enumType, Optional delimiter, + DefaultSupplier defaultSupplier, String[] aliases, boolean ignoredSiteProperty, + boolean isHidden, ConsistencyCheckLevel consistencyCheckLevel, + Scope scope, DisplayType displayType, boolean isBuiltIn, boolean dynamic, + Function valueValidationFunction) { mName = Preconditions.checkNotNull(name, "name"); // TODO(binfan): null check after we add description for each property key mDescription = Strings.isNullOrEmpty(description) ? "N/A" : description; @@ -9987,7 +9933,7 @@ private PropertyKey(String name, String description, PropertyType type, * @param name String of this property */ private PropertyKey(String name, PropertyType type, - Optional> enumType, Optional delimiter) { + Optional> enumType, Optional delimiter) { this(name, null, type, enumType, delimiter, new DefaultSupplier(() -> null, "null"), null, false, false, ConsistencyCheckLevel.IGNORE, Scope.ALL, DisplayType.DEFAULT, true, true, null); @@ -10120,7 +10066,8 @@ public PropertyType getType() { /** * @return enum class of the enum property, or throws when property is not of enum type */ - public Class getEnumType() { + public Class getEnumType() + { checkState(mType == PropertyType.ENUM && mEnumType.isPresent(), "PropertyKey %s is not of enum type", mName); return mEnumType.get(); @@ -10238,7 +10185,7 @@ private static boolean validateValue( } break; case ENUM: - if (!value.getClass().equals(enumType.get())) { + if (!value.getClass().equals(enumType.get())) { return false; } break; @@ -10271,7 +10218,6 @@ private static boolean validateValue( /** * For each property type, there might be different forms of acceptable input format, * convert these acceptable formats to proper internal format. - * * @param value property value in string format * @return property value in the expected type */ @@ -10280,8 +10226,7 @@ public Object formatValue(Object value) { } private static Object formatValue(Object value, PropertyType type, - Optional> enumType, - Optional delimiter) { + Optional> enumType, Optional delimiter) { if (value instanceof Number) { switch (type) { case LONG: @@ -10332,7 +10277,6 @@ private static Object formatValue(Object value, PropertyType type, /** * Parses value from string. - * * @param stringValue property value in string format * @return property value in the expected type */ @@ -10380,7 +10324,7 @@ public Object parseValue(String stringValue) { /** * Returns whether or not the given property key is marked as deprecated. - *

+ * * It first checks if the specific key is deprecated, otherwise it will fall back to checking * if the key's name matches any of the PropertyKey templates. If no keys or templates match, it * will return false. This will only return true when the key is marked with a {@link Deprecated} @@ -10405,7 +10349,7 @@ public static boolean isDeprecated(String name) { /** * Returns whether or not a property key has been removed from use. - *

+ * * If a PropertyKey or {@link Template} is deemed as "Removed" it will exist within * {@link RemovedKey}. This method can be used to detect if a key being utilized has been removed. * From 3368d20f3ca241b7b3e541a67575519d6b8edef3 Mon Sep 17 00:00:00 2001 From: thu-david Date: Wed, 19 Jun 2024 16:57:16 +0800 Subject: [PATCH 06/10] Update PropertyKey.java --- core/common/src/main/java/alluxio/conf/PropertyKey.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/common/src/main/java/alluxio/conf/PropertyKey.java b/core/common/src/main/java/alluxio/conf/PropertyKey.java index 2986bf9c9337..09deab79bdc1 100755 --- a/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -361,7 +361,7 @@ public Builder(PropertyType type, PropertyKey.Template template, Object... param * @param params parameters of the template */ public Builder(PropertyType type, Optional delimiter, - PropertyKey.Template template, Object... params) { + PropertyKey.Template template, Object... params) { this(format(template.mFormat, params), type, Optional.empty(), delimiter); } @@ -688,9 +688,9 @@ public String toString() { intBuilder(Name.METRICS_EXECUTOR_TASK_WARN_SIZE) .setDefaultValue(1000) .setDescription(String.format("When instrumenting an executor with" - + " InstrumentedExecutorService, if the number of" - + " active tasks (queued or running) is greater than this value, a warning log" - + " will be printed at the interval given by %s", + + " InstrumentedExecutorService, if the number of" + + " active tasks (queued or running) is greater than this value, a warning log" + + " will be printed at the interval given by %s", Name.METRICS_EXECUTOR_TASK_WARN_FREQUENCY)) .setScope(Scope.ALL) .setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE) From d3aaadbb1c0765fdd298a2667695a06d82998430 Mon Sep 17 00:00:00 2001 From: thu-david Date: Wed, 19 Jun 2024 17:21:28 +0800 Subject: [PATCH 07/10] Update TOSUnderFileSystem.java --- .../main/java/alluxio/underfs/tos/TOSUnderFileSystem.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java index 36cb7b76b193..836ef42c7808 100644 --- a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java +++ b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java @@ -396,11 +396,11 @@ protected String getRootKey() { */ public static TransportConfig initializeTOSClientConfig( AlluxioConfiguration alluxioConf) { - int readTimeoutMills = (int) alluxioConf.getMs(PropertyKey.UNDERFS_TOS_READ_TIMEOUT); - int writeTimeoutMills = (int) alluxioConf.getMs(PropertyKey.UNDERFS_TOS_WRITE_TIMEOUT); - int connectionTimeoutMills = (int) alluxioConf.getMs(PropertyKey.UNDERFS_TOS_CONNECT_TIMEOUT); + int readTimeoutMills = alluxioConf.getInt(PropertyKey.UNDERFS_TOS_READ_TIMEOUT); + int writeTimeoutMills = alluxioConf.getInt(PropertyKey.UNDERFS_TOS_WRITE_TIMEOUT); + int connectionTimeoutMills = alluxioConf.getInt(PropertyKey.UNDERFS_TOS_CONNECT_TIMEOUT); int maxConnections = alluxioConf.getInt(PropertyKey.UNDERFS_TOS_CONNECT_MAX); - int idleConnectionTime = (int) alluxioConf.getMs(PropertyKey.UNDERFS_TOS_CONNECT_TTL); + int idleConnectionTime = alluxioConf.getInt(PropertyKey.UNDERFS_TOS_CONNECT_TTL); int maxErrorRetry = alluxioConf.getInt(PropertyKey.UNDERFS_TOS_RETRY_MAX); TransportConfig config = TransportConfig.builder() .connectTimeoutMills(connectionTimeoutMills) From ed7ef2b88e6eb6f87eb9c9a17c5f5a7cf249e066 Mon Sep 17 00:00:00 2001 From: thu-david Date: Wed, 19 Jun 2024 19:17:56 +0800 Subject: [PATCH 08/10] Update TOSUnderFileSystem.java --- .../src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java index 836ef42c7808..2f4c89a0ebcf 100644 --- a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java +++ b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java @@ -394,8 +394,7 @@ protected String getRootKey() { * @param alluxioConf the TOS Configuration * @return the TOS {@link TransportConfig} */ - public static TransportConfig initializeTOSClientConfig( - AlluxioConfiguration alluxioConf) { + public static TransportConfig initializeTOSClientConfig(AlluxioConfiguration alluxioConf) { int readTimeoutMills = alluxioConf.getInt(PropertyKey.UNDERFS_TOS_READ_TIMEOUT); int writeTimeoutMills = alluxioConf.getInt(PropertyKey.UNDERFS_TOS_WRITE_TIMEOUT); int connectionTimeoutMills = alluxioConf.getInt(PropertyKey.UNDERFS_TOS_CONNECT_TIMEOUT); From 9624da29fe9fd5662f94be5109816b6bf19b56a1 Mon Sep 17 00:00:00 2001 From: thu-david Date: Fri, 21 Jun 2024 16:05:45 +0800 Subject: [PATCH 09/10] Fix the error handle --- .../underfs/tos/AlluxioTosException.java | 20 ++++------ .../underfs/tos/TOSLowLevelOutputStream.java | 28 ++++++------- .../alluxio/underfs/tos/TOSOutputStream.java | 17 ++++---- .../underfs/tos/TOSUnderFileSystem.java | 40 ++++++++++--------- .../tos/TOSUnderFileSystemFactory.java | 6 +-- 5 files changed, 55 insertions(+), 56 deletions(-) diff --git a/underfs/tos/src/main/java/alluxio/underfs/tos/AlluxioTosException.java b/underfs/tos/src/main/java/alluxio/underfs/tos/AlluxioTosException.java index 1c759238aebb..648626895c99 100644 --- a/underfs/tos/src/main/java/alluxio/underfs/tos/AlluxioTosException.java +++ b/underfs/tos/src/main/java/alluxio/underfs/tos/AlluxioTosException.java @@ -14,7 +14,7 @@ import alluxio.exception.runtime.AlluxioRuntimeException; import alluxio.grpc.ErrorType; -import com.volcengine.tos.TosClientException; +import com.volcengine.tos.TosException; import io.grpc.Status; import java.net.HttpURLConnection; @@ -26,27 +26,23 @@ public class AlluxioTosException extends AlluxioRuntimeException { private static final ErrorType ERROR_TYPE = ErrorType.External; /** - * Converts an TosClientException to a corresponding AlluxioTosException. + * Converts a TosClientException to a corresponding AlluxioTosException. * * @param cause tos exception * @return alluxio tos exception */ - public static AlluxioTosException from(TosClientException cause) { - return from(null, cause); - } + public static AlluxioTosException from(TosException cause) { return from(null, cause); } /** - * Converts an TosClientException with errormessage to a corresponding AlluxioTosException. + * Converts a TosClientException with errormessage to a corresponding AlluxioTosException. * * @param errorMessage error message - * @param cause tos exception + * @param cause tos exception * @return alluxio tos exception */ - public static AlluxioTosException from(String errorMessage, TosClientException cause) { - Status status = Status.UNKNOWN; - String errorDescription = "ClientException:" + cause.getMessage(); - status = httpStatusToGrpcStatus(cause.getStatusCode()); - errorDescription = cause.getCode() + ":" + cause.getMessage(); + public static AlluxioTosException from(String errorMessage, TosException cause) { + Status status = httpStatusToGrpcStatus(cause.getStatusCode()); + String errorDescription = cause.getCode() + ":" + cause.getMessage(); if (errorMessage == null) { errorMessage = errorDescription; } diff --git a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSLowLevelOutputStream.java b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSLowLevelOutputStream.java index d42965e04a8c..468c25ae87a0 100644 --- a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSLowLevelOutputStream.java +++ b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSLowLevelOutputStream.java @@ -18,7 +18,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListeningExecutorService; import com.volcengine.tos.TOSV2; -import com.volcengine.tos.TosClientException; +import com.volcengine.tos.TosException; import com.volcengine.tos.comm.io.TosRepeatableBoundedFileInputStream; import com.volcengine.tos.model.object.AbortMultipartUploadInput; import com.volcengine.tos.model.object.CompleteMultipartUploadV2Input; @@ -91,9 +91,9 @@ protected void abortMultiPartUploadInternal() throws IOException { AbortMultipartUploadInput input = new AbortMultipartUploadInput().setBucket(mBucketName) .setKey(mKey).setUploadID(mUploadId); getClient().abortMultipartUpload(input); - } catch (TosClientException e) { + } catch (TosException e) { LOG.debug("failed to abort multi part upload. upload id: {}", mUploadId, e); - throw new IOException(String.format( + throw AlluxioTosException.from(String.format( "failed to upload part. key: %s uploadId: %s", mKey, mUploadId), e); } @@ -137,14 +137,14 @@ protected void initMultiPartUploadInternal() throws IOException { new CreateMultipartUploadInput().setBucket(mBucketName).setKey(mKey); CreateMultipartUploadOutput output = getClient().createMultipartUpload(create); mUploadId = output.getUploadID(); - } catch (TosClientException e) { + } catch (TosException e) { LOG.debug("failed to init multi part upload", e); - throw new IOException("failed to init multi part upload", e); + throw AlluxioTosException.from("failed to init multi part upload", e); } } @Override - protected void completeMultiPartUploadInternal() throws IOException { + protected void completeMultiPartUploadInternal() { try { LOG.debug("complete multi part {}", mUploadId); CompleteMultipartUploadV2Input complete = new CompleteMultipartUploadV2Input() @@ -155,16 +155,16 @@ protected void completeMultiPartUploadInternal() throws IOException { CompleteMultipartUploadV2Output completedOutput = getClient().completeMultipartUpload(complete); mContentHash = completedOutput.getEtag(); - } catch (TosClientException e) { + } catch (TosException e) { LOG.debug("failed to complete multi part upload", e); - throw new IOException( + throw AlluxioTosException.from( String.format("failed to complete multi part upload, key: %s, upload id: %s", - mKey, mUploadId) + e); + mKey, mUploadId), e); } } @Override - protected void createEmptyObject(String key) throws IOException { + protected void createEmptyObject(String key) { try { PutObjectInput putObjectInput = new PutObjectInput() .setBucket(mBucketName) @@ -172,8 +172,8 @@ protected void createEmptyObject(String key) throws IOException { .setContent(new ByteArrayInputStream(new byte[0])) .setContentLength(0); mContentHash = getClient().putObject(putObjectInput).getEtag(); - } catch (TosClientException e) { - throw new IOException(e); + } catch (TosException e) { + throw AlluxioTosException.from(e); } } @@ -186,8 +186,8 @@ protected void putObject(String key, File file, @Nullable String md5) throws IOE .setContent(content) .setContentLength(file.length()); // Set the correct content length mContentHash = getClient().putObject(putObjectInput).getEtag(); - } catch (IOException e) { - throw new IOException(e); + } catch (TosException e) { + throw AlluxioTosException.from(e); } } diff --git a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSOutputStream.java b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSOutputStream.java index abee3441577f..e4d65d2bc9ea 100644 --- a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSOutputStream.java +++ b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSOutputStream.java @@ -17,7 +17,7 @@ import com.google.common.base.Preconditions; import com.volcengine.tos.TOSV2; -import com.volcengine.tos.TosClientException; +import com.volcengine.tos.TosException; import com.volcengine.tos.internal.util.base64.Base64; import com.volcengine.tos.model.object.ObjectMetaRequestOptions; import com.volcengine.tos.model.object.PutObjectInput; @@ -27,10 +27,9 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.file.Files; import java.security.DigestOutputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -105,11 +104,12 @@ public TOSOutputStream(String bucketName, String key, TOSV2 client, List try { mHash = MessageDigest.getInstance("MD5"); mLocalOutputStream = - new BufferedOutputStream(new DigestOutputStream(new FileOutputStream(mFile), mHash)); + new BufferedOutputStream( + new DigestOutputStream(Files.newOutputStream(mFile.toPath()), mHash)); } catch (NoSuchAlgorithmException e) { LOG.warn("Algorithm not available for MD5 hash.", e); mHash = null; - mLocalOutputStream = new BufferedOutputStream(new FileOutputStream(mFile)); + mLocalOutputStream = new BufferedOutputStream(Files.newOutputStream(mFile.toPath())); } } @@ -167,7 +167,7 @@ public void close() throws IOException { return; } mLocalOutputStream.close(); - try (BufferedInputStream in = new BufferedInputStream(new FileInputStream(mFile))) { + try (BufferedInputStream in = new BufferedInputStream(Files.newInputStream(mFile.toPath()))) { ObjectMetaRequestOptions meta = new ObjectMetaRequestOptions(); meta.setContentLength(mFile.length()); if (mHash != null) { @@ -177,9 +177,9 @@ public void close() throws IOException { PutObjectInput putObjectInput = new PutObjectInput().setBucket(mBucketName).setKey(mKey) .setOptions(meta).setContent(in); mContentHash = mTOSClient.putObject(putObjectInput).getEtag(); - } catch (TosClientException e) { + } catch (TosException e) { LOG.error("Failed to upload {}. ", mKey); - throw new IOException(e); + throw AlluxioTosException.from(e); } finally { // Delete the temporary file on the local machine if the COS client completed the // upload or if the upload failed. @@ -187,7 +187,6 @@ public void close() throws IOException { LOG.error("Failed to delete temporary file @ {}", mFile.getPath()); } } - return; } @Override diff --git a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java index 2f4c89a0ebcf..dcb408b7a7eb 100644 --- a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java +++ b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java @@ -103,8 +103,8 @@ public class TOSUnderFileSystem extends ObjectUnderFileSystem { * @param conf the configuration for this UFS * @return the created {@link TOSUnderFileSystem} instance */ - public static TOSUnderFileSystem createInstance(AlluxioURI uri, UnderFileSystemConfiguration conf) - throws Exception { + public static TOSUnderFileSystem createInstance(AlluxioURI uri, + UnderFileSystemConfiguration conf) { String bucketName = UnderFileSystemUtils.getBucketName(uri); Preconditions.checkArgument(conf.isSet(PropertyKey.TOS_ACCESS_KEY), "Property %s is required to connect to TOS", PropertyKey.TOS_ACCESS_KEY); @@ -167,29 +167,33 @@ public void setMode(String path, short mode) throws IOException { } @Override - public void cleanup() throws IOException { + public void cleanup() { long cleanAge = mUfsConf.getMs(PropertyKey.UNDERFS_TOS_INTERMEDIATE_UPLOAD_CLEAN_AGE); Date cleanBefore = new Date(new Date().getTime() - cleanAge); boolean isTruncated = true; String keyMarker = null; String uploadIdMarker = null; int maxKeys = 10; - while (isTruncated) { - ListMultipartUploadsV2Input input = new ListMultipartUploadsV2Input().setBucket(mBucketName) - .setMaxUploads(maxKeys).setKeyMarker(keyMarker).setUploadIDMarker(uploadIdMarker); - ListMultipartUploadsV2Output output = mClient.listMultipartUploads(input); - if (output.getUploads() != null) { - for (int i = 0; i < output.getUploads().size(); ++i) { - ListedUpload upload = output.getUploads().get(i); - if (upload.getInitiated().before(cleanBefore)) { - mClient.abortMultipartUpload(new AbortMultipartUploadInput().setBucket(mBucketName) - .setKey(upload.getKey()).setUploadID(upload.getUploadID())); + try { + while (isTruncated) { + ListMultipartUploadsV2Input input = new ListMultipartUploadsV2Input().setBucket(mBucketName) + .setMaxUploads(maxKeys).setKeyMarker(keyMarker).setUploadIDMarker(uploadIdMarker); + ListMultipartUploadsV2Output output = mClient.listMultipartUploads(input); + if (output.getUploads() != null) { + for (int i = 0; i < output.getUploads().size(); ++i) { + ListedUpload upload = output.getUploads().get(i); + if (upload.getInitiated().before(cleanBefore)) { + mClient.abortMultipartUpload(new AbortMultipartUploadInput().setBucket(mBucketName) + .setKey(upload.getKey()).setUploadID(upload.getUploadID())); + } } } + isTruncated = output.isTruncated(); + keyMarker = output.getNextKeyMarker(); + uploadIdMarker = output.getNextUploadIdMarker(); } - isTruncated = output.isTruncated(); - keyMarker = output.getNextKeyMarker(); - uploadIdMarker = output.getNextUploadIdMarker(); + } catch (TosException e) { + throw AlluxioTosException.from(e); } } @@ -248,7 +252,7 @@ protected boolean deleteObject(String key) { } @Override - protected List deleteObjects(List keys) throws IOException { + protected List deleteObjects(List keys) { try { List list = new ArrayList<>(); for (String key : keys) { @@ -259,7 +263,7 @@ protected List deleteObjects(List keys) throws IOException { DeleteMultiObjectsV2Output output = mClient.deleteMultiObjects(input); return output.getDeleteds().stream().map(Deleted::getKey).collect(Collectors.toList()); } catch (TosException e) { - throw new IOException("Failed to delete objects", e); + throw AlluxioTosException.from(e); } } diff --git a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystemFactory.java b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystemFactory.java index ff02067ddd8c..107432abf615 100644 --- a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystemFactory.java +++ b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystemFactory.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.volcengine.tos.TosException; import java.io.IOException; @@ -40,13 +41,12 @@ public UnderFileSystem create(String path, UnderFileSystemConfiguration conf) { if (checkTOSCredentials(conf)) { try { return TOSUnderFileSystem.createInstance(new AlluxioURI(path), conf); - } catch (Exception e) { + } catch (TosException e) { throw Throwables.propagate(e); } } - String err = - "TOS Credentials or configurations not available, cannot create TOS Under File System."; + String err = "TOS Credentials not available, cannot create TOS Under File System."; throw Throwables.propagate(new IOException(err)); } From 1b7a6d9f8f9f9e668030f0cd3dd60c79119e3203 Mon Sep 17 00:00:00 2001 From: thu-david Date: Mon, 24 Jun 2024 11:00:50 +0800 Subject: [PATCH 10/10] update the log.warn --- .../java/alluxio/underfs/tos/TOSLowLevelOutputStream.java | 2 ++ .../main/java/alluxio/underfs/tos/TOSUnderFileSystem.java | 4 ++++ .../alluxio/underfs/tos/TOSUnderFileSystemFactory.java | 7 +++++++ 3 files changed, 13 insertions(+) diff --git a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSLowLevelOutputStream.java b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSLowLevelOutputStream.java index 468c25ae87a0..801a0bf37ecf 100644 --- a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSLowLevelOutputStream.java +++ b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSLowLevelOutputStream.java @@ -173,6 +173,7 @@ protected void createEmptyObject(String key) { .setContentLength(0); mContentHash = getClient().putObject(putObjectInput).getEtag(); } catch (TosException e) { + LOG.debug("failed to create empty object", e); throw AlluxioTosException.from(e); } } @@ -187,6 +188,7 @@ protected void putObject(String key, File file, @Nullable String md5) throws IOE .setContentLength(file.length()); // Set the correct content length mContentHash = getClient().putObject(putObjectInput).getEtag(); } catch (TosException e) { + LOG.debug("failed to put object", e); throw AlluxioTosException.from(e); } } diff --git a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java index dcb408b7a7eb..deb692526b27 100644 --- a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java +++ b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystem.java @@ -193,6 +193,7 @@ public void cleanup() { uploadIdMarker = output.getNextUploadIdMarker(); } } catch (TosException e) { + LOG.error("Failed to cleanup TOS uploads", e); throw AlluxioTosException.from(e); } } @@ -263,6 +264,7 @@ protected List deleteObjects(List keys) { DeleteMultiObjectsV2Output output = mClient.deleteMultiObjects(input); return output.getDeleteds().stream().map(Deleted::getKey).collect(Collectors.toList()); } catch (TosException e) { + LOG.error("Failed to delete objects", e); throw AlluxioTosException.from(e); } } @@ -378,6 +380,7 @@ protected ObjectStatus getObjectStatus(String key) { } throw AlluxioTosException.from(e); } catch (TosClientException e) { + LOG.error("Failed to get object status for {}", key, e); throw AlluxioTosException.from(e); } } @@ -423,6 +426,7 @@ protected InputStream openObject(String key, OpenOptions options, RetryPolicy re return new TOSInputStream(mBucketName, key, mClient, options.getOffset(), retryPolicy, mUfsConf.getBytes(PropertyKey.UNDERFS_OBJECT_STORE_MULTI_RANGE_CHUNK_SIZE)); } catch (TosException e) { + LOG.error("Failed to open object: {}", key, e); throw AlluxioTosException.from(e); } } diff --git a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystemFactory.java b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystemFactory.java index 107432abf615..2ab05ce15d07 100644 --- a/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystemFactory.java +++ b/underfs/tos/src/main/java/alluxio/underfs/tos/TOSUnderFileSystemFactory.java @@ -21,6 +21,8 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.volcengine.tos.TosException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; @@ -28,6 +30,8 @@ * Factory for creating {@link TOSUnderFileSystem}. */ public class TOSUnderFileSystemFactory implements UnderFileSystemFactory { + private static final Logger LOG = LoggerFactory.getLogger(TOSUnderFileSystemFactory.class); + /** * Constructs a new {@link TOSUnderFileSystemFactory}. */ @@ -42,6 +46,9 @@ public UnderFileSystem create(String path, UnderFileSystemConfiguration conf) { try { return TOSUnderFileSystem.createInstance(new AlluxioURI(path), conf); } catch (TosException e) { + LOG.warn("Failed to create TOS Under File System: {}", e.getMessage()); + throw AlluxioTosException.from(e); + } catch (Exception e) { throw Throwables.propagate(e); } }