Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support TOS exception handle in Alluxio #18630

Merged
merged 13 commits into from
Jun 24, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.grpc.ErrorType;

import com.volcengine.tos.TosClientException;
import com.volcengine.tos.TosException;
import io.grpc.Status;

import java.net.HttpURLConnection;
Expand All @@ -26,27 +26,23 @@ public class AlluxioTosException extends AlluxioRuntimeException {
private static final ErrorType ERROR_TYPE = ErrorType.External;

/**
* Converts an TosClientException to a corresponding AlluxioTosException.
* Converts a TosClientException to a corresponding AlluxioTosException.
*
* @param cause tos exception
* @return alluxio tos exception
*/
public static AlluxioTosException from(TosClientException cause) {
return from(null, cause);
}
public static AlluxioTosException from(TosException cause) { return from(null, cause); }

/**
* Converts an TosClientException with errormessage to a corresponding AlluxioTosException.
* Converts a TosClientException with errormessage to a corresponding AlluxioTosException.
*
* @param errorMessage error message
* @param cause tos exception
* @param cause tos exception
* @return alluxio tos exception
*/
public static AlluxioTosException from(String errorMessage, TosClientException cause) {
Status status = Status.UNKNOWN;
String errorDescription = "ClientException:" + cause.getMessage();
status = httpStatusToGrpcStatus(cause.getStatusCode());
errorDescription = cause.getCode() + ":" + cause.getMessage();
public static AlluxioTosException from(String errorMessage, TosException cause) {
Status status = httpStatusToGrpcStatus(cause.getStatusCode());
String errorDescription = cause.getCode() + ":" + cause.getMessage();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG.warn(errorDescription);

if (errorMessage == null) {
errorMessage = errorDescription;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.volcengine.tos.TOSV2;
import com.volcengine.tos.TosClientException;
import com.volcengine.tos.TosException;
import com.volcengine.tos.comm.io.TosRepeatableBoundedFileInputStream;
import com.volcengine.tos.model.object.AbortMultipartUploadInput;
import com.volcengine.tos.model.object.CompleteMultipartUploadV2Input;
Expand Down Expand Up @@ -91,9 +91,9 @@ protected void abortMultiPartUploadInternal() throws IOException {
AbortMultipartUploadInput input = new AbortMultipartUploadInput().setBucket(mBucketName)
.setKey(mKey).setUploadID(mUploadId);
getClient().abortMultipartUpload(input);
} catch (TosClientException e) {
} catch (TosException e) {
LOG.debug("failed to abort multi part upload. upload id: {}", mUploadId, e);
throw new IOException(String.format(
throw AlluxioTosException.from(String.format(
"failed to upload part. key: %s uploadId: %s",
mKey, mUploadId), e);
}
Expand Down Expand Up @@ -137,14 +137,14 @@ protected void initMultiPartUploadInternal() throws IOException {
new CreateMultipartUploadInput().setBucket(mBucketName).setKey(mKey);
CreateMultipartUploadOutput output = getClient().createMultipartUpload(create);
mUploadId = output.getUploadID();
} catch (TosClientException e) {
} catch (TosException e) {
LOG.debug("failed to init multi part upload", e);
throw new IOException("failed to init multi part upload", e);
throw AlluxioTosException.from("failed to init multi part upload", e);
}
}

@Override
protected void completeMultiPartUploadInternal() throws IOException {
protected void completeMultiPartUploadInternal() {
try {
LOG.debug("complete multi part {}", mUploadId);
CompleteMultipartUploadV2Input complete = new CompleteMultipartUploadV2Input()
Expand All @@ -155,25 +155,26 @@ protected void completeMultiPartUploadInternal() throws IOException {
CompleteMultipartUploadV2Output completedOutput =
getClient().completeMultipartUpload(complete);
mContentHash = completedOutput.getEtag();
} catch (TosClientException e) {
} catch (TosException e) {
LOG.debug("failed to complete multi part upload", e);
throw new IOException(
throw AlluxioTosException.from(
String.format("failed to complete multi part upload, key: %s, upload id: %s",
mKey, mUploadId) + e);
mKey, mUploadId), e);
}
}

@Override
protected void createEmptyObject(String key) throws IOException {
protected void createEmptyObject(String key) {
try {
PutObjectInput putObjectInput = new PutObjectInput()
.setBucket(mBucketName)
.setKey(key)
.setContent(new ByteArrayInputStream(new byte[0]))
.setContentLength(0);
mContentHash = getClient().putObject(putObjectInput).getEtag();
} catch (TosClientException e) {
throw new IOException(e);
} catch (TosException e) {
LOG.debug("failed to create empty object", e);
throw AlluxioTosException.from(e);
}
}

Expand All @@ -186,8 +187,9 @@ protected void putObject(String key, File file, @Nullable String md5) throws IOE
.setContent(content)
.setContentLength(file.length()); // Set the correct content length
mContentHash = getClient().putObject(putObjectInput).getEtag();
} catch (IOException e) {
throw new IOException(e);
} catch (TosException e) {
LOG.debug("failed to put object", e);
throw AlluxioTosException.from(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import com.google.common.base.Preconditions;
import com.volcengine.tos.TOSV2;
import com.volcengine.tos.TosClientException;
import com.volcengine.tos.TosException;
import com.volcengine.tos.internal.util.base64.Base64;
import com.volcengine.tos.model.object.ObjectMetaRequestOptions;
import com.volcengine.tos.model.object.PutObjectInput;
Expand All @@ -27,10 +27,9 @@
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
Expand Down Expand Up @@ -105,11 +104,12 @@ public TOSOutputStream(String bucketName, String key, TOSV2 client, List<String>
try {
mHash = MessageDigest.getInstance("MD5");
mLocalOutputStream =
new BufferedOutputStream(new DigestOutputStream(new FileOutputStream(mFile), mHash));
new BufferedOutputStream(
new DigestOutputStream(Files.newOutputStream(mFile.toPath()), mHash));
} catch (NoSuchAlgorithmException e) {
LOG.warn("Algorithm not available for MD5 hash.", e);
mHash = null;
mLocalOutputStream = new BufferedOutputStream(new FileOutputStream(mFile));
mLocalOutputStream = new BufferedOutputStream(Files.newOutputStream(mFile.toPath()));
}
}

Expand Down Expand Up @@ -167,7 +167,7 @@ public void close() throws IOException {
return;
}
mLocalOutputStream.close();
try (BufferedInputStream in = new BufferedInputStream(new FileInputStream(mFile))) {
try (BufferedInputStream in = new BufferedInputStream(Files.newInputStream(mFile.toPath()))) {
Jackson-Wang-7 marked this conversation as resolved.
Show resolved Hide resolved
ObjectMetaRequestOptions meta = new ObjectMetaRequestOptions();
meta.setContentLength(mFile.length());
if (mHash != null) {
Expand All @@ -177,17 +177,16 @@ public void close() throws IOException {
PutObjectInput putObjectInput = new PutObjectInput().setBucket(mBucketName).setKey(mKey)
.setOptions(meta).setContent(in);
mContentHash = mTOSClient.putObject(putObjectInput).getEtag();
} catch (TosClientException e) {
} catch (TosException e) {
LOG.error("Failed to upload {}. ", mKey);
throw new IOException(e);
throw AlluxioTosException.from(e);
} finally {
// Delete the temporary file on the local machine if the COS client completed the
// upload or if the upload failed.
if (!mFile.delete()) {
LOG.error("Failed to delete temporary file @ {}", mFile.getPath());
}
}
return;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ public class TOSUnderFileSystem extends ObjectUnderFileSystem {
* @param conf the configuration for this UFS
* @return the created {@link TOSUnderFileSystem} instance
*/
public static TOSUnderFileSystem createInstance(AlluxioURI uri, UnderFileSystemConfiguration conf)
throws Exception {
public static TOSUnderFileSystem createInstance(AlluxioURI uri,
UnderFileSystemConfiguration conf) {
String bucketName = UnderFileSystemUtils.getBucketName(uri);
Preconditions.checkArgument(conf.isSet(PropertyKey.TOS_ACCESS_KEY),
"Property %s is required to connect to TOS", PropertyKey.TOS_ACCESS_KEY);
Expand Down Expand Up @@ -167,29 +167,34 @@ public void setMode(String path, short mode) throws IOException {
}

@Override
public void cleanup() throws IOException {
public void cleanup() {
long cleanAge = mUfsConf.getMs(PropertyKey.UNDERFS_TOS_INTERMEDIATE_UPLOAD_CLEAN_AGE);
Date cleanBefore = new Date(new Date().getTime() - cleanAge);
boolean isTruncated = true;
String keyMarker = null;
String uploadIdMarker = null;
int maxKeys = 10;
while (isTruncated) {
ListMultipartUploadsV2Input input = new ListMultipartUploadsV2Input().setBucket(mBucketName)
.setMaxUploads(maxKeys).setKeyMarker(keyMarker).setUploadIDMarker(uploadIdMarker);
ListMultipartUploadsV2Output output = mClient.listMultipartUploads(input);
if (output.getUploads() != null) {
for (int i = 0; i < output.getUploads().size(); ++i) {
ListedUpload upload = output.getUploads().get(i);
if (upload.getInitiated().before(cleanBefore)) {
mClient.abortMultipartUpload(new AbortMultipartUploadInput().setBucket(mBucketName)
.setKey(upload.getKey()).setUploadID(upload.getUploadID()));
try {
while (isTruncated) {
ListMultipartUploadsV2Input input = new ListMultipartUploadsV2Input().setBucket(mBucketName)
.setMaxUploads(maxKeys).setKeyMarker(keyMarker).setUploadIDMarker(uploadIdMarker);
ListMultipartUploadsV2Output output = mClient.listMultipartUploads(input);
if (output.getUploads() != null) {
for (int i = 0; i < output.getUploads().size(); ++i) {
ListedUpload upload = output.getUploads().get(i);
if (upload.getInitiated().before(cleanBefore)) {
mClient.abortMultipartUpload(new AbortMultipartUploadInput().setBucket(mBucketName)
.setKey(upload.getKey()).setUploadID(upload.getUploadID()));
}
}
}
isTruncated = output.isTruncated();
keyMarker = output.getNextKeyMarker();
uploadIdMarker = output.getNextUploadIdMarker();
}
isTruncated = output.isTruncated();
keyMarker = output.getNextKeyMarker();
uploadIdMarker = output.getNextUploadIdMarker();
} catch (TosException e) {
LOG.error("Failed to cleanup TOS uploads", e);
throw AlluxioTosException.from(e);
}
}

Expand Down Expand Up @@ -248,7 +253,7 @@ protected boolean deleteObject(String key) {
}

@Override
protected List<String> deleteObjects(List<String> keys) throws IOException {
protected List<String> deleteObjects(List<String> keys) {
try {
List<ObjectTobeDeleted> list = new ArrayList<>();
for (String key : keys) {
Expand All @@ -259,7 +264,8 @@ protected List<String> deleteObjects(List<String> keys) throws IOException {
DeleteMultiObjectsV2Output output = mClient.deleteMultiObjects(input);
return output.getDeleteds().stream().map(Deleted::getKey).collect(Collectors.toList());
} catch (TosException e) {
throw new IOException("Failed to delete objects", e);
LOG.error("Failed to delete objects", e);
throw AlluxioTosException.from(e);
}
}

Expand Down Expand Up @@ -374,6 +380,7 @@ protected ObjectStatus getObjectStatus(String key) {
}
throw AlluxioTosException.from(e);
} catch (TosClientException e) {
LOG.error("Failed to get object status for {}", key, e);
throw AlluxioTosException.from(e);
}
}
Expand Down Expand Up @@ -419,6 +426,7 @@ protected InputStream openObject(String key, OpenOptions options, RetryPolicy re
return new TOSInputStream(mBucketName, key, mClient, options.getOffset(), retryPolicy,
mUfsConf.getBytes(PropertyKey.UNDERFS_OBJECT_STORE_MULTI_RANGE_CHUNK_SIZE));
} catch (TosException e) {
LOG.error("Failed to open object: {}", key, e);
throw AlluxioTosException.from(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.volcengine.tos.TosException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* Factory for creating {@link TOSUnderFileSystem}.
*/
public class TOSUnderFileSystemFactory implements UnderFileSystemFactory {
private static final Logger LOG = LoggerFactory.getLogger(TOSUnderFileSystemFactory.class);

/**
* Constructs a new {@link TOSUnderFileSystemFactory}.
*/
Expand All @@ -40,13 +45,15 @@ public UnderFileSystem create(String path, UnderFileSystemConfiguration conf) {
if (checkTOSCredentials(conf)) {
try {
return TOSUnderFileSystem.createInstance(new AlluxioURI(path), conf);
} catch (TosException e) {
Jackson-Wang-7 marked this conversation as resolved.
Show resolved Hide resolved
LOG.warn("Failed to create TOS Under File System: {}", e.getMessage());
throw AlluxioTosException.from(e);
} catch (Exception e) {
throw Throwables.propagate(e);
}
}

String err =
"TOS Credentials or configurations not available, cannot create TOS Under File System.";
String err = "TOS Credentials not available, cannot create TOS Under File System.";
throw Throwables.propagate(new IOException(err));
}

Expand Down
Loading