Skip to content

Commit

Permalink
Parallel upload fix
Browse files Browse the repository at this point in the history
  • Loading branch information
plyhun committed Dec 25, 2023
1 parent 77d8e59 commit adae05e
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 8 deletions.
17 changes: 17 additions & 0 deletions common/src/main/java/com/gentics/mesh/util/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,23 @@ public static Single<String> hash(Flowable<Buffer> stream) {
}
}

/**
* Generate a SHA 512 checksum from the data in the given buffer and synchronously return the hex encoded hash as a string.
*
* @param buffer
* @return Observable returning the SHA 512 checksum
*/
public static String hashSync(byte[] bytes) {
try {
MessageDigest md = MessageDigest.getInstance("SHA-512");
md.update(bytes);
return bytesToHex(md.digest());
} catch (Exception e) {
log.error("Error while hashing data", e);
throw error(INTERNAL_SERVER_ERROR, "generic_error", e);
}
}

/**
* Generate a SHA 512 checksum from the data in the given buffer and asynchronously return the hex encoded hash as a string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import static org.apache.commons.lang3.StringUtils.isEmpty;

import java.io.File;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -214,9 +216,11 @@ public void handleUpdateField(InternalActionContext ac, String nodeUuid, String
if (log.isDebugEnabled()) {
log.debug("Error detected. Purging previously stored upload for tempId {}", tmpId, e);
}
return binaryStorage.purgeTemporaryUpload(tmpId).doOnError(e1 -> {
log.error("Error while purging temporary upload for tempId {}", tmpId, e1);
}).onErrorComplete().andThen(Single.error(e));
return binaryStorage.purgeTemporaryUpload(tmpId)
.onErrorComplete(e1 -> e1 instanceof NoSuchFileException || (e.getCause() instanceof NoSuchFileException))
.doOnError(e1 -> {
log.error("Error while purging temporary upload for tempId {}", tmpId, e1);
}).andThen(Single.error(e));
} else {
return Single.error(e);
}
Expand Down Expand Up @@ -246,14 +250,15 @@ private Completable storeUploadInTemp(UploadContext ctx, FileUpload ul, String h
} else {
// File has already been stored. Lets remove the upload from the vert.x tmpdir. We no longer need it.
return fs.rxDelete(uploadFilePath)
.onErrorComplete(e1 -> e1 instanceof NoSuchFileException || (e1.getCause() instanceof NoSuchFileException))
.doOnComplete(() -> {
if (log.isTraceEnabled()) {
log.trace("Removed temporary file {}", uploadFilePath);
}
})
.doOnError(e -> {
log.warn("Failed to remove upload from tmpDir {}", uploadFilePath, e);
}).onErrorComplete();
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
Expand All @@ -15,15 +16,19 @@
import javax.inject.Inject;
import javax.inject.Singleton;

import org.apache.commons.lang.StringUtils;

import com.gentics.mesh.core.data.node.field.HibBinaryField;
import com.gentics.mesh.core.data.storage.AbstractBinaryStorage;
import com.gentics.mesh.core.data.storage.LocalBinaryStorage;
import com.gentics.mesh.etc.config.MeshOptions;
import com.gentics.mesh.etc.config.MeshUploadOptions;
import com.gentics.mesh.util.FileUtils;
import com.gentics.mesh.util.RxUtil;

import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.logging.Logger;
Expand Down Expand Up @@ -63,9 +68,23 @@ public Completable moveInPlace(String uuid, String temporaryId) {
}
File uploadFolder = new File(options.getDirectory(), getSegmentedPath(uuid));
return createParentPath(uploadFolder.getAbsolutePath())
.andThen(fileSystem.rxMove(source, target).doOnError(e -> {
log.error("Error while moving binary from temp upload dir {} to final dir {}", source, target);
}));
.andThen(fileSystem.rxMove(source, target)
.onErrorComplete(e -> {
if (e instanceof FileAlreadyExistsException || (e.getCause() instanceof FileAlreadyExistsException)) {
try (FileInputStream sourceFis = new FileInputStream(new File(source)); FileInputStream targetFis = new FileInputStream(new File(target))) {
String sourceHash = FileUtils.hashSync(sourceFis.readAllBytes());
String targetHash = FileUtils.hashSync(targetFis.readAllBytes());
boolean match = sourceHash.equals(targetHash);
log.warn("The file [{}] exists at the target [{}]. The hashes {}match. {}.", match ? StringUtils.EMPTY : "don't ", match ? "Suppressing the error" : "Aborting the upload");
return match;
}
} else {
return false;
}
})
.doOnError(e -> {
log.error("Error while moving binary from temp upload dir {} to final dir {}", source, target);
}));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ protected void testParallelUpload(boolean useSameName) throws IOException {
initialSyncWrites = initialSyncWrites.or(() -> Optional.of(mesh().globalLock().isSyncWrites()));
getTestContext().getInstanceProvider().setSyncWrites(isSyncWrites());

String defaultFilename = Long.toHexString(System.currentTimeMillis()) + "blume.jpg";

Observable.range(0, numUploads).flatMapSingle(number -> {
NodeCreateRequest request = new NodeCreateRequest();
request.setLanguage("en");
Expand All @@ -93,7 +95,7 @@ protected void testParallelUpload(boolean useSameName) throws IOException {
int size = data.length;
InputStream ins = new ByteArrayInputStream(data);
return client()
.updateNodeBinaryField(projectName(), node.getUuid(), "en", node.getVersion(), "image", ins, size, useSameName ? "blume.jpg" : (number + "blume.jpg"), "image/jpeg")
.updateNodeBinaryField(PROJECT_NAME, node.getUuid(), "en", node.getVersion(), "image", ins, size, useSameName ? defaultFilename : (number + defaultFilename), "image/jpeg")
.toSingle();
});
}).lastOrError().ignoreElement().blockingAwait();
Expand Down

0 comments on commit adae05e

Please sign in to comment.