diff --git a/big_tests/default.spec b/big_tests/default.spec index a7371fa207..35b7903a2f 100644 --- a/big_tests/default.spec +++ b/big_tests/default.spec @@ -37,6 +37,7 @@ {suites, "tests", graphql_stanza_SUITE}. {suites, "tests", graphql_vcard_SUITE}. {suites, "tests", graphql_http_upload_SUITE}. +{suites, "tests", graphql_metric_SUITE}. {suites, "tests", inbox_SUITE}. {suites, "tests", inbox_extensions_SUITE}. {suites, "tests", jingle_SUITE}. diff --git a/big_tests/dynamic_domains.spec b/big_tests/dynamic_domains.spec index 1f3db9a14a..09207261d5 100644 --- a/big_tests/dynamic_domains.spec +++ b/big_tests/dynamic_domains.spec @@ -53,6 +53,7 @@ {suites, "tests", graphql_stanza_SUITE}. {suites, "tests", graphql_vcard_SUITE}. {suites, "tests", graphql_http_upload_SUITE}. +{suites, "tests", graphql_metric_SUITE}. {suites, "tests", inbox_SUITE}. diff --git a/big_tests/tests/graphql_metric_SUITE.erl b/big_tests/tests/graphql_metric_SUITE.erl new file mode 100644 index 0000000000..635494b6b6 --- /dev/null +++ b/big_tests/tests/graphql_metric_SUITE.erl @@ -0,0 +1,316 @@ +-module(graphql_metric_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("exml/include/exml.hrl"). + +-compile([export_all, nowarn_export_all]). + +-import(distributed_helper, [require_rpc_nodes/1, rpc/4]). +-import(graphql_helper, [execute_auth/2, init_admin_handler/1]). + +suite() -> + MIM2NodeName = maps:get(node, distributed_helper:mim2()), + %% Ensure nodes are connected + mongoose_helper:successful_rpc(net_kernel, connect_node, [MIM2NodeName]), + require_rpc_nodes([mim, mim2]) ++ escalus:suite(). + +all() -> + [{group, metrics}]. + +groups() -> + [{metrics, [], metrics_handler()}]. + +metrics_handler() -> + [get_all_metrics, + get_all_metrics_check_by_type, + get_by_name_global_erlang_metrics, + get_process_queue_length, + get_inet_stats, + get_vm_stats_memory, + get_metrics_as_dicts, + get_by_name_metrics_as_dicts, + get_metrics_as_dicts_with_key_one, + get_cluster_metrics, + get_by_name_cluster_metrics_as_dicts, + get_mim2_cluster_metrics]. + +init_per_suite(Config) -> + escalus:init_per_suite(init_admin_handler(Config)). + +end_per_suite(Config) -> + escalus_fresh:clean(), + escalus:end_per_suite(Config). + +init_per_testcase(CaseName, Config) -> + escalus:init_per_testcase(CaseName, Config). + +end_per_testcase(CaseName, Config) -> + escalus:end_per_testcase(CaseName, Config). + +get_all_metrics(Config) -> + %% Get all metrics + Result = execute_auth(#{query => get_all_metrics_call(), + variables => #{}, operationName => <<"Q1">>}, Config), + ParsedResult = ok_result(<<"metric">>, <<"getMetrics">>, Result), + Map = maps:from_list([{Name, X} || X = #{<<"name">> := Name} <- ParsedResult]), + ReadsKey = [<<"global">>, <<"backends">>, <<"mod_roster">>, <<"read_roster_version">>], + Reads = maps:get(ReadsKey, Map), + %% Histogram integer keys have p prefix + check_histogram_p(Reads), + %% HistogramMetric type + #{<<"type">> := <<"histogram">>} = Reads. + +get_all_metrics_check_by_type(Config) -> + %% Get all metrics + Result = execute_auth(#{query => get_all_metrics_call(), + variables => #{}, operationName => <<"Q1">>}, Config), + ParsedResult = ok_result(<<"metric">>, <<"getMetrics">>, Result), + lists:foreach(fun check_metric_by_type/1, ParsedResult). + +check_metric_by_type(#{<<"type">> := Type} = Map) -> + values_are_integers(Map, type_to_keys(Type)). + +type_to_keys(<<"histogram">>) -> + [<<"n">>, <<"mean">>, <<"min">>, <<"max">>, <<"median">>, + <<"p50">>, <<"p75">>, <<"p90">>, <<"p95">>, <<"p99">>, <<"p999">>]; +type_to_keys(<<"counter">>) -> + [<<"value">>, <<"ms_since_reset">>]; +type_to_keys(<<"spiral">>) -> + [<<"one">>, <<"count">>]; +type_to_keys(<<"gauge">>) -> + [<<"value">>]; +type_to_keys(<<"merged_inet_stats">>) -> + [<<"connections">>, <<"recv_cnt">>, <<"recv_max">>, <<"recv_oct">>, + <<"send_cnt">>, <<"send_max">>, <<"send_oct">>, <<"send_pend">>]; +type_to_keys(<<"rdbms_stats">>) -> + [<<"workers">>, <<"recv_cnt">>, <<"recv_max">>, <<"recv_oct">>, + <<"send_cnt">>, <<"send_max">>, <<"send_oct">>, <<"send_pend">>]; +type_to_keys(<<"vm_stats_memory">>) -> + [<<"atom_used">>, <<"binary">>, <<"ets">>, + <<"processes_used">>, <<"system">>, <<"total">>]; +type_to_keys(<<"vm_system_info">>) -> + [<<"ets_limit">>, <<"port_count">>, <<"port_limit">>, + <<"process_count">>, <<"process_limit">>]; +type_to_keys(<<"probe_queues">>) -> + [<<"fsm">>, <<"regular">>, <<"total">>]. + +get_by_name_global_erlang_metrics(Config) -> + %% Filter by name works + Result = execute_auth(#{query => get_metrics_call_with_args(<<"(name: [\"global\", \"erlang\"])">>), + variables => #{}, operationName => <<"Q1">>}, Config), + ParsedResult = ok_result(<<"metric">>, <<"getMetrics">>, Result), + Map = maps:from_list([{Name, X} || X = #{<<"name">> := Name} <- ParsedResult]), + Info = maps:get([<<"global">>, <<"erlang">>, <<"system_info">>], Map), + %% VMSystemInfoMetric type + #{<<"type">> := <<"vm_system_info">>} = Info, + check_metric_by_type(Info), + ReadsKey = [<<"global">>, <<"backends">>, <<"mod_roster">>, <<"read_roster_version">>], + %% Other metrics are filtered out + undef = maps:get(ReadsKey, Map, undef). + +get_process_queue_length(Config) -> + Result = execute_auth(#{query => get_metrics_call_with_args( + <<"(name: [\"global\", \"processQueueLengths\"])">>), + variables => #{}, operationName => <<"Q1">>}, Config), + ParsedResult = ok_result(<<"metric">>, <<"getMetrics">>, Result), + Map = maps:from_list([{Name, X} || X = #{<<"name">> := Name} <- ParsedResult]), + Lens = maps:get([<<"global">>, <<"processQueueLengths">>], Map), + %% ProbeQueuesMetric type + #{<<"type">> := <<"probe_queues">>} = Lens, + check_metric_by_type(Lens). + +get_inet_stats(Config) -> + Result = execute_auth(#{query => get_metrics_call_with_args( + <<"(name: [\"global\", \"data\", \"dist\"])">>), + variables => #{}, operationName => <<"Q1">>}, Config), + ParsedResult = ok_result(<<"metric">>, <<"getMetrics">>, Result), + Map = maps:from_list([{Name, X} || X = #{<<"name">> := Name} <- ParsedResult]), + Stats = maps:get([<<"global">>, <<"data">>, <<"dist">>], Map), + %% MergedInetStatsMetric type + #{<<"type">> := <<"merged_inet_stats">>} = Stats, + check_metric_by_type(Stats). + +get_vm_stats_memory(Config) -> + Result = execute_auth(#{query => get_metrics_call_with_args(<<"(name: [\"global\"])">>), + variables => #{}, operationName => <<"Q1">>}, Config), + ParsedResult = ok_result(<<"metric">>, <<"getMetrics">>, Result), + Map = maps:from_list([{Name, X} || X = #{<<"name">> := Name} <- ParsedResult]), + Mem = maps:get([<<"global">>, <<"erlang">>, <<"memory">>], Map), + %% VMStatsMemoryMetric type + #{<<"type">> := <<"vm_stats_memory">>} = Mem, + check_metric_by_type(Mem). + +get_metrics_as_dicts(Config) -> + Result = execute_auth(#{query => get_all_metrics_as_dicts_call(), variables => #{}, + operationName => <<"Q1">>}, Config), + ParsedResult = ok_result(<<"metric">>, <<"getMetricsAsDicts">>, Result), + check_node_result_is_valid(ParsedResult, false). + +get_by_name_metrics_as_dicts(Config) -> + Args = <<"(name: [\"_\", \"xmppStanzaSent\"])">>, + Result = execute_auth(#{query => get_by_args_metrics_as_dicts_call(Args), + variables => #{}, operationName => <<"Q1">>}, Config), + ParsedResult = ok_result(<<"metric">>, <<"getMetricsAsDicts">>, Result), + [_|_] = ParsedResult, + %% Only xmppStanzaSent type + lists:foreach(fun(#{<<"dict">> := Dict, <<"name">> := [_, <<"xmppStanzaSent">>]}) -> + check_spiral_dict(Dict) + end, ParsedResult). + +get_metrics_as_dicts_with_key_one(Config) -> + Result = execute_auth(#{query => get_all_metrics_as_dicts_with_key_one_call(), + variables => #{}, + operationName => <<"Q1">>}, Config), + ParsedResult = ok_result(<<"metric">>, <<"getMetricsAsDicts">>, Result), + Map = dict_objects_to_map(ParsedResult), + SentName = [metric_host_type(), <<"xmppStanzaSent">>], + [#{<<"key">> := <<"one">>, <<"value">> := One}] = maps:get(SentName, Map), + true = is_integer(One). + +get_cluster_metrics(Config) -> + %% We will have at least these two nodes + Node1 = atom_to_binary(maps:get(node, distributed_helper:mim())), + Node2 = atom_to_binary(maps:get(node, distributed_helper:mim2())), + Result = execute_auth(#{query => get_all_cluster_metrics_as_dicts_call(), + variables => #{}, + operationName => <<"Q1">>}, Config), + ParsedResult = ok_result(<<"metric">>, <<"getClusterMetricsAsDicts">>, Result), + #{Node1 := Res1, Node2 := Res2} = node_objects_to_map(ParsedResult), + check_node_result_is_valid(Res1, false), + check_node_result_is_valid(Res2, true). + +get_by_name_cluster_metrics_as_dicts(Config) -> + Args = <<"(name: [\"_\", \"xmppStanzaSent\"])">>, + Result = execute_auth(#{query => get_by_args_cluster_metrics_as_dicts_call(Args), + variables => #{}, operationName => <<"Q1">>}, Config), + NodeResult = ok_result(<<"metric">>, <<"getClusterMetricsAsDicts">>, Result), + Map = node_objects_to_map(NodeResult), + %% Contains data for at least two nodes + true = maps:size(Map) > 1, + %% Only xmppStanzaSent type + maps:map(fun(_Node, [_|_] = NodeRes) -> + lists:foreach(fun(#{<<"dict">> := Dict, + <<"name">> := [_, <<"xmppStanzaSent">>]}) -> + check_spiral_dict(Dict) + end, NodeRes) end, Map). + +get_mim2_cluster_metrics(Config) -> + Node = atom_to_binary(maps:get(node, distributed_helper:mim2())), + Result = execute_auth(#{query => get_node_cluster_metrics_as_dicts_call(Node), + variables => #{}, + operationName => <<"Q1">>}, Config), + ParsedResult = ok_result(<<"metric">>, <<"getClusterMetricsAsDicts">>, Result), + [#{<<"node">> := Node, <<"result">> := ResList}] = ParsedResult, + check_node_result_is_valid(ResList, true). + +check_node_result_is_valid(ResList, MetricsAreGlobal) -> + %% Check that result contains something + Map = dict_objects_to_map(ResList), + SentName = case MetricsAreGlobal of + true -> [<<"global">>, <<"xmppStanzaSent">>]; + false -> [metric_host_type(), <<"xmppStanzaSent">>] + end, + check_spiral_dict(maps:get(SentName, Map)), + [#{<<"key">> := <<"value">>,<<"value">> := V}] = + maps:get([<<"global">>,<<"uniqueSessionCount">>], Map), + true = is_integer(V), + HistObjects = maps:get([<<"global">>, <<"data">>, <<"xmpp">>, + <<"sent">>, <<"compressed_size">>], Map), + check_histogram(kv_objects_to_map(HistObjects)). + +check_histogram(Map) -> + Keys = [<<"n">>, <<"mean">>, <<"min">>, <<"max">>, <<"median">>, + <<"50">>, <<"75">>, <<"90">>, <<"95">>, <<"99">>, <<"999">>], + values_are_integers(Map, Keys). + +check_histogram_p(Map) -> + Keys = type_to_keys(<<"histogram">>), + values_are_integers(Map, Keys). + +dict_objects_to_map(List) -> + KV = [{Name, Dict} || #{<<"name">> := Name, <<"dict">> := Dict} <- List], + maps:from_list(KV). + +node_objects_to_map(List) -> + KV = [{Name, Value} || #{<<"node">> := Name, <<"result">> := Value} <- List], + maps:from_list(KV). + +kv_objects_to_map(List) -> + KV = [{Key, Value} || #{<<"key">> := Key, <<"value">> := Value} <- List], + maps:from_list(KV). + +get_all_metrics_call() -> + get_metrics_call_with_args(<<>>). + +get_metrics_call_with_args(Args) -> + <<"query Q1 + {metric + {getMetrics", Args/binary, " { + ... on HistogramMetric + { name type n mean min max median p50 p75 p90 p95 p99 p999 } + ... on CounterMetric + { name type value ms_since_reset } + ... on SpiralMetric + { name type one count } + ... on GaugeMetric + { name type value } + ... on MergedInetStatsMetric + { name type connections recv_cnt recv_max recv_oct + send_cnt send_max send_oct send_pend } + ... on RDBMSStatsMetric + { name type workers recv_cnt recv_max recv_oct + send_cnt send_max send_oct send_pend } + ... on VMStatsMemoryMetric + { name type total processes_used atom_used binary ets system } + ... on VMSystemInfoMetric + { name type port_count port_limit process_count process_limit ets_limit } + ... on ProbeQueuesMetric + { name type fsm regular total } + } + } + }">>. + +get_all_metrics_as_dicts_call() -> + get_by_args_metrics_as_dicts_call(<<>>). + +get_by_args_metrics_as_dicts_call(Args) -> + <<"query Q1 + {metric + {getMetricsAsDicts", Args/binary, " { name dict { key value }}}}">>. + +get_all_metrics_as_dicts_with_key_one_call() -> + <<"query Q1 + {metric + {getMetricsAsDicts(keys: [\"one\"]) { name dict { key value }}}}">>. + +get_all_cluster_metrics_as_dicts_call() -> + get_by_args_cluster_metrics_as_dicts_call(<<>>). + +get_by_args_cluster_metrics_as_dicts_call(Args) -> + <<"query Q1 + {metric + {getClusterMetricsAsDicts", Args/binary, + " {node result { name dict { key value }}}}}">>. + +get_node_cluster_metrics_as_dicts_call(NodeBin) -> + get_by_args_cluster_metrics_as_dicts_call(<<"(nodes: [\"", NodeBin/binary, "\"])">>). + +%% Helpers +ok_result(What1, What2, {{<<"200">>, <<"OK">>}, #{<<"data">> := Data}}) -> + maps:get(What2, maps:get(What1, Data)). + +error_result(ErrorNumber, {{<<"200">>, <<"OK">>}, #{<<"errors">> := Errors}}) -> + lists:nth(ErrorNumber, Errors). + +check_spiral_dict(Dict) -> + [#{<<"key">> := <<"count">>, <<"value">> := Count}, + #{<<"key">> := <<"one">>, <<"value">> := One}] = Dict, + true = is_integer(Count), + true = is_integer(One). + +values_are_integers(Map, Keys) -> + lists:foreach(fun(Key) -> true = is_integer(maps:get(Key, Map)) end, Keys). + +metric_host_type() -> + binary:replace(domain_helper:host_type(), <<" ">>, <<"_">>, [global]). diff --git a/big_tests/tests/mam_helper.erl b/big_tests/tests/mam_helper.erl index 10ae4f2892..4b450fa0b5 100644 --- a/big_tests/tests/mam_helper.erl +++ b/big_tests/tests/mam_helper.erl @@ -655,9 +655,9 @@ send_muc_rsm_messages(Config) -> escalus:wait_for_stanzas(Alice, 3), %% Alice sends messages to Bob. - lists:foreach(fun(N) -> + lists:foreach(fun(NN) -> escalus:send(Alice, escalus_stanza:groupchat_to( - RoomAddr, generate_message_text(N))) + RoomAddr, generate_message_text(NN))) end, lists:seq(1, N)), assert_list_size(N, escalus:wait_for_stanzas(Bob, N)), assert_list_size(N, escalus:wait_for_stanzas(Alice, N)), diff --git a/priv/graphql/schemas/admin/admin_schema.gql b/priv/graphql/schemas/admin/admin_schema.gql index 08f258006d..cc9016f5a7 100644 --- a/priv/graphql/schemas/admin/admin_schema.gql +++ b/priv/graphql/schemas/admin/admin_schema.gql @@ -30,6 +30,8 @@ type AdminQuery{ vcard: VcardAdminQuery "Private storage management" private: PrivateAdminQuery + "Metrics management" + metric: MetricAdminQuery } """ diff --git a/priv/graphql/schemas/admin/metric.gql b/priv/graphql/schemas/admin/metric.gql new file mode 100644 index 0000000000..4f439aa79c --- /dev/null +++ b/priv/graphql/schemas/admin/metric.gql @@ -0,0 +1,218 @@ +""" +Result of a metric +""" + +enum MetricType { + histogram + counter + spiral + gauge + merged_inet_stats + rdbms_stats + vm_stats_memory + vm_system_info + probe_queues +} + +union MetricResult = HistogramMetric | CounterMetric | SpiralMetric + | GaugeMetric | MergedInetStatsMetric | RDBMSStatsMetric + | VMStatsMemoryMetric | VMSystemInfoMetric + | ProbeQueuesMetric + +type HistogramMetric { + "Metric name" + name: [String] + "Metric type" + type: MetricType + "The number of values used in the calculation" + n: Int + "Mean value" + mean: Int + "Min value" + min: Int + "Max value" + max: Int + "Median value" + median: Int + "50th percentile" + p50: Int + "75th percentile" + p75: Int + "90th percentile" + p90: Int + "95th percentile" + p95: Int + "99th percentile" + p99: Int + "99.9th percentile" + p999: Int +} + +type CounterMetric { + "Metric name" + name: [String] + "Metric type" + type: MetricType + "The metric value" + value: Int + "Time since last reset" + ms_since_reset: Int +} + +type GaugeMetric { + "Metric name" + name: [String] + "Metric type" + type: MetricType + "The metric value" + value: Int +} + +type SpiralMetric { + "Metric name" + name: [String] + "Metric type" + type: MetricType + "One minute value" + one: Int + "Total value" + count: Int +} + +type MergedInetStatsMetric { + "Metric name" + name: [String] + "Metric type" + type: MetricType + "Number of connections" + connections: Int + "Number of packets received by the socket" + recv_cnt: Int + "Size of the largest packet, in bytes, received by the socket" + recv_max: Int + "Number of bytes received by the socket" + recv_oct: Int + "Number of packets sent from the socket" + send_cnt: Int + "Size of the largest packet, in bytes, sent from the socket" + send_max: Int + "Number of bytes sent from the socket" + send_oct: Int + "Number of bytes waiting to be sent by the socket" + send_pend: Int +} + +type RDBMSStatsMetric { + "Metric name" + name: [String] + "Metric type" + type: MetricType + "Number of workers" + workers: Int + "Number of packets received by the socket" + recv_cnt: Int + "Size of the largest packet, in bytes, received by the socket" + recv_max: Int + "Number of bytes received by the socket" + recv_oct: Int + "Number of packets sent from the socket" + send_cnt: Int + "Size of the largest packet, in bytes, sent from the socket" + send_max: Int + "Number of bytes sent from the socket" + send_oct: Int + "Number of bytes waiting to be sent by the socket" + send_pend: Int +} + +type VMStatsMemoryMetric { + "Metric name" + name: [String] + "Metric type" + type: MetricType + "The total amount of memory in bytes currently allocated (processes_used + system)" + total: Int + "The total amount of memory in bytes allocated for Erlang processes" + processes_used: Int + "The total amount of memory in bytes allocated for atoms" + atom_used: Int + "The total amount of memory in bytes allocated for binaries" + binary: Int + "The total amount of memory in bytes allocated for ETS tables" + ets: Int + "The total amount of memory in bytes allocated for the emulator" + system: Int +} + +type VMSystemInfoMetric { + "Metric name" + name: [String] + "Metric type" + type: MetricType + "Current number of open Erlang ports" + port_count: Int + "Maximum allowed number of open Erlang ports" + port_limit: Int + "Current number of Erlang processes" + process_count: Int + "Maximum allowed number of Erlang processes" + process_limit: Int + "Maximum number of ETS tables" + ets_limit: Int +} + +type ProbeQueuesMetric { + "Metric name" + name: [String] + "Metric type" + type: MetricType + "Number of messages in p1_fsm queue" + fsm: Int + "Number of messages in the erlang process message queues" + regular: Int + "Total number of messages (fsm + regular)" + total: Int +} + +type MetricDictEntry { + "The name of the metric key (i.e. one, count, value)" + key: String + "Metric value" + value: Int +} + +type MetricDictResult { + "Metric name" + name: [String] + "A list of keys and values" + dict: [MetricDictEntry] +} + +type MetricNodeResult { + node: String + result: [MetricDictResult] +} + +""" +Allow admin to get the metric values +""" +type MetricAdminQuery @protected{ + """ + Match metrics using a name pattern and return the metric values. + Return all metrics if the name is not provided. + Name is a list of name segments or an underscore (i.e. path). + """ + getMetrics(name: [String]): [MetricResult] + """ + Get metrics without using graphql unions. + Optionally returns only specified keys + (i.e. keys: ["one"] only returns key "one", but not key "count") + """ + getMetricsAsDicts(name: [String], keys: [String]): [MetricDictResult] + + """ + Gather metrics from multiple nodes. + Optionally returns only from specified nodes. + """ + getClusterMetricsAsDicts(name: [String], keys: [String], nodes: [String]): [MetricNodeResult] +} diff --git a/src/graphql/admin/mongoose_graphql_admin_query.erl b/src/graphql/admin/mongoose_graphql_admin_query.erl index 4543b86c5b..5ce01ef7e5 100644 --- a/src/graphql/admin/mongoose_graphql_admin_query.erl +++ b/src/graphql/admin/mongoose_graphql_admin_query.erl @@ -28,4 +28,6 @@ execute(_Ctx, _Obj, <<"session">>, _Args) -> execute(_Ctx, _Obj, <<"stanza">>, _Args) -> {ok, #{}}; execute(_Ctx, _Obj, <<"vcard">>, _Args) -> - {ok, vcard}. + {ok, vcard}; +execute(_Ctx, _Obj, <<"metric">>, _Args) -> + {ok, metric}. diff --git a/src/graphql/admin/mongoose_graphql_metric_admin_query.erl b/src/graphql/admin/mongoose_graphql_metric_admin_query.erl new file mode 100644 index 0000000000..c2fdebaf76 --- /dev/null +++ b/src/graphql/admin/mongoose_graphql_metric_admin_query.erl @@ -0,0 +1,45 @@ +-module(mongoose_graphql_metric_admin_query). +-behaviour(mongoose_graphql). + +-export([execute/4]). + +-ignore_xref([execute/4]). + +-include("mongoose_logger.hrl"). + +execute(_Ctx, _Obj, <<"getMetrics">>, Args) -> + Name = get_name(Args), + mongoose_metrics_api:get_metrics(Name); +execute(_Ctx, _Obj, <<"getMetricsAsDicts">>, Args) -> + Name = get_name(Args), + Keys = get_keys2(Args), + mongoose_metrics_api:get_metrics_as_dicts(Name, Keys); +execute(_Ctx, _Obj, <<"getClusterMetricsAsDicts">>, Args) -> + Name = get_name(Args), + Keys = get_keys2(Args), + Nodes = get_nodes(Args), + mongoose_metrics_api:get_cluster_metrics_as_dicts(Name, Keys, Nodes). + +%% get_keys is a BIF, so we have a name conflict +get_keys2(Args) -> + Keys = get_list(<<"keys">>, Args), + lists:map(fun prepare_key/1, Keys). + +prepare_key(X) when is_binary(X) -> + binary_to_atom(X); +prepare_key(X) when is_integer(X) -> %% For percentiles + X. + +get_name(Args) -> + Segments = get_list(<<"name">>, Args), + lists:map(fun binary_to_atom/1, Segments). + +get_nodes(Args) -> + Nodes = get_list(<<"nodes">>, Args), + lists:map(fun binary_to_atom/1, Nodes). + +get_list(Key, Map) -> + null_as_empty(maps:get(Key, Map, [])). + +null_as_empty(null) -> []; +null_as_empty(X) -> X. diff --git a/src/graphql/mongoose_graphql.erl b/src/graphql/mongoose_graphql.erl index 1931c6bbcf..f93cbcaed0 100644 --- a/src/graphql/mongoose_graphql.erl +++ b/src/graphql/mongoose_graphql.erl @@ -152,6 +152,7 @@ admin_mapping_rules() -> 'HttpUploadAdminMutation' => mongoose_graphql_http_upload_admin_mutation, 'RosterAdminMutation' => mongoose_graphql_roster_admin_mutation, 'Domain' => mongoose_graphql_domain, + 'MetricAdminQuery' => mongoose_graphql_metric_admin_query, default => mongoose_graphql_default}, interfaces => #{default => mongoose_graphql_default}, scalars => #{default => mongoose_graphql_scalar}, diff --git a/src/graphql/mongoose_graphql_enum.erl b/src/graphql/mongoose_graphql_enum.erl index d1e73a792d..948184bada 100644 --- a/src/graphql/mongoose_graphql_enum.erl +++ b/src/graphql/mongoose_graphql_enum.erl @@ -32,7 +32,8 @@ input(<<"MUCAffiliation">>, <<"OUTCAST">>) -> {ok, outcast}; input(<<"MUCAffiliation">>, <<"ADMIN">>) -> {ok, admin}; input(<<"MUCAffiliation">>, <<"OWNER">>) -> {ok, owner}; input(<<"PrivacyClassificationTags">>, Name) -> {ok, Name}; -input(<<"TelephoneTags">>, Name) -> {ok, Name}. +input(<<"TelephoneTags">>, Name) -> {ok, Name}; +input(<<"MetricType">>, Name) -> {ok, Name}. output(<<"PresenceShow">>, Show) -> {ok, list_to_binary(string:to_upper(binary_to_list(Show)))}; @@ -67,4 +68,5 @@ output(<<"MUCAffiliation">>, Aff) -> output(<<"AddressTags">>, Name) -> {ok, Name}; output(<<"EmailTags">>, Name) -> {ok, Name}; output(<<"PrivacyClassificationTags">>, Name) -> {ok, Name}; -output(<<"TelephoneTags">>, Name) -> {ok, Name}. +output(<<"TelephoneTags">>, Name) -> {ok, Name}; +output(<<"MetricType">>, Type) -> {ok, Type}. diff --git a/src/graphql/mongoose_graphql_union.erl b/src/graphql/mongoose_graphql_union.erl index 357e830160..acce89eb71 100644 --- a/src/graphql/mongoose_graphql_union.erl +++ b/src/graphql/mongoose_graphql_union.erl @@ -1,12 +1,30 @@ -module(mongoose_graphql_union). - -export([execute/1]). -ignore_xref([execute/1]). +-include("mongoose_logger.hrl"). + execute(#{<<"type">> := _, <<"binValue">> := _}) -> {ok, <<"ImageData">>}; execute(#{<<"extValue">> := _}) -> {ok, <<"External">>}; execute(#{<<"phonetic">> := _}) -> {ok, <<"Phonetic">>}; execute(#{<<"binValue">> := _}) -> {ok, <<"BinValue">>}; execute(#{<<"vcard">> := _}) -> {ok, <<"AgentVcard">>}; -execute(_Otherwise) -> {error, unknown_type}. +execute(#{<<"type">> := <<"histogram">>, <<"name">> := _, <<"p50">> := _}) -> + {ok, <<"HistogramMetric">>}; +execute(#{<<"type">> := <<"spiral">>, <<"name">> := _, <<"one">> := _}) -> + {ok, <<"SpiralMetric">>}; +execute(#{<<"type">> := <<"counter">>, <<"name">> := _, <<"ms_since_reset">> := _}) -> + {ok, <<"CounterMetric">>}; +execute(#{<<"type">> := <<"gauge">>, <<"name">> := _, <<"value">> := _}) -> + {ok, <<"GaugeMetric">>}; +execute(#{<<"type">> := <<"merged_inet_stats">>, <<"connections">> := _}) -> + {ok, <<"MergedInetStatsMetric">>}; +execute(#{<<"type">> := <<"vm_stats_memory">>, <<"processes_used">> := _}) -> + {ok, <<"VMStatsMemoryMetric">>}; +execute(#{<<"type">> := <<"vm_system_info">>, <<"port_count">> := _}) -> + {ok, <<"VMSystemInfoMetric">>}; +execute(#{<<"type">> := <<"probe_queues">>, <<"fsm">> := _}) -> + {ok, <<"ProbeQueuesMetric">>}; +execute(#{<<"type">> := <<"rdbms_stats">>, <<"workers">> := _}) -> + {ok, <<"RDBMSStatsMetric">>}. diff --git a/src/metrics/mongoose_metrics_api.erl b/src/metrics/mongoose_metrics_api.erl new file mode 100644 index 0000000000..53fa08941a --- /dev/null +++ b/src/metrics/mongoose_metrics_api.erl @@ -0,0 +1,156 @@ +-module(mongoose_metrics_api). +-export([get_metrics/1, + get_metrics_as_dicts/2, + get_cluster_metrics_as_dicts/3]). + +-include("mongoose_logger.hrl"). + +-type name() :: [atom() | integer()]. +-type key() :: atom(). +-type metric_result() :: + {ok, #{binary() => binary() | non_neg_integer()}}. +-type dict_result() :: #{binary() => binary() | non_neg_integer()}. +-type metric_dict_result() :: + {ok, #{binary() => binary() | [dict_result()]}}. +-type metric_node_dict_result() :: + {ok, #{binary() => binary() | [metric_dict_result()]}} + | {error, binary()}. + +-spec get_metrics(Name :: name()) -> {ok, [metric_result()]}. +get_metrics(Name) -> + Values = exometer:get_values(Name), + {ok, lists:map(fun make_metric_result/1, Values)}. + +-spec get_metrics_as_dicts(Name :: name(), Keys :: [key()]) -> + {ok, [metric_dict_result()]}. +get_metrics_as_dicts(Name, Keys) -> + Values = exometer:get_values(Name), + {ok, [make_metric_dict_result(V, Keys) || V <- Values]}. + +-spec get_cluster_metrics_as_dicts(Name :: name(), Keys :: [key()], + Nodes :: [node()]) -> + {ok, [metric_node_dict_result()]}. +get_cluster_metrics_as_dicts(Name, Keys, Nodes) -> + Nodes2 = existing_nodes(Nodes), + F = fun(Node) -> rpc:call(Node, exometer, get_values, [Name]) end, + Results = mongoose_lib:pmap(F, Nodes2), + {ok, [make_node_result(Node, Result, Keys) + || {Node, Result} <- lists:zip(Nodes2, Results)]}. + +make_node_result(Node, {ok, Values}, Keys) -> + {ok, #{<<"node">> => Node, + <<"result">> => [make_metric_dict_result(V, Keys) || V <- Values]}}; +make_node_result(Node, Other, _Keys) -> + ?LOG_ERROR(#{what => metric_get_failed, + remote_node => Node, reason => Other}), + {error, <<"Failed to get metrics">>}. + +filter_keys(Dict, []) -> + Dict; +filter_keys(Dict, Keys) -> + [KV || KV = {Key, _} <- Dict, lists:member(Key, Keys)]. + +existing_nodes(Nodes) -> + AllNodes = [node()|nodes()], + filter_nodes(AllNodes, Nodes). + +filter_nodes(AllNodes, []) -> + AllNodes; +filter_nodes(AllNodes, AllowedNodes) -> + [Node || Node <- AllNodes, lists:member(Node, AllowedNodes)]. + +make_metric_result({Name, Dict}) -> + PreparedName = format_name(Name), + Map = format_dict(Dict), + {ok, Map#{<<"name">> => PreparedName}}. + +make_metric_dict_result({Name, Dict}, Keys) -> + PreparedName = format_name(Name), + {ok, #{<<"name">> => PreparedName, <<"dict">> => format_dict_entries(Dict, Keys)}}. + +format_dict_entries(Dict, Keys) -> + [{ok, #{<<"key">> => Key, <<"value">> => Value}} + || {Key, Value} <- filter_keys(Dict, Keys)]. + +format_name(Name) -> + lists:map(fun format_name_segment/1, Name). + +format_name_segment(Segment) when is_atom(Segment) -> + {ok, atom_to_binary(Segment)}; +format_name_segment(Segment) when is_binary(Segment) -> + {ok, Segment}. + +format_dict(Dict) -> + format_dict2(maps:from_list(Dict)). + +format_dict2(#{one := _} = Dict) -> + format_spiral(Dict); +format_dict2(#{ms_since_reset := _} = Dict) -> + format_counter(Dict); +format_dict2(#{value := _} = Dict) -> + format_gauge(Dict); +format_dict2(#{median := _} = Dict) -> + format_histogram(Dict); +format_dict2(#{connections := _, recv_cnt := _} = Dict) -> + format_merged_inet_stats(Dict); +format_dict2(#{processes_used := _} = Dict) -> + format_vm_stats_memory(Dict); +format_dict2(#{port_count := _} = Dict) -> + format_vm_system_info(Dict); +format_dict2(#{fsm := _, regular := _} = Dict) -> + format_probe_queues(Dict); +format_dict2(#{recv_cnt := _, workers := _} = Dict) -> + format_rdbms_stats(Dict). + +format_spiral(#{one := One, count := Count}) -> + #{<<"type">> => <<"spiral">>, <<"one">> => One, <<"count">> => Count}. + +format_counter(#{value := Value, ms_since_reset := MS}) -> + #{<<"type">> => <<"counter">>, <<"value">> => Value, <<"ms_since_reset">> => MS}. + +format_gauge(#{value := Value}) -> + #{<<"type">> => <<"gauge">>, <<"value">> => Value}. + +format_histogram(#{n := N, mean := Mean, min := Min, max := Max, median := Median, + 50 := P50, 75 := P75, 90 := P90, 95 := P95, + 99 := P99, 999 := P999}) -> + #{<<"type">> => <<"histogram">>, <<"n">> => N, <<"mean">> => Mean, + <<"min">> => Min, <<"max">> => Max, <<"median">> => Median, + <<"p50">> => P50, <<"p75">> => P75, <<"p90">> => P90, <<"p95">> => P95, + <<"p99">> => P99, <<"p999">> => P999}. + +format_merged_inet_stats(#{connections := Cons, + recv_cnt := RCnt, recv_max := RMax, recv_oct := ROct, + send_cnt := SCnt, send_max := SMax, send_oct := SOct, + send_pend := SPend}) -> + %% Metrics from a pool of connections + #{<<"type">> => <<"merged_inet_stats">>, <<"connections">> => Cons, + <<"recv_cnt">> => RCnt, <<"recv_max">> => RMax, <<"recv_oct">> => ROct, + <<"send_cnt">> => SCnt, <<"send_max">> => SMax, <<"send_oct">> => SOct, + <<"send_pend">> => SPend}. + +format_rdbms_stats(#{recv_cnt := RCnt, recv_max := RMax, recv_oct := ROct, + send_cnt := SCnt, send_max := SMax, send_oct := SOct, + send_pend := SPend, workers := Workers}) -> + #{<<"type">> => <<"rdbms_stats">>, <<"workers">> => Workers, + <<"recv_cnt">> => RCnt, <<"recv_max">> => RMax, <<"recv_oct">> => ROct, + <<"send_cnt">> => SCnt, <<"send_max">> => SMax, <<"send_oct">> => SOct, + <<"send_pend">> => SPend}. + +format_vm_stats_memory(#{total := Total, processes_used := P, + atom_used := A, binary := B, ets := E, system := S}) -> + #{<<"type">> => <<"vm_stats_memory">>, + <<"total">> => Total, <<"processes_used">> => P, <<"atom_used">> => A, + <<"binary">> => B, <<"ets">> => E, <<"system">> => S}. + +format_vm_system_info(#{port_count := PortCount, port_limit := PortLimit, + process_count := ProcessCount, process_limit := ProcessLimit, + ets_limit := EtsLimit}) -> + #{<<"type">> => <<"vm_system_info">>, + <<"port_count">> => PortCount, <<"port_limit">> => PortLimit, + <<"process_count">> => ProcessCount, <<"process_limit">> => ProcessLimit, + <<"ets_limit">> => EtsLimit}. + +format_probe_queues(#{fsm := FSM, regular := Regular, total := Total}) -> + #{<<"type">> => <<"probe_queues">>, + <<"fsm">> => FSM, <<"regular">> => Regular, <<"total">> => Total}.