Skip to content

Commit

Permalink
feat: support network IO traffic monitoring
Browse files Browse the repository at this point in the history
Support network IO traffic monitoring. Including IO bytes and kps of Redis requests and master-slave replication.

Fixes: OpenAtomFoundation#1732

Signed-off-by: yaoyinnan <[email protected]>
  • Loading branch information
yaoyinnan committed Jul 15, 2023
1 parent 355bb10 commit 413c6d6
Show file tree
Hide file tree
Showing 27 changed files with 3,910 additions and 2,192 deletions.
41 changes: 41 additions & 0 deletions include/pika_instant.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_PIKA_INSTANT_H
#define PIKA_PIKA_INSTANT_H

#include <string>
#include <unordered_map>

#define STATS_METRIC_SAMPLES 16 /* Number of samples per metric. */
inline const std::string STATS_METRIC_NET_INPUT = "stats_metric_net_input";
inline const std::string STATS_METRIC_NET_OUTPUT = "stats_metric_net_output";
inline const std::string STATS_METRIC_NET_INPUT_REPLICATION = "stats_metric_net_input_replication";
inline const std::string STATS_METRIC_NET_OUTPUT_REPLICATION = "stats_metric_net_output_replication";

#define run_with_period(_ms_) if (((_ms_) <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))

/* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */
struct inst_metric{
uint64_t last_sample_base; /* The divisor of last sample window */
uint64_t last_sample_value; /* The dividend of last sample window */
uint64_t samples[STATS_METRIC_SAMPLES];
int idx;
};

class Instant {
public:
Instant() = default;
~Instant() = default;

void trackInstantaneousMetric(std::string metric, uint64_t current_value, uint64_t current_base, uint64_t factor);
uint64_t getInstantaneousMetric(std::string metric);

private:
std::unordered_map<std::string, inst_metric> inst_metrics_;
};

#endif // PIKA_PIKA_INSTANT_H
22 changes: 22 additions & 0 deletions include/pika_monotonic_time.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_MONOTONIC_TIME_H
#define PIKA_MONOTONIC_TIME_H

#include <cstdint>

/* A counter in micro-seconds. The 'monotime' type is provided for variables
* holding a monotonic time. This will help distinguish & document that the
* variable is associated with the monotonic clock and should not be confused
* with other types of time.*/
typedef uint64_t monotime;

// Get monotonic time in microseconds
monotime getMonotonicUs();

#endif // PIKA_MONOTONIC_TIME_H


