Skip to content

Commit

Permalink
Fix ResolveLock not called in Scan API (#2089) (#2091)
Browse files Browse the repository at this point in the history
* fix v2.4 scan bug

fix resolve lock not called in Scan API

Signed-off-by: birdstorm <[email protected]>

* remove backoff

Signed-off-by: birdstorm <[email protected]>

Co-authored-by: birdstorm <[email protected]>
  • Loading branch information
ti-srebot and birdstorm authored Sep 6, 2021
1 parent 57b8940 commit 1effd9a
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 123 deletions.
3 changes: 3 additions & 0 deletions config/tidb.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ enable-table-lock = true
# In order to support "drop primary key" operation , this flag must be true and the table does not have the pkIsHandle flag.
alter-primary-key = true

# index-limit is used to deal with compatibility issues. It can only be in [64, 64*8].
index-limit = 512

[log]
# Log level: info, debug, warn, error, fatal.
level = "info"
Expand Down
26 changes: 0 additions & 26 deletions tikv-client/src/main/java/com/pingcap/tikv/KVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,24 +110,6 @@ public List<KvPair> scan(ByteString startKey, ByteString endKey, long version)
return result;
}

/**
* Scan key-value pairs from TiKV in range [startKey, ♾), maximum to `limit` pairs
*
* @param startKey start key, inclusive
* @param limit limit of kv pairs
* @return list of key-value pairs in range
*/
public List<KvPair> scan(ByteString startKey, long version, int limit) throws GrpcException {
Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, version, limit);
List<KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
}

public List<KvPair> scan(ByteString startKey, long version) throws GrpcException {
return scan(startKey, version, Integer.MAX_VALUE);
}

private List<KvPair> doSendBatchGet(BackOffer backOffer, List<ByteString> keys, long version) {
ExecutorCompletionService<List<KvPair>> completionService =
new ExecutorCompletionService<>(batchGetThreadPool);
Expand Down Expand Up @@ -190,12 +172,4 @@ private Iterator<KvPair> scanIterator(
return new ConcreteScanIterator(conf, builder, startKey, endKey, version);
}

private Iterator<KvPair> scanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
long version,
int limit) {
return new ConcreteScanIterator(conf, builder, startKey, version, limit);
}
}
15 changes: 0 additions & 15 deletions tikv-client/src/main/java/com/pingcap/tikv/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,6 @@ public Iterator<Handle> indexHandleRead(TiDAGRequest dagRequest, List<RegionTask
return getHandleIterator(dagRequest, tasks, session);
}

/**
* scan all keys after startKey, inclusive
*
* @param startKey start of keys
* @return iterator of kvPair
*/
public Iterator<KvPair> scan(ByteString startKey) {
return new ConcreteScanIterator(
session.getConf(),
session.getRegionStoreClientBuilder(),
startKey,
timestamp.getVersion(),
Integer.MAX_VALUE);
}

