Skip to content

Commit

Permalink
Merge branch 'unstable' into kvrocks2redis_ci
Browse files Browse the repository at this point in the history
  • Loading branch information
jihuayu authored Mar 21, 2024
2 parents aa8e555 + a9259a7 commit 009d772
Show file tree
Hide file tree
Showing 13 changed files with 464 additions and 58 deletions.
4 changes: 2 additions & 2 deletions src/cli/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,12 @@ int main(int argc, char *argv[]) {
}
bool is_supervised = IsSupervisedMode(config.supervised_mode);
if (config.daemonize && !is_supervised) Daemonize();
s = CreatePidFile(config.GetPidFile());
s = CreatePidFile(config.pidfile);
if (!s.IsOK()) {
LOG(ERROR) << "Failed to create pidfile: " << s.Msg();
return 1;
}
auto pidfile_exit = MakeScopeExit([&config] { RemovePidFile(config.GetPidFile()); });
auto pidfile_exit = MakeScopeExit([&config] { RemovePidFile(config.pidfile); });

#ifdef ENABLE_OPENSSL
// initialize OpenSSL
Expand Down
8 changes: 7 additions & 1 deletion src/commands/cmd_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "commands/command_parser.h"
#include "error_constants.h"
#include "server/server.h"
#include "status.h"
#include "types/redis_bitmap.h"

