Skip to content

Commit dee1ab5

Browse files
committed
Add more validation around initing restart lsn
1 parent 96f01b1 commit dee1ab5

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

lib/sequin/runtime/slot_producer/slot_producer.ex

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,10 @@ defmodule Sequin.Runtime.SlotProducer do
547547
raise "New restart cursor is behind new restart cursor: #{inspect(restart_wal_cursor)} < #{inspect(state.restart_wal_cursor)}"
548548
end
549549

550+
if is_nil(restart_wal_cursor) or is_nil(restart_wal_cursor.commit_lsn) or is_nil(restart_wal_cursor.commit_idx) do
551+
raise "[SlotProducer] restart_wal_cursor is empty"
552+
end
553+
550554
Replication.put_restart_wal_cursor!(state.id, restart_wal_cursor)
551555

552556
%{state | restart_wal_cursor: restart_wal_cursor}
@@ -638,7 +642,7 @@ defmodule Sequin.Runtime.SlotProducer do
638642
case Replication.restart_wal_cursor(state.id) do
639643
{:error, %NotFoundError{}} ->
640644
case Protocol.handle_simple(query, [], protocol) do
641-
{:ok, [%Postgrex.Result{rows: [[lsn]]}], protocol} ->
645+
{:ok, [%Postgrex.Result{rows: [[lsn]]}], protocol} when not is_nil(lsn) ->
642646
cursor = %{commit_lsn: Postgres.lsn_to_int(lsn), commit_idx: 0}
643647
{:ok, %{state | restart_wal_cursor: cursor}, protocol}
644648

0 commit comments

Comments
 (0)