From adf706016b1a6dfe7ef68a3f5673257a54767b0b Mon Sep 17 00:00:00 2001 From: gusu Date: Thu, 11 Aug 2022 23:38:17 +0800 Subject: [PATCH] add time unit for ttl Signed-off-by: gusu --- .../java/org/tikv/raw/RawKVClientBase.java | 69 +++++++++++++++++++ src/test/java/org/tikv/raw/CASTest.java | 13 ++-- .../java/org/tikv/raw/RawKVClientTest.java | 3 +- 3 files changed, 79 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/tikv/raw/RawKVClientBase.java b/src/main/java/org/tikv/raw/RawKVClientBase.java index 74eacc854cc..1a9f2ae7d0c 100644 --- a/src/main/java/org/tikv/raw/RawKVClientBase.java +++ b/src/main/java/org/tikv/raw/RawKVClientBase.java @@ -21,12 +21,14 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; import org.tikv.common.TiSession; import org.tikv.common.util.Pair; import org.tikv.common.util.ScanOption; import org.tikv.kvproto.Kvrpcpb; public interface RawKVClientBase extends AutoCloseable { + // https://www.github.com/pingcap/tidb/blob/master/store/tikv/rawkv.go int MAX_RAW_SCAN_LIMIT = 10240; int MAX_RAW_BATCH_LIMIT = 1024; @@ -51,6 +53,20 @@ public interface RawKVClientBase extends AutoCloseable { */ void put(ByteString key, ByteString value, long ttl); + /** + * Put a raw key-value pair to TiKV + * + * @see #put(ByteString, ByteString, long) + * @param key raw key + * @param value raw value + * @param duration the duration of the key, 0 means the key will never be outdated + * @param timeUnit the time unit of duration + */ + default void put(ByteString key, ByteString value, long duration, TimeUnit timeUnit) { + Long seconds = timeUnit.toSeconds(duration); + put(key, value, seconds); + } + /** * Put a key-value pair if it does not exist. This API is atomic. * @@ -76,6 +92,25 @@ public interface RawKVClientBase extends AutoCloseable { */ Optional putIfAbsent(ByteString key, ByteString value, long ttl); + /** + * Put a key-value pair with TTL if it does not exist. This API is atomic. + * + *

To use this API, please enable `tikv.enable_atomic_for_cas`. + * + * @see #putIfAbsent(ByteString, ByteString, long) + * @param key key + * @param value value + * @param duration duration of key, 0 means the key will never be outdated. + * @param timeUnit the time unit of duration + * @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the + * previous key if the value already exists, and does not write to TiKV. + */ + default Optional putIfAbsent( + ByteString key, ByteString value, long duration, TimeUnit timeUnit) { + Long seconds = timeUnit.toSeconds(duration); + return putIfAbsent(key, value, seconds); + } + /** * Put a key-value pair if the prevValue matched the value in TiKV. This API is atomic. * @@ -97,6 +132,27 @@ public interface RawKVClientBase extends AutoCloseable { */ void compareAndSet(ByteString key, Optional prevValue, ByteString value, long ttl); + /** + * pair if the prevValue matched the value in TiKV. This API is atomic. + * + *

To use this API, please enable `tikv.enable_atomic_for_cas`. + * + * @see #compareAndSet(ByteString, Optional, ByteString, long) + * @param key key + * @param value value + * @param duration duration of key , 0 means the key will never be outdated. + * @param timeUnit time unit of duration + */ + default void compareAndSet( + ByteString key, + Optional prevValue, + ByteString value, + long duration, + TimeUnit timeUnit) { + long seconds = timeUnit.toSeconds(duration); + compareAndSet(key, prevValue, value, seconds); + } + /** * Put a set of raw key-value pair to TiKV. * @@ -112,6 +168,19 @@ public interface RawKVClientBase extends AutoCloseable { */ void batchPut(Map kvPairs, long ttl); + /** + * Put a set of raw key-value pair to TiKV. + * + * @see #batchPut(Map, long) + * @param kvPairs kvPairs + * @param duration the duration of keys to be put, 0 means the keys will never be outdated + * @param timeUnit time unit of duration + */ + default void batchPut(Map kvPairs, long duration, TimeUnit timeUnit) { + long seconds = timeUnit.toSeconds(duration); + batchPut(kvPairs, seconds); + } + /** * Get a raw key-value pair from TiKV if key exists * diff --git a/src/test/java/org/tikv/raw/CASTest.java b/src/test/java/org/tikv/raw/CASTest.java index 71a17ef80b4..a7e80af4bba 100644 --- a/src/test/java/org/tikv/raw/CASTest.java +++ b/src/test/java/org/tikv/raw/CASTest.java @@ -22,6 +22,7 @@ import com.google.protobuf.ByteString; import java.util.Optional; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -34,6 +35,7 @@ import org.tikv.common.exception.RawCASConflictException; public class CASTest extends BaseRawKVTest { + private RawKVClient client; private boolean initialized; private static final Logger logger = LoggerFactory.getLogger(RawKVClientTest.class); @@ -81,21 +83,22 @@ public void rawCASTest() { @Test public void rawPutIfAbsentTest() { - long ttl = 10; + long duration = 10; + TimeUnit timeUnit = TimeUnit.SECONDS; ByteString key = ByteString.copyFromUtf8("key_atomic"); ByteString value = ByteString.copyFromUtf8("value"); ByteString value2 = ByteString.copyFromUtf8("value2"); client.delete(key); - Optional res1 = client.putIfAbsent(key, value, ttl); + Optional res1 = client.putIfAbsent(key, value, duration, timeUnit); assertFalse(res1.isPresent()); - Optional res2 = client.putIfAbsent(key, value2, ttl); + Optional res2 = client.putIfAbsent(key, value2, duration, timeUnit); assertEquals(res2.get(), value); try { - Thread.sleep(ttl * 1000 + 100); + Thread.sleep(duration * 1000 + 100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - Optional res3 = client.putIfAbsent(key, value, ttl); + Optional res3 = client.putIfAbsent(key, value, duration, timeUnit); assertFalse(res3.isPresent()); } } diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index 08608ae11aa..289278381c4 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -68,6 +68,7 @@ import org.tikv.kvproto.Kvrpcpb.KvPair; public class RawKVClientTest extends BaseRawKVTest { + private static final String RAW_PREFIX = "raw_\u0001_"; private static final int KEY_POOL_SIZE = 1000000; private static final int TEST_CASES = 10000; @@ -143,7 +144,7 @@ public void getKeyTTLTest() { long ttl = 10; ByteString key = ByteString.copyFromUtf8("key_ttl"); ByteString value = ByteString.copyFromUtf8("value"); - client.put(key, value, ttl); + client.put(key, value, ttl, TimeUnit.SECONDS); for (int i = 0; i < 9; i++) { Optional t = client.getKeyTTL(key); logger.info("current ttl of key is " + t.orElse(null));