21 changes: 21 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "include/pika_db.h"
#include "include/pika_define.h"
#include "include/pika_dispatch_thread.h"
#include "include/pika_instant.h"
#include "include/pika_repl_client.h"
#include "include/pika_repl_server.h"
#include "include/pika_rsync_service.h"
Expand Down Expand Up @@ -306,6 +307,19 @@ class PikaServer : public pstd::noncopyable {
std::unordered_map<std::string, uint64_t> ServerExecCountDB();
QpsStatistic ServerDBStat(const std::string& db_name);
std::unordered_map<std::string, QpsStatistic> ServerAllDBStat();

/*
* Network Statistic used
*/
uint64_t NetInputBytes();
uint64_t NetOutputBytes();
uint64_t NetReplInputBytes();
uint64_t NetReplOutputBytes();
float InstantaneousInputKbps();
float InstantaneousOutputKbps();
float InstantaneousInputReplKbps();
float InstantaneousOutputReplKbps();

/*
* Slave to Master communication used
*/
Expand Down Expand Up @@ -474,6 +488,12 @@ class PikaServer : public pstd::noncopyable {
*/
std::unordered_map<std::string, CommandStatistics>& GetCommandStatMap();


/*
* Instantaneous Metric used
*/
Instant instant_;

friend class Cmd;
friend class InfoCmd;
friend class PikaReplClientConn;
Expand All @@ -488,6 +508,7 @@ class PikaServer : public pstd::noncopyable {
void AutoPurge();
void AutoDeleteExpiredDump();
void AutoKeepAliveRSync();
void AutoInstantaneousMetric();

std::string host_;
int port_ = 0;
Expand Down
4 changes: 2 additions & 2 deletions include/pika_statistic.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class QpsStatistic {
};

struct ServerStatistic {
ServerStatistic();
~ServerStatistic();
ServerStatistic() = default;
~ServerStatistic() = default;

std::atomic<uint64_t> accumulative_connections;
std::unordered_map<std::string, std::atomic<uint64_t>> exec_count_db;
Expand Down
36 changes: 36 additions & 0 deletions src/net/include/net_stats.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#ifndef NET_INCLUDE_REDIS_STSTS_H_
#define NET_INCLUDE_REDIS_STSTS_H_

#include <atomic>

namespace net {

class NetworkStatistic {
public:
NetworkStatistic() = default;
~NetworkStatistic() = default;

uint64_t NetInputBytes();
uint64_t NetOutputBytes();
uint64_t NetReplInputBytes();
uint64_t NetReplOutputBytes();
void IncrInputBytes(uint64_t bytes);
void IncrOutputBytes(uint64_t bytes);
void IncrReplInputBytes(uint64_t bytes);
void IncrReplOutputBytes(uint64_t bytes);

private:
std::atomic<uint64_t> stat_net_input_bytes = {0}; /* Bytes read from network. */
std::atomic<uint64_t> stat_net_output_bytes = {0}; /* Bytes written to network. */
std::atomic<uint64_t> stat_net_repl_input_bytes = {0}; /* Bytes read during replication, added to stat_net_input_bytes in 'info'. */
std::atomic<uint64_t> stat_net_repl_output_bytes = {0}; /* Bytes written during replication, added to stat_net_output_bytes in 'info'. */
};

}

#endif // NET_INCLUDE_REDIS_STSTS_H_
30 changes: 30 additions & 0 deletions src/net/src/net_stats.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include <atomic>
#include <string>
#include "net/include/net_stats.h"

std::unique_ptr<net::NetworkStatistic> g_network_statistic;

namespace net {

uint64_t NetworkStatistic::NetInputBytes() { return stat_net_input_bytes.load(std::memory_order_relaxed); }

uint64_t NetworkStatistic::NetOutputBytes() { return stat_net_output_bytes.load(std::memory_order_relaxed); }

uint64_t NetworkStatistic::NetReplInputBytes() { return stat_net_repl_input_bytes.load(std::memory_order_relaxed); }

uint64_t NetworkStatistic::NetReplOutputBytes() { return stat_net_repl_output_bytes.load(std::memory_order_relaxed); }

void NetworkStatistic::IncrInputBytes(uint64_t bytes) { stat_net_input_bytes.fetch_add(bytes, std::memory_order_relaxed); }

void NetworkStatistic::IncrOutputBytes(uint64_t bytes) { stat_net_output_bytes.fetch_add(bytes, std::memory_order_relaxed); }

void NetworkStatistic::IncrReplInputBytes(uint64_t bytes) { stat_net_repl_input_bytes.fetch_add(bytes, std::memory_order_relaxed); }

void NetworkStatistic::IncrReplOutputBytes(uint64_t bytes) { stat_net_repl_output_bytes.fetch_add(bytes, std::memory_order_relaxed); }

}
6 changes: 6 additions & 0 deletions src/net/src/pb_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
#include <glog/logging.h>

#include "net/include/net_define.h"
#include "net/include/net_stats.h"
#include "pstd/include/xdebug.h"

extern std::unique_ptr<net::NetworkStatistic> g_network_statistic;

namespace net {

PbConn::PbConn(const int fd, const std::string& ip_port, Thread* thread, NetMultiplexer* mpx)
Expand All @@ -34,6 +37,7 @@ ReadStatus PbConn::GetRequest() {
switch (connStatus_) {
case kHeader: {
ssize_t nread = read(fd(), rbuf_ + cur_pos_, COMMAND_HEADER_LENGTH - cur_pos_);
g_network_statistic->IncrReplInputBytes(nread);
if (nread == -1) {
if (errno == EAGAIN) {
return kReadHalf;
Expand Down Expand Up @@ -71,6 +75,7 @@ ReadStatus PbConn::GetRequest() {
}
// read msg body
ssize_t nread = read(fd(), rbuf_ + cur_pos_, remain_packet_len_);
g_network_statistic->IncrReplInputBytes(nread);
if (nread == -1) {
if (errno == EAGAIN) {
return kReadHalf;
Expand Down Expand Up @@ -117,6 +122,7 @@ WriteStatus PbConn::SendReply() {
item_len = item.size();
while (item_len - write_buf_.item_pos_ > 0) {
nwritten = write(fd(), item.data() + write_buf_.item_pos_, item_len - write_buf_.item_pos_);
g_network_statistic->IncrReplOutputBytes(nwritten);
if (nwritten <= 0) {
break;
}
Expand Down
8 changes: 5 additions & 3 deletions src/net/src/redis_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@

#include "net/include/redis_conn.h"

#include <climits>
#include <cstdlib>

#include <sstream>
#include <string>

#include <glog/logging.h>

#include "net/include/net_stats.h"
#include "pstd/include/pstd_string.h"
#include "pstd/include/xdebug.h"

extern std::unique_ptr<net::NetworkStatistic> g_network_statistic;

namespace net {

RedisConn::RedisConn(const int fd, const std::string& ip_port, Thread* thread, NetMultiplexer* net_mpx,
Expand Down Expand Up @@ -87,6 +87,7 @@ ReadStatus RedisConn::GetRequest() {
}

nread = read(fd(), rbuf_ + next_read_pos, remain);
g_network_statistic->IncrInputBytes(nread);
if (nread == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
nread = 0;
Expand Down Expand Up @@ -129,6 +130,7 @@ WriteStatus RedisConn::SendReply() {
size_t wbuf_len = response_.size();
while (wbuf_len > 0) {
nwritten = write(fd(), response_.data() + wbuf_pos_, wbuf_len - wbuf_pos_);
g_network_statistic->IncrOutputBytes(nwritten);
if (nwritten <= 0) {
break;
}
Expand Down
5 changes: 5 additions & 0 deletions src/pika.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <sys/resource.h>
#include <csignal>

#include "net/include/net_stats.h"
#include "include/build_version.h"
#include "include/pika_cmd_table_manager.h"
#include "include/pika_command.h"
Expand All @@ -26,6 +27,8 @@ std::unique_ptr<PikaReplicaManager> g_pika_rm;

std::unique_ptr<PikaCmdTableManager> g_pika_cmd_table_manager;

extern std::unique_ptr<net::NetworkStatistic> g_network_statistic;

static void version() {
char version[32];
snprintf(version, sizeof(version), "%d.%d.%d", PIKA_MAJOR, PIKA_MINOR, PIKA_PATCH);
Expand Down Expand Up @@ -192,6 +195,7 @@ int main(int argc, char* argv[]) {
g_pika_cmd_table_manager = std::make_unique<PikaCmdTableManager>();
g_pika_server = new PikaServer();
g_pika_rm = std::make_unique<PikaReplicaManager>();
g_network_statistic = std::make_unique<net::NetworkStatistic>();

if (g_pika_conf->daemonize()) {
close_std();
Expand All @@ -202,6 +206,7 @@ int main(int argc, char* argv[]) {
g_pika_server = nullptr;
g_pika_rm.reset();
g_pika_cmd_table_manager.reset();
g_network_statistic.reset();
::google::ShutdownGoogleLogging();
g_pika_conf.reset();
};
Expand Down
11 changes: 11 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,17 @@ void InfoCmd::InfoStats(std::string& info) {
tmp_stream << "total_connections_received:" << g_pika_server->accumulative_connections() << "\r\n";
tmp_stream << "instantaneous_ops_per_sec:" << g_pika_server->ServerCurrentQps() << "\r\n";
tmp_stream << "total_commands_processed:" << g_pika_server->ServerQueryNum() << "\r\n";

// Network stats
tmp_stream << "total_net_input_bytes:" << g_pika_server->NetInputBytes() + g_pika_server->NetReplInputBytes() << "\r\n";
tmp_stream << "total_net_output_bytes:" << g_pika_server->NetOutputBytes() + g_pika_server->NetReplOutputBytes() << "\r\n";
tmp_stream << "total_net_repl_input_bytes:" << g_pika_server->NetReplInputBytes() << "\r\n";
tmp_stream << "total_net_repl_output_bytes:" << g_pika_server->NetReplOutputBytes() << "\r\n";
tmp_stream << "instantaneous_input_kbps:" << g_pika_server->InstantaneousInputKbps() << "\r\n";
tmp_stream << "instantaneous_output_kbps:" << g_pika_server->InstantaneousOutputKbps() << "\r\n";
tmp_stream << "instantaneous_input_repl_kbps:" << g_pika_server->InstantaneousInputReplKbps() << "\r\n";
tmp_stream << "instantaneous_output_repl_kbps:" << g_pika_server->InstantaneousOutputReplKbps() << "\r\n";

tmp_stream << "is_bgsaving:" << (g_pika_server->IsBgSaving() ? "Yes" : "No") << "\r\n";
tmp_stream << "is_scaning_keyspace:" << (g_pika_server->IsKeyScaning() ? "Yes" : "No") << "\r\n";
tmp_stream << "is_compact:" << (g_pika_server->IsCompacting() ? "Yes" : "No") << "\r\n";
Expand Down
39 changes: 39 additions & 0 deletions src/pika_instant.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include <string>
#include "../include/pika_instant.h"

/* Return the mean of all the samples. */
uint64_t Instant::getInstantaneousMetric(std::string metric) {
int j;
size_t sum = 0;

for (j = 0; j < STATS_METRIC_SAMPLES; j++)
sum += inst_metrics_[metric].samples[j];
return sum / STATS_METRIC_SAMPLES;
}

/* ======================= Cron: called every 100 ms ======================== */

/* Add a sample to the instantaneous metric. This function computes the quotient
* of the increment of value and base, which is useful to record operation count
* per second, or the average time consumption of an operation.
*
* current_value - The dividend
* current_base - The divisor
* */
void Instant::trackInstantaneousMetric(std::string metric, uint64_t current_value, uint64_t current_base, uint64_t factor) {
if (inst_metrics_[metric].last_sample_base > 0) {
uint64_t base = current_base - inst_metrics_[metric].last_sample_base;
uint64_t value = current_value - inst_metrics_[metric].last_sample_value;
uint64_t avg = base > 0 ? (value * factor / base) : 0;
inst_metrics_[metric].samples[inst_metrics_[metric].idx] = avg;
inst_metrics_[metric].idx++;
inst_metrics_[metric].idx %= STATS_METRIC_SAMPLES;
}
inst_metrics_[metric].last_sample_base = current_base;
inst_metrics_[metric].last_sample_value = current_value;
}
Loading

0 comments on commit 413c6d6

Please sign in to comment.