Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added SELECT redis command #482

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions redis/include/userver/storages/redis/impl/base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,19 @@ struct ConnectionInfo {
bool read_only = false;
ConnectionSecurity connection_security = ConnectionSecurity::kNone;
using HostVector = std::vector<std::string>;
size_t database_index = 0;

ConnectionInfo() = default;
ConnectionInfo(std::string host, int port, Password password,
bool read_only = false,
ConnectionSecurity security = ConnectionSecurity::kNone)
ConnectionSecurity security = ConnectionSecurity::kNone,
size_t database_index = {})
: host{std::move(host)},
port{port},
password{std::move(password)},
read_only{read_only},
connection_security(security) {}
connection_security(security),
database_index{database_index} {}
};

struct Stat {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ struct RedisSettings {

std::vector<std::string> shards;
std::vector<HostPort> sentinels;
size_t database_index{0};
redis::Password password{std::string()};
redis::ConnectionSecurity secure_connection{redis::ConnectionSecurity::kNone};
};
Expand Down
82 changes: 66 additions & 16 deletions redis/src/storages/redis/impl/redis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <storages/redis/impl/redis_stats.hpp>
#include <storages/redis/impl/tcp_socket.hpp>
#include <userver/storages/redis/impl/reply.hpp>
#include <userver/utils/scope_guard.hpp>

#include "command_control_impl.hpp"

Expand Down Expand Up @@ -146,7 +147,7 @@ class Redis::RedisImpl : public std::enable_shared_from_this<Redis::RedisImpl> {
~RedisImpl();

void Connect(const ConnectionInfo::HostVector& host_addrs, int port,
const Password& password);
const Password& password, size_t database_index);
void Disconnect();

bool AsyncCommand(const CommandPtr& command);
Expand Down Expand Up @@ -233,6 +234,7 @@ class Redis::RedisImpl : public std::enable_shared_from_this<Redis::RedisImpl> {
void ProcessCommand(const CommandPtr& command);

void Authenticate();
void SelectDatabase();
void SendReadOnly();
void FreeCommands();

Expand All @@ -251,7 +253,8 @@ class Redis::RedisImpl : public std::enable_shared_from_this<Redis::RedisImpl> {
static bool WatchCommandTimerEnabled(
const CommandsBufferingSettings& commands_buffering_settings);

bool Connect(const std::string& host, int port, const Password& password);
bool Connect(const std::string& host, int port, const Password& password,
size_t database_index);

Redis* redis_obj_;
engine::ev::ThreadControl ev_thread_control_;
Expand All @@ -272,6 +275,7 @@ class Redis::RedisImpl : public std::enable_shared_from_this<Redis::RedisImpl> {
uint16_t port_ = 0;
std::string server_;
Password password_{std::string()};
std::size_t database_index_ = 0;
std::atomic<size_t> commands_size_ = 0;
size_t sent_count_ = 0;
size_t cmd_counter_ = 0;
Expand Down Expand Up @@ -336,8 +340,9 @@ Redis::~Redis() {
}

void Redis::Connect(const ConnectionInfo::HostVector& host_addrs, int port,
const Password& password) {
impl_->Connect(host_addrs, port, password);
const Password& password,
size_t database_index) {
impl_->Connect(host_addrs, port, password, database_index);
}

bool Redis::AsyncCommand(const CommandPtr& command) {
Expand Down Expand Up @@ -445,17 +450,19 @@ void Redis::RedisImpl::Detach() {
}

void Redis::RedisImpl::Connect(const ConnectionInfo::HostVector& host_addrs,
int port, const Password& password) {
int port, const Password& password,
size_t database_index) {
for (const auto& host : host_addrs)
if (Connect(host, port, password)) return;
if (Connect(host, port, password, database_index)) return;

LOG_ERROR() << "error async connect to Redis server (host addrs ="
<< host_addrs << ", port=" << port << ")";
SetState(State::kInitError);
}

bool Redis::RedisImpl::Connect(const std::string& host, int port,
const Password& password) {
const Password& password,
size_t database_index) {
UASSERT(context_ == nullptr);
UASSERT(state_ == State::kInit);

Expand All @@ -466,6 +473,7 @@ bool Redis::RedisImpl::Connect(const std::string& host, int port,
log_extra_.Extend("redis_server", GetServer());
log_extra_.Extend("server_id", GetServerId().GetId());
password_ = password;
database_index_ = database_index;
LOG_INFO() << log_extra_ << "Async connect to Redis server=" << GetServer();
context_ = redisAsyncConnect(host.c_str(), port);

Expand Down Expand Up @@ -1040,19 +1048,13 @@ bool Redis::RedisImpl::InitSecureConnection() {

void Redis::RedisImpl::Authenticate() {
if (password_.GetUnderlying().empty()) {
if (send_readonly_)
SendReadOnly();
else
SetState(State::kConnected);
SendReadOnly();
} else {
ProcessCommand(PrepareCommand(
CmdArgs{"AUTH", password_.GetUnderlying()},
[this](const CommandPtr&, ReplyPtr reply) {
if (*reply && reply->data.IsStatus()) {
if (send_readonly_)
SendReadOnly();
else
SetState(State::kConnected);
SendReadOnly();
} else {
if (*reply) {
if (reply->IsUnknownCommandError()) {
Expand All @@ -1079,12 +1081,17 @@ void Redis::RedisImpl::Authenticate() {
}

void Redis::RedisImpl::SendReadOnly() {
if (!send_readonly_) {
SelectDatabase();
return;
}

LOG_DEBUG() << "Send READONLY command to slave "
<< GetServerId().GetDescription() << " in cluster mode";
ProcessCommand(PrepareCommand(CmdArgs{"READONLY"}, [this](const CommandPtr&,
ReplyPtr reply) {
if (*reply && reply->data.IsStatus()) {
SetState(State::kConnected);
SelectDatabase();
} else {
if (*reply) {
LOG_LIMITED_ERROR()
Expand All @@ -1101,6 +1108,49 @@ void Redis::RedisImpl::SendReadOnly() {
}));
}

void Redis::RedisImpl::SelectDatabase() {
// To get rid of the redundant `SELECT 0` command
// since 0 is the default database index, and it will be set automatically
if (database_index_ == 0) {
SetState(RedisState::kConnected);
return;
}

ProcessCommand(PrepareCommand(
CmdArgs{"SELECT", database_index_},
[this](const CommandPtr&, ReplyPtr reply) {
if (*reply && reply->data.IsStatus()) {
SetState(RedisState::kConnected);
LOG_INFO() << log_extra_
<< "Selected redis logical database with index "
<< database_index_;
return;
}

const utils::ScopeGuard auto_disconnect([this]() { Disconnect(); });

if (!*reply) {
LOG_LIMITED_ERROR()
<< "SELECT failed with status " << reply->status << " ("
<< reply->status_string << ") " << log_extra_;
return;
}

if (reply->IsUnknownCommandError()) {
LOG_WARNING() << log_extra_
<< "SELECT failed: unknown command `SELECT` - "
"possible when connecting to Sentinel instead "
"of Redis master or slave instance";
return;
}

LOG_LIMITED_ERROR()
<< log_extra_
<< "SELECT failed: response type=" << reply->data.GetTypeString()
<< " msg=" << reply->data.ToDebugString();
}));
}

void Redis::RedisImpl::OnRedisReply(redisAsyncContext* c, void* r,
void* privdata) noexcept {
auto* impl = static_cast<Redis::RedisImpl*>(c->data);
Expand Down
3 changes: 2 additions & 1 deletion redis/src/storages/redis/impl/redis.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class Redis {
Redis(Redis&& o) = delete;

void Connect(const ConnectionInfo::HostVector& host_addrs, int port,
const Password& password);
const Password& password,
size_t database_index = 0);

bool AsyncCommand(const CommandPtr& command);
size_t GetRunningCommands() const;
Expand Down
1 change: 1 addition & 0 deletions redis/src/storages/redis/impl/redis_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const std::string_view kCommandTypes[] = {
"scan",
"scard",
"script",
"select",
"sentinel",
"set",
"setex",
Expand Down
26 changes: 15 additions & 11 deletions redis/src/storages/redis/impl/sentinel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,18 @@ void OnSubscribeImpl(std::string_view message_type,

} // namespace

Sentinel::Sentinel(
const std::shared_ptr<ThreadPools>& thread_pools,
const std::vector<std::string>& shards,
const std::vector<ConnectionInfo>& conns, std::string shard_group_name,
const std::string& client_name, const Password& password,
ConnectionSecurity connection_security, ReadyChangeCallback ready_callback,
dynamic_config::Source dynamic_config_source,
std::unique_ptr<KeyShard>&& key_shard, CommandControl command_control,
const testsuite::RedisControl& testsuite_redis_control, ConnectionMode mode)
Sentinel::Sentinel(const std::shared_ptr<ThreadPools>& thread_pools,
const std::vector<std::string>& shards,
const std::vector<ConnectionInfo>& conns,
std::string shard_group_name, const std::string& client_name,
const Password& password,
ConnectionSecurity connection_security,
ReadyChangeCallback ready_callback,
dynamic_config::Source dynamic_config_source,
std::unique_ptr<KeyShard>&& key_shard,
CommandControl command_control,
const testsuite::RedisControl& testsuite_redis_control,
ConnectionMode mode, size_t database_index)
: thread_pools_(thread_pools),
secdist_default_command_control_(command_control),
testsuite_redis_control_(testsuite_redis_control) {
Expand All @@ -96,7 +99,7 @@ Sentinel::Sentinel(
*sentinel_thread_control_, thread_pools_->GetRedisThreadPool(), *this,
shards, conns, std::move(shard_group_name), client_name, password,
connection_security, std::move(ready_callback), std::move(key_shard),
dynamic_config_source, mode);
dynamic_config_source, mode, database_index);
}
});
}
Expand Down Expand Up @@ -176,7 +179,8 @@ std::shared_ptr<Sentinel> Sentinel::CreateSentinel(
thread_pools, shards, conns, std::move(shard_group_name), client_name,
password, settings.secure_connection, std::move(ready_callback),
dynamic_config_source, std::move(key_shard), command_control,
testsuite_redis_control);
testsuite_redis_control, ConnectionMode::kCommands,
settings.database_index);
client->Start();
}

Expand Down
3 changes: 2 additions & 1 deletion redis/src/storages/redis/impl/sentinel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class Sentinel {
std::unique_ptr<KeyShard>&& key_shard = nullptr,
CommandControl command_control = {},
const testsuite::RedisControl& testsuite_redis_control = {},
ConnectionMode mode = ConnectionMode::kCommands);
ConnectionMode mode = ConnectionMode::kCommands,
size_t database_index = 0);
virtual ~Sentinel();

void Start();
Expand Down
8 changes: 6 additions & 2 deletions redis/src/storages/redis/impl/sentinel_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ SentinelImpl::SentinelImpl(
const std::string& client_name, const Password& password,
ConnectionSecurity connection_security, ReadyChangeCallback ready_callback,
std::unique_ptr<KeyShard>&& key_shard,
dynamic_config::Source dynamic_config_source, ConnectionMode mode)
dynamic_config::Source dynamic_config_source, ConnectionMode mode,
size_t database_index)
: sentinel_obj_(sentinel),
ev_thread_(sentinel_thread_control),
shard_group_name_(std::move(shard_group_name)),
Expand All @@ -79,7 +80,8 @@ SentinelImpl::SentinelImpl(
key_shard_(std::move(key_shard)),
connection_mode_(mode),
slot_info_(IsInClusterMode() ? std::make_unique<SlotInfo>() : nullptr),
dynamic_config_source_(dynamic_config_source) {
dynamic_config_source_(dynamic_config_source),
database_index_(database_index) {
for (size_t i = 0; i < init_shards_->size(); ++i) {
shards_[(*init_shards_)[i]] = i;
connected_statuses_.push_back(std::make_unique<ConnectedStatus>());
Expand Down Expand Up @@ -637,6 +639,7 @@ void SentinelImpl::ReadSentinels() {
for (auto shard_conn : info) {
if (shards_.find(shard_conn.Name()) != shards_.end()) {
shard_conn.SetConnectionSecurity(connection_security_);
shard_conn.SetDatabaseIndex(database_index_);
shard_found[shards_[shard_conn.Name()]] = true;
watcher->host_port_to_shard[shard_conn.HostPort()] =
shards_[shard_conn.Name()];
Expand Down Expand Up @@ -675,6 +678,7 @@ void SentinelImpl::ReadSentinels() {
shard_conn.SetName(shard);
shard_conn.SetReadOnly(true);
shard_conn.SetConnectionSecurity(connection_security_);
shard_conn.SetDatabaseIndex(database_index_);
if (shards_.find(shard_conn.Name()) != shards_.end())
watcher->host_port_to_shard[shard_conn.HostPort()] =
shards_[shard_conn.Name()];
Expand Down
4 changes: 3 additions & 1 deletion redis/src/storages/redis/impl/sentinel_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ class SentinelImpl : public SentinelImplBase {
ReadyChangeCallback ready_callback,
std::unique_ptr<KeyShard>&& key_shard,
dynamic_config::Source dynamic_config_source,
ConnectionMode mode = ConnectionMode::kCommands);
ConnectionMode mode = ConnectionMode::kCommands,
size_t database_index = 0);
~SentinelImpl() override;

std::unordered_map<ServerId, size_t, ServerIdHasher>
Expand Down Expand Up @@ -285,6 +286,7 @@ class SentinelImpl : public SentinelImplBase {
std::optional<CommandsBufferingSettings> commands_buffering_settings_;
dynamic_config::Source dynamic_config_source_;
std::atomic<int> publish_shard_{0};
std::size_t database_index_{0};
};

} // namespace redis
Expand Down
Loading
Loading