Skip to content

Commit

Permalink
Fix for nar unpacking
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilerigila09 committed Sep 9, 2024
1 parent ca0fb44 commit 3c77137
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,31 @@ static File doUnpackNar(final File nar, final File baseWorkingDirectory, Runnabl
synchronized (localLock) {
// create file lock that ensures that other processes
// using the same lock file don't execute concurrently
File successMarkerFile = new File(parentDirectory, "." + md5Sum + ".success");
try (FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
FileLock lock = channel.lock()) {
File narWorkingDirectory = new File(parentDirectory, md5Sum);
if (narWorkingDirectory.mkdir()) {
if (narWorkingDirectory.mkdir() || !successMarkerFile.exists()) {
try {
log.info("Extracting {} to {}", nar, narWorkingDirectory);
if (extractCallback != null) {
extractCallback.run();
}
unpack(nar, narWorkingDirectory);
boolean successMarkerFileCreated = successMarkerFile.createNewFile();
log.info("Successfully extracted nar file, status for creating success marker at {} is {}",
successMarkerFile.getAbsolutePath(), successMarkerFileCreated);
} catch (IOException e) {
log.error("There was a problem extracting the nar file. Deleting {} to clean up state.",
narWorkingDirectory, e);
FileUtils.deleteFile(narWorkingDirectory, true);
FileUtils.deleteFile(successMarkerFile, null);
throw e;
}
} else {
log.info("Extraction directory {}, exists = {}, success file {}, exists = {}."
+ "Extraction will be skipped.", narWorkingDirectory, narWorkingDirectory.exists(),
successMarkerFile, successMarkerFile.exists());
}
return narWorkingDirectory;
}
Expand Down Expand Up @@ -166,7 +175,7 @@ private static void makeFile(final InputStream inputStream, final File file) thr
* @throws IOException
* if cannot read file
*/
private static byte[] calculateMd5sum(final File file) throws IOException {
protected static byte[] calculateMd5sum(final File file) throws IOException {
try (final FileInputStream inputStream = new FileInputStream(file)) {
// codeql[java/weak-cryptographic-algorithm] - md5 is sufficient for this use case
final MessageDigest md5 = MessageDigest.getInstance("md5");
Expand All @@ -184,4 +193,4 @@ private static byte[] calculateMd5sum(final File file) throws IOException {
throw new IllegalArgumentException(nsae);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
*/
package org.apache.pulsar.common.nar;

import static org.apache.pulsar.common.nar.NarUnpacker.calculateMd5sum;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Base64;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -99,6 +101,68 @@ void shouldExtractFilesOnceInSameProcess() throws InterruptedException {
assertEquals(extractCounter.get(), 1);
}

@Test
void shouldNotCreateSuccessFileIfExtractionFails() throws InterruptedException {
int threads = 1;
CountDownLatch countDownLatch = new CountDownLatch(threads);
AtomicInteger exceptionCounter = new AtomicInteger();
AtomicInteger extractCounter = new AtomicInteger();
new Thread(() -> {
try {
// Simulate a failure during unpacking using empty zip file
String corruptZip = "corrupt.zip";
File corruptFile = new File(extractDirectory, corruptZip);
Files.createFile(corruptFile.toPath());
File narWorkingDirectory = NarUnpacker.doUnpackNar(corruptFile, extractDirectory, extractCounter::incrementAndGet);
String md5Sum = Base64.getUrlEncoder().withoutPadding().encodeToString(calculateMd5sum(sampleZipFile));
File successMarkerFile = new File(narWorkingDirectory.getParentFile(), "." + md5Sum + ".success");
assertTrue(successMarkerFile.exists(), "Success file should be created");
} catch (Exception e) {
log.error("Unpacking failed", e);
exceptionCounter.incrementAndGet();
} finally {
countDownLatch.countDown();
}
}).start();
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
assertEquals(exceptionCounter.get(), 1);
assertEquals(extractCounter.get(), 1);
}

@Test
void shouldHandleSuccessFileDeletion() throws InterruptedException, IOException {
int threads = 1;
CountDownLatch countDownLatch = new CountDownLatch(threads);
AtomicInteger exceptionCounter = new AtomicInteger();
AtomicInteger extractCounter = new AtomicInteger();
try {
// Extract the file initially
File narWorkingDirectory = NarUnpacker.doUnpackNar(sampleZipFile, extractDirectory, extractCounter::incrementAndGet);
String md5Sum = Base64.getUrlEncoder().withoutPadding().encodeToString(calculateMd5sum(sampleZipFile));
File successMarkerFile = new File(narWorkingDirectory.getParentFile(), "." + md5Sum + ".success");
assertTrue(successMarkerFile.exists(), "Success file should be created");

// Delete the success file
successMarkerFile.delete();

// Try extracting again
new Thread(() -> {
try {
NarUnpacker.doUnpackNar(sampleZipFile, extractDirectory, extractCounter::incrementAndGet);
} catch (Exception e) {
log.error("Unpacking failed", e);
exceptionCounter.incrementAndGet();
} finally {
countDownLatch.countDown();
}
}).start();
} finally {
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
assertEquals(exceptionCounter.get(), 0);
assertEquals(extractCounter.get(), 2);
}
}

public static class NarUnpackerWorker {
public static void main(String[] args) {
File sampleZipFile = new File(args[0]);
Expand Down

0 comments on commit 3c77137

Please sign in to comment.