Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[close #776] Fix prewrite & commit log info to debug(#776) #777

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,18 @@
<artifactId>simpleclient_pushgateway</artifactId>
<version>0.10.0</version>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>2.0.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<version>2.0.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<reporting>
<plugins>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/tikv/common/exception/KeyException.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public KeyException(String errMsg) {
}

public KeyException(Kvrpcpb.KeyError keyErr) {
super("Key exception occurred");
super("Key exception occurred " + keyErr.toString());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to print error msg here?
Have you tried getKeyErr()? it should return the error and you should be able to get the error msg from getKeyErr().toString()

this.keyErr = keyErr;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ private boolean isPrewriteSuccess(BackOffer backOffer, PrewriteResponse resp, lo
Lock lock = new Lock(err.getLocked(), codec);
locks.add(lock);
} else {
throw new KeyException(err.toString());
throw new KeyException(err);
}
}
if (isSuccess) {
Expand Down
53 changes: 31 additions & 22 deletions src/main/java/org/tikv/txn/TwoPhaseCommitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ private void doCommitPrimaryKeyWithRetry(BackOffer backOffer, ByteString key, lo
this.doCommitPrimaryKeyWithRetry(backOffer, key, commitTs);
}
}

LOG.info("commit primary key {} successfully", KeyUtils.formatBytes(key));
if (commitResult.isSuccess() && LOG.isDebugEnabled()) {
LOG.debug("commit primary key {} successfully", KeyUtils.formatBytes(key));
}
}

