Skip to content

Commit a5106c6

Browse files
authored
Expose ra counters (#13895)
Switch from ra_metrics to ra_counters * Expose many more metrics (they are also up to date) * Bump Seshat, Ra, Osiris, Prometheus.erl * switch from proplists to maps
1 parent d0c3b3a commit a5106c6

25 files changed

+324
-159
lines changed

deps/rabbit/src/rabbit_global_counters.erl

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -132,46 +132,46 @@
132132
boot_step() ->
133133
[begin
134134
%% Protocol counters
135-
Protocol = {protocol, Proto},
136-
init([Protocol]),
135+
Protocol = #{protocol => Proto},
136+
init(Protocol),
137137
rabbit_msg_size_metrics:init(Proto),
138138

139139
%% Protocol & Queue Type counters
140-
init([Protocol, {queue_type, rabbit_classic_queue}]),
141-
init([Protocol, {queue_type, rabbit_quorum_queue}]),
142-
init([Protocol, {queue_type, rabbit_stream_queue}])
140+
init(Protocol#{queue_type => rabbit_classic_queue}),
141+
init(Protocol#{queue_type => rabbit_quorum_queue}),
142+
init(Protocol#{queue_type => rabbit_stream_queue})
143143
end || Proto <- [amqp091, amqp10]],
144144

145145
%% Dead Letter counters
146146
%%
147147
%% Streams never dead letter.
148148
%%
149149
%% Source classic queue dead letters.
150-
init([{queue_type, rabbit_classic_queue}, {dead_letter_strategy, disabled}],
150+
init(#{queue_type => rabbit_classic_queue, dead_letter_strategy => disabled},
151151
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER,
152152
?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER,
153153
?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER]),
154-
init([{queue_type, rabbit_classic_queue}, {dead_letter_strategy, at_most_once}],
154+
init(#{queue_type => rabbit_classic_queue, dead_letter_strategy => at_most_once},
155155
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER,
156156
?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER,
157157
?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER]),
158158
%%
159159
%% Source quorum queue dead letters.
160160
%% Only quorum queues can dead letter due to delivery-limit exceeded.
161161
%% Only quorum queues support dead letter strategy at-least-once.
162-
init([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, disabled}],
162+
init(#{queue_type => rabbit_quorum_queue, dead_letter_strategy => disabled},
163163
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER,
164164
?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER,
165165
?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER,
166166
?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER
167167
]),
168-
init([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, at_most_once}],
168+
init(#{queue_type => rabbit_quorum_queue, dead_letter_strategy => at_most_once},
169169
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER,
170170
?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER,
171171
?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER,
172172
?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER
173173
]),
174-
init([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, at_least_once}],
174+
init(#{queue_type => rabbit_quorum_queue, dead_letter_strategy => at_least_once},
175175
[?MESSAGES_DEAD_LETTERED_CONFIRMED_COUNTER,
176176
?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER,
177177
?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER,
@@ -181,21 +181,21 @@ boot_step() ->
181181
init(Labels) ->
182182
init(Labels, []).
183183

184-
init(Labels = [{protocol, Protocol}, {queue_type, QueueType}], Extra) ->
184+
init(Labels = #{protocol := Protocol, queue_type := QueueType}, Extra) ->
185185
_ = seshat:new_group(?MODULE),
186-
Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_QUEUE_TYPE_COUNTERS ++ Extra),
186+
Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_QUEUE_TYPE_COUNTERS ++ Extra, Labels),
187187
persistent_term:put({?MODULE, Protocol, QueueType}, Counters);
188-
init(Labels = [{protocol, Protocol}], Extra) ->
188+
init(Labels = #{protocol := Protocol}, Extra) ->
189189
_ = seshat:new_group(?MODULE),
190-
Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_COUNTERS ++ Extra),
190+
Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_COUNTERS ++ Extra, Labels),
191191
persistent_term:put({?MODULE, Protocol}, Counters);
192-
init(Labels = [{queue_type, QueueType}, {dead_letter_strategy, DLS}], DeadLetterCounters) ->
192+
init(Labels = #{queue_type := QueueType, dead_letter_strategy := DLS}, DeadLetterCounters) ->
193193
_ = seshat:new_group(?MODULE),
194-
Counters = seshat:new(?MODULE, Labels, DeadLetterCounters),
194+
Counters = seshat:new(?MODULE, Labels, DeadLetterCounters, Labels),
195195
persistent_term:put({?MODULE, QueueType, DLS}, Counters).
196196

197197
overview() ->
198-
seshat:overview(?MODULE).
198+
seshat:counters(?MODULE).
199199

200200
prometheus_format() ->
201201
seshat:format(?MODULE).

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ setup(_Context) ->
276276
{default_ra_system, ?RA_SYSTEM}]}],
277277
[{persistent, true}]),
278278
RaServerConfig = #{cluster_name => ?RA_CLUSTER_NAME,
279+
metrics_labels => #{ra_system => ?RA_SYSTEM, module => ?MODULE},
279280
friendly_name => ?RA_FRIENDLY_NAME},
280281
case khepri:start(?RA_SYSTEM, RaServerConfig) of
281282
{ok, ?STORE_ID} ->

