Skip to content

Commit

Permalink
Merge pull request #133 from openmessaging/statemachine
Browse files Browse the repository at this point in the history
Add Statemachine feature for DLedger
  • Loading branch information
odbozhou authored Apr 28, 2022
2 parents ac3f801 + 6775159 commit 6f0bf36
Show file tree
Hide file tree
Showing 9 changed files with 712 additions and 70 deletions.
213 changes: 149 additions & 64 deletions src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java

Large diffs are not rendered by default.

28 changes: 22 additions & 6 deletions src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
import io.openmessaging.storage.dledger.protocol.VoteRequest;
import io.openmessaging.storage.dledger.protocol.VoteResponse;
import io.openmessaging.storage.dledger.statemachine.StateMachine;
import io.openmessaging.storage.dledger.statemachine.StateMachineCaller;
import io.openmessaging.storage.dledger.store.DLedgerMemoryStore;
import io.openmessaging.storage.dledger.store.DLedgerStore;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
Expand All @@ -49,12 +51,12 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -71,6 +73,7 @@ public class DLedgerServer implements DLedgerProtocolHandler {
private DLedgerLeaderElector dLedgerLeaderElector;

private ScheduledExecutorService executorService;
private Optional<StateMachineCaller> fsmCaller;

public DLedgerServer(DLedgerConfig dLedgerConfig) {
this.dLedgerConfig = dLedgerConfig;
Expand All @@ -87,7 +90,6 @@ public DLedgerServer(DLedgerConfig dLedgerConfig) {
});
}


public void startup() {
this.dLedgerStore.startup();
this.dLedgerRpcService.startup();
Expand All @@ -102,6 +104,7 @@ public void shutdown() {
this.dLedgerRpcService.shutdown();
this.dLedgerStore.shutdown();
executorService.shutdown();
this.fsmCaller.ifPresent(StateMachineCaller::shutdown);
}

private DLedgerStore createDLedgerStore(String storeType, DLedgerConfig config, MemberState memberState) {
Expand All @@ -116,6 +119,18 @@ public MemberState getMemberState() {
return memberState;
}

public void registerStateMachine(final StateMachine fsm) {
final StateMachineCaller fsmCaller = new StateMachineCaller(this.dLedgerStore, fsm, this.dLedgerEntryPusher);
fsmCaller.start();
this.fsmCaller = Optional.of(fsmCaller);
// Register state machine caller to entry pusher
this.dLedgerEntryPusher.registerStateMachine(this.fsmCaller);
}

public StateMachine getStateMachine() {
return this.fsmCaller.map(StateMachineCaller::getStateMachine).orElse(null);
}

@Override public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest request) throws Exception {
try {

Expand Down Expand Up @@ -190,12 +205,12 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
}
// only wait last entry ack is ok
BatchAppendFuture<AppendEntryResponse> batchAppendFuture =
(BatchAppendFuture<AppendEntryResponse>) dLedgerEntryPusher.waitAck(resEntry, true);
(BatchAppendFuture<AppendEntryResponse>) dLedgerEntryPusher.waitAck(resEntry, true);
batchAppendFuture.setPositions(positions);
return batchAppendFuture;
}
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" +
" with empty bodys");
" with empty bodys");
} else {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(request.getBody());
Expand Down Expand Up @@ -278,7 +293,8 @@ public CompletableFuture<PullEntriesResponse> handlePull(PullEntriesRequest requ
}

@Override
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(LeadershipTransferRequest request) throws Exception {
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
LeadershipTransferRequest request) throws Exception {
try {
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
Expand Down Expand Up @@ -385,7 +401,7 @@ private void checkPreferredLeader() {
}
}
logger.info("preferredLeaderId = {}, which has the smallest fall behind index = {} and is decided to be transferee.", preferredLeaderId, minFallBehind);

if (minFallBehind < dLedgerConfig.getMaxLeadershipTransferWaitIndex()) {
LeadershipTransferRequest request = new LeadershipTransferRequest();
request.setTerm(memberState.currTerm());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.storage.dledger.snapshot;

/**
* Reader for snapshot
*/
public interface SnapshotReader {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.storage.dledger.snapshot;

/**
* Writer for snapshot
*/
public interface SnapshotWriter {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.storage.dledger.statemachine;

import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.store.DLedgerStore;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/**
* The iterator implementation of committed entries.
*/
public class CommittedEntryIterator implements Iterator<DLedgerEntry> {

private final Function<Long, Boolean> completeEntryCallback;
private final DLedgerStore dLedgerStore;
private final long committedIndex;
private final long firstApplyingIndex;
private final AtomicLong applyingIndex;
private long currentIndex;
private int completeAckNums = 0;

public CommittedEntryIterator(final DLedgerStore dLedgerStore, final long committedIndex,
final AtomicLong applyingIndex, final long lastAppliedIndex,
final Function<Long, Boolean> completeEntryCallback) {
this.dLedgerStore = dLedgerStore;
this.committedIndex = committedIndex;
this.applyingIndex = applyingIndex;
this.firstApplyingIndex = lastAppliedIndex + 1;
this.currentIndex = lastAppliedIndex;
this.completeEntryCallback = completeEntryCallback;
}

@Override
public boolean hasNext() {
if (this.currentIndex >= this.firstApplyingIndex && this.currentIndex <= this.committedIndex) {
completeApplyingEntry();
}
return this.currentIndex < this.committedIndex;
}

@Override
public DLedgerEntry next() {
++this.currentIndex;
if (this.currentIndex <= this.committedIndex) {
final DLedgerEntry dLedgerEntry = this.dLedgerStore.get(this.currentIndex);
this.applyingIndex.set(this.currentIndex);
return dLedgerEntry;
}
return null;
}

private void completeApplyingEntry() {
if (this.completeEntryCallback.apply(this.currentIndex)) {
this.completeAckNums++;
}
}

public long getIndex() {
return this.currentIndex;
}

public int getCompleteAckNums() {
return completeAckNums;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.storage.dledger.statemachine;

import io.openmessaging.storage.dledger.snapshot.SnapshotReader;
import io.openmessaging.storage.dledger.snapshot.SnapshotWriter;
import java.util.concurrent.CompletableFuture;

/**
* Finite state machine, which should be implemented by user.
*/
public interface StateMachine {

/**
* Update the user statemachine with a batch a tasks that can be accessed
* through |iterator|.
*
* @param iter iterator of committed entry
*/
void onApply(final CommittedEntryIterator iter);

/**
* User defined snapshot generate function, this method will block StateMachine#onApply(Iterator).
* Call done.run(status) when snapshot finished.
*
* @param writer snapshot writer
* @param done callback
*/
void onSnapshotSave(final SnapshotWriter writer, final CompletableFuture<Boolean> done);

/**
* User defined snapshot load function.
*
* @param reader snapshot reader
* @return true on success
*/
boolean onSnapshotLoad(final SnapshotReader reader);

/**
* Invoked once when the raft node was shut down.
* Default do nothing
*/
void onShutdown();
}
Loading

0 comments on commit 6f0bf36

Please sign in to comment.