Skip to content

Commit

Permalink
Merge branch 'sequential' of github.com:jackwener/nebula into sequential
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener committed Jan 26, 2022
2 parents 91f1ec2 + 7cd00e2 commit 3564f45
Show file tree
Hide file tree
Showing 57 changed files with 1,356 additions and 1,241 deletions.
6 changes: 3 additions & 3 deletions src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ ::nebula::cpp2::ErrorCode getErrorCode(T& tryResp) {
switch (stResp.status().code()) {
case Status::Code::kLeaderChanged:
return nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
case Status::Code::kError:
return nebula::cpp2::ErrorCode::E_RPC_FAILURE;
default:
LOG(ERROR) << "not impl error transform: code="
<< static_cast<int32_t>(stResp.status().code());
Expand Down Expand Up @@ -69,8 +71,8 @@ void InternalStorageClient::chainUpdateEdge(cpp2::UpdateEdgeRequest& reversedReq

std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = getErrorCode(t);
VLOG(1) << "chainUpdateEdge rpc: " << apache::thrift::util::enumNameSafe(code);
if (code == ::nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
chainUpdateEdge(reversedRequest, termOfSrc, optVersion, std::move(p));
} else {
p.setValue(code);
Expand Down Expand Up @@ -108,7 +110,6 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = getErrorCode(t);
if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
chainAddEdges(directReq, termId, optVersion, std::move(p));
} else {
p.setValue(code);
Expand Down Expand Up @@ -165,7 +166,6 @@ void InternalStorageClient::chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = getErrorCode(t);
if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
chainDeleteEdges(req, txnId, termId, std::move(p));
} else {
p.setValue(code);
Expand Down
14 changes: 4 additions & 10 deletions src/common/utils/MemoryLockWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class MemoryLockGuard {
}

~MemoryLockGuard() {
if (locked_) {
if (locked_ && autoUnlock_) {
lock_->unlockBatch(keys_);
}
}
Expand All @@ -71,22 +71,16 @@ class MemoryLockGuard {
return *iter_;
}

// this will manual set the lock to unlocked state
// which mean will not release all locks automatically
// please make sure you really know the side effect
void forceLock() {
locked_ = true;
}

void forceUnlock() {
locked_ = false;
void setAutoUnlock(bool autoUnlock) {
autoUnlock_ = autoUnlock;
}

protected:
MemoryLockCore<Key>* lock_;
std::vector<Key> keys_;
typename std::vector<Key>::iterator iter_;
bool locked_{false};
bool autoUnlock_{true};
};

} // namespace nebula
4 changes: 4 additions & 0 deletions src/common/utils/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,8 @@ std::string NebulaKeyUtils::dataVersionKey() {
return "\xFF\xFF\xFF\xFF";
}

std::string NebulaKeyUtils::dataVersionValue() {
return "3.0";
}

} // namespace nebula
2 changes: 2 additions & 0 deletions src/common/utils/NebulaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ class NebulaKeyUtils final {

static std::string dataVersionKey();

static std::string dataVersionValue();

static_assert(sizeof(NebulaKeyType) == sizeof(PartitionID));

private:
Expand Down
2 changes: 1 addition & 1 deletion src/graph/validator/MaintainValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ Status ShowZonesValidator::toPlan() {
}

Status AddHostsIntoZoneValidator::validateImpl() {
return Status::OK();
return Status::SemanticError("Add hosts into zone is unsupported");
}

Status AddHostsIntoZoneValidator::toPlan() {
Expand Down
1 change: 1 addition & 0 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ enum ErrorCode {
E_RAFT_WRITE_BLOCKED = -3528,
E_RAFT_BUFFER_OVERFLOW = -3529,
E_RAFT_ATOMIC_OP_FAILED = -3530,
E_LEADER_LEASE_FAILED = -3531,

E_UNKNOWN = -8000,
} (cpp.enum_strict)
3 changes: 2 additions & 1 deletion src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,8 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId,
}
auto part = nebula::value(ret);
if (!checkLeader(part, canReadFromFollower)) {
return nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
return part->isLeader() ? nebula::cpp2::ErrorCode::E_LEADER_LEASE_FAILED
: nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
}
return part->engine()->get(key, value);
}
Expand Down
12 changes: 12 additions & 0 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "common/base/Base.h"
#include "common/fs/FileUtils.h"
#include "common/utils/MetaKeyUtils.h"
#include "common/utils/NebulaKeyUtils.h"
#include "kvstore/KVStore.h"

Expand Down Expand Up @@ -124,6 +125,17 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId,
status = rocksdb::DB::Open(options, path, &db);
}
CHECK(status.ok()) << status.ToString();
if (!readonly && spaceId_ != kDefaultSpaceId /* only for storage*/) {
rocksdb::ReadOptions readOptions;
std::string dataVersionValue = "";
status = db->Get(readOptions, NebulaKeyUtils::dataVersionKey(), &dataVersionValue);
if (status.IsNotFound()) {
rocksdb::WriteOptions writeOptions;
status = db->Put(
writeOptions, NebulaKeyUtils::dataVersionKey(), NebulaKeyUtils::dataVersionValue());
}
CHECK(status.ok()) << status.ToString();
}
db_.reset(db);
extractorLen_ = sizeof(PartitionID) + vIdLen;
partsNum_ = allParts().size();
Expand Down
5 changes: 1 addition & 4 deletions src/meta/ActiveHostsMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ nebula::cpp2::ErrorCode ActiveHostsMan::updateHostInfo(kvstore::KVStore* kv,
}
}
// indicate whether any leader info is updated
bool hasUpdate = false;
if (!data.empty()) {
hasUpdate = true;
}
bool hasUpdate = !data.empty();
data.emplace_back(MetaKeyUtils::hostKey(hostAddr.host, hostAddr.port), HostInfo::encodeV2(info));

folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());
Expand Down
12 changes: 10 additions & 2 deletions src/meta/processors/parts/CreateSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
return;
}

