Skip to content

Commit

Permalink
feat(core): support protocol about install snapshot
Browse files Browse the repository at this point in the history
1. support protocol about install snapshot

Closes openmessaging#275
  • Loading branch information
TheR1sing3un committed May 30, 2023
1 parent 339149c commit 4b079ed
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import io.openmessaging.storage.dledger.protocol.GetEntriesResponse;
import io.openmessaging.storage.dledger.protocol.HeartBeatRequest;
import io.openmessaging.storage.dledger.protocol.HeartBeatResponse;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotRequest;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotResponse;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse;
import io.openmessaging.storage.dledger.protocol.MetadataRequest;
Expand Down Expand Up @@ -73,7 +75,7 @@ public class DLedgerRpcNettyService extends DLedgerRpcService {

private AbstractDLedgerServer dLedger;

private final ConcurrentHashMap<Integer, UserDefineProcessor<? extends UserDefineRequest,? extends UserDefineResponse>> userDefineProcessors = new ConcurrentHashMap<Integer, UserDefineProcessor<? extends UserDefineRequest,? extends UserDefineResponse>>();
private final ConcurrentHashMap<Integer, UserDefineProcessor<? extends UserDefineRequest, ? extends UserDefineResponse>> userDefineProcessors = new ConcurrentHashMap<Integer, UserDefineProcessor<? extends UserDefineRequest, ? extends UserDefineResponse>>();

private final ExecutorService futureExecutor = Executors.newFixedThreadPool(4, new NamedThreadFactory("FutureExecutor"));

Expand Down Expand Up @@ -118,7 +120,7 @@ public boolean rejectRequest() {
}

@Override
public void registerUserDefineProcessor(UserDefineProcessor<? extends UserDefineRequest,? extends UserDefineResponse> userDefineProcessor) {
public void registerUserDefineProcessor(UserDefineProcessor<? extends UserDefineRequest, ? extends UserDefineResponse> userDefineProcessor) {
this.userDefineProcessors.put(userDefineProcessor.getRequestTypeCode(), userDefineProcessor);
}

Expand All @@ -132,6 +134,7 @@ private void registerProcessor(NettyRemotingServer remotingServer, NettyRequestP
remotingServer.registerProcessor(DLedgerRequestCode.HEART_BEAT.getCode(), protocolProcessor, null);
remotingServer.registerProcessor(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), protocolProcessor, null);
remotingServer.registerProcessor(DLedgerRequestCode.USER_DEFINE_REQUEST.getCode(), protocolProcessor, null);
remotingServer.registerProcessor(DLedgerRequestCode.INSTALL_SNAPSHOT.getCode(), protocolProcessor, null);
}

private NettyRemotingServer registerRemotingServer(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener, NettyRequestProcessor protocolProcessor) {
Expand Down Expand Up @@ -277,6 +280,11 @@ public CompletableFuture<PushEntryResponse> push(PushEntryRequest request) {
return future;
}

@Override
public CompletableFuture<InstallSnapshotResponse> installSnapshot(InstallSnapshotRequest request) throws Exception {
return null;
}

@Override
public CompletableFuture<LeadershipTransferResponse> leadershipTransfer(
LeadershipTransferRequest request) {
Expand Down Expand Up @@ -395,6 +403,14 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
}, futureExecutor);
break;
}
case INSTALL_SNAPSHOT: {
InstallSnapshotRequest installSnapshotRequest = JSON.parseObject(request.getBody(), InstallSnapshotRequest.class);
CompletableFuture<InstallSnapshotResponse> future = handleInstallSnapshot(installSnapshotRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
break;
}
case USER_DEFINE_REQUEST:
UserDefineCommandHeader header = (UserDefineCommandHeader) request.decodeCommandCustomHeader(UserDefineCommandHeader.class);
UserDefineProcessor<? extends UserDefineRequest, ? extends UserDefineResponse> userDefineProcessor = this.userDefineProcessors.get(header.getRequestTypeCode());
Expand Down Expand Up @@ -453,6 +469,11 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)
return this.dLedger.handlePush(request);
}

@Override
public CompletableFuture<InstallSnapshotResponse> handleInstallSnapshot(InstallSnapshotRequest request) throws Exception {
return this.dLedger.handleInstallSnapshot(request);
}

