Skip to content

Commit de29b53

Browse files
committed
Return stream frame header binary in dispatch chunk callback
This saves a system call by sending the frame header and the chunk header at the same time. References rabbitmq/osiris#192
1 parent ee652cb commit de29b53

File tree

2 files changed

+10
-18
lines changed

2 files changed

+10
-18
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3565,12 +3565,9 @@ subscription_exists(StreamSubscriptions, SubscriptionId) ->
35653565
lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds).
35663566

35673567
send_file_callback(?VERSION_1,
3568-
Transport,
35693568
_Log,
35703569
#consumer{configuration =
3571-
#consumer_configuration{socket = S,
3572-
subscription_id =
3573-
SubscriptionId,
3570+
#consumer_configuration{subscription_id = SubId,
35743571
counters = Counters}},
35753572
Counter) ->
35763573
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
@@ -3581,19 +3578,16 @@ send_file_callback(?VERSION_1,
35813578
?REQUEST:1,
35823579
?COMMAND_DELIVER:15,
35833580
?VERSION_1:16,
3584-
SubscriptionId:8/unsigned>>,
3585-
Transport:send(S, FrameBeginning),
3581+
SubId:8/unsigned>>,
35863582
atomics:add(Counter, 1, Size),
35873583
increase_messages_consumed(Counters, NumEntries),
3588-
set_consumer_offset(Counters, FirstOffsetInChunk)
3584+
set_consumer_offset(Counters, FirstOffsetInChunk),
3585+
FrameBeginning
35893586
end;
35903587
send_file_callback(?VERSION_2,
3591-
Transport,
35923588
Log,
35933589
#consumer{configuration =
3594-
#consumer_configuration{socket = S,
3595-
subscription_id =
3596-
SubscriptionId,
3590+
#consumer_configuration{subscription_id = SubId,
35973591
counters = Counters}},
35983592
Counter) ->
35993593
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
@@ -3605,12 +3599,12 @@ send_file_callback(?VERSION_2,
36053599
?REQUEST:1,
36063600
?COMMAND_DELIVER:15,
36073601
?VERSION_2:16,
3608-
SubscriptionId:8/unsigned,
3602+
SubId:8/unsigned,
36093603
CommittedChunkId:64>>,
3610-
Transport:send(S, FrameBeginning),
36113604
atomics:add(Counter, 1, Size),
36123605
increase_messages_consumed(Counters, NumEntries),
3613-
set_consumer_offset(Counters, FirstOffsetInChunk)
3606+
set_consumer_offset(Counters, FirstOffsetInChunk),
3607+
FrameBeginning
36143608
end.
36153609

36163610
send_chunks(DeliverVersion,
@@ -3680,9 +3674,7 @@ send_chunks(DeliverVersion,
36803674
Retry,
36813675
Counter) ->
36823676
case osiris_log:send_file(Socket, Log,
3683-
send_file_callback(DeliverVersion,
3684-
Transport,
3685-
Log,
3677+
send_file_callback(DeliverVersion, Log,
36863678
Consumer,
36873679
Counter))
36883680
of

rabbitmq-components.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ dep_jose = hex 1.11.10
4949
dep_khepri = hex 0.17.1
5050
dep_khepri_mnesia_migration = hex 0.8.0
5151
dep_meck = hex 1.0.0
52-
dep_osiris = git https://github.com/rabbitmq/osiris v1.8.8
52+
dep_osiris = git https://github.com/rabbitmq/osiris send-file-improvements
5353
dep_prometheus = hex 4.11.0
5454
dep_ra = hex 2.16.11
5555
dep_ranch = hex 2.2.0

0 commit comments

Comments
 (0)