/**
* scan all keys with prefix
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Kvrpcpb.KvPair;

public class MetaCodec {
protected static final Logger logger = LoggerFactory.getLogger(MetaCodec.class);
public static final String ENCODED_DB_PREFIX = "DB";
public static final String KEY_TID = "TID";
private static final byte[] META_PREFIX = new byte[] {'m'};
Expand Down Expand Up @@ -115,7 +118,7 @@ public static List<Pair<ByteString, ByteString>> hashGetFields(
List<Pair<ByteString, ByteString>> fields = new ArrayList<>();
while (iterator.hasNext()) {
Kvrpcpb.KvPair kv = iterator.next();
if (kv == null || kv.getKey() == null) {
if (kv == null || kv.getKey().isEmpty()) {
continue;
}
fields.add(Pair.create(MetaCodec.decodeHashDataKey(kv.getKey()).second, kv.getValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@
import com.pingcap.tikv.util.BackOffer;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Kvrpcpb.KvPair;
import org.tikv.kvproto.Kvrpcpb.ScanResponse;

// TODO: consider refactor to Builder mode
// TODO: KVErrorHandler should resolve locks if it could.
Expand Down Expand Up @@ -101,6 +105,24 @@ private void invalidateRegionStoreCache(TiRegion ctxRegion) {
regionManager.invalidateStore(ctxRegion.getLeader().getStoreId());
}

private void resolveLocks(BackOffer backOffer, List<Lock> locks) {
if (lockResolverClient != null) {
logger.warn("resolving " + locks.size() + " locks");

ResolveLockResult resolveLockResult =
lockResolverClient.resolveLocks(backOffer, callerStartTS, locks, forWrite);
resolveLockResultCallback.apply(resolveLockResult);
long msBeforeExpired = resolveLockResult.getMsBeforeTxnExpired();
if (msBeforeExpired > 0) {
// if not resolve all locks, we wait and retry
backOffer.doBackOffWithMaxSleep(
BoTxnLockFast,
msBeforeExpired,
new KeyException("not all locks resolved: " + locks.size()));
}
}
}

private void resolveLock(BackOffer backOffer, Lock lock) {
if (lockResolverClient != null) {
logger.warn("resolving lock");
Expand Down Expand Up @@ -256,15 +278,34 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) {

boolean retry = false;

// Key error handling logic
Kvrpcpb.KeyError keyError = getKeyError.apply(resp);
if (keyError != null) {
try {
Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(keyError);
resolveLock(backOffer, lock);
retry = true;
} catch (KeyException e) {
logger.warn("Unable to handle KeyExceptions other than LockException", e);
if (resp instanceof ScanResponse) {
List<KvPair> kvPairs = ((ScanResponse) resp).getPairsList();
List<Lock> locks = new ArrayList<>();
for (KvPair kvPair : kvPairs) {
if (kvPair.hasError()) {
Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(kvPair.getError());
locks.add(lock);
}
}
if (!locks.isEmpty()) {
try {
resolveLocks(backOffer, locks);
retry = true;
} catch (KeyException e) {
logger.warn("Unable to handle KeyExceptions other than LockException", e);
}
}
} else {
// Key error handling logic
Kvrpcpb.KeyError keyError = getKeyError.apply(resp);
if (keyError != null) {
try {
Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(keyError);
resolveLock(backOffer, lock);
retry = true;
} catch (KeyException e) {
logger.warn("Unable to handle KeyExceptions other than LockException", e);
}
}
}
return retry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import com.pingcap.tikv.TiConfiguration;
import com.pingcap.tikv.exception.GrpcException;
import com.pingcap.tikv.exception.KeyException;
import com.pingcap.tikv.exception.TiKVException;
import com.pingcap.tikv.key.Key;
import com.pingcap.tikv.region.RegionStoreClient;
import com.pingcap.tikv.region.RegionStoreClient.RegionStoreClientBuilder;
import com.pingcap.tikv.region.TiRegion;
import com.pingcap.tikv.util.BackOffFunction;
import com.pingcap.tikv.util.BackOffer;
import com.pingcap.tikv.util.ConcreteBackOffer;
import com.pingcap.tikv.util.Pair;
Expand All @@ -40,45 +42,35 @@ public class ConcreteScanIterator extends ScanIterator {
private final long version;
private final Logger logger = LoggerFactory.getLogger(ConcreteScanIterator.class);

public ConcreteScanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
long version,
int limit) {
// Passing endKey as ByteString.EMPTY means that endKey is +INF by default,
this(conf, builder, startKey, ByteString.EMPTY, version, limit);
}

public ConcreteScanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
long version) {
// Passing endKey as ByteString.EMPTY means that endKey is +INF by default,
this(conf, builder, startKey, endKey, version, Integer.MAX_VALUE);
}

private ConcreteScanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
long version,
int limit) {
super(conf, builder, startKey, endKey, limit);
super(conf, builder, startKey, endKey, Integer.MAX_VALUE);
this.version = version;
}

@Override
TiRegion loadCurrentRegionToCache() throws GrpcException {
TiRegion region;
try (RegionStoreClient client = builder.build(startKey)) {
region = client.getRegion();
BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff();
currentCache = client.scan(backOffer, startKey, version);
return region;
BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff();
while (true) {
try (RegionStoreClient client = builder.build(startKey)) {
TiRegion region = client.getRegion();
if (limit <= 0) {
currentCache = null;
} else {
try {
currentCache = client.scan(backOffer, startKey, limit, version);
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
continue;
}
}
return region;
}
}
}

Expand All @@ -96,25 +88,39 @@ private ByteString resolveCurrentLock(Kvrpcpb.KvPair current) {
}
}

/**
* Cache is drained when - no data extracted - scan limit was not defined - have read the last
* index of cache - index not initialized
*
* @return whether cache is drained
*/
private boolean isCacheDrained() {
return currentCache == null || limit <= 0 || index >= currentCache.size() || index == -1;
}

