Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
cangfengzhs committed Nov 29, 2022
1 parent 653294d commit 17cd3f8
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 45 deletions.
12 changes: 0 additions & 12 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -254,18 +254,6 @@ jobs:
volumes:
- /tmp/ccache/nebula/${{ matrix.os }}-${{ matrix.compiler }}:/tmp/ccache/nebula/${{ matrix.os }}-${{ matrix.compiler }}
options: --cap-add=SYS_PTRACE
services:
elasticsearch:
image: elasticsearch:7.17.7
ports:
- 9200:9200
env:
discovery.type: single-node
options: >-
--health-cmd "curl elasticsearch:9200"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: webiny/[email protected]
with:
Expand Down
57 changes: 41 additions & 16 deletions src/kvstore/plugins/elasticsearch/ESListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,9 @@ void ESListener::processLogs() {
}

LogID lastApplyId = -1;
// the kv pair which can sync to remote safely
std::vector<KV> data;
// // the kv pair which can sync to remote safely
// std::vector<KV> data;
BatchHolder batch;
while (iter->valid()) {
lastApplyId = iter->logId();

Expand All @@ -238,28 +239,52 @@ void ESListener::processLogs() {
case OP_PUT: {
auto pieces = decodeMultiValues(log);
DCHECK_EQ(2, pieces.size());
data.emplace_back(pieces[0], pieces[1]);
batch.put(pieces[0].toString(), pieces[1].toString());
break;
}
case OP_MULTI_PUT: {
auto kvs = decodeMultiValues(log);
DCHECK_EQ(0, kvs.size() % 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
data.emplace_back(kvs[i], kvs[i + 1]);
batch.put(kvs[i].toString(), kvs[i + 1].toString());
}
break;
}
case OP_REMOVE:
case OP_REMOVE_RANGE:
case OP_REMOVE: {
auto key = decodeSingleValue(log);
batch.remove(key.toString());
break;
}
case OP_REMOVE_RANGE: {
auto kvs = decodeMultiValues(log);
DCHECK_EQ(2, kvs.size());
batch.rangeRemove(kvs[0].toString(), kvs[1].toString());
break;
}
case OP_MULTI_REMOVE: {
auto keys = decodeMultiValues(log);
for (auto key : keys) {
batch.remove(key.toString());
}
break;
}
case OP_BATCH_WRITE: {
auto batch = decodeBatchValue(log);
for (auto& op : batch) {
auto batchData = decodeBatchValue(log);
for (auto& op : batchData) {
// OP_BATCH_REMOVE and OP_BATCH_REMOVE_RANGE is igored
if (op.first == BatchLogType::OP_BATCH_PUT) {
data.emplace_back(op.second.first, op.second.second);
switch (op.first) {
case BatchLogType::OP_BATCH_PUT: {
batch.put(op.second.first.toString(), op.second.second.toString());
break;
}
case BatchLogType::OP_BATCH_REMOVE: {
batch.remove(op.second.first.toString());
break;
}
case BatchLogType::OP_BATCH_REMOVE_RANGE: {
batch.rangeRemove(op.second.first.toString(), op.second.second.toString());
break;
}
}
}
break;
Expand All @@ -275,13 +300,14 @@ void ESListener::processLogs() {
}
}

if (static_cast<int32_t>(data.size()) > FLAGS_listener_commit_batch_size) {
if (static_cast<int32_t>(batch.size()) > FLAGS_listener_commit_batch_size) {
break;
}
++(*iter);
}

// apply to state machine
if (lastApplyId != -1 && apply(data)) {
if (lastApplyId != -1 && apply(batch)) {
std::lock_guard<std::mutex> guard(raftLock_);
lastApplyLogId_ = lastApplyId;
persist(committedLogId_, term_, lastApplyLogId_);
Expand All @@ -298,15 +324,14 @@ std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> ESListener::commitSnapshot
VLOG(2) << idStr_ << "Listener is committing snapshot.";
int64_t count = 0;
int64_t size = 0;
std::vector<KV> data;
data.reserve(rows.size());
BatchHolder batch;
for (const auto& row : rows) {
count++;
size += row.size();
auto kv = decodeKV(row);
data.emplace_back(kv.first, kv.second);
batch.put(kv.first.toString(), kv.second.toString());
}
if (!apply(data)) {
if (!apply(batch)) {
LOG(INFO) << idStr_ << "Failed to apply data while committing snapshot.";
return {
nebula::cpp2::ErrorCode::E_RAFT_PERSIST_SNAPSHOT_FAILED, kNoSnapshotCount, kNoSnapshotSize};
Expand Down
1 change: 0 additions & 1 deletion src/kvstore/plugins/elasticsearch/ESListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class ESListener : public Listener {
*/
bool apply(const BatchHolder& batch);


/**
* @brief Persist commitLogId commitLogTerm and lastApplyLogId
*/
Expand Down
55 changes: 39 additions & 16 deletions src/kvstore/test/NebulaListenerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,14 @@ class DummyListener : public Listener {
int64_t size = 0;
std::tuple<nebula::cpp2::ErrorCode, nebula::LogID, nebula::TermID> result{
nebula::cpp2::ErrorCode::SUCCEEDED, count, size};
std::vector<KV> data;
data.reserve(rows.size());
BatchHolder batch;
for (const auto& row : rows) {
count++;
size += row.size();
auto kv = decodeKV(row);
data.emplace_back(kv.first, kv.second);
batch.put(kv.first.toString(), kv.second.toString());
}
if (!apply(data)) {
if (!apply(batch)) {
LOG(INFO) << idStr_ << "Failed to apply data while committing snapshot.";
result = {nebula::cpp2::ErrorCode::E_RAFT_PERSIST_SNAPSHOT_FAILED,
kNoSnapshotCount,
Expand Down Expand Up @@ -124,8 +123,8 @@ class DummyListener : public Listener {
}

LogID lastApplyId = -1;
// the kv pair which can sync to remote safely
std::vector<KV> data;
// // the kv pair which can sync to remote safely
BatchHolder batch;
while (iter->valid()) {
lastApplyId = iter->logId();

Expand All @@ -141,28 +140,52 @@ class DummyListener : public Listener {
case OP_PUT: {
auto pieces = decodeMultiValues(log);
DCHECK_EQ(2, pieces.size());
data.emplace_back(pieces[0], pieces[1]);
batch.put(pieces[0].toString(), pieces[1].toString());
break;
}
case OP_MULTI_PUT: {
auto kvs = decodeMultiValues(log);
DCHECK_EQ(0, kvs.size() % 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
data.emplace_back(kvs[i], kvs[i + 1]);
batch.put(kvs[i].toString(), kvs[i + 1].toString());
}
break;
}
case OP_REMOVE:
case OP_REMOVE_RANGE:
case OP_REMOVE: {
auto key = decodeSingleValue(log);
batch.remove(key.toString());
break;
}
case OP_REMOVE_RANGE: {
auto kvs = decodeMultiValues(log);
DCHECK_EQ(2, kvs.size());
batch.rangeRemove(kvs[0].toString(), kvs[1].toString());
break;
}
case OP_MULTI_REMOVE: {
auto keys = decodeMultiValues(log);
for (auto key : keys) {
batch.remove(key.toString());
}
break;
}
case OP_BATCH_WRITE: {
auto batch = decodeBatchValue(log);
for (auto& op : batch) {
auto batchData = decodeBatchValue(log);
for (auto& op : batchData) {
// OP_BATCH_REMOVE and OP_BATCH_REMOVE_RANGE is igored
if (op.first == BatchLogType::OP_BATCH_PUT) {
data.emplace_back(op.second.first, op.second.second);
switch (op.first) {
case BatchLogType::OP_BATCH_PUT: {
batch.put(op.second.first.toString(), op.second.second.toString());
break;
}
case BatchLogType::OP_BATCH_REMOVE: {
batch.remove(op.second.first.toString());
break;
}
case BatchLogType::OP_BATCH_REMOVE_RANGE: {
batch.rangeRemove(op.second.first.toString(), op.second.second.toString());
break;
}
}
}
break;
Expand All @@ -180,7 +203,7 @@ class DummyListener : public Listener {
++(*iter);
}
// apply to state machine
if (lastApplyId != -1 && apply(data)) {
if (lastApplyId != -1 && apply(batch)) {
std::lock_guard<std::mutex> guard(raftLock_);
lastApplyLogId_ = lastApplyId;
persist(committedLogId_, term_, lastApplyLogId_);
Expand All @@ -192,7 +215,7 @@ class DummyListener : public Listener {
protected:
void init() override {}

bool apply(const BatchHolder& batch) override {
bool apply(const BatchHolder& batch) {
for (auto& log : batch.getBatch()) {
switch (std::get<0>(log)) {
case BatchLogType::OP_BATCH_PUT: {
Expand Down

0 comments on commit 17cd3f8

Please sign in to comment.