diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java index 8323884ae57..367141ae509 100644 --- a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java +++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java @@ -444,7 +444,7 @@ public BlobStoreFileInputStream(BlobStoreFile part) throws IOException { @Override public long getVersion() throws IOException { - return part.getModTime(); + return part.getVersion(); } @Override diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreFile.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreFile.java index 2f47978b336..bd901a0c234 100644 --- a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreFile.java +++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreFile.java @@ -42,6 +42,10 @@ public abstract class BlobStoreFile { public abstract long getModTime() throws IOException; + public long getVersion() throws IOException { + return getModTime(); + } + public abstract InputStream getInputStream() throws IOException; public abstract OutputStream getOutputStream() throws IOException; diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java index a8f519d6453..f708946fbe2 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java @@ -291,7 +291,7 @@ public ReadableBlobMeta getBlobMeta(String key, Subject who) throws Authorizatio rbm.set_settable(meta); try { LocalFsBlobStoreFile pf = fbs.read(DATA_PREFIX + key); - rbm.set_version(pf.getModTime()); + rbm.set_version(pf.getVersion()); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStoreFile.java b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStoreFile.java index 2262e908176..128377f9811 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStoreFile.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStoreFile.java @@ -2,9 +2,9 @@ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions * and limitations under the License. @@ -21,14 +21,20 @@ import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.util.regex.Matcher; +import java.util.zip.CRC32C; +import java.util.zip.Checksum; + +import org.apache.commons.io.FileUtils; import org.apache.storm.generated.SettableBlobMeta; + public class LocalFsBlobStoreFile extends BlobStoreFile { private final String key; private final boolean isTmp; private final File path; private final boolean mustBeNew; + private final Checksum checksumAlgorithm; private SettableBlobMeta meta; public LocalFsBlobStoreFile(File base, String name) { @@ -44,12 +50,14 @@ public LocalFsBlobStoreFile(File base, String name) { key = base.getName(); path = new File(base, name); mustBeNew = false; + checksumAlgorithm = new CRC32C(); } public LocalFsBlobStoreFile(File base, boolean isTmp, boolean mustBeNew) { key = base.getName(); this.isTmp = isTmp; this.mustBeNew = mustBeNew; + checksumAlgorithm = new CRC32C(); if (this.isTmp) { path = new File(base, System.currentTimeMillis() + TMP_EXT); } else { @@ -72,6 +80,11 @@ public String getKey() { return key; } + @Override + public long getVersion() throws IOException { + return FileUtils.checksum(path, checksumAlgorithm).getValue(); + } + @Override public long getModTime() throws IOException { return path.lastModified(); diff --git a/storm-server/src/test/java/org/apache/storm/blobstore/LocalFsBlobStoreFileTest.java b/storm-server/src/test/java/org/apache/storm/blobstore/LocalFsBlobStoreFileTest.java new file mode 100644 index 00000000000..2faae2a5185 --- /dev/null +++ b/storm-server/src/test/java/org/apache/storm/blobstore/LocalFsBlobStoreFileTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.blobstore; + +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.util.zip.CRC32C; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +class LocalFsBlobStoreFileTest { + + private File tempFile; + private LocalFsBlobStoreFile blobStoreFile; + private CRC32C checksumAlgorithm; + + @BeforeEach + public void setUp() throws IOException { + tempFile = Files.createTempFile(null, ".tmp").toFile(); + try (FileOutputStream fs = new FileOutputStream(tempFile)) { + fs.write("Content for checksum".getBytes()); + } + blobStoreFile = new LocalFsBlobStoreFile(tempFile.getParentFile(), tempFile.getName()); + checksumAlgorithm= new CRC32C(); + } + + @Test + void testGetVersion() throws IOException { + long expectedVersion = FileUtils.checksum(tempFile, checksumAlgorithm).getValue(); + long actualVersion = blobStoreFile.getVersion(); + assertEquals(expectedVersion, actualVersion, "The version should match the expected checksum value."); + } + + @Test + void testGetVersion_Mismatch() throws IOException { + long expectedVersion = FileUtils.checksum(tempFile, checksumAlgorithm).getValue(); + try (FileOutputStream fs = new FileOutputStream(tempFile)) { + fs.write("Different content".getBytes()); + } + long actualVersion = blobStoreFile.getVersion(); + assertNotEquals(expectedVersion, actualVersion, "The version shouldn't match the checksum value of different content."); + } + + @Test + void testGetModTime() throws IOException { + long expectedModTime = tempFile.lastModified(); + long actualModTime = blobStoreFile.getModTime(); + assertEquals(expectedModTime, actualModTime, "The modification time should match the expected value."); + } +}