private boolean notEndOfScan() {
return limit > 0
&& !(processingLastBatch
&& (index >= currentCache.size()
|| Key.toRawKey(currentCache.get(index).getKey()).compareTo(endKey) >= 0));
}

@Override
public boolean hasNext() {
Kvrpcpb.KvPair current;
if (isCacheDrained() && cacheLoadFails()) {
endOfScan = true;
return false;
}
// continue when cache is empty but not null
do {
current = getCurrent();
if (isCacheDrained() && cacheLoadFails()) {
endOfScan = true;
while (currentCache != null && currentCache.isEmpty()) {
if (cacheLoadFails()) {
return false;
}
} while (currentCache != null && current == null);
// for last batch to be processed, we have to check if
return !processingLastBatch
|| current == null
|| (hasEndKey && Key.toRawKey(current.getKey()).compareTo(endKey) < 0);
}
return notEndOfScan();
}

@Override
public KvPair next() {
private Kvrpcpb.KvPair getCurrent() {
--limit;
KvPair current = currentCache.get(index++);

Expand All @@ -127,20 +133,8 @@ public KvPair next() {
return current;
}

/**
* Cache is drained when - no data extracted - scan limit was not defined - have read the last
* index of cache - index not initialized
*
* @return whether cache is drained
*/
private boolean isCacheDrained() {
return currentCache == null || limit <= 0 || index >= currentCache.size() || index == -1;
}

private Kvrpcpb.KvPair getCurrent() {
if (isCacheDrained()) {
return null;
}
return currentCache.get(index);
@Override
public Kvrpcpb.KvPair next() {
return getCurrent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,8 @@ public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
ByteString endKey,
int limit) {
this.startKey = requireNonNull(startKey, "start key is null");
if (startKey.isEmpty()) {
throw new IllegalArgumentException("start key cannot be empty");
}
this.endKey = Key.toRawKey(requireNonNull(endKey, "end key is null"));
this.hasEndKey = !endKey.equals(ByteString.EMPTY);
this.hasEndKey = !endKey.isEmpty();
this.limit = limit;
this.conf = conf;
this.builder = builder;
Expand All @@ -71,7 +68,7 @@ boolean cacheLoadFails() {
if (endOfScan || processingLastBatch) {
return true;
}
if (startKey == null || startKey.isEmpty()) {
if (startKey == null) {
return true;
}
try {
Expand All @@ -89,10 +86,10 @@ boolean cacheLoadFails() {
// Session should be single-threaded itself
// so that we don't worry about conf change in the middle
// of a transaction. Otherwise below code might lose data
if (currentCache.size() < conf.getScanBatchSize()) {
if (currentCache.size() < limit) {
startKey = curRegionEndKey;
lastKey = Key.toRawKey(curRegionEndKey);
} else if (currentCache.size() > conf.getScanBatchSize()) {
} else if (currentCache.size() > limit) {
throw new IndexOutOfBoundsException(
"current cache size = "
+ currentCache.size()
Expand All @@ -104,7 +101,8 @@ boolean cacheLoadFails() {
startKey = lastKey.next().toByteString();
}
// notify last batch if lastKey is greater than or equal to endKey
if (hasEndKey && lastKey.compareTo(endKey) >= 0) {
// if startKey is empty, it indicates +∞
if (hasEndKey && lastKey.compareTo(endKey) >= 0 || startKey.isEmpty()) {
processingLastBatch = true;
startKey = null;
}
Expand Down
Loading

0 comments on commit 1effd9a

Please sign in to comment.