public RemotingCommand handleResponse(RequestOrResponse response, RemotingCommand request) {
RemotingCommand remotingCommand = RemotingCommand.createResponseCommand(DLedgerResponseCode.SUCCESS.getCode(), null);
remotingCommand.setBody(JSON.toJSONBytes(response));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import io.openmessaging.storage.dledger.protocol.GetEntriesResponse;
import io.openmessaging.storage.dledger.protocol.HeartBeatRequest;
import io.openmessaging.storage.dledger.protocol.HeartBeatResponse;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotRequest;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotResponse;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse;
import io.openmessaging.storage.dledger.protocol.MetadataRequest;
Expand Down Expand Up @@ -446,7 +448,11 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)
response.setLeaderId(memberState.getLeaderId());
return CompletableFuture.completedFuture(response);
}
}

@Override
public CompletableFuture<InstallSnapshotResponse> handleInstallSnapshot(InstallSnapshotRequest request) throws Exception {
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ public enum DLedgerRequestCode {
PULL(51003, ""),
PUSH(51004, ""),
LEADERSHIP_TRANSFER(51005, ""),

INSTALL_SNAPSHOT(51006, ""),
USER_DEFINE_REQUEST(59999, "");


private static Map<Integer, DLedgerRequestCode> codeMap = new HashMap<>();

static {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright 2017-2022 The DLedger Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.storage.dledger.protocol;

public class InstallSnapshotRequest extends RequestOrResponse {

private long lastIncludedIndex;

private long lastIncludedTerm;

private byte[] data;

public InstallSnapshotRequest() {
}

public InstallSnapshotRequest(long lastIncludedIndex, long lastIncludedTerm, byte[] data) {
this.lastIncludedIndex = lastIncludedIndex;
this.lastIncludedTerm = lastIncludedTerm;
this.data = data;
}

public long getLastIncludedIndex() {
return lastIncludedIndex;
}

public void setLastIncludedIndex(long lastIncludedIndex) {
this.lastIncludedIndex = lastIncludedIndex;
}

public long getLastIncludedTerm() {
return lastIncludedTerm;
}

public void setLastIncludedTerm(long lastIncludedTerm) {
this.lastIncludedTerm = lastIncludedTerm;
}

public byte[] getData() {
return data;
}

public void setData(byte[] data) {
this.data = data;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright 2017-2022 The DLedger Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.storage.dledger.protocol;

public class InstallSnapshotResponse extends RequestOrResponse {

public InstallSnapshotResponse(int term) {
this.term = term;
}

@Override
public RequestOrResponse code(int code) {
this.code = code;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import io.openmessaging.storage.dledger.protocol.HeartBeatRequest;
import io.openmessaging.storage.dledger.protocol.HeartBeatResponse;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotRequest;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotResponse;
import io.openmessaging.storage.dledger.protocol.PullEntriesRequest;
import io.openmessaging.storage.dledger.protocol.PullEntriesResponse;
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
Expand All @@ -37,4 +39,6 @@ public interface DLedgerRaftProtocolHandler {

CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception;

CompletableFuture<InstallSnapshotResponse> handleInstallSnapshot(InstallSnapshotRequest request) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import io.openmessaging.storage.dledger.protocol.HeartBeatRequest;
import io.openmessaging.storage.dledger.protocol.HeartBeatResponse;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotRequest;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotResponse;
import io.openmessaging.storage.dledger.protocol.PullEntriesRequest;
import io.openmessaging.storage.dledger.protocol.PullEntriesResponse;
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
Expand All @@ -37,4 +39,6 @@ public interface DLedgerRaftProtocol {

CompletableFuture<PushEntryResponse> push(PushEntryRequest request) throws Exception;

CompletableFuture<InstallSnapshotResponse> installSnapshot(InstallSnapshotRequest request) throws Exception;

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/**
* Copyright 2017-2022 The DLedger Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -31,6 +31,8 @@
import io.openmessaging.storage.dledger.protocol.GetEntriesResponse;
import io.openmessaging.storage.dledger.protocol.HeartBeatRequest;
import io.openmessaging.storage.dledger.protocol.HeartBeatResponse;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotRequest;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotResponse;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse;
import io.openmessaging.storage.dledger.protocol.MetadataRequest;
Expand Down Expand Up @@ -247,6 +249,11 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)
}
}

@Override
public CompletableFuture<InstallSnapshotResponse> handleInstallSnapshot(InstallSnapshotRequest request) throws Exception {
return null;
}

public void startup() {
this.dLedgerRpcService.startup();
this.dLedgerManager.startup();
Expand Down

0 comments on commit 4b079ed

Please sign in to comment.