int32_t activeZoneSize = 0;
std::unordered_map<std::string, Hosts> zoneHosts;
for (auto& zone : zones) {
auto zoneKey = MetaKeyUtils::zoneKey(zone);
Expand All @@ -194,14 +195,14 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
auto key = MetaKeyUtils::hostKey(host.host, host.port);
auto ret = doGet(key);
if (!nebula::ok(ret)) {
code = nebula::error(ret);
LOG(ERROR) << "Get host " << host << " failed.";
break;
continue;
}

HostInfo info = HostInfo::decode(nebula::value(ret));
if (now - info.lastHBTimeInMilliSec_ <
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000) {
activeZoneSize += 1;
auto hostIter = hostLoading_.find(host);
if (hostIter == hostLoading_.end()) {
hostLoading_[host] = 0;
Expand All @@ -218,6 +219,13 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
zoneHosts[zone] = std::move(hosts);
}

if (replicaFactor > activeZoneSize) {
LOG(ERROR) << "Replication number should less than or equal to active zone number.";
handleErrorCode(nebula::cpp2::ErrorCode::E_ZONE_NOT_ENOUGH);
onFinished();
return;
}

if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Create space failed";
handleErrorCode(code);
Expand Down
65 changes: 52 additions & 13 deletions src/meta/test/ProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,33 @@ TEST(ProcessorTest, SpaceTest) {
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
{
std::vector<HostAddr> hosts = {{"0", 0}, {"1", 1}, {"2", 2}, {"3", 3}};
TestUtils::registerHB(kv.get(), hosts);
// Attempt to register heartbeat
const ClusterID kClusterId = 10;
for (auto i = 0; i < 4; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr(std::to_string(i), i);
req.cluster_id_ref() = kClusterId;
auto* processor = HBProcessor::instance(kv.get(), nullptr, kClusterId);
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
}
{
cpp2::ListZonesReq req;
auto* processor = ListZonesProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
auto zones = resp.get_zones();
ASSERT_EQ(4, zones.size());
ASSERT_EQ("default_zone_0_0", zones[0].get_zone_name());
ASSERT_EQ("default_zone_1_1", zones[1].get_zone_name());
ASSERT_EQ("default_zone_2_2", zones[2].get_zone_name());
ASSERT_EQ("default_zone_3_3", zones[3].get_zone_name());
}
int32_t hostsNum = 4;
{
Expand Down Expand Up @@ -473,6 +498,31 @@ TEST(ProcessorTest, SpaceTest) {
auto dresp = std::move(df).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, dresp.get_code());
}
{
cpp2::AddHostsReq req;
std::vector<HostAddr> hosts = {{"4", 4}};
req.hosts_ref() = std::move(hosts);
auto* processor = AddHostsProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
{
cpp2::SpaceDesc properties;
properties.space_name_ref() = "default_space";
properties.partition_num_ref() = 8;
properties.replica_factor_ref() = 5;
properties.charset_name_ref() = "utf8";
properties.collate_name_ref() = "utf8_bin";
cpp2::CreateSpaceReq req;
req.properties_ref() = std::move(properties);
auto* processor = CreateSpaceProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_ZONE_NOT_ENOUGH, resp.get_code());
}
}

TEST(ProcessorTest, CreateTagTest) {
Expand Down Expand Up @@ -2569,7 +2619,6 @@ TEST(ProcessorTest, HostsTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down Expand Up @@ -2610,7 +2659,6 @@ TEST(ProcessorTest, HostsTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down Expand Up @@ -2749,7 +2797,6 @@ TEST(ProcessorTest, HostsTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand All @@ -2770,7 +2817,6 @@ TEST(ProcessorTest, AddHostsIntoNewZoneTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down Expand Up @@ -2824,7 +2870,6 @@ TEST(ProcessorTest, AddHostsIntoNewZoneTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down Expand Up @@ -2871,7 +2916,6 @@ TEST(ProcessorTest, AddHostsIntoZoneTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down Expand Up @@ -3028,7 +3072,6 @@ TEST(ProcessorTest, AddHostsIntoZoneTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand All @@ -3049,7 +3092,6 @@ TEST(ProcessorTest, DropHostsTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down Expand Up @@ -3080,7 +3122,6 @@ TEST(ProcessorTest, DropHostsTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down Expand Up @@ -3446,7 +3487,6 @@ TEST(ProcessorTest, RenameZoneTest) {
const ClusterID kClusterId = 10;
for (auto i = 8987; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down Expand Up @@ -3568,7 +3608,6 @@ TEST(ProcessorTest, MergeZoneTest) {
const ClusterID kClusterId = 10;
for (auto i = 8986; i < 8990; i++) {
cpp2::HBReq req;
req.role_ref() = cpp2::HostRole::STORAGE;
req.host_ref() = HostAddr("127.0.0.1", i);
req.cluster_id_ref() = kClusterId;
req.role_ref() = cpp2::HostRole::STORAGE;
Expand Down
4 changes: 2 additions & 2 deletions src/meta/upgrade/v2/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct SpaceDesc {
3: i32 replica_factor = 0,
4: binary charset_name,
5: binary collate_name,
6: ColumnTypeDef vid_type = {"type": PropertyType.FIXED_STRING, "type_length": 8},
6: ColumnTypeDef vid_type = {"type": "PropertyType.FIXED_STRING", "type_length": 8},
7: optional binary group_name,
8: optional IsolationLevel isolation_level,
9: optional binary comment,
Expand Down Expand Up @@ -78,4 +78,4 @@ struct ColumnTypeDef {
enum IsolationLevel {
DEFAULT = 0x00, // allow add half edge(either in or out edge succeeded)
TOSS = 0x01, // add in and out edge atomic
} (cpp.enum_strict)
} (cpp.enum_strict)
1 change: 1 addition & 0 deletions src/mock/MockCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ void MockCluster::initStorageKV(const char* dataPath,

txnMan_ = std::make_unique<storage::TransactionManager>(storageEnv_.get());
storageEnv_->txnMan_ = txnMan_.get();
txnMan_->start();
}

void MockCluster::startStorage(HostAddr addr,
Expand Down
9 changes: 4 additions & 5 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,13 @@ nebula_add_library(
transaction/ConsistUtil.cpp
transaction/ChainUpdateEdgeLocalProcessor.cpp
transaction/ChainUpdateEdgeRemoteProcessor.cpp
transaction/ChainResumeProcessor.cpp
transaction/ChainAddEdgesGroupProcessor.cpp
transaction/ChainAddEdgesLocalProcessor.cpp
transaction/ChainAddEdgesRemoteProcessor.cpp
transaction/ResumeAddEdgeProcessor.cpp
transaction/ResumeAddEdgeRemoteProcessor.cpp
transaction/ResumeUpdateProcessor.cpp
transaction/ResumeUpdateRemoteProcessor.cpp
transaction/ChainResumeAddPrimeProcessor.cpp
transaction/ChainResumeAddDoublePrimeProcessor.cpp
transaction/ChainResumeUpdatePrimeProcessor.cpp
transaction/ChainResumeUpdateDoublePrimeProcessor.cpp
transaction/ChainProcessorFactory.cpp
transaction/ChainDeleteEdgesGroupProcessor.cpp
transaction/ChainDeleteEdgesLocalProcessor.cpp
Expand Down
Loading

0 comments on commit 3564f45

Please sign in to comment.