Skip to content

Commit d0c3b3a

Browse files
authored
Merge pull request #14255 from rabbitmq/stream-filtering-yield
Support concurrent links with stream filtering
2 parents 0183885 + 04009b8 commit d0c3b3a

File tree

5 files changed

+278
-84
lines changed

5 files changed

+278
-84
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,15 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId,
409409
outgoing_window = ?UINT(RemoteOutgoingWindow),
410410
handle_max = ClientHandleMax}}) ->
411411
process_flag(trap_exit, true),
412+
case application:get_env(rabbit, session_min_heap_size) of
413+
{ok, MinHeapSize} ->
414+
%% Increasing min_heap_size to e.g. 987 words can greatly speed up
415+
%% stream filtering due to less minor garbage collections.
416+
process_flag(min_heap_size, MinHeapSize),
417+
ok;
418+
undefined ->
419+
ok
420+
end,
412421
rabbit_process_flag:adjust_for_message_handling_proc(),
413422
logger:update_process_metadata(#{channel_number => ChannelNum,
414423
amqp_container => ContainerId,

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 109 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,17 @@
7777
policy, operator_policy, effective_policy_definition, type, memory,
7878
consumers, segments]).
7979

80+
-define(UNMATCHED_THRESHOLD, 200).
81+
8082
-type appender_seq() :: non_neg_integer().
8183

8284
-type msg() :: term(). %% TODO: refine
8385

8486
-record(stream, {mode :: rabbit_queue_type:consume_mode(),
8587
delivery_count :: none | rabbit_queue_type:delivery_count(),
8688
credit :: rabbit_queue_type:credit(),
89+
drain = false :: boolean(),
90+
credit_reply_outstanding = false :: boolean(),
8791
ack :: boolean(),
8892
start_offset = 0 :: non_neg_integer(),
8993
listening_offset = 0 :: non_neg_integer(),
@@ -95,6 +99,9 @@
9599
%% reversed order until the consumer has more credits to consume them.
96100
buffer_msgs_rev = [] :: [rabbit_amqqueue:qmsg()],
97101
filter :: rabbit_amqp_filter:expression(),
102+
%% Number of consecutive messages for which the filter evaluated to false
103+
unmatched = 0 :: non_neg_integer(),
104+
filtering_paused = false :: boolean(),
98105
reader_options :: map()}).
99106

100107
-record(stream_client, {stream_id :: string(),
@@ -513,39 +520,22 @@ credit_v1(_, _, _, _, _) ->
513520
credit(QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain,
514521
#stream_client{readers = Readers,
515522
name = Name,
516-
local_pid = LocalPid} = State0) ->
523+
local_pid = LocalPid} = State) ->
517524
case Readers of
518525
#{CTag := Str0 = #stream{delivery_count = DeliveryCountSnd}} ->
519526
LinkCreditSnd = amqp10_util:link_credit_snd(
520527
DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd),
521-
Str1 = Str0#stream{credit = LinkCreditSnd},
522-
{Str2 = #stream{delivery_count = DeliveryCount,
523-
credit = Credit,
524-
ack = Ack}, Msgs} = stream_entries(QName, Name, LocalPid, Str1),
525-
Str = case Drain andalso Credit > 0 of
526-
true ->
527-
Str2#stream{delivery_count = serial_number:add(DeliveryCount, Credit),
528-
credit = 0};
529-
false ->
530-
Str2
531-
end,
532-
State = State0#stream_client{readers = maps:update(CTag, Str, Readers)},
533-
Actions = deliver_actions(CTag, Ack, Msgs) ++ [{credit_reply,
534-
CTag,
535-
Str#stream.delivery_count,
536-
Str#stream.credit,
537-
available_messages(Str),
538-
Drain}],
539-
{State, Actions};
528+
Str1 = Str0#stream{credit = LinkCreditSnd,
529+
drain = Drain,
530+
credit_reply_outstanding = true},
531+
{Str2, Msgs} = stream_entries(QName, Name, CTag, LocalPid, Str1),
532+
{Str, Actions} = actions(CTag, Msgs, Str2),
533+
{State#stream_client{readers = maps:update(CTag, Str, Readers)},
534+
Actions};
540535
_ ->
541-
{State0, []}
536+
{State, []}
542537
end.
543538

