Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature : mock server #6205

Merged
merged 16 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
<module>spring</module>
<module>tcc</module>
<module>test</module>
<module>test-mock-server</module>
<module>tm</module>
<module>metrics</module>
<module>serializer</module>
Expand Down
62 changes: 62 additions & 0 deletions test-mock-server/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 1999-2019 Seata.io Group.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
Bughue marked this conversation as resolved.
Show resolved Hide resolved
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.seata</groupId>
<artifactId>seata-parent</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-mock-server</artifactId>
<packaging>jar</packaging>
<name>seata-mock-server</name>
xingfudeshi marked this conversation as resolved.
Show resolved Hide resolved
<description>Seata mock server</description>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
Bughue marked this conversation as resolved.
Show resolved Hide resolved
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-server</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.mockserver;

import io.seata.core.exception.TransactionException;
import io.seata.core.model.BranchStatus;
import io.seata.core.model.GlobalStatus;
import io.seata.core.protocol.AbstractMessage;
import io.seata.core.protocol.AbstractResultMessage;
import io.seata.core.protocol.ProtocolConstants;
import io.seata.core.protocol.ResultCode;
import io.seata.core.protocol.Version;
import io.seata.core.protocol.transaction.*;
import io.seata.core.rpc.Disposable;
import io.seata.core.rpc.RemotingServer;
import io.seata.core.rpc.RpcContext;
import io.seata.core.rpc.TransactionMessageHandler;
import io.seata.core.rpc.netty.ChannelManager;
import io.seata.mockserver.call.CallRm;
import io.seata.serializer.seata.MessageCodecFactory;
import io.seata.serializer.seata.MessageSeataCodec;
import io.seata.serializer.seata.protocol.v1.MessageCodecFactoryV1;
import io.seata.server.AbstractTCInboundHandler;

/**
* Mock Coordinator
*
* @author minghua.xie
* @date 2023/11/14
Bughue marked this conversation as resolved.
Show resolved Hide resolved
**/
public class MockCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {

RemotingServer remotingServer;
@Override
public void destroy() {

}

@Override
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
if (!(request instanceof AbstractTransactionRequestToTC)) {
throw new IllegalArgumentException();
}
AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
transactionRequest.setTCInboundHandler(this);

return transactionRequest.handle(context);
}

@Override
public void onResponse(AbstractResultMessage response, RpcContext context) {
response.setResultCode(ResultCode.Success);
}

@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException {
response.setXid("666");
Bughue marked this conversation as resolved.
Show resolved Hide resolved
response.setResultCode(ResultCode.Success);
}

@Override
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException {
response.setGlobalStatus(GlobalStatus.Committed);
response.setResultCode(ResultCode.Success);
}

@Override
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response, RpcContext rpcContext) throws TransactionException {
response.setGlobalStatus(GlobalStatus.Rollbacked);
response.setResultCode(ResultCode.Success);
}

@Override
protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException {
response.setBranchId(9L);
response.setResultCode(ResultCode.Success);

String resourceId = request.getResourceId();
String clientId = rpcContext.getClientId();

Thread thread = new Thread(() -> {
try {
Thread.sleep(1000);
BranchStatus commit = CallRm.branchCommit(remotingServer, resourceId, clientId);
BranchStatus rollback = CallRm.branchRollback(remotingServer, resourceId, clientId);
if (ProtocolConstants.VERSION_0 != Version.calcProtocolVersion(rpcContext.getVersion())) {
CallRm.deleteUndoLog(remotingServer, resourceId, clientId);
}
} catch (Exception e) {
e.printStackTrace();
}
});
thread.start();
}

@Override
protected void doBranchReport(BranchReportRequest request, BranchReportResponse response, RpcContext rpcContext) throws TransactionException {
response.setResultCode(ResultCode.Success);
}

@Override
protected void doLockCheck(GlobalLockQueryRequest request, GlobalLockQueryResponse response, RpcContext rpcContext) throws TransactionException {
response.setResultCode(ResultCode.Success);
}

@Override
protected void doGlobalStatus(GlobalStatusRequest request, GlobalStatusResponse response, RpcContext rpcContext) throws TransactionException {
response.setGlobalStatus(GlobalStatus.Committed);
response.setResultCode(ResultCode.Success);
}

