Skip to content

Commit

Permalink
Re-factor and cleanup Coop Lock implementation (#204)
Browse files Browse the repository at this point in the history
  • Loading branch information
medb authored Jul 1, 2019
1 parent a91ed84 commit 34bca3a
Show file tree
Hide file tree
Showing 15 changed files with 1,126 additions and 641 deletions.
380 changes: 26 additions & 354 deletions gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/CoopLockFsck.java

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.AUTHENTICATION_PREFIX;
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_COOPERATIVE_LOCKING_EXPIRATION_TIMEOUT_MS;
import static com.google.cloud.hadoop.gcsio.cooplock.CoopLockOperationType.DELETE;
import static com.google.cloud.hadoop.gcsio.cooplock.CoopLockOperationType.RENAME;
import static com.google.cloud.hadoop.gcsio.cooplock.CoopLockRecordsDao.LOCK_DIRECTORY;
import static com.google.cloud.hadoop.util.EntriesCredentialConfiguration.ENABLE_SERVICE_ACCOUNTS_SUFFIX;
import static com.google.cloud.hadoop.util.EntriesCredentialConfiguration.SERVICE_ACCOUNT_EMAIL_SUFFIX;
Expand Down Expand Up @@ -61,7 +63,7 @@ public class CoopLockRepairIntegrationTest {

private static final Gson GSON = new Gson();

private static final String OPERATION_FILENAME_PATTERN =
private static final String OPERATION_FILENAME_PATTERN_FORMAT =
"[0-9]{8}T[0-9]{6}\\.[0-9]{3}Z_%s_[a-z0-9\\-]+";

private static GoogleCloudStorageOptions gcsOptions;
Expand Down Expand Up @@ -175,10 +177,9 @@ private void moveDirectoryOperationRepairedAfterFailedCopy(String command) throw
.collect(toList());

assertThat(lockFiles).hasSize(2);
URI lockFileUri =
matchFile(lockFiles, String.format(OPERATION_FILENAME_PATTERN, "rename") + "\\.lock").get();
URI logFileUri =
matchFile(lockFiles, String.format(OPERATION_FILENAME_PATTERN, "rename") + "\\.log").get();
String filenamePattern = String.format(OPERATION_FILENAME_PATTERN_FORMAT, RENAME);
URI lockFileUri = matchFile(lockFiles, filenamePattern + "\\.lock").get();
URI logFileUri = matchFile(lockFiles, filenamePattern + "\\.log").get();

String lockContent = gcsfsIHelper.readTextFile(bucketName, lockFileUri.getPath());
assertThat(GSON.fromJson(lockContent, RenameOperation.class).setLockEpochSeconds(0))
Expand Down Expand Up @@ -248,10 +249,9 @@ public void moveDirectoryOperationRepairedAfterFailedDelete() throws Exception {
.collect(toList());

assertThat(lockFiles).hasSize(2);
URI lockFileUri =
matchFile(lockFiles, String.format(OPERATION_FILENAME_PATTERN, "rename") + "\\.lock").get();
URI logFileUri =
matchFile(lockFiles, String.format(OPERATION_FILENAME_PATTERN, "rename") + "\\.log").get();
String filenameFormat = String.format(OPERATION_FILENAME_PATTERN_FORMAT, RENAME);
URI lockFileUri = matchFile(lockFiles, filenameFormat + "\\.lock").get();
URI logFileUri = matchFile(lockFiles, filenameFormat + "\\.log").get();

String lockContent = gcsfsIHelper.readTextFile(bucketName, lockFileUri.getPath());
assertThat(GSON.fromJson(lockContent, RenameOperation.class).setLockEpochSeconds(0))
Expand Down Expand Up @@ -316,11 +316,14 @@ public void deleteDirectoryOperationRolledForward() throws Exception {
.collect(toList());

assertThat(lockFiles).hasSize(2);
URI lockFileUri =
matchFile(lockFiles, String.format(OPERATION_FILENAME_PATTERN, "delete") + "\\.lock").get();
String filenamePattern = String.format(OPERATION_FILENAME_PATTERN_FORMAT, DELETE);
URI lockFileUri = matchFile(lockFiles, filenamePattern + "\\.lock").get();
URI logFileUri = matchFile(lockFiles, filenamePattern + "\\.log").get();
String lockContent = gcsfsIHelper.readTextFile(bucketName, lockFileUri.getPath());
assertThat(GSON.fromJson(lockContent, DeleteOperation.class).setLockEpochSeconds(0))
.isEqualTo(new DeleteOperation().setLockEpochSeconds(0).setResource(dirUri.toString()));
assertThat(gcsfsIHelper.readTextFile(bucketName, logFileUri.getPath()))
.isEqualTo(dirUri.resolve(fileName) + "\n" + dirUri + "\n");
}

private Configuration getTestConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public GoogleCloudStorageItemInfo composeObjects(
*
* @return the {@link GoogleCloudStorage} objected wrapped by this class.
*/
protected GoogleCloudStorage getDelegate() {
public GoogleCloudStorage getDelegate() {
return delegate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import com.google.api.client.util.Clock;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorage.ListPage;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemOptions.TimestampUpdatePredicate;
import com.google.cloud.hadoop.gcsio.cooplock.CoopLockOperationDao;
import com.google.cloud.hadoop.gcsio.cooplock.CoopLockRecordsDao;
import com.google.cloud.hadoop.gcsio.cooplock.CoopLockOperationDelete;
import com.google.cloud.hadoop.gcsio.cooplock.CoopLockOperationRename;
import com.google.cloud.hadoop.util.LazyExecutorService;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CharMatcher;
Expand All @@ -50,17 +50,16 @@
import java.nio.channels.WritableByteChannel;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.FileAlreadyExistsException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -110,9 +109,6 @@ public class GoogleCloudStorageFileSystem {

private final PathCodec pathCodec;

private final CoopLockRecordsDao coopLockRecordsDao;
private final CoopLockOperationDao coopLockOperationDao;

// Executor for updating directory timestamps.
private ExecutorService updateTimestampsExecutor = createUpdateTimestampsExecutor();

Expand Down Expand Up @@ -165,15 +161,10 @@ public GoogleCloudStorageFileSystem(

checkArgument(credential != null, "credential must not be null");

GoogleCloudStorageImpl gcsImpl =
new GoogleCloudStorageImpl(options.getCloudStorageOptions(), credential);
this.gcs = gcsImpl;
this.gcs = new GoogleCloudStorageImpl(options.getCloudStorageOptions(), credential);
this.options = options;
this.pathCodec = options.getPathCodec();

this.coopLockRecordsDao = new CoopLockRecordsDao(gcsImpl);
this.coopLockOperationDao = new CoopLockOperationDao(gcsImpl, pathCodec);

if (options.isPerformanceCacheEnabled()) {
this.gcs =
new PerformanceCachingGoogleCloudStorage(this.gcs, options.getPerformanceCacheOptions());
Expand Down Expand Up @@ -203,26 +194,6 @@ public GoogleCloudStorageFileSystem(
this.gcs = gcs;
this.options = options;
this.pathCodec = options.getPathCodec();
this.coopLockRecordsDao = null;
this.coopLockOperationDao = null;
}

/**
* Constructs a GoogleCloudStorageFilesystem based on an already-configured underlying
* GoogleCloudStorageImpl {@code gcs}. Any options pertaining to GCS creation will be ignored.
*/
@VisibleForTesting
public GoogleCloudStorageFileSystem(
GoogleCloudStorageImpl gcs, GoogleCloudStorageFileSystemOptions options) {
this.gcs = gcs;
this.options = options;
this.pathCodec = options.getPathCodec();
this.coopLockRecordsDao = new CoopLockRecordsDao(gcs);
this.coopLockOperationDao = new CoopLockOperationDao(gcs, pathCodec);
}

public CoopLockRecordsDao getCoopLockRecordsDao() {
return coopLockRecordsDao;
}

@VisibleForTesting
Expand Down Expand Up @@ -417,9 +388,13 @@ public void delete(URI path, boolean recursive) throws IOException {
() -> getFileInfoInternal(parentId, /* inferImplicitDirectories= */ false));
}

List<FileInfo> itemsToDelete = new ArrayList<>();
List<FileInfo> bucketsToDelete = new ArrayList<>();
Optional<CoopLockOperationDelete> coopLockOp =
options.enableCooperativeLocking() && fileInfo.isDirectory()
? Optional.of(CoopLockOperationDelete.create(gcs, pathCodec, fileInfo.getPath()))
: Optional.empty();
coopLockOp.ifPresent(CoopLockOperationDelete::lock);

List<FileInfo> itemsToDelete;
// Delete sub-items if it is a directory.
if (fileInfo.isDirectory()) {
itemsToDelete =
Expand All @@ -429,32 +404,20 @@ public void delete(URI path, boolean recursive) throws IOException {
if (!itemsToDelete.isEmpty() && !recursive) {
throw new DirectoryNotEmptyException("Cannot delete a non-empty directory.");
}
}

if (fileInfo.getItemInfo().isBucket()) {
bucketsToDelete.add(fileInfo);
} else {
itemsToDelete.add(fileInfo);
itemsToDelete = new ArrayList<>();
}

if (options.enableCooperativeLocking() && fileInfo.isDirectory()) {
String operationId = UUID.randomUUID().toString();
StorageResourceId resourceId = pathCodec.validatePathAndGetId(fileInfo.getPath(), true);

coopLockRecordsDao.lockPaths(operationId, resourceId);
Future<?> lockUpdateFuture = Futures.immediateCancelledFuture();
try {
lockUpdateFuture =
coopLockOperationDao.persistDeleteOperation(
path, itemsToDelete, bucketsToDelete, operationId, resourceId, lockUpdateFuture);
List<FileInfo> bucketsToDelete = new ArrayList<>();
(fileInfo.getItemInfo().isBucket() ? bucketsToDelete : itemsToDelete).add(fileInfo);

deleteInternal(itemsToDelete, bucketsToDelete);
coopLockRecordsDao.unlockPaths(operationId, resourceId);
} finally {
lockUpdateFuture.cancel(/* mayInterruptIfRunning= */ true);
}
} else {
coopLockOp.ifPresent(o -> o.persistAndScheduleRenewal(itemsToDelete, bucketsToDelete));
try {
deleteInternal(itemsToDelete, bucketsToDelete);

coopLockOp.ifPresent(CoopLockOperationDelete::unlock);
} finally {
coopLockOp.ifPresent(CoopLockOperationDelete::cancelRenewal);
}

repairImplicitDirectory(parentInfoFuture);
Expand Down Expand Up @@ -786,16 +749,8 @@ public void compose(List<URI> sources, URI destination, String contentType) thro
* copied and not the content of any file.
*/
private void renameInternal(FileInfo srcInfo, URI dst) throws IOException {
if (srcInfo.isDirectory() && options.enableCooperativeLocking()) {
String operationId = UUID.randomUUID().toString();
StorageResourceId srcResourceId = pathCodec.validatePathAndGetId(srcInfo.getPath(), true);
StorageResourceId dstResourceId = pathCodec.validatePathAndGetId(dst, true);

coopLockRecordsDao.lockPaths(operationId, srcResourceId, dstResourceId);
renameDirectoryInternal(srcInfo, dst, operationId);
coopLockRecordsDao.unlockPaths(operationId, srcResourceId, dstResourceId);
} else if (srcInfo.isDirectory()) {
renameDirectoryInternal(srcInfo, dst, /* operationId= */ null);
if (srcInfo.isDirectory()) {
renameDirectoryInternal(srcInfo, dst);
} else {
URI src = srcInfo.getPath();
StorageResourceId srcResourceId = pathCodec.validatePathAndGetId(src, true);
Expand Down Expand Up @@ -825,11 +780,16 @@ private void renameInternal(FileInfo srcInfo, URI dst) throws IOException {
*
* @see #renameInternal
*/
private void renameDirectoryInternal(FileInfo srcInfo, URI dst, String operationId)
throws IOException {
private void renameDirectoryInternal(FileInfo srcInfo, URI dst) throws IOException {
checkArgument(srcInfo.isDirectory(), "'%s' should be a directory", srcInfo);

Pattern markerFilePattern = options.getMarkerFilePattern();
URI src = srcInfo.getPath();

Optional<CoopLockOperationRename> coopLockOp =
options.enableCooperativeLocking() && src.getAuthority().equals(dst.getAuthority())
? Optional.of(CoopLockOperationRename.create(gcs, pathCodec, src, dst))
: Optional.empty();
coopLockOp.ifPresent(CoopLockOperationRename::lock);

// Mapping from each src to its respective dst.
// Sort src items so that parent directories appear before their children.
Expand All @@ -839,13 +799,14 @@ private void renameDirectoryInternal(FileInfo srcInfo, URI dst, String operation

// List of individual paths to rename;
// we will try to carry out the copies in this list's order.
List<FileInfo> srcItemInfos = listAllFileInfoForPrefix(srcInfo.getPath());
List<FileInfo> srcItemInfos = listAllFileInfoForPrefix(src);

// Convert to the destination directory.
dst = FileInfo.convertToDirectoryPath(pathCodec, dst);

// Create a list of sub-items to copy.
String prefix = srcInfo.getPath().toString();
Pattern markerFilePattern = options.getMarkerFilePattern();
String prefix = src.toString();
for (FileInfo srcItemInfo : srcItemInfos) {
String relativeItemName = srcItemInfo.getPath().toString().substring(prefix.length());
URI dstItemName = dst.resolve(relativeItemName);
Expand All @@ -856,69 +817,56 @@ private void renameDirectoryInternal(FileInfo srcInfo, URI dst, String operation
}
}

Future<?> lockUpdateFuture = null;
Instant operationInstant = Instant.now();
if (options.enableCooperativeLocking()
&& srcInfo.getItemInfo().getBucketName().equals(dst.getAuthority())) {
lockUpdateFuture =
coopLockOperationDao.persistUpdateOperation(
srcInfo,
dst,
operationId,
srcToDstItemNames,
srcToDstMarkerItemNames,
operationInstant);
}

// Create the destination directory.
mkdir(dst);
coopLockOp.ifPresent(
o -> o.persistAndScheduleRenewal(srcToDstItemNames, srcToDstMarkerItemNames));
try {
// Create the destination directory.
mkdir(dst);

// First, copy all items except marker items
copyInternal(srcToDstItemNames);
// Finally, copy marker items (if any) to mark rename operation success
copyInternal(srcToDstMarkerItemNames);
// First, copy all items except marker items
copyInternal(srcToDstItemNames);
// Finally, copy marker items (if any) to mark rename operation success
copyInternal(srcToDstMarkerItemNames);

if (options.enableCooperativeLocking()
&& srcInfo.getItemInfo().getBucketName().equals(dst.getAuthority())) {
coopLockOperationDao.checkpointUpdateOperation(srcInfo, dst, operationId, operationInstant);
}
coopLockOp.ifPresent(CoopLockOperationRename::checkpoint);

// So far, only the destination directories are updated. Only do those now:
if (!srcToDstItemNames.isEmpty() || !srcToDstMarkerItemNames.isEmpty()) {
List<URI> allDestinationUris =
new ArrayList<>(srcToDstItemNames.size() + srcToDstMarkerItemNames.size());
allDestinationUris.addAll(srcToDstItemNames.values());
allDestinationUris.addAll(srcToDstMarkerItemNames.values());
// So far, only the destination directories are updated. Only do those now:
if (!srcToDstItemNames.isEmpty() || !srcToDstMarkerItemNames.isEmpty()) {
List<URI> allDestinationUris =
new ArrayList<>(srcToDstItemNames.size() + srcToDstMarkerItemNames.size());
allDestinationUris.addAll(srcToDstItemNames.values());
allDestinationUris.addAll(srcToDstMarkerItemNames.values());

tryUpdateTimestampsForParentDirectories(allDestinationUris, allDestinationUris);
}

List<FileInfo> bucketsToDelete = new ArrayList<>(1);
List<FileInfo> srcItemsToDelete = new ArrayList<>(srcToDstItemNames.size() + 1);
srcItemsToDelete.addAll(srcToDstItemNames.keySet());
if (srcInfo.getItemInfo().isBucket()) {
bucketsToDelete.add(srcInfo);
} else {
// If src is a directory then srcItemInfos does not contain its own name,
// therefore add it to the list before we delete items in the list.
srcItemsToDelete.add(srcInfo);
}
tryUpdateTimestampsForParentDirectories(allDestinationUris, allDestinationUris);
}

// First delete marker files from the src
deleteInternal(new ArrayList<>(srcToDstMarkerItemNames.keySet()), new ArrayList<>());
// Then delete rest of the items that we successfully copied.
deleteInternal(srcItemsToDelete, bucketsToDelete);
List<FileInfo> bucketsToDelete = new ArrayList<>(1);
List<FileInfo> srcItemsToDelete = new ArrayList<>(srcToDstItemNames.size() + 1);
srcItemsToDelete.addAll(srcToDstItemNames.keySet());
if (srcInfo.getItemInfo().isBucket()) {
bucketsToDelete.add(srcInfo);
} else {
// If src is a directory then srcItemInfos does not contain its own name,
// therefore add it to the list before we delete items in the list.
srcItemsToDelete.add(srcInfo);
}

// if we deleted a bucket, then there no need to update timestamps
if (bucketsToDelete.isEmpty()) {
List<URI> srcItemNames =
srcItemInfos.stream().map(FileInfo::getPath).collect(toCollection(ArrayList::new));
// Any path that was deleted, we should update the parent except for parents we also deleted
tryUpdateTimestampsForParentDirectories(srcItemNames, srcItemNames);
}
// First delete marker files from the src
deleteInternal(new ArrayList<>(srcToDstMarkerItemNames.keySet()), new ArrayList<>());
// Then delete rest of the items that we successfully copied.
deleteInternal(srcItemsToDelete, bucketsToDelete);

// if we deleted a bucket, then there no need to update timestamps
if (bucketsToDelete.isEmpty()) {
List<URI> srcItemNames =
srcItemInfos.stream().map(FileInfo::getPath).collect(toCollection(ArrayList::new));
// Any path that was deleted, we should update the parent except for parents we also deleted
tryUpdateTimestampsForParentDirectories(srcItemNames, srcItemNames);
}

if (lockUpdateFuture != null) {
lockUpdateFuture.cancel(/* mayInterruptIfRunning= */ false);
coopLockOp.ifPresent(CoopLockOperationRename::unlock);
} finally {
coopLockOp.ifPresent(CoopLockOperationRename::cancelRenewal);
}
}

Expand Down
Loading

0 comments on commit 34bca3a

Please sign in to comment.