Skip to content

Commit e867ff8

Browse files
committed
🗑️ Remove storage param from SlotMessageStore
1 parent bfcc2eb commit e867ff8

11 files changed

+126
-431
lines changed

config/runtime.exs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,7 @@ if config_env() == :prod and self_hosted do
232232
api_base_url: "http://#{server_host}:#{server_port}",
233233
release_version: System.get_env("RELEASE_VERSION"),
234234
backfill_max_pending_messages: backfill_max_pending_messages,
235-
max_memory_bytes: ConfigParser.max_memory_bytes(env_vars),
236-
default_max_storage_bytes: ConfigParser.default_max_storage_bytes(env_vars)
235+
max_memory_bytes: ConfigParser.max_memory_bytes(env_vars)
237236
end
238237

239238
if config_env() == :prod and not self_hosted do

lib/sequin/config_parser.ex

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ defmodule Sequin.ConfigParser do
8989

9090
defp validate_log_level(level, default_level) do
9191
Logger.warning("[ConfigParser] Invalid log level: #{inspect(level)}. Using default #{inspect(default_level)} level.")
92+
9293
default_level
9394
end
9495

@@ -181,18 +182,6 @@ defmodule Sequin.ConfigParser do
181182
end
182183
end
183184

184-
def default_max_storage_bytes(env) do
185-
case env["DEFAULT_MAX_STORAGE_MB"] do
186-
nil ->
187-
nil
188-
189-
mb_str ->
190-
mb_str
191-
|> String.to_integer()
192-
|> Sequin.Size.mb()
193-
end
194-
end
195-
196185
defp parse_buffer_percent(env) do
197186
env
198187
|> Map.get("MEMORY_BUFFER_PERCENT", "20")

lib/sequin/consumers/consumers.ex

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -284,17 +284,6 @@ defmodule Sequin.Consumers do
284284
round(Sequin.Size.mb(consumer.max_memory_mb) * 0.8)
285285
end
286286

287-
@doc """
288-
Calculates the maximum storage bytes allowed for a consumer.
289-
"""
290-
@spec max_storage_bytes_for_consumer(SinkConsumer.t()) ::
291-
non_neg_integer() | nil
292-
def max_storage_bytes_for_consumer(%SinkConsumer{max_storage_mb: nil}), do: nil
293-
294-
def max_storage_bytes_for_consumer(%SinkConsumer{max_storage_mb: max_storage_mb}) do
295-
round(Sequin.Size.mb(max_storage_mb) * 0.8)
296-
end
297-
298287
def earliest_sink_consumer_inserted_at_for_account(account_id) do
299288
account_id
300289
|> SinkConsumer.where_account_id()

lib/sequin/consumers/sink_consumer.ex

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ defmodule Sequin.Consumers.SinkConsumer do
4848
:status,
4949
:health,
5050
:max_memory_mb,
51-
:max_storage_mb,
5251
:legacy_transform,
5352
:timestamp_format,
5453
:batch_timeout_ms,
@@ -70,7 +69,6 @@ defmodule Sequin.Consumers.SinkConsumer do
7069
field :batch_timeout_ms, :integer, default: nil
7170
field :annotations, :map, default: %{}
7271
field :max_memory_mb, :integer, default: 128
73-
field :max_storage_mb, :integer, default: nil
7472
field :partition_count, :integer, default: 1
7573
field :legacy_transform, Ecto.Enum, values: [:none, :record_only], default: :none
7674
field :timestamp_format, Ecto.Enum, values: [:iso8601, :unix_microsecond], default: :iso8601
@@ -142,7 +140,6 @@ defmodule Sequin.Consumers.SinkConsumer do
142140
:sequence_id,
143141
:message_kind,
144142
:max_memory_mb,
145-
:max_storage_mb,
146143
:transform_id,
147144
:routing_id,
148145
:filter_id
@@ -186,7 +183,6 @@ defmodule Sequin.Consumers.SinkConsumer do
186183
:status,
187184
:annotations,
188185
:max_memory_mb,
189-
:max_storage_mb,
190186
:partition_count,
191187
:legacy_transform,
192188
:transform_id,
@@ -205,7 +201,6 @@ defmodule Sequin.Consumers.SinkConsumer do
205201
|> validate_number(:batch_size, less_than_or_equal_to: 1_000)
206202
|> validate_number(:batch_timeout_ms, greater_than: 0)
207203
|> validate_number(:max_memory_mb, greater_than_or_equal_to: 128)
208-
|> validate_number(:max_storage_mb, greater_than_or_equal_to: 256)
209204
|> validate_number(:partition_count, greater_than_or_equal_to: 1)
210205
|> validate_number(:max_retry_count, greater_than: 0)
211206
|> validate_inclusion(:legacy_transform, [:none, :record_only])
@@ -243,7 +238,6 @@ defmodule Sequin.Consumers.SinkConsumer do
243238
|> put_change(:max_waiting, get_field(changeset, :max_waiting) || 20)
244239
|> put_change(:max_ack_pending, get_field(changeset, :max_ack_pending) || 10_000)
245240
|> put_change(:max_memory_mb, get_field(changeset, :max_memory_mb) || 128)
246-
|> put_change(:max_storage_mb, get_field(changeset, :max_storage_mb) || default_max_storage_mb())
247241
|> put_change(:partition_count, get_field(changeset, :partition_count) || 1)
248242
|> put_change(:legacy_transform, get_field(changeset, :legacy_transform) || :none)
249243
|> put_change(:message_kind, get_field(changeset, :message_kind) || :event)
@@ -359,11 +353,4 @@ defmodule Sequin.Consumers.SinkConsumer do
359353
end
360354

361355
def preload_cached_http_endpoint(consumer), do: consumer
362-
363-
defp default_max_storage_mb do
364-
case Application.get_env(:sequin, :default_max_storage_bytes) do
365-
nil -> nil
366-
bytes -> round(bytes / 1024 / 1024)
367-
end
368-
end
369356
end

lib/sequin/runtime/consumer_lifecycle_event_worker.ex

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,11 @@ defmodule Sequin.Runtime.ConsumerLifecycleEventWorker do
7474
consumer = Repo.preload(consumer, :replication_slot)
7575
CheckSinkConfigurationWorker.enqueue(consumer.id)
7676

77-
# Restart the entire supervision tree for replication, including slot processor, smss, etc.
78-
# This is safest- later we can be a bit more intelligent about when to restart (ie. when name changes we don't have to restart)
79-
{:ok, _} = RuntimeSupervisor.restart_replication(consumer.replication_slot)
77+
unless consumer.status == :disabled do
78+
# Restart the entire supervision tree for replication, including slot processor, smss, etc.
79+
# This is safest- later we can be a bit more intelligent about when to restart (ie. when name changes we don't have to restart)
80+
{:ok, _} = RuntimeSupervisor.restart_replication(consumer.replication_slot)
81+
end
8082
end
8183

8284
"delete" ->

0 commit comments

Comments
 (0)