diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/CoopLockFsck.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/CoopLockFsck.java index a77b50922f..1840f86237 100644 --- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/CoopLockFsck.java +++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/CoopLockFsck.java @@ -1,49 +1,30 @@ +/* + * Copyright 2019 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.google.cloud.hadoop.fs.gcs; -import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_COOPERATIVE_LOCKING_ENABLE; -import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_COOPERATIVE_LOCKING_EXPIRATION_TIMEOUT_MS; -import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_REPAIR_IMPLICIT_DIRECTORIES_ENABLE; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; -import static java.time.temporal.ChronoUnit.MILLIS; -import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toSet; -import com.google.cloud.hadoop.gcsio.GoogleCloudStorage; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem; -import com.google.cloud.hadoop.gcsio.StorageResourceId; -import com.google.cloud.hadoop.gcsio.cooplock.CoopLockOperationDao; -import com.google.cloud.hadoop.gcsio.cooplock.CoopLockRecord; -import com.google.cloud.hadoop.gcsio.cooplock.CoopLockRecordsDao; -import com.google.cloud.hadoop.gcsio.cooplock.DeleteOperation; -import com.google.cloud.hadoop.gcsio.cooplock.RenameOperation; -import com.google.common.base.Charsets; -import com.google.common.base.Splitter; import com.google.common.collect.ImmutableSet; -import com.google.common.flogger.GoogleLogger; -import com.google.common.io.ByteSource; -import com.google.gson.Gson; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.net.URI; -import java.net.URISyntaxException; -import java.time.Instant; -import java.util.AbstractMap; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.concurrent.Future; -import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -58,18 +39,13 @@ */ public class CoopLockFsck extends Configured implements Tool { - private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); - - private static final String COMMAND_CHECK = "--check"; - private static final String COMMAND_ROLL_FORWARD = "--rollForward"; - private static final String COMMAND_ROLL_BACK = "--rollBack"; + static final String COMMAND_CHECK = "--check"; + static final String COMMAND_ROLL_FORWARD = "--rollForward"; + static final String COMMAND_ROLL_BACK = "--rollBack"; private static final Set FSCK_COMMANDS = ImmutableSet.of(COMMAND_CHECK, COMMAND_ROLL_FORWARD, COMMAND_ROLL_BACK); - private static final Gson GSON = new Gson(); - private static final Splitter RENAME_LOG_RECORD_SPLITTER = Splitter.on(" -> "); - public static void main(String[] args) throws Exception { if (args.length == 1 && "--help".equals(args[0])) { System.out.println( @@ -91,9 +67,9 @@ public static void main(String[] args) throws Exception { } // Let ToolRunner handle generic command-line options - int res = ToolRunner.run(new Configuration(), new CoopLockFsck(), args); + int result = ToolRunner.run(new Configuration(), new CoopLockFsck(), args); - System.exit(res); + System.exit(result); } @Override @@ -105,315 +81,11 @@ public int run(String[] args) throws Exception { checkArgument(FSCK_COMMANDS.contains(command), "Unknown %s command, should be %s", command); String bucket = args[1]; - checkArgument(bucket.startsWith("gs://"), "bucket parameter should have 'gs://' scheme"); - - Configuration conf = getConf(); - - // Disable cooperative locking to prevent blocking - conf.setBoolean(GCS_COOPERATIVE_LOCKING_ENABLE.getKey(), false); - conf.setBoolean(GCS_REPAIR_IMPLICIT_DIRECTORIES_ENABLE.getKey(), false); - - URI bucketUri = URI.create(bucket); - String bucketName = bucketUri.getAuthority(); - GoogleHadoopFileSystem ghfs = (GoogleHadoopFileSystem) FileSystem.get(bucketUri, conf); - GoogleCloudStorageFileSystem gcsFs = ghfs.getGcsFs(); - GoogleCloudStorage gcs = gcsFs.getGcs(); - CoopLockRecordsDao lockRecordsDao = gcsFs.getCoopLockRecordsDao(); - CoopLockOperationDao lockOperationDao = new CoopLockOperationDao(gcs, gcsFs.getPathCodec()); - - Instant operationExpirationTime = Instant.now(); - - Set lockedOperations = - lockRecordsDao.getLockedOperations(bucketUri.getAuthority()); - if (lockedOperations.isEmpty()) { - logger.atInfo().log("No expired operation locks"); - return 0; - } - - Map expiredOperations = new HashMap<>(); - for (CoopLockRecord lockedOperation : lockedOperations) { - String operationId = lockedOperation.getOperationId(); - URI operationPattern = - bucketUri.resolve("/" + CoopLockRecordsDao.LOCK_DIRECTORY + "*" + operationId + "*.lock"); - FileStatus[] operationStatuses = ghfs.globStatus(new Path(operationPattern)); - checkState( - operationStatuses.length < 2, - "operation %s should not have more than one lock file", - operationId); - - // Lock file not created - nothing to repair - if (operationStatuses.length == 0) { - logger.atInfo().log( - "Operation %s for %s resources doesn't have lock file, unlocking", - lockedOperation.getOperationId(), lockedOperation.getResources()); - StorageResourceId[] lockedResources = - lockedOperation.getResources().stream() - .map(r -> StorageResourceId.fromObjectName(bucketUri.resolve("/" + r).toString())) - .toArray(StorageResourceId[]::new); - lockRecordsDao.unlockPaths(lockedOperation.getOperationId(), lockedResources); - continue; - } - - FileStatus operation = operationStatuses[0]; - - Instant lockInstant = Instant.ofEpochSecond(lockedOperation.getLockEpochSeconds()); - Instant renewedInstant = getLockRenewedInstant(ghfs, operation); - if (isLockExpired(conf, renewedInstant, operationExpirationTime) - && isLockExpired(conf, lockInstant, operationExpirationTime)) { - expiredOperations.put(operation, lockedOperation); - logger.atInfo().log("Operation %s expired.", operation.getPath()); - } else { - logger.atInfo().log("Operation %s not expired.", operation.getPath()); - } - } - - if (COMMAND_CHECK.equals(command)) { - return 0; - } - - Function, Boolean> operationRecovery = - expiredOperation -> { - FileStatus operation = expiredOperation.getKey(); - CoopLockRecord lockedOperation = expiredOperation.getValue(); - - String operationId = getOperationId(operation); - try { - if (operation.getPath().toString().contains("_delete_")) { - if (COMMAND_ROLL_BACK.equals(command)) { - logger.atInfo().log( - "Rolling back delete operations (%s) not supported, skipping.", - operation.getPath()); - return false; - } - logger.atInfo().log("Repairing FS after %s delete operation.", operation.getPath()); - DeleteOperation operationObject = - getOperationObject(ghfs, operation, DeleteOperation.class); - lockRecordsDao.lockOperation( - bucketName, operationId, lockedOperation.getLockEpochSeconds()); - Future lockUpdateFuture = - lockOperationDao.scheduleLockUpdate( - operationId, - new URI(operation.getPath().toString()), - DeleteOperation.class, - (o, i) -> o.setLockEpochSeconds(i.getEpochSecond())); - try { - List loggedResources = getOperationLog(ghfs, operation, l -> l); - deleteResource(ghfs, operationObject.getResource(), loggedResources); - lockRecordsDao.unlockPaths( - operationId, StorageResourceId.fromObjectName(operationObject.getResource())); - } finally { - lockUpdateFuture.cancel(/* mayInterruptIfRunning= */ false); - } - } else if (operation.getPath().toString().contains("_rename_")) { - RenameOperation operationObject = - getOperationObject(ghfs, operation, RenameOperation.class); - lockRecordsDao.lockOperation( - bucketName, operationId, lockedOperation.getLockEpochSeconds()); - Future lockUpdateFuture = - lockOperationDao.scheduleLockUpdate( - operationId, - new URI(operation.getPath().toString()), - RenameOperation.class, - (o, i) -> o.setLockEpochSeconds(i.getEpochSecond())); - try { - List> loggedResources = - getOperationLog( - ghfs, - operation, - l -> { - List srcToDst = RENAME_LOG_RECORD_SPLITTER.splitToList(l); - checkState(srcToDst.size() == 2); - return new AbstractMap.SimpleEntry<>(srcToDst.get(0), srcToDst.get(1)); - }); - if (operationObject.getCopySucceeded()) { - if (COMMAND_ROLL_BACK.equals(command)) { - logger.atInfo().log( - "Repairing FS after %s rename operation" - + " (deleting source (%s) and renaming (%s -> %s)).", - operation.getPath(), - operationObject.getSrcResource(), - operationObject.getDstResource(), - operationObject.getSrcResource()); - deleteResource( - ghfs, - operationObject.getSrcResource(), - loggedResources.stream().map(Map.Entry::getKey).collect(toList())); - gcs.copy( - bucketName, - loggedResources.stream() - .map( - p -> StorageResourceId.fromObjectName(p.getValue()).getObjectName()) - .collect(toList()), - bucketName, - loggedResources.stream() - .map(p -> StorageResourceId.fromObjectName(p.getKey()).getObjectName()) - .collect(toList())); - deleteResource( - ghfs, - operationObject.getDstResource(), - loggedResources.stream().map(Map.Entry::getValue).collect(toList())); - } else { - logger.atInfo().log( - "Repairing FS after %s rename operation (deleting source (%s)).", - operation.getPath(), operationObject.getSrcResource()); - deleteResource( - ghfs, - operationObject.getSrcResource(), - loggedResources.stream().map(Map.Entry::getKey).collect(toList())); - } - } else { - if (COMMAND_ROLL_BACK.equals(command)) { - logger.atInfo().log( - "Repairing FS after %s rename operation (deleting destination (%s)).", - operation.getPath(), operationObject.getDstResource()); - deleteResource( - ghfs, - operationObject.getDstResource(), - loggedResources.stream().map(Map.Entry::getKey).collect(toList())); - } else { - logger.atInfo().log( - "Repairing FS after %s rename operation" - + " (deleting destination (%s) and renaming (%s -> %s)).", - operation.getPath(), - operationObject.getDstResource(), - operationObject.getSrcResource(), - operationObject.getDstResource()); - deleteResource( - ghfs, - operationObject.getDstResource(), - loggedResources.stream().map(Map.Entry::getValue).collect(toList())); - gcs.copy( - bucketName, - loggedResources.stream() - .map(p -> StorageResourceId.fromObjectName(p.getKey()).getObjectName()) - .collect(toList()), - bucketName, - loggedResources.stream() - .map( - p -> StorageResourceId.fromObjectName(p.getValue()).getObjectName()) - .collect(toList())); - deleteResource( - ghfs, - operationObject.getSrcResource(), - loggedResources.stream().map(Map.Entry::getKey).collect(toList())); - } - } - lockRecordsDao.unlockPaths( - operationId, - StorageResourceId.fromObjectName(operationObject.getSrcResource()), - StorageResourceId.fromObjectName(operationObject.getDstResource())); - } finally { - lockUpdateFuture.cancel(/* mayInterruptIfRunning= */ false); - } - } else { - throw new IllegalStateException("Unknown operation type: " + operation.getPath()); - } - } catch (IOException | URISyntaxException e) { - throw new RuntimeException("Failed to recover operation: ", e); - } - return true; - }; - - for (Map.Entry expiredOperation : expiredOperations.entrySet()) { - long start = System.currentTimeMillis(); - try { - boolean succeeded = operationRecovery.apply(expiredOperation); - long finish = System.currentTimeMillis(); - if (succeeded) { - logger.atInfo().log( - "Operation %s successfully %s in %dms", - expiredOperation, - COMMAND_ROLL_FORWARD.equals(command) ? "rolled forward" : "rolled back", - finish - start); - } else { - logger.atSevere().log( - "Operation %s failed to %s in %dms", - expiredOperation, - COMMAND_ROLL_FORWARD.equals(command) ? "rolled forward" : "rolled back", - finish - start); - } - } catch (Exception e) { - long finish = System.currentTimeMillis(); - logger.atSevere().withCause(e).log( - "Operation %s failed to roll forward in %dms", expiredOperation, finish - start); - } - } - return 0; - } - - private void deleteResource( - GoogleHadoopFileSystem ghfs, String resource, List loggedResources) - throws IOException { - Path lockedResource = new Path(resource); - Set allObjects = - Arrays.stream(ghfs.listStatus(lockedResource)) - .map(s -> s.getPath().toString()) - .collect(toSet()); - List objectsToDelete = new ArrayList<>(loggedResources.size()); - for (String loggedObject : loggedResources) { - if (allObjects.contains(loggedObject)) { - objectsToDelete.add(StorageResourceId.fromObjectName(loggedObject)); - } - } - GoogleCloudStorage gcs = ghfs.getGcsFs().getGcs(); - gcs.deleteObjects(objectsToDelete); - - // delete directory if empty - allObjects.removeAll(loggedResources); - if (allObjects.isEmpty() && ghfs.exists(lockedResource)) { - ghfs.delete(lockedResource, /* recursive= */ false); - } - } - - private boolean isLockExpired( - Configuration conf, Instant lockInstant, Instant expirationInstant) { - return lockInstant - .plus(GCS_COOPERATIVE_LOCKING_EXPIRATION_TIMEOUT_MS.get(conf, conf::getLong), MILLIS) - .isBefore(expirationInstant); - } - - private static Instant getLockRenewedInstant(GoogleHadoopFileSystem ghfs, FileStatus operation) - throws IOException { - if (operation.getPath().toString().contains("_delete_")) { - return Instant.ofEpochSecond( - getOperationObject(ghfs, operation, DeleteOperation.class).getLockEpochSeconds()); - } - if (operation.getPath().toString().contains("_rename_")) { - return Instant.ofEpochSecond( - getOperationObject(ghfs, operation, RenameOperation.class).getLockEpochSeconds()); - } - throw new IllegalStateException("Unknown operation type: " + operation.getPath()); - } - - private static T getOperationObject( - GoogleHadoopFileSystem ghfs, FileStatus operation, Class clazz) throws IOException { - ByteSource operationByteSource = - new ByteSource() { - @Override - public InputStream openStream() throws IOException { - return ghfs.open(operation.getPath()); - } - }; - String operationContent = operationByteSource.asCharSource(Charsets.UTF_8).read(); - return GSON.fromJson(operationContent, clazz); - } - - private static List getOperationLog( - GoogleHadoopFileSystem ghfs, FileStatus operation, Function logRecordFn) - throws IOException { - List log = new ArrayList<>(); - Path operationLog = new Path(operation.getPath().toString().replace(".lock", ".log")); - try (BufferedReader in = new BufferedReader(new InputStreamReader(ghfs.open(operationLog)))) { - String line; - while ((line = in.readLine()) != null) { - log.add(logRecordFn.apply(line)); - } - } - return log; - } + checkArgument( + bucket.startsWith(GoogleCloudStorageFileSystem.SCHEME + "://"), + "bucket parameter should have 'gs://' scheme"); - private static String getOperationId(FileStatus operation) { - String[] fileParts = operation.getPath().toString().split("_"); - return fileParts[fileParts.length - 1].split("\\.")[0]; + return new CoopLockFsckRunner(getConf(), URI.create(bucket), command).run(); } } + diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/CoopLockFsckRunner.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/CoopLockFsckRunner.java new file mode 100644 index 0000000000..47196f3c78 --- /dev/null +++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/CoopLockFsckRunner.java @@ -0,0 +1,443 @@ +/* + * Copyright 2019 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.hadoop.fs.gcs; + +import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_COOPERATIVE_LOCKING_ENABLE; +import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_COOPERATIVE_LOCKING_EXPIRATION_TIMEOUT_MS; +import static com.google.cloud.hadoop.gcsio.cooplock.CoopLockOperationDao.RENAME_LOG_RECORD_SEPARATOR; +import static com.google.common.base.Preconditions.checkState; +import static java.time.temporal.ChronoUnit.MILLIS; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; + +import com.google.cloud.hadoop.gcsio.GoogleCloudStorage; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl; +import com.google.cloud.hadoop.gcsio.StorageResourceId; +import com.google.cloud.hadoop.gcsio.cooplock.CoopLockOperationDao; +import com.google.cloud.hadoop.gcsio.cooplock.CoopLockRecord; +import com.google.cloud.hadoop.gcsio.cooplock.CoopLockRecordsDao; +import com.google.cloud.hadoop.gcsio.cooplock.DeleteOperation; +import com.google.cloud.hadoop.gcsio.cooplock.RenameOperation; +import com.google.common.base.Charsets; +import com.google.common.base.Splitter; +import com.google.common.flogger.GoogleLogger; +import com.google.common.io.ByteSource; +import com.google.gson.Gson; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Instant; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.function.Function; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +class CoopLockFsckRunner { + + private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); + + private static final Gson GSON = new Gson(); + + private static final Splitter RENAME_LOG_RECORD_SPLITTER = + Splitter.on(RENAME_LOG_RECORD_SEPARATOR); + + private final Instant operationExpirationInstant = Instant.now(); + + private final Configuration conf; + private final String bucketName; + private final String command; + + private final GoogleHadoopFileSystem ghfs; + private final GoogleCloudStorageFileSystem gcsFs; + private final GoogleCloudStorageImpl gcs; + private final CoopLockRecordsDao lockRecordsDao; + private final CoopLockOperationDao lockOperationDao; + + public CoopLockFsckRunner(Configuration conf, URI bucketUri, String command) throws IOException { + // Disable cooperative locking to prevent blocking + conf.setBoolean(GCS_COOPERATIVE_LOCKING_ENABLE.getKey(), false); + + this.conf = conf; + this.bucketName = bucketUri.getAuthority(); + this.command = command; + + this.ghfs = (GoogleHadoopFileSystem) FileSystem.get(bucketUri, conf); + this.gcsFs = ghfs.getGcsFs(); + this.gcs = (GoogleCloudStorageImpl) gcsFs.getGcs(); + this.lockRecordsDao = new CoopLockRecordsDao(gcs); + this.lockOperationDao = new CoopLockOperationDao(gcs, gcsFs.getPathCodec()); + } + + public int run() throws IOException { + Set lockedOperations = lockRecordsDao.getLockedOperations(bucketName); + if (lockedOperations.isEmpty()) { + logger.atInfo().log("No expired operation locks"); + return 0; + } + + Map expiredOperations = + lockedOperations.stream() + .map(this::getOperationLockIfExpiredUnchecked) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (CoopLockFsck.COMMAND_CHECK.equals(command)) { + return 0; + } + + Function, Boolean> operationRecovery = + expiredOperation -> { + FileStatus operationStatus = expiredOperation.getKey(); + CoopLockRecord operation = expiredOperation.getValue(); + String operationId = getOperationId(operationStatus); + try { + switch (operation.getOperationType()) { + case DELETE: + repairDeleteOperation(operationStatus, operation, operationId); + break; + case RENAME: + repairRenameOperation(operationStatus, operation, operationId); + break; + default: + throw new IllegalStateException( + "Unsupported operation type: " + operationStatus.getPath()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to recover operation: " + operation, e); + } + return true; + }; + + for (Map.Entry expiredOperation : expiredOperations.entrySet()) { + long start = System.currentTimeMillis(); + try { + boolean succeeded = operationRecovery.apply(expiredOperation); + long finish = System.currentTimeMillis(); + if (succeeded) { + logger.atInfo().log( + "Operation %s successfully %s in %dms", + expiredOperation, + CoopLockFsck.COMMAND_ROLL_FORWARD.equals(command) ? "rolled forward" : "rolled back", + finish - start); + } else { + logger.atSevere().log( + "Operation %s failed to %s in %dms", + expiredOperation, + CoopLockFsck.COMMAND_ROLL_FORWARD.equals(command) ? "rolled forward" : "rolled back", + finish - start); + } + } catch (Exception e) { + long finish = System.currentTimeMillis(); + logger.atSevere().withCause(e).log( + "Operation %s failed to roll forward in %dms", expiredOperation, finish - start); + } + } + return 0; + } + + private void repairDeleteOperation( + FileStatus operationStatus, CoopLockRecord operation, String operationId) + throws IOException, URISyntaxException { + if (CoopLockFsck.COMMAND_ROLL_BACK.equals(command)) { + logger.atInfo().log( + "Rolling back delete operations (%s) not supported, skipping.", + operationStatus.getPath()); + } else { + logger.atInfo().log("Repairing FS after %s delete operation.", operationStatus.getPath()); + DeleteOperation operationObject = getOperationObject(operationStatus, DeleteOperation.class); + lockRecordsDao.relockOperation( + bucketName, operationId, operation.getClientId(), operation.getLockEpochSeconds()); + Future lockUpdateFuture = + lockOperationDao.scheduleLockUpdate( + operationId, + new URI(operationStatus.getPath().toString()), + DeleteOperation.class, + (o, i) -> o.setLockEpochSeconds(i.getEpochSecond())); + try { + List loggedResources = getOperationLog(operationStatus, l -> l); + deleteResource(operationObject.getResource(), loggedResources); + lockRecordsDao.unlockPaths( + operationId, StorageResourceId.fromObjectName(operationObject.getResource())); + } finally { + lockUpdateFuture.cancel(/* mayInterruptIfRunning= */ true); + } + } + } + + private void repairRenameOperation( + FileStatus operationStatus, CoopLockRecord operation, String operationId) + throws IOException, URISyntaxException { + RenameOperation operationObject = getOperationObject(operationStatus, RenameOperation.class); + lockRecordsDao.relockOperation( + bucketName, operationId, operation.getClientId(), operation.getLockEpochSeconds()); + Future lockUpdateFuture = + lockOperationDao.scheduleLockUpdate( + operationId, + new URI(operationStatus.getPath().toString()), + RenameOperation.class, + (o, i) -> o.setLockEpochSeconds(i.getEpochSecond())); + try { + LinkedHashMap loggedResources = + getOperationLog( + operationStatus, + l -> { + List srcToDst = RENAME_LOG_RECORD_SPLITTER.splitToList(l); + checkState(srcToDst.size() == 2); + return new AbstractMap.SimpleEntry<>(srcToDst.get(0), srcToDst.get(1)); + }) + .stream() + .collect( + toMap( + AbstractMap.SimpleEntry::getKey, + AbstractMap.SimpleEntry::getValue, + (e1, e2) -> { + throw new RuntimeException( + String.format("Found entries with duplicate keys: %s and %s", e1, e2)); + }, + LinkedHashMap::new)); + if (operationObject.getCopySucceeded()) { + if (CoopLockFsck.COMMAND_ROLL_BACK.equals(command)) { + deleteAndRenameToRepairRenameOperation( + operationStatus, + operation, + operationObject, + operationObject.getDstResource(), + new ArrayList<>(loggedResources.values()), + operationObject.getSrcResource(), + "source", + new ArrayList<>(loggedResources.keySet()), + /* copySucceeded= */ false); + } else { + deleteToRepairRenameOperation( + operationStatus, + operationObject.getSrcResource(), + "source", + loggedResources.keySet()); + } + } else { + if (CoopLockFsck.COMMAND_ROLL_BACK.equals(command)) { + deleteToRepairRenameOperation( + operationStatus, + operationObject.getDstResource(), + "destination", + loggedResources.values()); + } else { + deleteAndRenameToRepairRenameOperation( + operationStatus, + operation, + operationObject, + operationObject.getSrcResource(), + new ArrayList<>(loggedResources.keySet()), + operationObject.getDstResource(), + "destination", + new ArrayList<>(loggedResources.values()), + /* copySucceeded= */ true); + } + } + lockRecordsDao.unlockPaths( + operationId, + StorageResourceId.fromObjectName(operationObject.getSrcResource()), + StorageResourceId.fromObjectName(operationObject.getDstResource())); + } finally { + lockUpdateFuture.cancel(/* mayInterruptIfRunning= */ true); + } + } + + private void deleteToRepairRenameOperation( + FileStatus operationLock, + String operationResource, + String deleteResourceType, + Collection loggedResources) + throws IOException { + logger.atInfo().log( + "Repairing FS after %s rename operation (deleting %s (%s)).", + operationLock.getPath(), deleteResourceType, operationResource); + deleteResource(operationResource, loggedResources); + } + + private void deleteAndRenameToRepairRenameOperation( + FileStatus operationLock, + CoopLockRecord operation, + RenameOperation operationObject, + String srcResource, + List loggedSrcResources, + String dstResource, + String dstResourceType, + List loggedDstResources, + boolean copySucceeded) + throws IOException { + logger.atInfo().log( + "Repairing FS after %s rename operation (deleting %s (%s) and renaming (%s -> %s)).", + operationLock.getPath(), dstResourceType, dstResource, srcResource, dstResource); + deleteResource(dstResource, loggedDstResources); + gcs.copy(bucketName, toNames(loggedSrcResources), bucketName, toNames(loggedDstResources)); + + // Update rename operation checkpoint before proceeding to allow repair of failed repair + lockOperationDao.checkpointRenameOperation( + StorageResourceId.fromObjectName(operationObject.getSrcResource()), + StorageResourceId.fromObjectName(operationObject.getDstResource()), + operation.getOperationId(), + Instant.ofEpochSecond(operation.getOperationEpochSeconds()), + copySucceeded); + + deleteResource(srcResource, loggedSrcResources); + } + + private static List toNames(List resources) { + return resources.stream() + .map(r -> StorageResourceId.fromObjectName(r).getObjectName()) + .collect(toList()); + } + + private Optional> getOperationLockIfExpiredUnchecked( + CoopLockRecord operation) { + try { + return getOperationLockIfExpired(bucketName, operation); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to check if %s operation expired", operation), e); + } + } + + private Optional> getOperationLockIfExpired( + String bucketName, CoopLockRecord operation) throws IOException { + String operationId = operation.getOperationId(); + String globPath = CoopLockRecordsDao.LOCK_DIRECTORY + "*" + operationId + "*.lock"; + URI globUri = + gcsFs.getPathCodec().getPath(bucketName, globPath, /* allowEmptyObjectName= */ false); + FileStatus[] operationLocks = ghfs.globStatus(new Path(globUri)); + checkState( + operationLocks.length < 2, + "operation %s should not have more than one lock file", + operationId); + + // Lock file not created - nothing to repair + if (operationLocks.length == 0) { + logger.atInfo().log( + "Operation %s for %s resources doesn't have lock file, unlocking", + operation.getOperationId(), operation.getResources()); + StorageResourceId[] lockedResources = + operation.getResources().stream() + .map(resource -> new StorageResourceId(bucketName, resource)) + .toArray(StorageResourceId[]::new); + lockRecordsDao.unlockPaths(operation.getOperationId(), lockedResources); + return Optional.empty(); + } + + FileStatus operationStatus = operationLocks[0]; + + Instant lockInstant = Instant.ofEpochSecond(operation.getLockEpochSeconds()); + if (isLockExpired(lockInstant) + && isLockExpired(getLockRenewedInstant(operationStatus, operation))) { + logger.atInfo().log("Operation %s expired.", operationStatus.getPath()); + return Optional.of(new AbstractMap.SimpleEntry<>(operationStatus, operation)); + } + + logger.atInfo().log("Operation %s not expired.", operationStatus.getPath()); + return Optional.empty(); + } + + private void deleteResource(String resource, Collection loggedResources) + throws IOException { + Path lockedResource = new Path(resource); + Set allObjects = + Arrays.stream(ghfs.listStatus(lockedResource)) + .map(s -> s.getPath().toString()) + .collect(toSet()); + List objectsToDelete = new ArrayList<>(loggedResources.size()); + for (String loggedObject : loggedResources) { + if (allObjects.contains(loggedObject)) { + objectsToDelete.add(StorageResourceId.fromObjectName(loggedObject)); + } + } + GoogleCloudStorage gcs = ghfs.getGcsFs().getGcs(); + gcs.deleteObjects(objectsToDelete); + + // delete directory if empty + allObjects.removeAll(loggedResources); + if (allObjects.isEmpty() && ghfs.exists(lockedResource)) { + ghfs.delete(lockedResource, /* recursive= */ false); + } + } + + private boolean isLockExpired(Instant lockInstant) { + return lockInstant + .plus(GCS_COOPERATIVE_LOCKING_EXPIRATION_TIMEOUT_MS.get(conf, conf::getLong), MILLIS) + .isBefore(operationExpirationInstant); + } + + private Instant getLockRenewedInstant(FileStatus operationStatus, CoopLockRecord operation) + throws IOException { + switch (operation.getOperationType()) { + case DELETE: + return Instant.ofEpochSecond( + getOperationObject(operationStatus, DeleteOperation.class).getLockEpochSeconds()); + case RENAME: + return Instant.ofEpochSecond( + getOperationObject(operationStatus, RenameOperation.class).getLockEpochSeconds()); + default: + throw new IllegalStateException("Unknown operation type: " + operationStatus.getPath()); + } + } + + private T getOperationObject(FileStatus operation, Class clazz) throws IOException { + ByteSource operationByteSource = + new ByteSource() { + @Override + public InputStream openStream() throws IOException { + return ghfs.open(operation.getPath()); + } + }; + String operationContent = operationByteSource.asCharSource(Charsets.UTF_8).read(); + return GSON.fromJson(operationContent, clazz); + } + + private List getOperationLog(FileStatus operation, Function logRecordFn) + throws IOException { + List log = new ArrayList<>(); + Path operationLog = new Path(operation.getPath().toString().replace(".lock", ".log")); + try (BufferedReader in = new BufferedReader(new InputStreamReader(ghfs.open(operationLog)))) { + String line; + while ((line = in.readLine()) != null) { + log.add(logRecordFn.apply(line)); + } + } + return log; + } + + private static String getOperationId(FileStatus operation) { + String[] fileParts = operation.getPath().toString().split("_"); + return fileParts[fileParts.length - 1].split("\\.")[0]; + } +} diff --git a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/CoopLockRepairIntegrationTest.java b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/CoopLockRepairIntegrationTest.java index f935853747..be27d4e5fc 100644 --- a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/CoopLockRepairIntegrationTest.java +++ b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/CoopLockRepairIntegrationTest.java @@ -18,6 +18,8 @@ import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.AUTHENTICATION_PREFIX; import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_COOPERATIVE_LOCKING_EXPIRATION_TIMEOUT_MS; +import static com.google.cloud.hadoop.gcsio.cooplock.CoopLockOperationType.DELETE; +import static com.google.cloud.hadoop.gcsio.cooplock.CoopLockOperationType.RENAME; import static com.google.cloud.hadoop.gcsio.cooplock.CoopLockRecordsDao.LOCK_DIRECTORY; import static com.google.cloud.hadoop.util.EntriesCredentialConfiguration.ENABLE_SERVICE_ACCOUNTS_SUFFIX; import static com.google.cloud.hadoop.util.EntriesCredentialConfiguration.SERVICE_ACCOUNT_EMAIL_SUFFIX; @@ -61,7 +63,7 @@ public class CoopLockRepairIntegrationTest { private static final Gson GSON = new Gson(); - private static final String OPERATION_FILENAME_PATTERN = + private static final String OPERATION_FILENAME_PATTERN_FORMAT = "[0-9]{8}T[0-9]{6}\\.[0-9]{3}Z_%s_[a-z0-9\\-]+"; private static GoogleCloudStorageOptions gcsOptions; @@ -175,10 +177,9 @@ private void moveDirectoryOperationRepairedAfterFailedCopy(String command) throw .collect(toList()); assertThat(lockFiles).hasSize(2); - URI lockFileUri = - matchFile(lockFiles, String.format(OPERATION_FILENAME_PATTERN, "rename") + "\\.lock").get(); - URI logFileUri = - matchFile(lockFiles, String.format(OPERATION_FILENAME_PATTERN, "rename") + "\\.log").get(); + String filenamePattern = String.format(OPERATION_FILENAME_PATTERN_FORMAT, RENAME); + URI lockFileUri = matchFile(lockFiles, filenamePattern + "\\.lock").get(); + URI logFileUri = matchFile(lockFiles, filenamePattern + "\\.log").get(); String lockContent = gcsfsIHelper.readTextFile(bucketName, lockFileUri.getPath()); assertThat(GSON.fromJson(lockContent, RenameOperation.class).setLockEpochSeconds(0)) @@ -248,10 +249,9 @@ public void moveDirectoryOperationRepairedAfterFailedDelete() throws Exception { .collect(toList()); assertThat(lockFiles).hasSize(2); - URI lockFileUri = - matchFile(lockFiles, String.format(OPERATION_FILENAME_PATTERN, "rename") + "\\.lock").get(); - URI logFileUri = - matchFile(lockFiles, String.format(OPERATION_FILENAME_PATTERN, "rename") + "\\.log").get(); + String filenameFormat = String.format(OPERATION_FILENAME_PATTERN_FORMAT, RENAME); + URI lockFileUri = matchFile(lockFiles, filenameFormat + "\\.lock").get(); + URI logFileUri = matchFile(lockFiles, filenameFormat + "\\.log").get(); String lockContent = gcsfsIHelper.readTextFile(bucketName, lockFileUri.getPath()); assertThat(GSON.fromJson(lockContent, RenameOperation.class).setLockEpochSeconds(0)) @@ -316,11 +316,14 @@ public void deleteDirectoryOperationRolledForward() throws Exception { .collect(toList()); assertThat(lockFiles).hasSize(2); - URI lockFileUri = - matchFile(lockFiles, String.format(OPERATION_FILENAME_PATTERN, "delete") + "\\.lock").get(); + String filenamePattern = String.format(OPERATION_FILENAME_PATTERN_FORMAT, DELETE); + URI lockFileUri = matchFile(lockFiles, filenamePattern + "\\.lock").get(); + URI logFileUri = matchFile(lockFiles, filenamePattern + "\\.log").get(); String lockContent = gcsfsIHelper.readTextFile(bucketName, lockFileUri.getPath()); assertThat(GSON.fromJson(lockContent, DeleteOperation.class).setLockEpochSeconds(0)) .isEqualTo(new DeleteOperation().setLockEpochSeconds(0).setResource(dirUri.toString())); + assertThat(gcsfsIHelper.readTextFile(bucketName, logFileUri.getPath())) + .isEqualTo(dirUri.resolve(fileName) + "\n" + dirUri + "\n"); } private Configuration getTestConfiguration() { diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/ForwardingGoogleCloudStorage.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/ForwardingGoogleCloudStorage.java index 7680310fda..ee9c08a9da 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/ForwardingGoogleCloudStorage.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/ForwardingGoogleCloudStorage.java @@ -254,7 +254,7 @@ public GoogleCloudStorageItemInfo composeObjects( * * @return the {@link GoogleCloudStorage} objected wrapped by this class. */ - protected GoogleCloudStorage getDelegate() { + public GoogleCloudStorage getDelegate() { return delegate; } } diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystem.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystem.java index 9e2fc61b26..b1ce0c7bec 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystem.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystem.java @@ -29,8 +29,8 @@ import com.google.api.client.util.Clock; import com.google.cloud.hadoop.gcsio.GoogleCloudStorage.ListPage; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemOptions.TimestampUpdatePredicate; -import com.google.cloud.hadoop.gcsio.cooplock.CoopLockOperationDao; -import com.google.cloud.hadoop.gcsio.cooplock.CoopLockRecordsDao; +import com.google.cloud.hadoop.gcsio.cooplock.CoopLockOperationDelete; +import com.google.cloud.hadoop.gcsio.cooplock.CoopLockOperationRename; import com.google.cloud.hadoop.util.LazyExecutorService; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.CharMatcher; @@ -50,7 +50,6 @@ import java.nio.channels.WritableByteChannel; import java.nio.file.DirectoryNotEmptyException; import java.nio.file.FileAlreadyExistsException; -import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -58,9 +57,9 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; -import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -110,9 +109,6 @@ public class GoogleCloudStorageFileSystem { private final PathCodec pathCodec; - private final CoopLockRecordsDao coopLockRecordsDao; - private final CoopLockOperationDao coopLockOperationDao; - // Executor for updating directory timestamps. private ExecutorService updateTimestampsExecutor = createUpdateTimestampsExecutor(); @@ -165,15 +161,10 @@ public GoogleCloudStorageFileSystem( checkArgument(credential != null, "credential must not be null"); - GoogleCloudStorageImpl gcsImpl = - new GoogleCloudStorageImpl(options.getCloudStorageOptions(), credential); - this.gcs = gcsImpl; + this.gcs = new GoogleCloudStorageImpl(options.getCloudStorageOptions(), credential); this.options = options; this.pathCodec = options.getPathCodec(); - this.coopLockRecordsDao = new CoopLockRecordsDao(gcsImpl); - this.coopLockOperationDao = new CoopLockOperationDao(gcsImpl, pathCodec); - if (options.isPerformanceCacheEnabled()) { this.gcs = new PerformanceCachingGoogleCloudStorage(this.gcs, options.getPerformanceCacheOptions()); @@ -203,26 +194,6 @@ public GoogleCloudStorageFileSystem( this.gcs = gcs; this.options = options; this.pathCodec = options.getPathCodec(); - this.coopLockRecordsDao = null; - this.coopLockOperationDao = null; - } - - /** - * Constructs a GoogleCloudStorageFilesystem based on an already-configured underlying - * GoogleCloudStorageImpl {@code gcs}. Any options pertaining to GCS creation will be ignored. - */ - @VisibleForTesting - public GoogleCloudStorageFileSystem( - GoogleCloudStorageImpl gcs, GoogleCloudStorageFileSystemOptions options) { - this.gcs = gcs; - this.options = options; - this.pathCodec = options.getPathCodec(); - this.coopLockRecordsDao = new CoopLockRecordsDao(gcs); - this.coopLockOperationDao = new CoopLockOperationDao(gcs, pathCodec); - } - - public CoopLockRecordsDao getCoopLockRecordsDao() { - return coopLockRecordsDao; } @VisibleForTesting @@ -417,9 +388,13 @@ public void delete(URI path, boolean recursive) throws IOException { () -> getFileInfoInternal(parentId, /* inferImplicitDirectories= */ false)); } - List itemsToDelete = new ArrayList<>(); - List bucketsToDelete = new ArrayList<>(); + Optional coopLockOp = + options.enableCooperativeLocking() && fileInfo.isDirectory() + ? Optional.of(CoopLockOperationDelete.create(gcs, pathCodec, fileInfo.getPath())) + : Optional.empty(); + coopLockOp.ifPresent(CoopLockOperationDelete::lock); + List itemsToDelete; // Delete sub-items if it is a directory. if (fileInfo.isDirectory()) { itemsToDelete = @@ -429,32 +404,20 @@ public void delete(URI path, boolean recursive) throws IOException { if (!itemsToDelete.isEmpty() && !recursive) { throw new DirectoryNotEmptyException("Cannot delete a non-empty directory."); } - } - - if (fileInfo.getItemInfo().isBucket()) { - bucketsToDelete.add(fileInfo); } else { - itemsToDelete.add(fileInfo); + itemsToDelete = new ArrayList<>(); } - if (options.enableCooperativeLocking() && fileInfo.isDirectory()) { - String operationId = UUID.randomUUID().toString(); - StorageResourceId resourceId = pathCodec.validatePathAndGetId(fileInfo.getPath(), true); - - coopLockRecordsDao.lockPaths(operationId, resourceId); - Future lockUpdateFuture = Futures.immediateCancelledFuture(); - try { - lockUpdateFuture = - coopLockOperationDao.persistDeleteOperation( - path, itemsToDelete, bucketsToDelete, operationId, resourceId, lockUpdateFuture); + List bucketsToDelete = new ArrayList<>(); + (fileInfo.getItemInfo().isBucket() ? bucketsToDelete : itemsToDelete).add(fileInfo); - deleteInternal(itemsToDelete, bucketsToDelete); - coopLockRecordsDao.unlockPaths(operationId, resourceId); - } finally { - lockUpdateFuture.cancel(/* mayInterruptIfRunning= */ true); - } - } else { + coopLockOp.ifPresent(o -> o.persistAndScheduleRenewal(itemsToDelete, bucketsToDelete)); + try { deleteInternal(itemsToDelete, bucketsToDelete); + + coopLockOp.ifPresent(CoopLockOperationDelete::unlock); + } finally { + coopLockOp.ifPresent(CoopLockOperationDelete::cancelRenewal); } repairImplicitDirectory(parentInfoFuture); @@ -786,16 +749,8 @@ public void compose(List sources, URI destination, String contentType) thro * copied and not the content of any file. */ private void renameInternal(FileInfo srcInfo, URI dst) throws IOException { - if (srcInfo.isDirectory() && options.enableCooperativeLocking()) { - String operationId = UUID.randomUUID().toString(); - StorageResourceId srcResourceId = pathCodec.validatePathAndGetId(srcInfo.getPath(), true); - StorageResourceId dstResourceId = pathCodec.validatePathAndGetId(dst, true); - - coopLockRecordsDao.lockPaths(operationId, srcResourceId, dstResourceId); - renameDirectoryInternal(srcInfo, dst, operationId); - coopLockRecordsDao.unlockPaths(operationId, srcResourceId, dstResourceId); - } else if (srcInfo.isDirectory()) { - renameDirectoryInternal(srcInfo, dst, /* operationId= */ null); + if (srcInfo.isDirectory()) { + renameDirectoryInternal(srcInfo, dst); } else { URI src = srcInfo.getPath(); StorageResourceId srcResourceId = pathCodec.validatePathAndGetId(src, true); @@ -825,11 +780,16 @@ private void renameInternal(FileInfo srcInfo, URI dst) throws IOException { * * @see #renameInternal */ - private void renameDirectoryInternal(FileInfo srcInfo, URI dst, String operationId) - throws IOException { + private void renameDirectoryInternal(FileInfo srcInfo, URI dst) throws IOException { checkArgument(srcInfo.isDirectory(), "'%s' should be a directory", srcInfo); - Pattern markerFilePattern = options.getMarkerFilePattern(); + URI src = srcInfo.getPath(); + + Optional coopLockOp = + options.enableCooperativeLocking() && src.getAuthority().equals(dst.getAuthority()) + ? Optional.of(CoopLockOperationRename.create(gcs, pathCodec, src, dst)) + : Optional.empty(); + coopLockOp.ifPresent(CoopLockOperationRename::lock); // Mapping from each src to its respective dst. // Sort src items so that parent directories appear before their children. @@ -839,13 +799,14 @@ private void renameDirectoryInternal(FileInfo srcInfo, URI dst, String operation // List of individual paths to rename; // we will try to carry out the copies in this list's order. - List srcItemInfos = listAllFileInfoForPrefix(srcInfo.getPath()); + List srcItemInfos = listAllFileInfoForPrefix(src); // Convert to the destination directory. dst = FileInfo.convertToDirectoryPath(pathCodec, dst); // Create a list of sub-items to copy. - String prefix = srcInfo.getPath().toString(); + Pattern markerFilePattern = options.getMarkerFilePattern(); + String prefix = src.toString(); for (FileInfo srcItemInfo : srcItemInfos) { String relativeItemName = srcItemInfo.getPath().toString().substring(prefix.length()); URI dstItemName = dst.resolve(relativeItemName); @@ -856,69 +817,56 @@ private void renameDirectoryInternal(FileInfo srcInfo, URI dst, String operation } } - Future lockUpdateFuture = null; - Instant operationInstant = Instant.now(); - if (options.enableCooperativeLocking() - && srcInfo.getItemInfo().getBucketName().equals(dst.getAuthority())) { - lockUpdateFuture = - coopLockOperationDao.persistUpdateOperation( - srcInfo, - dst, - operationId, - srcToDstItemNames, - srcToDstMarkerItemNames, - operationInstant); - } - - // Create the destination directory. - mkdir(dst); + coopLockOp.ifPresent( + o -> o.persistAndScheduleRenewal(srcToDstItemNames, srcToDstMarkerItemNames)); + try { + // Create the destination directory. + mkdir(dst); - // First, copy all items except marker items - copyInternal(srcToDstItemNames); - // Finally, copy marker items (if any) to mark rename operation success - copyInternal(srcToDstMarkerItemNames); + // First, copy all items except marker items + copyInternal(srcToDstItemNames); + // Finally, copy marker items (if any) to mark rename operation success + copyInternal(srcToDstMarkerItemNames); - if (options.enableCooperativeLocking() - && srcInfo.getItemInfo().getBucketName().equals(dst.getAuthority())) { - coopLockOperationDao.checkpointUpdateOperation(srcInfo, dst, operationId, operationInstant); - } + coopLockOp.ifPresent(CoopLockOperationRename::checkpoint); - // So far, only the destination directories are updated. Only do those now: - if (!srcToDstItemNames.isEmpty() || !srcToDstMarkerItemNames.isEmpty()) { - List allDestinationUris = - new ArrayList<>(srcToDstItemNames.size() + srcToDstMarkerItemNames.size()); - allDestinationUris.addAll(srcToDstItemNames.values()); - allDestinationUris.addAll(srcToDstMarkerItemNames.values()); + // So far, only the destination directories are updated. Only do those now: + if (!srcToDstItemNames.isEmpty() || !srcToDstMarkerItemNames.isEmpty()) { + List allDestinationUris = + new ArrayList<>(srcToDstItemNames.size() + srcToDstMarkerItemNames.size()); + allDestinationUris.addAll(srcToDstItemNames.values()); + allDestinationUris.addAll(srcToDstMarkerItemNames.values()); - tryUpdateTimestampsForParentDirectories(allDestinationUris, allDestinationUris); - } - - List bucketsToDelete = new ArrayList<>(1); - List srcItemsToDelete = new ArrayList<>(srcToDstItemNames.size() + 1); - srcItemsToDelete.addAll(srcToDstItemNames.keySet()); - if (srcInfo.getItemInfo().isBucket()) { - bucketsToDelete.add(srcInfo); - } else { - // If src is a directory then srcItemInfos does not contain its own name, - // therefore add it to the list before we delete items in the list. - srcItemsToDelete.add(srcInfo); - } + tryUpdateTimestampsForParentDirectories(allDestinationUris, allDestinationUris); + } - // First delete marker files from the src - deleteInternal(new ArrayList<>(srcToDstMarkerItemNames.keySet()), new ArrayList<>()); - // Then delete rest of the items that we successfully copied. - deleteInternal(srcItemsToDelete, bucketsToDelete); + List bucketsToDelete = new ArrayList<>(1); + List srcItemsToDelete = new ArrayList<>(srcToDstItemNames.size() + 1); + srcItemsToDelete.addAll(srcToDstItemNames.keySet()); + if (srcInfo.getItemInfo().isBucket()) { + bucketsToDelete.add(srcInfo); + } else { + // If src is a directory then srcItemInfos does not contain its own name, + // therefore add it to the list before we delete items in the list. + srcItemsToDelete.add(srcInfo); + } - // if we deleted a bucket, then there no need to update timestamps - if (bucketsToDelete.isEmpty()) { - List srcItemNames = - srcItemInfos.stream().map(FileInfo::getPath).collect(toCollection(ArrayList::new)); - // Any path that was deleted, we should update the parent except for parents we also deleted - tryUpdateTimestampsForParentDirectories(srcItemNames, srcItemNames); - } + // First delete marker files from the src + deleteInternal(new ArrayList<>(srcToDstMarkerItemNames.keySet()), new ArrayList<>()); + // Then delete rest of the items that we successfully copied. + deleteInternal(srcItemsToDelete, bucketsToDelete); + + // if we deleted a bucket, then there no need to update timestamps + if (bucketsToDelete.isEmpty()) { + List srcItemNames = + srcItemInfos.stream().map(FileInfo::getPath).collect(toCollection(ArrayList::new)); + // Any path that was deleted, we should update the parent except for parents we also deleted + tryUpdateTimestampsForParentDirectories(srcItemNames, srcItemNames); + } - if (lockUpdateFuture != null) { - lockUpdateFuture.cancel(/* mayInterruptIfRunning= */ false); + coopLockOp.ifPresent(CoopLockOperationRename::unlock); + } finally { + coopLockOp.ifPresent(CoopLockOperationRename::cancelRenewal); } } diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockOperationDao.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockOperationDao.java index 8a9ef9dfaf..ba0aff1770 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockOperationDao.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockOperationDao.java @@ -1,9 +1,24 @@ +/* + * Copyright 2019 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.google.cloud.hadoop.gcsio.cooplock; import static com.google.cloud.hadoop.gcsio.CreateObjectOptions.DEFAULT_CONTENT_TYPE; import static com.google.cloud.hadoop.gcsio.CreateObjectOptions.EMPTY_METADATA; import static com.google.cloud.hadoop.gcsio.cooplock.CoopLockRecordsDao.LOCK_DIRECTORY; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; @@ -16,9 +31,9 @@ import com.google.cloud.hadoop.gcsio.PathCodec; import com.google.cloud.hadoop.gcsio.StorageResourceId; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Streams; import com.google.common.flogger.GoogleLogger; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gson.Gson; import java.io.BufferedReader; import java.io.IOException; @@ -31,7 +46,6 @@ import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -43,9 +57,11 @@ public class CoopLockOperationDao { private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); - private static final Set VALID_OPERATIONS = ImmutableSet.of("delete", "rename"); + public static final String RENAME_LOG_RECORD_SEPARATOR = " -> "; + private static final String OPERATION_LOG_FILE_FORMAT = "%s_%s_%s.log"; private static final String OPERATION_LOCK_FILE_FORMAT = "%s_%s_%s.lock"; + private static final CreateObjectOptions CREATE_OBJECT_OPTIONS = new CreateObjectOptions(/* overwriteExisting= */ false, "application/text", EMPTY_METADATA); private static final CreateObjectOptions UPDATE_OBJECT_OPTIONS = @@ -56,6 +72,11 @@ public class CoopLockOperationDao { private static final Gson GSON = new Gson(); + private final ScheduledExecutorService scheduledThreadPool = + Executors.newScheduledThreadPool( + /* corePoolSize= */ 0, + new ThreadFactoryBuilder().setNameFormat("coop-lock-thread-%d").setDaemon(true).build()); + private GoogleCloudStorage gcs; private PathCodec pathCodec; @@ -65,20 +86,18 @@ public CoopLockOperationDao(GoogleCloudStorage gcs, PathCodec pathCodec) { } public Future persistDeleteOperation( - URI path, - List itemsToDelete, - List bucketsToDelete, String operationId, + Instant operationInstant, StorageResourceId resourceId, - Future lockUpdateFuture) + List itemsToDelete, + List bucketsToDelete) throws IOException { - Instant operationInstant = Instant.now(); URI operationLockPath = writeOperationFile( - path.getAuthority(), + resourceId.getBucketName(), OPERATION_LOCK_FILE_FORMAT, CREATE_OBJECT_OPTIONS, - "delete", + CoopLockOperationType.DELETE, operationId, operationInstant, ImmutableList.of( @@ -91,86 +110,90 @@ public Future persistDeleteOperation( .map(i -> i.getItemInfo().getResourceId().toString()) .collect(toImmutableList()); writeOperationFile( - path.getAuthority(), + resourceId.getBucketName(), OPERATION_LOG_FILE_FORMAT, CREATE_OBJECT_OPTIONS, - "delete", + CoopLockOperationType.DELETE, operationId, operationInstant, logRecords); // Schedule lock expiration update - lockUpdateFuture = - scheduleLockUpdate( - operationId, - operationLockPath, - DeleteOperation.class, - (o, i) -> o.setLockEpochSeconds(i.getEpochSecond())); - return lockUpdateFuture; + return scheduleLockUpdate( + operationId, + operationLockPath, + DeleteOperation.class, + (o, i) -> o.setLockEpochSeconds(i.getEpochSecond())); } - public Future persistUpdateOperation( - FileInfo srcInfo, - URI dst, + public Future persistRenameOperation( String operationId, + Instant operationInstant, + StorageResourceId src, + StorageResourceId dst, Map srcToDstItemNames, - Map srcToDstMarkerItemNames, - Instant operationInstant) + Map srcToDstMarkerItemNames) throws IOException { - Future lockUpdateFuture; URI operationLockPath = writeOperationFile( - dst.getAuthority(), + dst.getBucketName(), OPERATION_LOCK_FILE_FORMAT, CREATE_OBJECT_OPTIONS, - "rename", + CoopLockOperationType.RENAME, operationId, operationInstant, ImmutableList.of( GSON.toJson( new RenameOperation() .setLockEpochSeconds(operationInstant.getEpochSecond()) - .setSrcResource(srcInfo.getPath().toString()) + .setSrcResource(src.toString()) .setDstResource(dst.toString()) .setCopySucceeded(false)))); List logRecords = Streams.concat( srcToDstItemNames.entrySet().stream(), srcToDstMarkerItemNames.entrySet().stream()) - .map(e -> e.getKey().getItemInfo().getResourceId() + " -> " + e.getValue()) + .map( + e -> + e.getKey().getItemInfo().getResourceId() + + RENAME_LOG_RECORD_SEPARATOR + + e.getValue()) .collect(toImmutableList()); writeOperationFile( - dst.getAuthority(), + dst.getBucketName(), OPERATION_LOG_FILE_FORMAT, CREATE_OBJECT_OPTIONS, - "rename", + CoopLockOperationType.RENAME, operationId, operationInstant, logRecords); // Schedule lock expiration update - lockUpdateFuture = - scheduleLockUpdate( - operationId, - operationLockPath, - RenameOperation.class, - (o, i) -> o.setLockEpochSeconds(i.getEpochSecond())); - return lockUpdateFuture; + return scheduleLockUpdate( + operationId, + operationLockPath, + RenameOperation.class, + (o, i) -> o.setLockEpochSeconds(i.getEpochSecond())); } - public void checkpointUpdateOperation( - FileInfo srcInfo, URI dst, String operationId, Instant operationInstant) throws IOException { + public void checkpointRenameOperation( + StorageResourceId src, + StorageResourceId dst, + String operationId, + Instant operationInstant, + boolean copySucceeded) + throws IOException { writeOperationFile( - dst.getAuthority(), + dst.getBucketName(), OPERATION_LOCK_FILE_FORMAT, UPDATE_OBJECT_OPTIONS, - "rename", + CoopLockOperationType.RENAME, operationId, operationInstant, ImmutableList.of( GSON.toJson( new RenameOperation() .setLockEpochSeconds(Instant.now().getEpochSecond()) - .setSrcResource(srcInfo.getPath().toString()) + .setSrcResource(src.toString()) .setDstResource(dst.toString()) - .setCopySucceeded(true)))); + .setCopySucceeded(copySucceeded)))); } private void renewLockOrExit( @@ -179,9 +202,10 @@ private void renewLockOrExit( for (int i = 0; i < 10; i++) { try { renewLock(operationId, operationLockPath, renewFn); + return; } catch (IOException e) { logger.atWarning().withCause(e).log( - "Failed to renew '%s' lock for %s operation, retry #%d", + "Failed to renew '%s' lock for %s operation, attempt #%d", operationLockPath, operationId, i + 1); } sleepUninterruptibly(100, TimeUnit.MILLISECONDS); @@ -194,7 +218,8 @@ private void renewLockOrExit( private void renewLock( String operationId, URI operationLockPath, Function renewFn) throws IOException { - StorageResourceId lockId = StorageResourceId.fromObjectName(operationLockPath.toString()); + StorageResourceId lockId = + pathCodec.validatePathAndGetId(operationLockPath, /* allowEmptyObjectNames =*/ false); GoogleCloudStorageItemInfo lockInfo = gcs.getItemInfo(lockId); checkState(lockInfo.exists(), "lock file for %s operation should exist", operationId); @@ -208,30 +233,23 @@ private void renewLock( CreateObjectOptions updateOptions = new CreateObjectOptions( /* overwriteExisting= */ true, DEFAULT_CONTENT_TYPE, EMPTY_METADATA); - StorageResourceId operationLockPathResourceId = + StorageResourceId lockIdWithGeneration = new StorageResourceId( - operationLockPath.getAuthority(), - operationLockPath.getPath(), - lockInfo.getContentGeneration()); - writeOperation(operationLockPathResourceId, updateOptions, ImmutableList.of(lock)); + lockId.getBucketName(), lockId.getObjectName(), lockInfo.getContentGeneration()); + writeOperation(lockIdWithGeneration, updateOptions, ImmutableList.of(lock)); } private URI writeOperationFile( String bucket, String fileNameFormat, CreateObjectOptions createObjectOptions, - String operation, + CoopLockOperationType operationType, String operationId, Instant operationInstant, List records) throws IOException { - checkArgument( - VALID_OPERATIONS.contains(operation), - "operation must be one of $s, but was '%s'", - VALID_OPERATIONS, - operation); String date = LOCK_FILE_DATE_TIME_FORMAT.format(operationInstant); - String file = String.format(LOCK_DIRECTORY + fileNameFormat, date, operation, operationId); + String file = String.format(LOCK_DIRECTORY + fileNameFormat, date, operationType, operationId); URI path = pathCodec.getPath(bucket, file, /* allowEmptyObjectName= */ false); StorageResourceId resourceId = pathCodec.validatePathAndGetId(path, /* allowEmptyObjectName= */ false); @@ -250,8 +268,6 @@ private void writeOperation( } } - private ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1); - public Future scheduleLockUpdate( String operationId, URI operationLockPath, Class clazz, BiConsumer renewFn) { return scheduledThreadPool.scheduleAtFixedRate( diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockOperationDelete.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockOperationDelete.java new file mode 100644 index 0000000000..c97708729a --- /dev/null +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockOperationDelete.java @@ -0,0 +1,110 @@ +/* + * Copyright 2019 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.hadoop.gcsio.cooplock; + +import static com.google.cloud.hadoop.gcsio.cooplock.CoopLockOperationType.DELETE; +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.cloud.hadoop.gcsio.FileInfo; +import com.google.cloud.hadoop.gcsio.ForwardingGoogleCloudStorage; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorage; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl; +import com.google.cloud.hadoop.gcsio.PathCodec; +import com.google.cloud.hadoop.gcsio.StorageResourceId; +import com.google.common.base.MoreObjects; +import java.io.IOException; +import java.net.URI; +import java.time.Instant; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Future; + +public class CoopLockOperationDelete { + + private final String operationId = UUID.randomUUID().toString(); + private final Instant operationInstant = Instant.now(); + + private final StorageResourceId resourceId; + + private final CoopLockRecordsDao coopLockRecordsDao; + private final CoopLockOperationDao coopLockOperationDao; + + private Future lockUpdateFuture; + + private CoopLockOperationDelete( + GoogleCloudStorageImpl gcs, PathCodec pathCodec, StorageResourceId resourceId) { + this.resourceId = resourceId; + this.coopLockRecordsDao = new CoopLockRecordsDao(gcs); + this.coopLockOperationDao = new CoopLockOperationDao(gcs, pathCodec); + } + + public static CoopLockOperationDelete create( + GoogleCloudStorage gcs, PathCodec pathCodec, URI path) { + while (gcs instanceof ForwardingGoogleCloudStorage) { + gcs = ((ForwardingGoogleCloudStorage) gcs).getDelegate(); + } + checkArgument( + gcs instanceof GoogleCloudStorageImpl, + "gcs should be instance of %s, but was %s", + GoogleCloudStorageImpl.class, + gcs.getClass()); + return new CoopLockOperationDelete( + (GoogleCloudStorageImpl) gcs, + pathCodec, + pathCodec.validatePathAndGetId(path, /* allowEmptyObjectName= */ true)); + } + + public void lock() { + try { + coopLockRecordsDao.lockPaths(operationId, operationInstant, DELETE, resourceId); + } catch (IOException e) { + throw new RuntimeException(String.format("Failed to acquire lock for %s operation", this), e); + } + } + + public void persistAndScheduleRenewal( + List itemsToDelete, List bucketsToDelete) { + try { + lockUpdateFuture = + coopLockOperationDao.persistDeleteOperation( + operationId, operationInstant, resourceId, itemsToDelete, bucketsToDelete); + } catch (IOException e) { + throw new RuntimeException(String.format("Failed to persist %s operation", this), e); + } + } + + public void unlock() { + try { + coopLockRecordsDao.unlockPaths(operationId, resourceId); + } catch (IOException e) { + throw new RuntimeException(String.format("Failed to release lock for %s operation", this), e); + } + } + + public void cancelRenewal() { + lockUpdateFuture.cancel(/* mayInterruptIfRunning= */ true); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("operationId", operationId) + .add("operationInstant", operationInstant) + .add("resourceId", resourceId) + .toString(); + } +} diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockOperationRename.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockOperationRename.java new file mode 100644 index 0000000000..03c1baaa28 --- /dev/null +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockOperationRename.java @@ -0,0 +1,133 @@ +/* + * Copyright 2019 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.hadoop.gcsio.cooplock; + +import static com.google.cloud.hadoop.gcsio.cooplock.CoopLockOperationType.RENAME; +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.cloud.hadoop.gcsio.FileInfo; +import com.google.cloud.hadoop.gcsio.ForwardingGoogleCloudStorage; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorage; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl; +import com.google.cloud.hadoop.gcsio.PathCodec; +import com.google.cloud.hadoop.gcsio.StorageResourceId; +import com.google.common.base.MoreObjects; +import java.io.IOException; +import java.net.URI; +import java.time.Instant; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Future; + +public class CoopLockOperationRename { + + private final String operationId = UUID.randomUUID().toString(); + private final Instant operationInstant = Instant.now(); + + private final StorageResourceId srcResourceId; + private final StorageResourceId dstResourceId; + + private final CoopLockRecordsDao coopLockRecordsDao; + private final CoopLockOperationDao coopLockOperationDao; + + private Future lockUpdateFuture; + + private CoopLockOperationRename( + GoogleCloudStorageImpl gcs, + PathCodec pathCodec, + StorageResourceId srcResourceId, + StorageResourceId dstResourceId) { + this.srcResourceId = srcResourceId; + this.dstResourceId = dstResourceId; + this.coopLockRecordsDao = new CoopLockRecordsDao(gcs); + this.coopLockOperationDao = new CoopLockOperationDao(gcs, pathCodec); + } + + public static CoopLockOperationRename create( + GoogleCloudStorage gcs, PathCodec pathCodec, URI src, URI dst) { + while (gcs instanceof ForwardingGoogleCloudStorage) { + gcs = ((ForwardingGoogleCloudStorage) gcs).getDelegate(); + } + checkArgument( + gcs instanceof GoogleCloudStorageImpl, + "gcs should be instance of %s, but was %s", + GoogleCloudStorageImpl.class, + gcs.getClass()); + return new CoopLockOperationRename( + (GoogleCloudStorageImpl) gcs, + pathCodec, + pathCodec.validatePathAndGetId(src, /* allowEmptyObjectName= */ true), + pathCodec.validatePathAndGetId(dst, /* allowEmptyObjectName= */ true)); + } + + public void lock() { + try { + coopLockRecordsDao.lockPaths( + operationId, operationInstant, RENAME, srcResourceId, dstResourceId); + } catch (IOException e) { + throw new RuntimeException(String.format("Failed to acquire lock %s operation", this), e); + } + } + + public void persistAndScheduleRenewal( + Map srcToDstItemNames, Map srcToDstMarkerItemNames) { + try { + lockUpdateFuture = + coopLockOperationDao.persistRenameOperation( + operationId, + operationInstant, + srcResourceId, + dstResourceId, + srcToDstItemNames, + srcToDstMarkerItemNames); + } catch (IOException e) { + throw new RuntimeException(String.format("Failed to persist %s operation", this), e); + } + } + + public void checkpoint() { + try { + coopLockOperationDao.checkpointRenameOperation( + srcResourceId, dstResourceId, operationId, operationInstant, /* copySucceeded= */ true); + } catch (IOException e) { + throw new RuntimeException(String.format("Failed to checkpoint %s operation", this), e); + } + } + + public void unlock() { + try { + coopLockRecordsDao.unlockPaths(operationId, srcResourceId, dstResourceId); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to release unlock for %s operation", this), e); + } + } + + public void cancelRenewal() { + lockUpdateFuture.cancel(/* mayInterruptIfRunning= */ true); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("operationId", operationId) + .add("operationInstant", operationInstant) + .add("srcResourceId", srcResourceId) + .add("dstResourceId", dstResourceId) + .toString(); + } +} diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockOperationType.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockOperationType.java new file mode 100644 index 0000000000..87c082d63b --- /dev/null +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockOperationType.java @@ -0,0 +1,23 @@ +/* + * Copyright 2019 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.hadoop.gcsio.cooplock; + +/** Enum that represent cooperative locking operation type */ +public enum CoopLockOperationType { + RENAME, + DELETE +} diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockRecord.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockRecord.java index 043916aa34..38e39e802b 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockRecord.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockRecord.java @@ -1,3 +1,19 @@ +/* + * Copyright 2019 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.google.cloud.hadoop.gcsio.cooplock; import com.google.common.base.MoreObjects; @@ -6,10 +22,22 @@ /** Class that represent cooperative locking operation */ public class CoopLockRecord { + private String clientId; private String operationId; + private long operationEpochSeconds; + private CoopLockOperationType operationType; private long lockEpochSeconds; private Set resources = new TreeSet<>(); + public String getClientId() { + return clientId; + } + + public CoopLockRecord setClientId(String clientId) { + this.clientId = clientId; + return this; + } + public String getOperationId() { return operationId; } @@ -19,6 +47,24 @@ public CoopLockRecord setOperationId(String operationId) { return this; } + public long getOperationEpochSeconds() { + return operationEpochSeconds; + } + + public CoopLockRecord setOperationEpochSeconds(long operationEpochSeconds) { + this.operationEpochSeconds = operationEpochSeconds; + return this; + } + + public CoopLockOperationType getOperationType() { + return operationType; + } + + public CoopLockRecord setOperationType(CoopLockOperationType operationType) { + this.operationType = operationType; + return this; + } + public long getLockEpochSeconds() { return lockEpochSeconds; } @@ -40,7 +86,10 @@ public CoopLockRecord setResources(Set resources) { @Override public String toString() { return MoreObjects.toStringHelper(this) + .add("clientId", clientId) .add("operationId", operationId) + .add("operationEpochSeconds", operationEpochSeconds) + .add("operationType", operationType) .add("lockEpochSeconds", lockEpochSeconds) .add("resources", resources) .toString(); diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockRecords.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockRecords.java index 8352d0f37a..dbdb8fb1c9 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockRecords.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockRecords.java @@ -1,3 +1,19 @@ +/* + * Copyright 2019 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.google.cloud.hadoop.gcsio.cooplock; import static com.google.common.base.Preconditions.checkState; @@ -8,8 +24,13 @@ import java.util.TreeSet; public class CoopLockRecords { - /** Supported version of operation locks */ - public static final long FORMAT_VERSION = 1; + /** + * Supported version of operation locks persistent objects format. + * + *

When making any changes to cooperative locking persistent objects format (adding, renaming + * or removing fields), then you need to increase this version number to prevent corruption. + */ + public static final long FORMAT_VERSION = 2; private long formatVersion = -1; private Set locks = diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockRecordsDao.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockRecordsDao.java index e79c566f75..097086852f 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockRecordsDao.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/CoopLockRecordsDao.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Google Inc. All Rights Reserved. + * Copyright 2019 Google LLC. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,9 @@ import com.google.common.flogger.GoogleLogger; import com.google.gson.Gson; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.time.Duration; import java.time.Instant; import java.util.Arrays; import java.util.HashMap; @@ -43,21 +46,22 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; public class CoopLockRecordsDao { - private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); public static final String LOCK_DIRECTORY = "_lock/"; - private static final Gson GSON = new Gson(); - - public static final String LOCK_FILE = "all.lock"; + private static final String LOCK_FILE = "all.lock"; public static final String LOCK_PATH = LOCK_DIRECTORY + LOCK_FILE; private static final String LOCK_METADATA_KEY = "lock"; private static final int MAX_LOCKS_COUNT = 20; + private static final int RETRY_INTERVAL_MILLIS = 2_000; + + private static final Gson GSON = new Gson(); private final GoogleCloudStorageImpl gcs; @@ -77,31 +81,56 @@ public Set getLockedOperations(String bucketName) throws IOExcep ? new HashSet<>() : getLockRecords(lockInfo).getLocks(); logger.atFine().log( - "[%dms] lockPaths(%s): %s", System.currentTimeMillis() - startMs, bucketName, operations); + "[%dms] getLockedOperations(%s): %s", + System.currentTimeMillis() - startMs, bucketName, operations); return operations; } - public void lockOperation(String bucketName, String operationId, long lockEpochSeconds) + public void relockOperation( + String bucketName, String operationId, String clientId, long lockEpochSeconds) throws IOException { long startMs = System.currentTimeMillis(); - logger.atFine().log("lockOperation(%s, %d)", operationId, lockEpochSeconds); - modifyLock(this::updateLockEpochSeconds, bucketName, operationId, lockEpochSeconds); + logger.atFine().log("lockOperation(%s, %d)", operationId, clientId); + modifyLock( + records -> reacquireOperationLock(records, operationId, clientId, lockEpochSeconds), + bucketName, + operationId); logger.atFine().log( "[%dms] lockOperation(%s, %s)", - System.currentTimeMillis() - startMs, operationId, lockEpochSeconds); + System.currentTimeMillis() - startMs, operationId, clientId); } - public void lockPaths(String operationId, StorageResourceId... resources) throws IOException { + public void lockPaths( + String operationId, + Instant operationInstant, + CoopLockOperationType operationType, + StorageResourceId... resources) + throws IOException { long startMs = System.currentTimeMillis(); logger.atFine().log("lockPaths(%s, %s)", operationId, lazy(() -> Arrays.toString(resources))); Set objects = validateResources(resources); String bucketName = resources[0].getBucketName(); - modifyLock(this::addLockRecords, bucketName, operationId, objects); + modifyLock( + records -> addLockRecords(records, operationId, operationInstant, operationType, objects), + bucketName, + operationId); logger.atFine().log( "[%dms] lockPaths(%s, %s)", System.currentTimeMillis() - startMs, operationId, lazy(() -> Arrays.toString(resources))); } + public void unlockPaths(String operationId, StorageResourceId... resources) throws IOException { + long startMs = System.currentTimeMillis(); + logger.atFine().log("unlockPaths(%s, %s)", operationId, lazy(() -> Arrays.toString(resources))); + Set objects = validateResources(resources); + String bucketName = resources[0].getBucketName(); + modifyLock( + records -> removeLockRecords(records, operationId, objects), bucketName, operationId); + logger.atFine().log( + "[%dms] unlockPaths(%s, %s)", + System.currentTimeMillis() - startMs, operationId, lazy(() -> Arrays.toString(resources))); + } + private Set validateResources(StorageResourceId[] resources) { checkNotNull(resources, "resources should not be null"); checkArgument(resources.length > 0, "resources should not be empty"); @@ -113,22 +142,8 @@ private Set validateResources(StorageResourceId[] resources) { return Arrays.stream(resources).map(StorageResourceId::getObjectName).collect(toImmutableSet()); } - public void unlockPaths(String operationId, StorageResourceId... resources) throws IOException { - long startMs = System.currentTimeMillis(); - logger.atFine().log("unlockPaths(%s, %s)", operationId, lazy(() -> Arrays.toString(resources))); - Set objects = validateResources(resources); - String bucketName = resources[0].getBucketName(); - modifyLock(this::removeLockRecords, bucketName, operationId, objects); - logger.atFine().log( - "[%dms] unlockPaths(%s, %s)", - System.currentTimeMillis() - startMs, operationId, lazy(() -> Arrays.toString(resources))); - } - - private void modifyLock( - LockRecordsModificationFunction modificationFn, - String bucketName, - String operationId, - T modificationFnParam) + private void modifyLock( + Function modificationFn, String bucketName, String operationId) throws IOException { long startMs = System.currentTimeMillis(); StorageResourceId lockId = getLockId(bucketName); @@ -137,7 +152,7 @@ private void modifyLock( new ExponentialBackOff.Builder() .setInitialIntervalMillis(100) .setMultiplier(1.2) - .setMaxIntervalMillis(30_000) + .setMaxIntervalMillis((int) Duration.ofSeconds(MAX_LOCKS_COUNT).toMillis()) .setMaxElapsedTimeMillis(Integer.MAX_VALUE) .build(); @@ -145,7 +160,7 @@ private void modifyLock( try { GoogleCloudStorageItemInfo lockInfo = gcs.getItemInfo(lockId); if (!lockInfo.exists()) { - gcs.createEmptyObject(lockId, new CreateObjectOptions(false)); + gcs.createEmptyObject(lockId, new CreateObjectOptions(/* overwriteExisting= */ false)); lockInfo = gcs.getItemInfo(lockId); } CoopLockRecords lockRecords = @@ -154,15 +169,15 @@ private void modifyLock( ? new CoopLockRecords().setFormatVersion(CoopLockRecords.FORMAT_VERSION) : getLockRecords(lockInfo); - if (!modificationFn.apply(lockRecords, operationId, modificationFnParam)) { + if (!modificationFn.apply(lockRecords)) { logger.atInfo().atMostEvery(5, SECONDS).log( - "Failed to update %s entries in %s file: resources could be locked. Re-trying.", - modificationFnParam, lockRecords.getLocks().size(), lockId); - sleepUninterruptibly(backOff.nextBackOffMillis(), MILLISECONDS); + "Failed to update %s entries in %s file: resources could be locked, retrying.", + lockRecords.getLocks().size(), lockId); + sleepUninterruptibly(RETRY_INTERVAL_MILLIS, MILLISECONDS); continue; } - // Unlocked all objects - delete lock object + // If unlocked all objects - delete lock object if (lockRecords.getLocks().isEmpty()) { gcs.deleteObject(lockInfo.getResourceId(), lockInfo.getMetaGeneration()); break; @@ -170,9 +185,9 @@ private void modifyLock( if (lockRecords.getLocks().size() > MAX_LOCKS_COUNT) { logger.atInfo().atMostEvery(5, SECONDS).log( - "Skipping lock entries update in %s file: too many (%d) locked resources. Re-trying.", + "Skipping lock entries update in %s file: too many (%d) locked resources, retrying.", lockRecords.getLocks().size(), lockId); - sleepUninterruptibly(backOff.nextBackOffMillis(), MILLISECONDS); + sleepUninterruptibly(RETRY_INTERVAL_MILLIS, MILLISECONDS); continue; } @@ -183,20 +198,19 @@ private void modifyLock( gcs.updateMetadata(lockInfo, metadata); logger.atFine().log( - "Updated lock file in %dms for %s operation with %s parameter", - System.currentTimeMillis() - startMs, operationId, lazy(modificationFnParam::toString)); + "Updated lock file in %dms for %s operation", + System.currentTimeMillis() - startMs, operationId); break; } catch (IOException e) { // continue after sleep if update failed due to file generation mismatch or other // IOException if (e.getMessage().contains("conditionNotMet")) { logger.atInfo().atMostEvery(5, SECONDS).log( - "Failed to update entries (conditionNotMet) in %s file for operation %s. Re-trying.", + "Failed to update entries (conditionNotMet) in %s file for operation %s, retrying.", lockId, operationId); } else { logger.atWarning().withCause(e).log( - "Failed to modify lock for %s operation with %s parameter, retrying.", - operationId, modificationFnParam); + "Failed to modify lock for %s operation, retrying.", operationId); } sleepUninterruptibly(backOff.nextBackOffMillis(), MILLISECONDS); } @@ -204,8 +218,7 @@ private void modifyLock( } private StorageResourceId getLockId(String bucketName) { - String lockObject = "gs://" + bucketName + "/" + LOCK_PATH; - return StorageResourceId.fromObjectName(lockObject); + return new StorageResourceId(bucketName, LOCK_PATH); } private CoopLockRecords getLockRecords(GoogleCloudStorageItemInfo lockInfo) { @@ -219,8 +232,8 @@ private CoopLockRecords getLockRecords(GoogleCloudStorageItemInfo lockInfo) { return lockRecords; } - private boolean updateLockEpochSeconds( - CoopLockRecords lockRecords, String operationId, long lockEpochSeconds) { + private boolean reacquireOperationLock( + CoopLockRecords lockRecords, String operationId, String clientId, long lockEpochSeconds) { Optional operationOptional = lockRecords.getLocks().stream() .filter(o -> o.getOperationId().equals(operationId)) @@ -228,9 +241,14 @@ private boolean updateLockEpochSeconds( checkState(operationOptional.isPresent(), "operation %s not found", operationId); CoopLockRecord operation = operationOptional.get(); checkState( - lockEpochSeconds == operation.getLockEpochSeconds(), - "operation %s should have %s lock epoch, but was %s", + clientId.equals(operation.getClientId()), + "operation %s should be locked by %s client, but was %s", operationId, + clientId, + operation.getClientId()); + checkState( + lockEpochSeconds == operation.getLockEpochSeconds(), + "operation %s should be locked at %s epoch seconds but was at %s", lockEpochSeconds, operation.getLockEpochSeconds()); operation.setLockEpochSeconds(Instant.now().getEpochSecond()); @@ -238,32 +256,39 @@ private boolean updateLockEpochSeconds( } private boolean addLockRecords( - CoopLockRecords lockRecords, String operationId, Set resourcesToAdd) { + CoopLockRecords lockRecords, + String operationId, + Instant operationInstant, + CoopLockOperationType operationType, + Set resourcesToAdd) { // TODO: optimize to match more efficiently - if (lockRecords.getLocks().stream() - .flatMap(operation -> operation.getResources().stream()) - .anyMatch( - resource -> { - for (String resourceToAdd : resourcesToAdd) { - if (resourceToAdd.equals(resource) - || isChildObject(resource, resourceToAdd) - || isChildObject(resourceToAdd, resource)) { - return true; - } - } - return false; - })) { + boolean atLestOneResourceAlreadyLocked = + lockRecords.getLocks().stream() + .flatMap(operation -> operation.getResources().stream()) + .anyMatch( + lockedResource -> { + for (String resourceToAdd : resourcesToAdd) { + if (resourceToAdd.equals(lockedResource) + || isChildObject(lockedResource, resourceToAdd) + || isChildObject(resourceToAdd, lockedResource)) { + return true; + } + } + return false; + }); + if (atLestOneResourceAlreadyLocked) { return false; } - long lockEpochSeconds = Instant.now().getEpochSecond(); - lockRecords - .getLocks() - .add( - new CoopLockRecord() - .setOperationId(operationId) - .setResources(resourcesToAdd) - .setLockEpochSeconds(lockEpochSeconds)); + CoopLockRecord record = + new CoopLockRecord() + .setClientId(newClientId(operationId)) + .setOperationId(operationId) + .setOperationEpochSeconds(operationInstant.getEpochSecond()) + .setLockEpochSeconds(Instant.now().getEpochSecond()) + .setOperationType(operationType) + .setResources(resourcesToAdd); + lockRecords.getLocks().add(record); return true; } @@ -274,17 +299,16 @@ private boolean isChildObject(String parent, String child) { private boolean removeLockRecords( CoopLockRecords lockRecords, String operationId, Set resourcesToRemove) { - List operationLocksToRemoveRecord = + List recordsToRemove = lockRecords.getLocks().stream() .filter(o -> o.getResources().stream().anyMatch(resourcesToRemove::contains)) .collect(Collectors.toList()); checkState( - operationLocksToRemoveRecord.size() == 1 - && operationLocksToRemoveRecord.get(0).getOperationId().equals(operationId), + recordsToRemove.size() == 1 && recordsToRemove.get(0).getOperationId().equals(operationId), "All resources %s should belong to %s operation, but was %s", resourcesToRemove.size(), - operationLocksToRemoveRecord.size()); - CoopLockRecord operationToRemove = operationLocksToRemoveRecord.get(0); + recordsToRemove.size()); + CoopLockRecord operationToRemove = recordsToRemove.get(0); checkState( operationToRemove.getResources().equals(resourcesToRemove), "All of %s resources should be locked by operation, but was locked only %s resources", @@ -297,8 +321,15 @@ private boolean removeLockRecords( return true; } - @FunctionalInterface - private interface LockRecordsModificationFunction { - T apply(T1 p1, T2 p2, T3 p3); + private static String newClientId(String operationId) { + InetAddress localHost; + try { + localHost = InetAddress.getLocalHost(); + } catch (UnknownHostException e) { + throw new RuntimeException( + String.format("Failed to get clientId for %s operation", operationId), e); + } + String epochMillis = String.valueOf(Instant.now().toEpochMilli()); + return localHost.getCanonicalHostName() + "-" + epochMillis.substring(epochMillis.length() - 6); } } diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/DeleteOperation.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/DeleteOperation.java index d971c23875..0d40f74f0f 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/DeleteOperation.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/DeleteOperation.java @@ -1,3 +1,19 @@ +/* + * Copyright 2019 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.google.cloud.hadoop.gcsio.cooplock; import com.google.common.base.MoreObjects; diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/RenameOperation.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/RenameOperation.java index 4cb8f029b7..f509d40afa 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/RenameOperation.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/cooplock/RenameOperation.java @@ -1,3 +1,19 @@ +/* + * Copyright 2019 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.google.cloud.hadoop.gcsio.cooplock; import com.google.common.base.MoreObjects; diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/CoopLockIntegrationTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/CoopLockIntegrationTest.java index cef4fdb3a8..f9dfeb31fa 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/CoopLockIntegrationTest.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/CoopLockIntegrationTest.java @@ -19,6 +19,8 @@ import static com.google.cloud.hadoop.gcsio.TrackingHttpRequestInitializer.deleteRequestString; import static com.google.cloud.hadoop.gcsio.TrackingHttpRequestInitializer.updateMetadataRequestString; import static com.google.cloud.hadoop.gcsio.TrackingHttpRequestInitializer.uploadRequestString; +import static com.google.cloud.hadoop.gcsio.cooplock.CoopLockOperationType.DELETE; +import static com.google.cloud.hadoop.gcsio.cooplock.CoopLockOperationType.RENAME; import static com.google.cloud.hadoop.gcsio.cooplock.CoopLockRecordsDao.LOCK_DIRECTORY; import static com.google.cloud.hadoop.gcsio.cooplock.CoopLockRecordsDao.LOCK_PATH; import static com.google.common.base.Preconditions.checkNotNull; @@ -50,7 +52,7 @@ public class CoopLockIntegrationTest { private static final Gson GSON = new Gson(); - private static final String OPERATION_FILENAME_PATTERN = + private static final String OPERATION_FILENAME_PATTERN_FORMAT = "[0-9]{8}T[0-9]{6}\\.[0-9]{3}Z_%s_[a-z0-9\\-]+"; private static GoogleCloudStorageOptions gcsOptions; @@ -132,10 +134,9 @@ public void moveDirectory() throws Exception { .collect(toList()); assertThat(lockFiles).hasSize(2); - URI lockFileUri = - matchFile(lockFiles, String.format(OPERATION_FILENAME_PATTERN, "rename") + "\\.lock").get(); - URI logFileUri = - matchFile(lockFiles, String.format(OPERATION_FILENAME_PATTERN, "rename") + "\\.log").get(); + String fileNamePattern = String.format(OPERATION_FILENAME_PATTERN_FORMAT, RENAME); + URI lockFileUri = matchFile(lockFiles, fileNamePattern + "\\.lock").get(); + URI logFileUri = matchFile(lockFiles, fileNamePattern + "\\.log").get(); String lockContent = gcsfsIHelper.readTextFile(bucketName, lockFileUri.getPath()); assertThat(GSON.fromJson(lockContent, RenameOperation.class).setLockEpochSeconds(0)) @@ -182,11 +183,14 @@ public void deleteDirectory() throws Exception { .collect(toList()); assertThat(lockFiles).hasSize(2); - URI lockFileUri = - matchFile(lockFiles, String.format(OPERATION_FILENAME_PATTERN, "delete") + "\\.lock").get(); + String fileNamePattern = String.format(OPERATION_FILENAME_PATTERN_FORMAT, DELETE); + URI lockFileUri = matchFile(lockFiles, fileNamePattern + "\\.lock").get(); + URI logFileUri = matchFile(lockFiles, fileNamePattern + "\\.log").get(); String lockContent = gcsfsIHelper.readTextFile(bucketName, lockFileUri.getPath()); assertThat(GSON.fromJson(lockContent, DeleteOperation.class).setLockEpochSeconds(0)) .isEqualTo(new DeleteOperation().setLockEpochSeconds(0).setResource(dirUri.toString())); + assertThat(gcsfsIHelper.readTextFile(bucketName, logFileUri.getPath())) + .isEqualTo(dirUri.resolve(fileName) + "\n" + dirUri + "\n"); } private Optional matchFile(List files, String pattern) {