544-
%% Returns only an approximation.
545-
available_messages(#stream{log = Log,
546-
last_consumed_offset = LastConsumedOffset}) ->
547-
max(0, osiris_log:committed_offset(Log) - LastConsumedOffset).
548-
549539
deliver(QSs, Msg, Options) ->
550540
lists:foldl(
551541
fun({Q, stateless}, {Qs, Actions}) ->
@@ -624,17 +614,34 @@ handle_event(_QName, {osiris_written, From, _WriterId, Corrs},
624614
slow = Slow},
625615
{ok, State, Actions};
626616
handle_event(QName, {osiris_offset, _From, _Offs},
627-
State = #stream_client{local_pid = LocalPid,
628-
readers = Readers0,
629-
name = Name}) ->
617+
State0 = #stream_client{local_pid = LocalPid,
618+
readers = Readers0,
619+
name = Name}) ->
630620
%% offset isn't actually needed as we use the atomic to read the
631621
%% current committed
632622
{Readers, Actions} = maps:fold(
633623
fun (Tag, Str0, {Rds, As}) ->
634-
{Str, Msgs} = stream_entries(QName, Name, LocalPid, Str0),
635-
{Rds#{Tag => Str}, deliver_actions(Tag, Str#stream.ack, Msgs) ++ As}
636-
end, {#{}, []}, Readers0),
637-
{ok, State#stream_client{readers = Readers}, Actions};
624+
{Str1, Msgs} = stream_entries(QName, Name, Tag, LocalPid, Str0),
625+
{Str, As1} = actions(Tag, Msgs, Str1),
626+
{[{Tag, Str} | Rds], As1 ++ As}
627+
end, {[], []}, Readers0),
628+
State = State0#stream_client{readers = maps:from_list(Readers)},
629+
{ok, State, Actions};
630+
handle_event(QName, {resume_filtering, CTag},
631+
#stream_client{name = Name,
632+
local_pid = LocalPid,
633+
readers = Readers0} = State) ->
634+
case Readers0 of
635+
#{CTag := Str0} ->
636+
Str1 = Str0#stream{unmatched = 0,
637+
filtering_paused = false},
638+
{Str2, Msgs} = stream_entries(QName, Name, CTag, LocalPid, Str1),
639+
{Str, Actions} = actions(CTag, Msgs, Str2),
640+
Readers = maps:update(CTag, Str, Readers0),
641+
{ok, State#stream_client{readers = Readers}, Actions};
642+
_ ->
643+
{ok, State, []}
644+
end;
638645
handle_event(_QName, {stream_leader_change, Pid}, State) ->
639646
{ok, update_leader_pid(Pid, State), []};
640647
handle_event(_QName, {stream_local_member_change, Pid},
@@ -690,7 +697,7 @@ settle(QName, _, CTag, MsgIds, #stream_client{readers = Readers0,
690697
%% all settle reasons will "give credit" to the stream queue
691698
Credit = length(MsgIds),
692699
Str1 = Str0#stream{credit = Credit0 + Credit},
693-
{Str, Msgs} = stream_entries(QName, Name, LocalPid, Str1),
700+
{Str, Msgs} = stream_entries(QName, Name, CTag, LocalPid, Str1),
694701
Readers = maps:update(CTag, Str, Readers0),
695702
{State#stream_client{readers = Readers},
696703
deliver_actions(CTag, Ack, Msgs)};
@@ -1132,7 +1139,10 @@ add_if_defined(Key, Value, Map) ->
11321139
maps:put(Key, Value, Map).
11331140

11341141
format_osiris_event(Evt, QRef) ->
1135-
{'$gen_cast', {queue_event, QRef, Evt}}.
1142+
{'$gen_cast', queue_event(QRef, Evt)}.
1143+
1144+
queue_event(QRef, Evt) ->
1145+
{queue_event, QRef, Evt}.
11361146

11371147
max_age(undefined) ->
11381148
undefined;
@@ -1159,21 +1169,21 @@ recover(Q) ->
11591169
maybe_send_reply(_ChPid, undefined) -> ok;
11601170
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
11611171

1162-
stream_entries(QName, Name, LocalPid,
1172+
stream_entries(QName, Name, CTag, LocalPid,
11631173
#stream{chunk_iterator = undefined,
11641174
credit = Credit} = Str0) ->
11651175
case Credit > 0 of
11661176
true ->
11671177
case chunk_iterator(Str0, LocalPid) of
11681178
{ok, Str} ->
1169-
stream_entries(QName, Name, LocalPid, Str);
1179+
stream_entries(QName, Name, CTag, LocalPid, Str);
11701180
{end_of_stream, Str} ->
11711181
{Str, []}
11721182
end;
11731183
false ->
11741184
{Str0, []}
11751185
end;
1176-
stream_entries(QName, Name, LocalPid,
1186+
stream_entries(QName, Name, CTag, LocalPid,
11771187
#stream{delivery_count = DC,
11781188
credit = Credit,
11791189
buffer_msgs_rev = Buf0,
@@ -1194,40 +1204,49 @@ stream_entries(QName, Name, LocalPid,
11941204
credit = Credit - BufLen,
11951205
buffer_msgs_rev = [],
11961206
last_consumed_offset = LastOff + BufLen},
1197-
stream_entries(QName, Name, LocalPid, Str, Buf0)
1207+
stream_entries(QName, Name, CTag, LocalPid, Str, Buf0)
11981208
end;
1199-
stream_entries(QName, Name, LocalPid, Str) ->
1200-
stream_entries(QName, Name, LocalPid, Str, []).
1209+
stream_entries(QName, Name, CTag, LocalPid, Str) ->
1210+
stream_entries(QName, Name, CTag, LocalPid, Str, []).
12011211

1202-
stream_entries(_, _, _, #stream{credit = Credit} = Str, Acc)
1212+
stream_entries(_, _, _, _, #stream{credit = Credit} = Str, Acc)
12031213
when Credit < 1 ->
12041214
{Str, lists:reverse(Acc)};
1205-
stream_entries(QName, Name, LocalPid,
1215+
stream_entries(QName, Name, CTag, LocalPid,
12061216
#stream{chunk_iterator = Iter0,
12071217
delivery_count = DC,
12081218
credit = Credit,
12091219
start_offset = StartOffset,
1210-
filter = Filter} = Str0, Acc0) ->
1220+
filter = Filter,
1221+
unmatched = Unmatched} = Str0, Acc0) ->
12111222
case osiris_log:iterator_next(Iter0) of
1223+
end_of_chunk when Unmatched > ?UNMATCHED_THRESHOLD ->
1224+
%% Pause filtering temporariliy for two reasons:
1225+
%% 1. Process Erlang messages in our mailbox to avoid blocking other links
1226+
%% 2. Send matched messages to the receiver as soon as possible
1227+
gen_server:cast(self(), queue_event(QName, {resume_filtering, CTag})),
1228+
{Str0#stream{filtering_paused = true}, lists:reverse(Acc0)};
12121229
end_of_chunk ->
12131230
case chunk_iterator(Str0, LocalPid) of
12141231
{ok, Str} ->
1215-
stream_entries(QName, Name, LocalPid, Str, Acc0);
1232+
stream_entries(QName, Name, CTag, LocalPid, Str, Acc0);
12161233
{end_of_stream, Str} ->
12171234
{Str, lists:reverse(Acc0)}
12181235
end;
12191236
{{Offset, Entry}, Iter} ->
12201237
{Str, Acc} = case Entry of
12211238
{batch, _NumRecords, 0, _Len, BatchedEntries} ->
1222-
{MsgsRev, NumMsgs} = parse_uncompressed_subbatch(
1223-
BatchedEntries, Offset, StartOffset,
1224-
QName, Name, LocalPid, Filter, {[], 0}),
1239+
{MsgsRev, NumMsgs, U} = parse_uncompressed_subbatch(
1240+
BatchedEntries, Offset, StartOffset,
1241+
QName, Name, LocalPid, Filter,
1242+
{[], 0, Unmatched}),
12251243
case Credit >= NumMsgs of
12261244
true ->
12271245
{Str0#stream{chunk_iterator = Iter,
12281246
delivery_count = delivery_count_add(DC, NumMsgs),
12291247
credit = Credit - NumMsgs,
1230-
last_consumed_offset = Offset + NumMsgs - 1},
1248+
last_consumed_offset = Offset + NumMsgs - 1,
1249+
unmatched = U},
12311250
MsgsRev ++ Acc0};
12321251
false ->
12331252
%% Consumer doesn't have sufficient credit.
@@ -1238,7 +1257,8 @@ stream_entries(QName, Name, LocalPid,
12381257
delivery_count = delivery_count_add(DC, Credit),
12391258
credit = 0,
12401259
buffer_msgs_rev = Buf,
1241-
last_consumed_offset = Offset + Credit - 1},
1260+
last_consumed_offset = Offset + Credit - 1,
1261+
unmatched = U},
12421262
MsgsRev1 ++ Acc0}
12431263
end;
12441264
{batch, _, _CompressionType, _, _} ->
@@ -1252,20 +1272,22 @@ stream_entries(QName, Name, LocalPid,
12521272
Name, LocalPid, Filter) of
12531273
none ->
12541274
{Str0#stream{chunk_iterator = Iter,
1255-
last_consumed_offset = Offset},
1275+
last_consumed_offset = Offset,
1276+
unmatched = Unmatched + 1},
12561277
Acc0};
12571278
Msg ->
12581279
{Str0#stream{chunk_iterator = Iter,
12591280
delivery_count = delivery_count_add(DC, 1),
12601281
credit = Credit - 1,
1261-
last_consumed_offset = Offset},
1282+
last_consumed_offset = Offset,
1283+
unmatched = 0},
12621284
[Msg | Acc0]}
12631285
end;
12641286
false ->
12651287
{Str0#stream{chunk_iterator = Iter}, Acc0}
12661288
end
12671289
end,
1268-
stream_entries(QName, Name, LocalPid, Str, Acc)
1290+
stream_entries(QName, Name, CTag, LocalPid, Str, Acc)
12691291
end.
12701292

12711293
chunk_iterator(#stream{credit = Credit,
@@ -1300,14 +1322,14 @@ parse_uncompressed_subbatch(
13001322
Len:31/unsigned,
13011323
Entry:Len/binary,
13021324
Rem/binary>>,
1303-
Offset, StartOffset, QName, Name, LocalPid, Filter, Acc0 = {AccList, AccCount}) ->
1325+
Offset, StartOffset, QName, Name, LocalPid, Filter, Acc0 = {AccList, AccCount, Unmatched}) ->
13041326
Acc = case Offset >= StartOffset of
13051327
true ->
13061328
case entry_to_msg(Entry, Offset, QName, Name, LocalPid, Filter) of
13071329
none ->
1308-
Acc0;
1330+
setelement(3, Acc0, Unmatched + 1);
13091331
Msg ->
1310-
{[Msg | AccList], AccCount + 1}
1332+
{[Msg | AccList], AccCount + 1, 0}
13111333
end;
13121334
false ->
13131335
Acc0
@@ -1418,6 +1440,37 @@ is_minority(All, Up) ->
14181440
MinQuorum = length(All) div 2 + 1,
14191441
length(Up) < MinQuorum.
14201442

1443+
actions(CTag, Msgs, #stream{ack = Ack} = Str0) ->
1444+
Str1 = maybe_drain(Str0),
1445+
{Str, Actions} = credit_reply(CTag, Str1),
1446+
{Str, deliver_actions(CTag, Ack, Msgs) ++ Actions}.
1447+
1448+
maybe_drain(#stream{delivery_count = DeliveryCount,
1449+
credit = Credit,
1450+
drain = true,
1451+
filtering_paused = false} = Str)
1452+
when Credit > 0 ->
1453+
Str#stream{delivery_count = serial_number:add(DeliveryCount, Credit),
1454+
credit = 0};
1455+
maybe_drain(Str) ->
1456+
Str.
1457+
1458+
credit_reply(CTag, #stream{delivery_count = DeliveryCount,
1459+
credit = Credit,
1460+
drain = Drain,
1461+
credit_reply_outstanding = true,
1462+
filtering_paused = false} = Str) ->
1463+
{Str#stream{credit_reply_outstanding = false},
1464+
[{credit_reply, CTag, DeliveryCount, Credit,
1465+
available_messages(Str), Drain}]};
1466+
credit_reply(_, Str) ->
1467+
{Str, []}.
1468+
1469+
%% Returns only an approximation.
1470+
available_messages(#stream{log = Log,
1471+
last_consumed_offset = LastConsumedOffset}) ->
1472+
max(0, osiris_log:committed_offset(Log) - LastConsumedOffset).
1473+
14211474
deliver_actions(_, _, []) ->
14221475
[];
14231476
deliver_actions(CTag, Ack, Msgs) ->

0 commit comments

Comments
 (0)