namespace redis {
Expand Down Expand Up @@ -171,6 +172,10 @@ class CommandBitPos : public Commander {
stop_ = *parse_stop;
}

if (args.size() >= 6 && util::EqualICase(args[5], "BIT")) {
is_bit_index_ = true;
}

auto parse_arg = ParseInt<int64_t>(args[2], 10);
if (!parse_arg) {
return {Status::RedisParseErr, errValueNotInteger};
Expand All @@ -189,7 +194,7 @@ class CommandBitPos : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
int64_t pos = 0;
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.BitPos(args_[1], bit_, start_, stop_, stop_given_, &pos);
auto s = bitmap_db.BitPos(args_[1], bit_, start_, stop_, stop_given_, &pos, is_bit_index_);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(pos);
Expand All @@ -201,6 +206,7 @@ class CommandBitPos : public Commander {
int64_t stop_ = -1;
bool bit_ = false;
bool stop_given_ = false;
bool is_bit_index_ = false;
};

class CommandBitOp : public Commander {
Expand Down
17 changes: 10 additions & 7 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
#include "status.h"
#include "storage/redis_metadata.h"

constexpr const char *kDefaultDir = "/tmp/kvrocks";
constexpr const char *kDefaultBackupDir = "/tmp/kvrocks/backup";
constexpr const char *kDefaultPidfile = "/tmp/kvrocks/kvrocks.pid";
constexpr const char *kDefaultBindAddress = "127.0.0.1";

constexpr const char *errBlobDbNotEnabled = "Must set rocksdb.enable_blob_files to yes first.";
Expand Down Expand Up @@ -141,11 +144,11 @@ Config::Config() {
{"force-compact-file-min-deleted-percentage", false,
new IntField(&force_compact_file_min_deleted_percentage, 10, 1, 100)},
{"db-name", true, new StringField(&db_name, "change.me.db")},
{"dir", true, new StringField(&dir, "/tmp/kvrocks")},
{"backup-dir", false, new StringField(&backup_dir_, "")},
{"dir", true, new StringField(&dir, kDefaultDir)},
{"backup-dir", false, new StringField(&backup_dir, kDefaultBackupDir)},
{"log-dir", true, new StringField(&log_dir, "")},
{"log-level", false, new EnumField<int>(&log_level, log_levels, google::INFO)},
{"pidfile", true, new StringField(&pidfile_, "")},
{"pidfile", true, new StringField(&pidfile, kDefaultPidfile)},
{"max-io-mb", false, new IntField(&max_io_mb, 0, 0, INT_MAX)},
{"max-bitmap-to-string-mb", false, new IntField(&max_bitmap_to_string_mb, 16, 0, INT_MAX)},
{"max-db-size", false, new IntField(&max_db_size, 0, 0, INT_MAX)},
Expand Down Expand Up @@ -403,6 +406,8 @@ void Config::initFieldCallback() {
checkpoint_dir = dir + "/checkpoint";
sync_checkpoint_dir = dir + "/sync_checkpoint";
backup_sync_dir = dir + "/backup_for_sync";
if (backup_dir == kDefaultBackupDir) backup_dir = dir + "/backup";
if (pidfile == kDefaultPidfile) pidfile = dir + "/kvrocks.pid";
return Status::OK();
}},
{"backup-dir",
Expand All @@ -412,8 +417,8 @@ void Config::initFieldCallback() {
// Note: currently, backup_mu_ may block by backing up or purging,
// the command may wait for seconds.
std::lock_guard<std::mutex> lg(this->backup_mu);
previous_backup = std::move(backup_dir_);
backup_dir_ = v;
previous_backup = std::move(backup_dir);
backup_dir = v;
}
if (!previous_backup.empty() && srv != nullptr && !srv->IsLoading()) {
// LOG(INFO) should be called after log is initialized and server is loaded.
Expand Down Expand Up @@ -778,9 +783,7 @@ Status Config::finish() {
if (master_port != 0 && binds.size() == 0) {
return {Status::NotOK, "replication doesn't support unix socket"};
}
if (backup_dir_.empty()) backup_dir_ = dir + "/backup";
if (db_dir.empty()) db_dir = dir + "/db";
if (pidfile_.empty()) pidfile_ = dir + "/kvrocks.pid";
if (log_dir.empty()) log_dir = dir;
std::vector<std::string> create_dirs = {dir};
for (const auto &name : create_dirs) {
Expand Down
6 changes: 2 additions & 4 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ struct Config {
std::vector<std::string> binds;
std::string dir;
std::string db_dir;
std::string backup_dir; // GUARD_BY(backup_mu_)
std::string pidfile;
std::string backup_sync_dir;
std::string checkpoint_dir;
std::string sync_checkpoint_dir;
Expand Down Expand Up @@ -237,13 +239,9 @@ struct Config {
void ClearMaster();
bool IsSlave() const { return !master_host.empty(); }
bool HasConfigFile() const { return !path_.empty(); }
std::string GetBackupDir() const { return backup_dir_.empty() ? dir + "/backup" : backup_dir_; }
std::string GetPidFile() const { return pidfile_.empty() ? dir + "/kvrocks.pid" : pidfile_; }

private:
std::string path_;
std::string backup_dir_; // GUARD_BY(backup_mu_)
std::string pidfile_;
std::string binds_str_;
std::string slaveof_;
std::string compact_cron_str_;
Expand Down
106 changes: 106 additions & 0 deletions src/search/ir.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#pragma once

#include <limits>
#include <memory>
#include <string>
#include <variant>
#include <vector>

// kqir stands for Kvorcks Query Intermediate Representation
namespace kqir {

struct FieldRef {
std::string name;
};

struct StringLiteral {
std::string val;
};

struct TagContainExpr {
FieldRef field;
StringLiteral tag;
};

struct NumericLiteral {
double val;
};

struct NumericCompareExpr {
enum { EQ, NE, LT, LET, GT, GET } op;
FieldRef field;
NumericLiteral num;
};

struct BoolLiteral {
bool val;
};

struct BooleanExpr {
std::variant<TagContainExpr, NumericCompareExpr, BoolLiteral> expr;
};

struct QueryExpr;

struct LogicalUnaryExpr {
enum { NOT } op;
std::unique_ptr<QueryExpr> inner;
};

struct LogicalBinaryExpr {
enum { AND, OR } op;
std::unique_ptr<QueryExpr> lhs;
std::unique_ptr<QueryExpr> rhs;
};

struct QueryExpr {
std::variant<LogicalUnaryExpr, LogicalBinaryExpr, BooleanExpr> expr;
};

struct Limit {
size_t offset = 0;
size_t count = std::numeric_limits<size_t>::max();
};

struct SortBy {
enum { ASC, DESC } order;
FieldRef field;
};

struct SelectExpr {
std::vector<FieldRef> fields;
};

struct IndexRef {
std::string name;
};

struct SearchStmt {
IndexRef index_ref;
QueryExpr query_expr;
Limit limit;
SortBy sort_by;
SelectExpr select_expr;
};

} // namespace kqir
4 changes: 2 additions & 2 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ Status Storage::Open(DBOpenMode mode) {
Status Storage::CreateBackup(uint64_t *sequence_number) {
LOG(INFO) << "[storage] Start to create new backup";
std::lock_guard<std::mutex> lg(config_->backup_mu);
std::string task_backup_dir = config_->GetBackupDir();
std::string task_backup_dir = config_->backup_dir;

std::string tmpdir = task_backup_dir + ".tmp";
// Maybe there is a dirty tmp checkpoint, try to clean it
Expand Down Expand Up @@ -535,7 +535,7 @@ void Storage::EmptyDB() {
void Storage::PurgeOldBackups(uint32_t num_backups_to_keep, uint32_t backup_max_keep_hours) {
time_t now = util::GetTimeStamp();
std::lock_guard<std::mutex> lg(config_->backup_mu);
std::string task_backup_dir = config_->GetBackupDir();
std::string task_backup_dir = config_->backup_dir;

// Return if there is no backup
auto s = env_->FileExists(task_backup_dir);
Expand Down
79 changes: 65 additions & 14 deletions src/types/redis_bitmap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include "redis_bitmap.h"

#include <algorithm>
#include <cstdint>
#include <memory>
#include <utility>
#include <vector>

#include "common/bit_util.h"
Expand Down Expand Up @@ -307,7 +309,9 @@ rocksdb::Status Bitmap::BitCount(const Slice &user_key, int64_t start, int64_t s
}

rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, int64_t stop, bool stop_given,
int64_t *pos) {
int64_t *pos, bool is_bit_index) {
if (is_bit_index) DCHECK(stop_given);

std::string raw_value;
std::string ns_key = AppendNamespacePrefix(user_key);

Expand All @@ -323,11 +327,15 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, i
if (metadata.Type() == kRedisString) {
ss = std::nullopt;
redis::BitmapString bitmap_string_db(storage_, namespace_);
return bitmap_string_db.BitPos(raw_value, bit, start, stop, stop_given, pos);
return bitmap_string_db.BitPos(raw_value, bit, start, stop, stop_given, pos, is_bit_index);
}
std::tie(start, stop) = BitmapString::NormalizeRange(start, stop, static_cast<int64_t>(metadata.size));
auto u_start = static_cast<uint32_t>(start);
auto u_stop = static_cast<uint32_t>(stop);

uint32_t to_bit_factor = is_bit_index ? 8 : 1;
auto size = static_cast<int64_t>(metadata.size) * static_cast<int64_t>(to_bit_factor);

std::tie(start, stop) = BitmapString::NormalizeRange(start, stop, size);
auto u_start = static_cast<uint64_t>(start);
auto u_stop = static_cast<uint64_t>(stop);
if (u_start > u_stop) {
*pos = -1;
return rocksdb::Status::OK();
Expand All @@ -341,13 +349,40 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, i
return -1;
};

auto bit_pos_in_byte_startstop = [](char byte, bool bit, uint32_t start, uint32_t stop) -> int {
for (uint32_t i = start; i <= stop; i++) {
if (bit && (byte & (1 << i)) != 0) return (int)i; // typecast to int since the value ranges from 0 to 7
if (!bit && (byte & (1 << i)) == 0) return (int)i;
}
return -1;
};

rocksdb::ReadOptions read_options;
read_options.snapshot = ss->GetSnapShot();
uint32_t start_index = u_start / kBitmapSegmentBytes;
uint32_t stop_index = u_stop / kBitmapSegmentBytes;
// if bit index, (Eg start = 1, stop = 35), then
// u_start = 1/8 = 0, u_stop = 35/8 = 4 (in bytes)
uint32_t start_segment_index = (u_start / to_bit_factor) / kBitmapSegmentBytes;
uint32_t stop_segment_index = (u_stop / to_bit_factor) / kBitmapSegmentBytes;
uint32_t start_bit_pos_in_byte = 0;
uint32_t stop_bit_pos_in_byte = 0;

if (is_bit_index) {
start_bit_pos_in_byte = u_start % 8;
stop_bit_pos_in_byte = u_stop % 8;
}

auto range_in_byte = [start_bit_pos_in_byte, stop_bit_pos_in_byte](
uint32_t byte_start, uint32_t byte_end,
uint32_t curr_byte) -> std::pair<uint32_t, uint32_t> {
if (curr_byte == byte_start && curr_byte == byte_end) return {start_bit_pos_in_byte, stop_bit_pos_in_byte};
if (curr_byte == byte_start) return {start_bit_pos_in_byte, 7};
if (curr_byte == byte_end) return {0, stop_bit_pos_in_byte};
return {0, 7};
};

// Don't use multi get to prevent large range query, and take too much memory
// Searching bits in segments [start_index, stop_index].
for (uint32_t i = start_index; i <= stop_index; i++) {
for (uint32_t i = start_segment_index; i <= stop_segment_index; i++) {
rocksdb::PinnableSlice pin_value;
std::string sub_key =
InternalKey(ns_key, std::to_string(i * kBitmapSegmentBytes), metadata.version, storage_->IsSlotIdEncoded())
Expand All @@ -364,17 +399,33 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, i
continue;
}
size_t byte_pos_in_segment = 0;
if (i == start_index) byte_pos_in_segment = u_start % kBitmapSegmentBytes;
size_t byte_with_bit_start = -1;
size_t byte_with_bit_stop = -2;
// if bit index, (Eg start = 1, stop = 35), then
// byte_pos_in_segment should be calculated in bytes, hence divide by 8
if (i == start_segment_index) {
byte_pos_in_segment = (u_start / to_bit_factor) % kBitmapSegmentBytes;
byte_with_bit_start = byte_pos_in_segment;
}
size_t stop_byte_in_segment = pin_value.size();
if (i == stop_index) {
DCHECK_LE(u_stop % kBitmapSegmentBytes + 1, pin_value.size());
stop_byte_in_segment = u_stop % kBitmapSegmentBytes + 1;
if (i == stop_segment_index) {
DCHECK_LE((u_stop / to_bit_factor) % kBitmapSegmentBytes + 1, pin_value.size());
stop_byte_in_segment = (u_stop / to_bit_factor) % kBitmapSegmentBytes + 1;
byte_with_bit_stop = stop_byte_in_segment;
}
// Invariant:
// 1. pin_value.size() <= kBitmapSegmentBytes.
// 2. If it's the last segment, metadata.size % kBitmapSegmentBytes <= pin_value.size().
for (; byte_pos_in_segment < stop_byte_in_segment; byte_pos_in_segment++) {
int bit_pos_in_byte_value = bit_pos_in_byte(pin_value[byte_pos_in_segment], bit);
int bit_pos_in_byte_value = -1;
if (is_bit_index) {
uint32_t start_bit = 0, stop_bit = 7;
std::tie(start_bit, stop_bit) = range_in_byte(byte_with_bit_start, byte_with_bit_stop, byte_pos_in_segment);
bit_pos_in_byte_value = bit_pos_in_byte_startstop(pin_value[byte_pos_in_segment], bit, start_bit, stop_bit);
} else {
bit_pos_in_byte_value = bit_pos_in_byte(pin_value[byte_pos_in_segment], bit);
}

if (bit_pos_in_byte_value != -1) {
*pos = static_cast<int64_t>(i * kBitmapSegmentBits + byte_pos_in_segment * 8 + bit_pos_in_byte_value);
return rocksdb::Status::OK();
Expand All @@ -387,7 +438,7 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, i
// 1. If it's the last segment, we've done searching in the above loop.
// 2. If it's not the last segment, we can check if the segment is all 0.
if (pin_value.size() < kBitmapSegmentBytes) {
if (i == stop_index) {
if (i == stop_segment_index) {
continue;
}
*pos = static_cast<int64_t>(i * kBitmapSegmentBits + pin_value.size() * 8);
Expand Down
3 changes: 2 additions & 1 deletion src/types/redis_bitmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class Bitmap : public Database {
rocksdb::Status GetString(const Slice &user_key, uint32_t max_btos_size, std::string *value);
rocksdb::Status SetBit(const Slice &user_key, uint32_t bit_offset, bool new_bit, bool *old_bit);
rocksdb::Status BitCount(const Slice &user_key, int64_t start, int64_t stop, bool is_bit_index, uint32_t *cnt);
rocksdb::Status BitPos(const Slice &user_key, bool bit, int64_t start, int64_t stop, bool stop_given, int64_t *pos);
rocksdb::Status BitPos(const Slice &user_key, bool bit, int64_t start, int64_t stop, bool stop_given, int64_t *pos,
bool is_bit_index);
rocksdb::Status BitOp(BitOpFlags op_flag, const std::string &op_name, const Slice &user_key,
const std::vector<Slice> &op_keys, int64_t *len);
rocksdb::Status Bitfield(const Slice &user_key, const std::vector<BitfieldOperation> &ops,
Expand Down
Loading

0 comments on commit 009d772

Please sign in to comment.