@Override
protected void doGlobalReport(GlobalReportRequest request, GlobalReportResponse response, RpcContext rpcContext) throws TransactionException {
response.setGlobalStatus(GlobalStatus.Committed);
response.setResultCode(ResultCode.Success);
}

public void setRemotingServer(RemotingServer remotingServer) {
this.remotingServer = remotingServer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.mockserver;

import io.netty.channel.Channel;
import io.seata.core.protocol.MessageType;
import io.seata.core.rpc.TransactionMessageHandler;
import io.seata.core.rpc.netty.AbstractNettyRemotingServer;
import io.seata.core.rpc.netty.NettyServerConfig;
import io.seata.core.rpc.processor.server.ServerHeartbeatProcessor;
import io.seata.mockserver.processor.MockHeartbeatProcessor;
import io.seata.mockserver.processor.MockOnReqProcessor;
import io.seata.mockserver.processor.MockOnRespProcessor;
import io.seata.mockserver.processor.MockRemotingProcessor;
import io.seata.mockserver.processor.MockRegisterProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ThreadPoolExecutor;

/**
* The mock netty remoting server.
*
* @author Bughue
*/
public class MockNettyRemotingServer extends AbstractNettyRemotingServer {

private static final Logger LOGGER = LoggerFactory.getLogger(MockNettyRemotingServer.class);

private TransactionMessageHandler handler;

public void setHandler(TransactionMessageHandler transactionMessageHandler) {
this.handler = transactionMessageHandler;
}

@Override
public void init() {
// registry processor
registerProcessor();
super.init();
}

/**
* Instantiates a new Rpc remoting server.
*
* @param messageExecutor the message executor
*/
public MockNettyRemotingServer(ThreadPoolExecutor messageExecutor) {
super(messageExecutor, new NettyServerConfig());
}

@Override
public void destroyChannel(String serverAddress, Channel channel) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("will destroy channel:{},address:{}", channel, serverAddress);
}
channel.disconnect();
channel.close();
}

private void registerProcessor() {
// 1. registry on request message processor
MockOnReqProcessor onRequestProcessor = new MockOnReqProcessor(this, handler);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);

// 2. registry on response message processor
MockOnRespProcessor onResponseProcessor = new MockOnRespProcessor(this, handler,getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);

// 3. registry rm reg processor
MockRegisterProcessor regRmProcessor = new MockRegisterProcessor(this, MockRegisterProcessor.Role.RM);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);

// 4. registry tm reg processor
MockRegisterProcessor regTmProcessor = new MockRegisterProcessor(this, MockRegisterProcessor.Role.TM);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);

// 5. registry heartbeat message processor
MockHeartbeatProcessor heartbeatMessageProcessor = new MockHeartbeatProcessor(this,handler);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}

@Override
public void destroy() {
super.destroy();
}
}
70 changes: 70 additions & 0 deletions test-mock-server/src/main/java/io/seata/mockserver/MockServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.mockserver;

import io.seata.common.XID;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.NetUtil;
import io.seata.server.UUIDGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.lang.management.ManagementFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* The type Mock Server.
*
* @author Bughue
*/
@SpringBootApplication
public class MockServer {

protected static final Logger LOGGER = LoggerFactory.getLogger(MockServer.class);

/**
* The entry point of application.
*
* @param args the input arguments
*/
public static void main(String[] args) {
SpringApplication.run(MockServer.class, args);

ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(50,
50, 500, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(20000),
new NamedThreadFactory("ServerHandlerThread", 500), new ThreadPoolExecutor.CallerRunsPolicy());

MockNettyRemotingServer nettyRemotingServer = new MockNettyRemotingServer(workingThreads);

// set registry
XID.setIpAddress(NetUtil.getLocalIp());
XID.setPort(8092);
// init snowflake for transactionId, branchId
UUIDGenerator.init(1L);

MockCoordinator coordinator = new MockCoordinator();
coordinator.setRemotingServer(nettyRemotingServer);
nettyRemotingServer.setHandler(coordinator);
nettyRemotingServer.init();

LOGGER.info("pid info: "+ ManagementFactory.getRuntimeMXBean().getName());
}
}
Loading
Loading