deps/rabbit/src/rabbit_observer_cli_quorum_queues.erl

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,11 @@ sheet_header() ->
123123
sheet_body(PrevState) ->
124124
{_, RaStates} = rabbit_quorum_queue:all_replica_states(),
125125
Body = [begin
126-
#resource{name = Name, virtual_host = Vhost} = R = amqqueue:get_name(Q),
126+
#resource{name = Name, virtual_host = Vhost} = amqqueue:get_name(Q),
127127
case rabbit_amqqueue:pid_of(Q) of
128128
none ->
129129
empty_row(Name);
130-
{QName, _QNode} = _QQ ->
130+
{QName, _QNode} = ServerId ->
131131
case whereis(QName) of
132132
undefined ->
133133
empty_row(Name);
@@ -139,7 +139,12 @@ sheet_body(PrevState) ->
139139
_ ->
140140
QQCounters = maps:get({QName, node()}, ra_counters:overview()),
141141
{ok, InternalName} = rabbit_queue_type_util:qname_to_internal_name(#resource{virtual_host = Vhost, name= Name}),
142-
[{_, CT, SnapIdx, LA, CI, LW, CL}] = ets:lookup(ra_metrics, R),
142+
#{snapshot_index := SnapIdx,
143+
last_written_index := LW,
144+
term := CT,
145+
commit_latency := CL,
146+
commit_index := CI,
147+
last_applied := LA} = ra:key_metrics(ServerId),
143148
[
144149
Pid,
145150
QName,

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1598,12 +1598,10 @@ transfer_leadership(Q, Destination) ->
15981598
end.
15991599

16001600
queue_length(Q) ->
1601-
Name = amqqueue:get_name(Q),
1602-
case ets:lookup(ra_metrics, Name) of
1603-
[] -> 0;
1604-
[{_, _, SnapIdx, _, _, LastIdx, _}] ->
1605-
LastIdx - SnapIdx
1606-
end.
1601+
ServerId = amqqueue:get_pid(Q),
1602+
#{snapshot_index := SnapIdx,
1603+
last_written_index := LastIdx} = key_metrics_rpc(ServerId),
1604+
LastIdx - SnapIdx.
16071605

16081606
get_replicas(Q) ->
16091607
get_nodes(Q).
@@ -1985,6 +1983,7 @@ make_ra_conf(Q, ServerId, TickTimeout,
19851983
SnapshotInterval, CheckpointInterval,
19861984
Membership, MacVersion) ->
19871985
QName = amqqueue:get_name(Q),
1986+
#resource{name = QNameBin} = QName,
19881987
RaMachine = ra_machine(Q),
19891988
[{ClusterName, _} | _] = Members = members(Q),
19901989
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
@@ -2000,6 +1999,8 @@ make_ra_conf(Q, ServerId, TickTimeout,
20001999
uid => UId,
20012000
friendly_name => FName,
20022001
metrics_key => QName,
2002+
metrics_labels => #{vhost => amqqueue:get_vhost(Q),
2003+
queue => QNameBin},
20032004
initial_members => Members,
20042005
log_init_args => LogCfg,
20052006
tick_timeout => TickTimeout,

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1349,6 +1349,7 @@ make_ra_conf(Node, Nodes, MinMacVersion) ->
13491349
uid => UId,
13501350
friendly_name => atom_to_list(?MODULE),
13511351
metrics_key => ?MODULE,
1352+
metrics_labels => #{ra_system => ?RA_SYSTEM, module => ?MODULE},
13521353
initial_members => Members,
13531354
log_init_args => #{uid => UId},
13541355
tick_timeout => TickTimeout,

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6967,11 +6967,11 @@ formatted_state(Pid) ->
69676967
proplists:get_value("State", L2).
69686968

69696969
get_global_counters(Config) ->
6970-
get_global_counters0(Config, [{protocol, amqp10}]).
6970+
get_global_counters0(Config, #{protocol => amqp10}).
69716971

69726972
get_global_counters(Config, QType) ->
6973-
get_global_counters0(Config, [{protocol, amqp10},
6974-
{queue_type, QType}]).
6973+
get_global_counters0(Config, #{protocol => amqp10,
6974+
queue_type => QType}).
69756975

69766976
get_global_counters0(Config, Key) ->
69776977
Overview = rpc(Config, rabbit_global_counters, overview, []),

deps/rabbit/test/dead_lettering_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1936,7 +1936,7 @@ counted(Metric, Config) ->
19361936
metric(QueueType, Strategy, Metric, OldCounters).
19371937

19381938
metric(QueueType, Strategy, Metric, Counters) ->
1939-
Metrics = maps:get([{queue_type, QueueType}, {dead_letter_strategy, Strategy}], Counters),
1939+
Metrics = maps:get(#{queue_type => QueueType, dead_letter_strategy => Strategy}, Counters),
19401940
maps:get(Metric, Metrics).
19411941

19421942
group_name(Config) ->

deps/rabbit/test/queue_type_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ smoke(Config) ->
162162
ok = publish_and_confirm(Ch, <<"non-existent_queue">>, <<"msg4">>),
163163
ConsumerTag3 = <<"ctag3">>,
164164
ok = subscribe(Ch, QName, ConsumerTag3),
165-
ProtocolCounters = maps:get([{protocol, amqp091}], get_global_counters(Config)),
165+
ProtocolCounters = maps:get(#{protocol => amqp091}, get_global_counters(Config)),
166166
?assertEqual(#{
167167
messages_confirmed_total => 4,
168168
messages_received_confirm_total => 4,
@@ -177,7 +177,7 @@ smoke(Config) ->
177177
"rabbit_" ++
178178
binary_to_list(?config(queue_type, Config)) ++
179179
"_queue"),
180-
ProtocolQueueTypeCounters = maps:get([{protocol, amqp091}, {queue_type, QueueType}],
180+
ProtocolQueueTypeCounters = maps:get(#{protocol => amqp091, queue_type => QueueType},
181181
get_global_counters(Config)),
182182
?assertEqual(#{
183183
messages_acknowledged_total => 3,
@@ -196,7 +196,7 @@ smoke(Config) ->
196196
?assertMatch(
197197
#{consumers := 0,
198198
publishers := 0},
199-
maps:get([{protocol, amqp091}], get_global_counters(Config))),
199+
maps:get(#{protocol => amqp091}, get_global_counters(Config))),
200200

201201
ok.
202202

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3145,6 +3145,9 @@ reconnect_consumer_and_wait_channel_down(Config) ->
31453145
{#'basic.deliver'{redelivered = false}, _} ->
31463146
wait_for_messages_ready(Servers, RaName, 0),
31473147
wait_for_messages_pending_ack(Servers, RaName, 1)
3148+
after 30000 ->
3149+
flush(1),
3150+
exit(basic_deliver_timeout)
31483151
end,
31493152
Up = [Leader, F2],
31503153
rabbit_ct_broker_helpers:block_traffic_between(F1, Leader),

deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -991,5 +991,5 @@ counted(Metric, Config) ->
991991
metric(Metric, OldCounters).
992992

993993
metric(Metric, Counters) ->
994-
Metrics = maps:get([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, at_least_once}], Counters),
994+
Metrics = maps:get(#{queue_type => rabbit_quorum_queue, dead_letter_strategy => at_least_once}, Counters),
995995
maps:get(Metric, Metrics).

0 commit comments

Comments
 (0)