Skip to content

Commit

Permalink
feature: support node metadata sync (#6326)
Browse files Browse the repository at this point in the history
* init

* support node metadata sync

* done

* update

* keep threads safe and remove removed nodes when node members are modified

* keep threads safe and remove removed nodes when node members are modified

* keep threads safe and remove removed nodes when node members are modified

* update

* update

* update

* update

* update

* Fixed the bug that metadata was not synchronized when the leader was changed

* Fixed the bug that metadata was not synchronized when the leader was changed

* The bug that node information is not updated when the cluster changes is fixed

* update

* update

* update
  • Loading branch information
funky-eyes authored Mar 5, 2024
1 parent 13a5ada commit 80eb68c
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 39 deletions.
21 changes: 21 additions & 0 deletions common/src/main/java/org/apache/seata/common/metadata/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ public class Node {

Map<String, Object> metadata = new HashMap<>();
private Endpoint control;

private Endpoint transaction;

private Endpoint internal;

private String group;
private ClusterRole role = ClusterRole.MEMBER;

private String version;

public Node() {}

public Endpoint createEndpoint(String host, int port, String protocol) {
Expand Down Expand Up @@ -75,6 +80,22 @@ public void setTransaction(Endpoint transaction) {
this.transaction = transaction;
}

public String getVersion() {
return version;
}

public void setVersion(String version) {
this.version = version;
}

public Endpoint getInternal() {
return internal;
}

public void setInternal(Endpoint internal) {
this.internal = internal;
}

public static class Endpoint {

private String host;
Expand Down
10 changes: 9 additions & 1 deletion dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
<hsf.version>1.8.3</hsf.version>
<bytebuddy.version>1.12.17</bytebuddy.version>
<dubbo.alibaba.version>2.6.10</dubbo.alibaba.version>
<sofa.rpc.version>5.5.3</sofa.rpc.version>
<sofa.rpc.version>5.6.5</sofa.rpc.version>
<fastjson.version>1.2.83</fastjson.version>
<protostuff.version>1.5.9</protostuff.version>
<config.version>1.2.1</config.version>
Expand Down Expand Up @@ -73,8 +73,11 @@
<ant.version>1.10.12</ant.version>
<lz4.version>1.7.1</lz4.version>
<jraft.version>1.3.14</jraft.version>
<netty.version>4.1.86.Final</netty.version>
<snakeyaml.version>2.0</snakeyaml.version>
<netty.version>4.1.94.Final</netty.version>
<sofa.hessian.version>4.0.3</sofa.hessian.version>
<sofa.bolt.version>1.6.7</sofa.bolt.version>

<protobuf.version>3.16.3</protobuf.version>
<grpc.version>1.27.1</grpc.version>
Expand Down Expand Up @@ -188,6 +191,11 @@
<artifactId>hessian</artifactId>
<version>${sofa.hessian.version}</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
<version>${sofa.bolt.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
Expand Down
10 changes: 10 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,16 @@
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>jraft-core</artifactId>
<exclusions>
<exclusion>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ public RaftStateMachine getRaftStateMachine() {
return raftStateMachine;
}

public PeerId getServerId() {
return serverId;
}

@Override
public void close() {
destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.CliService;
import com.alipay.sofa.jraft.RaftServiceFactory;
import com.alipay.sofa.jraft.conf.Configuration;
Expand All @@ -39,9 +40,12 @@
import org.apache.seata.common.XID;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.serializer.SerializerType;
import org.apache.seata.discovery.registry.FileRegistryServiceImpl;
import org.apache.seata.discovery.registry.MultiRegistryFactory;
import org.apache.seata.discovery.registry.RegistryService;
import org.apache.seata.server.cluster.raft.processor.PutNodeInfoRequestProcessor;
import org.apache.seata.server.cluster.raft.serializer.JacksonBoltSerializer;
import org.apache.seata.server.store.StoreConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -148,8 +152,12 @@ public static void start() {
}
LOGGER.info("started seata server raft cluster, group: {} ", group);
});
if (rpcServer != null && !rpcServer.init(null)) {
throw new RuntimeException("start raft node fail!");
if (rpcServer != null) {
rpcServer.registerProcessor(new PutNodeInfoRequestProcessor());
SerializerManager.addSerializer(SerializerType.JACKSON.getCode(), new JacksonBoltSerializer());
if (!rpcServer.init(null)) {
throw new RuntimeException("start raft node fail!");
}
}
}

Expand Down Expand Up @@ -222,9 +230,11 @@ public static Set<String> groups() {
private static class SingletonHandler {
private static final CliService CLI_SERVICE = RaftServiceFactory.createAndInitCliService(new CliOptions());
private static final CliClientService CLI_CLIENT_SERVICE = new CliClientServiceImpl();

static {
CLI_CLIENT_SERVICE.init(new CliOptions());
}

}

}
Loading

0 comments on commit 80eb68c

Please sign in to comment.