Skip to content

Commit 0b9973f

Browse files
committed
Return stream frame header binary in dispatch chunk callback
This saves a system call by sending the frame header and the chunk header at the same time. References rabbitmq/osiris#192
1 parent a5106c6 commit 0b9973f

File tree

2 files changed

+10
-18
lines changed

2 files changed

+10
-18
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3568,12 +3568,9 @@ subscription_exists(StreamSubscriptions, SubscriptionId) ->
35683568
lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds).
35693569

35703570
send_file_callback(?VERSION_1,
3571-
Transport,
35723571
_Log,
35733572
#consumer{configuration =
3574-
#consumer_configuration{socket = S,
3575-
subscription_id =
3576-
SubscriptionId,
3573+
#consumer_configuration{subscription_id = SubId,
35773574
counters = Counters}},
35783575
Counter) ->
35793576
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
@@ -3584,19 +3581,16 @@ send_file_callback(?VERSION_1,
35843581
?REQUEST:1,
35853582
?COMMAND_DELIVER:15,
35863583
?VERSION_1:16,
3587-
SubscriptionId:8/unsigned>>,
3588-
Transport:send(S, FrameBeginning),
3584+
SubId:8/unsigned>>,
35893585
atomics:add(Counter, 1, Size),
35903586
increase_messages_consumed(Counters, NumEntries),
3591-
set_consumer_offset(Counters, FirstOffsetInChunk)
3587+
set_consumer_offset(Counters, FirstOffsetInChunk),
3588+
FrameBeginning
35923589
end;
35933590
send_file_callback(?VERSION_2,
3594-
Transport,
35953591
Log,
35963592
#consumer{configuration =
3597-
#consumer_configuration{socket = S,
3598-
subscription_id =
3599-
SubscriptionId,
3593+
#consumer_configuration{subscription_id = SubId,
36003594
counters = Counters}},
36013595
Counter) ->
36023596
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
@@ -3608,12 +3602,12 @@ send_file_callback(?VERSION_2,
36083602
?REQUEST:1,
36093603
?COMMAND_DELIVER:15,
36103604
?VERSION_2:16,
3611-
SubscriptionId:8/unsigned,
3605+
SubId:8/unsigned,
36123606
CommittedChunkId:64>>,
3613-
Transport:send(S, FrameBeginning),
36143607
atomics:add(Counter, 1, Size),
36153608
increase_messages_consumed(Counters, NumEntries),
3616-
set_consumer_offset(Counters, FirstOffsetInChunk)
3609+
set_consumer_offset(Counters, FirstOffsetInChunk),
3610+
FrameBeginning
36173611
end.
36183612

36193613
send_chunks(DeliverVersion,
@@ -3683,9 +3677,7 @@ send_chunks(DeliverVersion,
36833677
Retry,
36843678
Counter) ->
36853679
case osiris_log:send_file(Socket, Log,
3686-
send_file_callback(DeliverVersion,
3687-
Transport,
3688-
Log,
3680+
send_file_callback(DeliverVersion, Log,
36893681
Consumer,
36903682
Counter))
36913683
of

rabbitmq-components.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ dep_jose = hex 1.11.10
4949
dep_khepri = hex 0.17.1
5050
dep_khepri_mnesia_migration = hex 0.8.0
5151
dep_meck = hex 1.0.0
52-
dep_osiris = git https://github.com/rabbitmq/osiris v1.9.0
52+
dep_osiris = git https://github.com/rabbitmq/osiris send-file-improvements
5353
dep_prometheus = hex 5.1.1
5454
dep_ra = hex 2.17.0
5555
dep_ranch = hex 2.2.0

0 commit comments

Comments
 (0)