Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: add three tcc fence interceptors to fix tcc fence deadlock ex… #6706

Open
wants to merge 5 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6661](https://github.com/apache/incubator-seata/pull/6661)] fix `tableMeta` cache scheduled refresh issue
- [[#6668](https://github.com/apache/incubator-seata/pull/6668)] thread safety issue when adding and removing instances
- [[#6678](https://github.com/apache/incubator-seata/pull/6678)] fix the same record has different lowkeys due to mixed case of table names yesterday
- [[#6706](https://github.com/apache/incubator-seata/pull/6706)] add three tcc fence interceptors to fix tcc fence deadlock exception
- [[#6697](https://github.com/apache/incubator-seata/pull/6697)] v0 ByteBuf should not decode by super class


### optimize:
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] split the task thread pool for committing and rollbacking statuses
- [[#6208](https://github.com/apache/incubator-seata/pull/6208)] optimize : load SeataSerializer by version
Expand Down
2 changes: 1 addition & 1 deletion changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
- [[#6661](https://github.com/apache/incubator-seata/pull/6661)] 修复`tableMeta`缓存定时刷新失效问题
- [[#6668](https://github.com/apache/incubator-seata/pull/6668)] 解决namingserver同一个集群下instance添加和删除时的线程安全问题
- [[#6678](https://github.com/apache/incubator-seata/pull/6678)] 修复由于表名大小写问题导致的相同记录生成不同RowKey的问题
- [[#6706](https://github.com/apache/incubator-seata/pull/6706)] 添加三个tcc fence拦截器来修复死锁异常
- [[#6697](https://github.com/apache/incubator-seata/pull/6697)] v0版本的ByteBuf不应由父类先解码


### optimize:
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池
- [[#6208](https://github.com/apache/incubator-seata/pull/6208)] 支持多版本的Seata序列化
Expand Down
96 changes: 96 additions & 0 deletions core/src/main/java/org/apache/seata/core/context/RootContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seata.common.exception.ShouldNeverHappenException;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.model.TccLocalTxActive;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand Down Expand Up @@ -60,6 +61,26 @@ private RootContext() {
*/
public static final String KEY_TIMEOUT = "TX_TIMEOUT";

/**
* The constant KEY_RESOURCE_ID.
*/
public static final String KEY_RESOURCE_ID = "TX_RESOURCE_ID";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rootcontext不应该用来存放这些数据
Rootcontext should not be used to store this data


/**
* The constant KEY_TCC_LOCAL_TX_ACTIVE.
*/
public static final String KEY_TCC_LOCAL_TX_ACTIVE = "TCC_LOCAL_TX_ACTIVE";

/**
* The constant KEY_TCC_COMMIT_RESULT.
*/
public static final String KEY_TCC_COMMIT_RESULT = "KEY_TCC_COMMIT_RESULT";

/**
* The constant KEY_TCC_ROLLBACK_RESULT.
*/
public static final String KEY_TCC_ROLLBACK_RESULT = "KEY_TCC_ROLLBACK_RESULT";

/**
* The constant MDC_KEY_XID for logback
* @since 1.5.0
Expand Down Expand Up @@ -142,6 +163,81 @@ public static void setTimeout(Integer timeout) {
CONTEXT_HOLDER.put(KEY_TIMEOUT,timeout);
}

public static String getBranchId() {
return (String) CONTEXT_HOLDER.get(KEY_BRANCHID);
}

public static void bindBranchId(String branchId) {
CONTEXT_HOLDER.put(KEY_BRANCHID, branchId);
}

public static void unbindBranchId() {
String branchId = (String) CONTEXT_HOLDER.remove(KEY_BRANCHID);
if (LOGGER.isDebugEnabled() && branchId != null) {
LOGGER.debug("unbind branch id");
}
}

public static String getResourceId() {
return (String) CONTEXT_HOLDER.get(KEY_RESOURCE_ID);
}

public static void bindResourceId(String resourceId) {
CONTEXT_HOLDER.put(KEY_RESOURCE_ID, resourceId);
}

public static void unbindResourceId() {
String resourceId = (String) CONTEXT_HOLDER.remove(KEY_RESOURCE_ID);
if (LOGGER.isDebugEnabled() && resourceId != null) {
LOGGER.debug("unbind tcc resource id");
}
}

public static TccLocalTxActive getTccLocalTxActive() {
return (TccLocalTxActive) CONTEXT_HOLDER.get(KEY_TCC_LOCAL_TX_ACTIVE);
}

public static void bindTccLocalTxActive(TccLocalTxActive tccLocalTxActive) {
CONTEXT_HOLDER.put(KEY_TCC_LOCAL_TX_ACTIVE, tccLocalTxActive);
}

public static void unbindTccLocalTxActive() {
TccLocalTxActive tccLocalTxActive = (TccLocalTxActive) CONTEXT_HOLDER.remove(KEY_TCC_LOCAL_TX_ACTIVE);
if (LOGGER.isDebugEnabled() && tccLocalTxActive != null) {
LOGGER.debug("unbind tcc local tx active identification");
}
}

public static Boolean getTccCommitResult() {
return (Boolean) CONTEXT_HOLDER.get(KEY_TCC_COMMIT_RESULT);
}

public static void bindTccCommitResult(Boolean tccCommitResult) {
CONTEXT_HOLDER.put(KEY_TCC_COMMIT_RESULT, tccCommitResult);
}

public static void unbindTccCommitResult() {
Boolean tccCommitResult = (Boolean) CONTEXT_HOLDER.remove(KEY_TCC_COMMIT_RESULT);
if (LOGGER.isDebugEnabled() && tccCommitResult != null) {
LOGGER.debug("unbind tcc commit result");
}
}

public static Boolean getTccRollbackResult() {
return (Boolean) CONTEXT_HOLDER.get(KEY_TCC_ROLLBACK_RESULT);
}

public static void bindTccRollbackResult(Boolean tccRollbackResult) {
CONTEXT_HOLDER.put(KEY_TCC_ROLLBACK_RESULT, tccRollbackResult);
}

public static void unbindTccRollbackResult() {
Boolean tccRollbackResult = (Boolean) CONTEXT_HOLDER.remove(KEY_TCC_ROLLBACK_RESULT);
if (LOGGER.isDebugEnabled() && tccRollbackResult != null) {
LOGGER.debug("unbind tcc rollback result");
}
}

/**
* declare local transactions will use global lock check for update/delete/insert/selectForUpdate SQL
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.model;

/**
* Identifies whether tcc transactions are activated on the business side
*/
public enum TccLocalTxActive {

/**
* The tcc transaction is not activated on the service side.
*/
UN_ACTIVE(0),

/**
* The tcc transaction is activated on the service side.
*/
ACTIVE(1);


private final int code;

TccLocalTxActive(int code) {
this.code = code;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ private void check() {
}

@Override
public Object prepareFence(String xid, Long branchId, String actionName, Callback<Object> targetCallback) {
public Object prepareFence(Method prepareMethod, Object targetTCCBean, String xid, Long branchId, String actionName, Callback<Object> targetCallback) {
check();
return fenceHandler.prepareFence(xid, branchId, actionName, targetCallback);
return fenceHandler.prepareFence(prepareMethod, targetTCCBean, xid, branchId, actionName, targetCallback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public interface FenceHandler {

Object prepareFence(String xid, Long branchId, String actionName, Callback<Object> targetCallback);
Object prepareFence(Method prepareMethod, Object targetTCCBean, String xid, Long branchId, String actionName, Callback<Object> targetCallback);

boolean commitFence(Method commitMethod, Object targetTCCBean, String xid, Long branchId, Object[] args);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,6 @@
*/
package org.apache.seata.integration.tx.api.interceptor;

import javax.annotation.Nonnull;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.seata.common.Constants;
import org.apache.seata.common.exception.FrameworkException;
import org.apache.seata.common.exception.SkipCallbackWrapperException;
Expand All @@ -42,6 +34,14 @@
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import javax.annotation.Nonnull;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java的包应该在顶层
Java packages should be at the top level

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* Handler the Tx Participant Aspect : Setting Context, Creating Branch Record
*
Expand All @@ -53,16 +53,17 @@ public class ActionInterceptorHandler {
/**
* Handler the Tx Aspect
*
* @param method the method
* @param arguments the arguments
* @param invocation the invocation wrapper
* @param xid the xid
* @param businessActionParam the business action params
* @param targetCallback the target callback
* @return the business result
* @throws Throwable the throwable
*/
public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessActionParam businessActionParam,
Callback<Object> targetCallback) throws Throwable {
public Object proceed(InvocationWrapper invocation, String xid, TwoPhaseBusinessActionParam businessActionParam) throws Throwable {
Method method = invocation.getMethod();
Object targetTCCBean = invocation.getProxy();
Object[] arguments = invocation.getArguments();
Callback<Object> targetCallback = invocation::proceed;
//Get action context from arguments, or create a new one and then reset to arguments
BusinessActionContext actionContext = getOrCreateActionContextAndResetToArguments(method.getParameterTypes(), arguments);

Expand All @@ -87,11 +88,13 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus
try {
//share actionContext implicitly
BusinessActionContextUtil.setContext(actionContext);
RootContext.bindBranchId(branchId);
RootContext.bindResourceId(actionName);

if (businessActionParam.getUseCommonFence()) {
try {
// Use common Fence, and return the business result
return DefaultCommonFenceHandler.get().prepareFence(xid, Long.valueOf(branchId), actionName, targetCallback);
return DefaultCommonFenceHandler.get().prepareFence(method, targetTCCBean, xid, Long.valueOf(branchId), actionName, targetCallback);
} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
Throwable originException = e.getCause();
if (originException instanceof FrameworkException) {
Expand All @@ -104,6 +107,8 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus
return targetCallback.execute();
}
} finally {
RootContext.unbindBranchId();
RootContext.unbindResourceId();
try {
//to report business action context finally if the actionContext.getUpdated() is true
BusinessActionContextUtil.reportContext(actionContext);
Expand Down
Loading
Loading