Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory lock in raft #3926

Merged
merged 5 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/kvstore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ nebula_add_library(
PartManager.cpp
NebulaStore.cpp
RocksEngineConfig.cpp
LogEncoder.cpp
NebulaSnapshotManager.cpp
RateLimiter.cpp
plugins/elasticsearch/ESListener.cpp
Expand Down
10 changes: 10 additions & 0 deletions src/kvstore/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,16 @@ inline rocksdb::Slice toSlice(const folly::StringPiece& str) {
using KVMap = std::unordered_map<std::string, std::string>;
using KVArrayIterator = std::vector<KV>::const_iterator;

class MergeableAtomicOpResult {
public:
nebula::cpp2::ErrorCode code;
std::string batch; // batched result, like before.
std::list<std::string> readSet;
std::list<std::string> writeSet;
};

using MergeableAtomicOp = folly::Function<MergeableAtomicOpResult(void)>;

} // namespace kvstore
} // namespace nebula

Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ class KVStore {
*/
virtual void asyncAtomicOp(GraphSpaceID spaceId,
PartitionID partId,
raftex::AtomicOp op,
MergeableAtomicOp op,
KVCallback cb) = 0;

/**
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ void NebulaStore::asyncRemoveRange(GraphSpaceID spaceId,

void NebulaStore::asyncAtomicOp(GraphSpaceID spaceId,
PartitionID partId,
raftex::AtomicOp op,
MergeableAtomicOp op,
KVCallback cb) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ class NebulaStore : public KVStore, public Handler {
*/
void asyncAtomicOp(GraphSpaceID spaceId,
PartitionID partId,
raftex::AtomicOp op,
MergeableAtomicOp op,
KVCallback cb) override;

/**
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ void Part::sync(KVCallback cb) {
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::asyncAtomicOp(raftex::AtomicOp op, KVCallback cb) {
void Part::asyncAtomicOp(MergeableAtomicOp op, KVCallback cb) {
atomicOpAsync(std::move(op))
.thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class Part : public raftex::RaftPart {
* @param op Atomic operation
* @param cb Callback when has a result
*/
void asyncAtomicOp(raftex::AtomicOp op, KVCallback cb);
void asyncAtomicOp(MergeableAtomicOp op, KVCallback cb);

/**
* @brief Add a raft learner asynchronously by adding raft log
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/raftex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ nebula_add_library(
RaftexService.cpp
Host.cpp
SnapshotManager.cpp
../LogEncoder.cpp
)

nebula_add_subdirectory(test)
Loading