/**
Expand Down Expand Up @@ -402,11 +403,13 @@ private void doPrewriteSecondaryKeySingleBatchWithRetry(
BatchKeys batchKeys,
Map<ByteString, Kvrpcpb.Mutation> mutations)
throws TiBatchWriteException {
LOG.debug(
"start prewrite secondary key, row={}, size={}KB, regionId={}",
batchKeys.getKeys().size(),
batchKeys.getSizeInKB(),
batchKeys.getRegion().getId());
if (LOG.isDebugEnabled()) {
LOG.debug(
"start prewrite secondary key, row={}, size={}KB, regionId={}",
batchKeys.getKeys().size(),
batchKeys.getSizeInKB(),
batchKeys.getRegion().getId());
}

List<ByteString> keyList = batchKeys.getKeys();
int batchSize = keyList.size();
Expand Down Expand Up @@ -450,11 +453,13 @@ private void doPrewriteSecondaryKeySingleBatchWithRetry(
throw new TiBatchWriteException(errorMsg, e);
}
}
LOG.debug(
"prewrite secondary key successfully, row={}, size={}KB, regionId={}",
batchKeys.getKeys().size(),
batchKeys.getSizeInKB(),
batchKeys.getRegion().getId());
if (LOG.isDebugEnabled()) {
LOG.debug(
"prewrite secondary key successfully, row={}, size={}KB, regionId={}",
batchKeys.getKeys().size(),
batchKeys.getSizeInKB(),
batchKeys.getRegion().getId());
}
}

private void appendBatchBySize(
Expand Down Expand Up @@ -597,11 +602,13 @@ private void doCommitSecondaryKeysWithRetry(

private void doCommitSecondaryKeySingleBatchWithRetry(
BackOffer backOffer, BatchKeys batchKeys, long commitTs) throws TiBatchWriteException {
LOG.info(
"start commit secondary key, row={}, size={}KB, regionId={}",
batchKeys.getKeys().size(),
batchKeys.getSizeInKB(),
batchKeys.getRegion().getId());
if (LOG.isDebugEnabled()) {
LOG.debug(
"start commit secondary key, row={}, size={}KB, regionId={}",
batchKeys.getKeys().size(),
batchKeys.getSizeInKB(),
batchKeys.getRegion().getId());
}
List<ByteString> keysCommit = batchKeys.getKeys();
ByteString[] keys = new ByteString[keysCommit.size()];
keysCommit.toArray(keys);
Expand All @@ -617,11 +624,13 @@ private void doCommitSecondaryKeySingleBatchWithRetry(
LOG.warn(error);
throw new TiBatchWriteException("commit secondary key error", commitResult.getException());
}
LOG.info(
"commit {} rows successfully, size={}KB, regionId={}",
batchKeys.getKeys().size(),
batchKeys.getSizeInKB(),
batchKeys.getRegion().getId());
if (commitResult.isSuccess() && LOG.isDebugEnabled()) {
LOG.debug(
"commit {} rows successfully, size={}KB, regionId={}",
batchKeys.getKeys().size(),
batchKeys.getSizeInKB(),
batchKeys.getRegion().getId());
}
}

private GroupKeyResult groupKeysByRegion(ByteString[] keys, int size, BackOffer backOffer)
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/org/tikv/txn/TxnKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.KeyException;
import org.tikv.common.exception.RegionException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.exception.TiKVException;
Expand Down Expand Up @@ -181,7 +180,6 @@ public ClientRPCResult commit(
// TODO: check this logic to see are we satisfied?
private boolean retryableException(Exception e) {
return e instanceof TiClientInternalException
|| e instanceof KeyException
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this will be a break change. Some KeyException also need the retry, why do you want to exclude this?
If you still want to exclude some KeyException, please judge the type. here is an example

 if (e instanceof KeyException) {
      Kvrpcpb.KeyError ke = ((KeyException) e).getKeyError();
      if (ke == null) return true;
      if (!ke.getAbort().isEmpty()
          || ke.hasConflict()
          || ke.hasAlreadyExist()
          || ke.hasDeadlock()
          || ke.hasCommitTsExpired()
          || ke.hasTxnNotFound()) return false;
      return true;
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok,I review it again

|| e instanceof RegionException
|| e instanceof StatusRuntimeException;
}
Expand Down
189 changes: 189 additions & 0 deletions src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,40 @@
import static org.junit.Assert.fail;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.BaseTxnKVTest;
import org.tikv.common.BytePairWrapper;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.exception.KeyException;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;

@RunWith(PowerMockRunner.class)
@PrepareForTest({TiSession.class, TxnKVClient.class, TwoPhaseCommitter.class})
@PowerMockIgnore({"javax.net.ssl.*"})
public class TwoPhaseCommitterTest extends BaseTxnKVTest {

private static final Logger logger = LoggerFactory.getLogger(TwoPhaseCommitterTest.class);
private static final int WRITE_BUFFER_SIZE = 32 * 1024;
private static final int TXN_COMMIT_BATCH_SIZE = 768 * 1024;
private static final long DEFAULT_BATCH_WRITE_LOCK_TTL = 3600000;
Expand Down Expand Up @@ -76,4 +99,170 @@ public void autoClosableTest() throws Exception {
executorService)) {}
Assert.assertTrue(executorService.isShutdown());
}

@Test
public void prewriteWriteConflictFastFailTest() throws Exception {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this test has nothing to do with this pr, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is based on the new constructor method added in this PR #775.
Do you mind move it to that PR, or the test will fail

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test for verify write conflict will aways retry ,but need fast fail

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you explain why this test was added? Or what scenes/feature this test for


int WRITE_BUFFER_SIZE = 32;
String primaryKey = RandomStringUtils.randomAlphabetic(3);
AtomicLong failCount = new AtomicLong();
ExecutorService executorService =
Executors.newFixedThreadPool(
WRITE_BUFFER_SIZE,
new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build());
CountDownLatch latch = new CountDownLatch(2);
int DEFAULT_BATCH_WRITE_LOCK_TTL = 10000;
new Thread(
() -> {
long startTS = session.getTimestamp().getVersion();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
TwoPhaseCommitter twoPhaseCommitter =
new TwoPhaseCommitter(
session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService);
List<BytePairWrapper> pairs =
Arrays.asList(
new BytePairWrapper(
primaryKey.getBytes(StandardCharsets.UTF_8),
primaryKey.getBytes(StandardCharsets.UTF_8)));
twoPhaseCommitter.prewriteSecondaryKeys(
primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000);
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000);

long commitTs = session.getTimestamp().getVersion();

twoPhaseCommitter.commitPrimaryKey(
backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs);
} catch (Exception e) {
logger.error(e.getMessage(), e);
failCount.incrementAndGet();
} finally {
latch.countDown();
}
})
.start();

Thread.sleep(10);
new Thread(
() -> {
long startTS = session.getTimestamp().getVersion();
try {
TwoPhaseCommitter twoPhaseCommitter =
new TwoPhaseCommitter(
session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService);
List<BytePairWrapper> pairs =
Arrays.asList(
new BytePairWrapper(
primaryKey.getBytes(StandardCharsets.UTF_8),
primaryKey.getBytes(StandardCharsets.UTF_8)));
twoPhaseCommitter.prewriteSecondaryKeys(
primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000);
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000);

long commitTs = session.getTimestamp().getVersion();

twoPhaseCommitter.commitPrimaryKey(
backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs);
} catch (Exception e) {
logger.error(e.getMessage(), e);
failCount.incrementAndGet();
} finally {
latch.countDown();
}
})
.start();
latch.await();
Assert.assertEquals(1, failCount.get());
}

@Test
public void prewriteWriteConflictLongNoFailTest() throws Exception {

int WRITE_BUFFER_SIZE = 32;
String primaryKey = RandomStringUtils.randomAlphabetic(3);
AtomicLong failCount = new AtomicLong();
ExecutorService executorService =
Executors.newFixedThreadPool(
WRITE_BUFFER_SIZE,
new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build());
CountDownLatch latch = new CountDownLatch(2);
int DEFAULT_BATCH_WRITE_LOCK_TTL = 10000;

new Thread(
() -> {
long startTS = session.getTimestamp().getVersion();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

try {
session = PowerMockito.spy(session);
TxnKVClient kvClient = PowerMockito.spy(session.createTxnClient());
PowerMockito.when(kvClient, "retryableException", Mockito.any(KeyException.class))
.thenReturn(true);
PowerMockito.doReturn(kvClient).when(session).createTxnClient();

TwoPhaseCommitter twoPhaseCommitter =
new TwoPhaseCommitter(
session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService);
List<BytePairWrapper> pairs =
Arrays.asList(
new BytePairWrapper(
primaryKey.getBytes(StandardCharsets.UTF_8),
primaryKey.getBytes(StandardCharsets.UTF_8)));
twoPhaseCommitter.prewriteSecondaryKeys(
primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000);
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000);

long commitTs = session.getTimestamp().getVersion();

twoPhaseCommitter.commitPrimaryKey(
backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs);
} catch (Exception e) {
logger.error(e.getMessage(), e);
failCount.incrementAndGet();
} finally {
latch.countDown();
}
})
.start();

Thread.sleep(10);
new Thread(
() -> {
long startTS = session.getTimestamp().getVersion();
try {
TwoPhaseCommitter twoPhaseCommitter =
new TwoPhaseCommitter(
session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService);
List<BytePairWrapper> pairs =
Arrays.asList(
new BytePairWrapper(
primaryKey.getBytes(StandardCharsets.UTF_8),
primaryKey.getBytes(StandardCharsets.UTF_8)));
twoPhaseCommitter.prewriteSecondaryKeys(
primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000);
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000);

long commitTs = session.getTimestamp().getVersion();

twoPhaseCommitter.commitPrimaryKey(
backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs);
} catch (Exception e) {
logger.error(e.getMessage(), e);
failCount.incrementAndGet();
} finally {
latch.countDown();
}
})
.start();
latch.await();
Assert.assertEquals(1, failCount.get());
}
}
Loading