Skip to content

Commit 78afa29

Browse files
committed
checkpoint: simplify batchers, fix action type, handle errors better
1 parent 61e96aa commit 78afa29

File tree

5 files changed

+73
-38
lines changed

5 files changed

+73
-38
lines changed

assets/svelte/sinks/meilisearch/MeilisearchSinkForm.svelte

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,11 @@
7272
Return <code>action: :function</code> from your routing function
7373
</li>
7474
<li>
75-
Specify a filter expression and JavaScript-like function to update
76-
specific fields
75+
Specify a filter expression and a <a
76+
class="underline font-medium"
77+
href="https://rhai.rs/book/engine/hello-world.html"
78+
target="_blank">Rhai</a> (a JavaScript-like language) function to update
79+
specific fields.
7780
</li>
7881
<li>
7982
This is useful for updating nested arrays or specific document

docs/reference/sinks/meilisearch.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ The `function` action allows you to update documents using Meilisearch's functio
9797

9898
When using `action: "function"`, you must also provide:
9999
- `filter` - A Meilisearch filter expression to select documents to update
100-
- `function` - A Rhai function expression to apply to the matched documents
100+
- `function` - A function expression using [Rhai (a JavaScript-like language)](https://docs.meilisearch.com/reference/api/documents#edit-documents-with-rhai) to apply to the matched documents
101101
- `context` (optional) - Additional context data for the function
102102

103103
Example routing function with function updates:

lib/sequin/runtime/meilisearch_pipeline.ex

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,6 @@ defmodule Sequin.Runtime.MeilisearchPipeline do
2323
concurrency: concurrency,
2424
batch_size: consumer.sink.batch_size || 100,
2525
batch_timeout: 5
26-
],
27-
delete: [
28-
concurrency: concurrency,
29-
batch_size: consumer.sink.batch_size || 100,
30-
batch_timeout: 5
31-
],
32-
function: [
33-
concurrency: concurrency,
34-
batch_size: consumer.sink.batch_size || 100,
35-
batch_timeout: 5
3626
]
3727
]
3828
end
@@ -42,17 +32,17 @@ defmodule Sequin.Runtime.MeilisearchPipeline do
4232
routing_info = Routing.route_message(context.consumer, message.data)
4333
%Routing.Consumers.Meilisearch{action: action, index_name: index_name} = routing_info
4434

45-
batcher =
46-
case action do
47-
:index -> :default
48-
:delete -> :delete
49-
:function -> :function
35+
action =
36+
if is_atom(action) do
37+
# Backwards compatibility with existing routing functions with atom actions
38+
to_string(action)
39+
else
40+
action
5041
end
5142

5243
message =
5344
message
54-
|> Broadway.Message.put_batcher(batcher)
55-
|> Broadway.Message.put_batch_key(index_name)
45+
|> Broadway.Message.put_batch_key({action, index_name})
5646
|> Broadway.Message.update_data(fn data ->
5747
Map.put(data, :routing_info, routing_info)
5848
end)
@@ -61,14 +51,12 @@ defmodule Sequin.Runtime.MeilisearchPipeline do
6151
end
6252

6353
@impl SinkPipeline
64-
def handle_batch(:default, messages, batch_info, context) do
54+
def handle_batch(:default, messages, %{batch_key: {"index", index_name}}, context) do
6555
%{
6656
consumer: %SinkConsumer{sink: sink, transform: transform} = consumer,
6757
test_pid: test_pid
6858
} = context
6959

70-
index_name = batch_info.batch_key
71-
7260
setup_allowances(test_pid)
7361

7462
records =
@@ -97,14 +85,12 @@ defmodule Sequin.Runtime.MeilisearchPipeline do
9785
end
9886

9987
@impl SinkPipeline
100-
def handle_batch(:delete, messages, batch_info, context) do
88+
def handle_batch(:default, messages, %{batch_key: {"delete", index_name}}, context) do
10189
%{
10290
consumer: %SinkConsumer{sink: sink} = consumer,
10391
test_pid: test_pid
10492
} = context
10593

106-
index_name = batch_info.batch_key
107-
10894
setup_allowances(test_pid)
10995

11096
document_ids =
@@ -131,14 +117,12 @@ defmodule Sequin.Runtime.MeilisearchPipeline do
131117
end
132118

133119
@impl SinkPipeline
134-
def handle_batch(:function, messages, batch_info, context) do
120+
def handle_batch(:default, messages, %{batch_key: {"function", index_name}}, context) do
135121
%{
136122
consumer: %SinkConsumer{sink: sink} = consumer,
137123
test_pid: test_pid
138124
} = context
139125

140-
index_name = batch_info.batch_key
141-
142126
setup_allowances(test_pid)
143127

144128
# Process each message individually

lib/sequin/runtime/routing/consumers/meilisearch.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ defmodule Sequin.Runtime.Routing.Consumers.Meilisearch do
4040
def route(action, _record, _changes, _metadata) do
4141
meilisearch_action =
4242
case action do
43-
"insert" -> :index
44-
"update" -> :index
45-
"delete" -> :delete
46-
"read" -> :index
43+
"insert" -> "index"
44+
"update" -> "index"
45+
"delete" -> "delete"
46+
"read" -> "index"
4747
end
4848

4949
%{action: meilisearch_action}

lib/sequin/sinks/meilisearch/client.ex

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ defmodule Sequin.Sinks.Meilisearch.Client do
2020
req = base_request(sink)
2121

2222
case Req.get(req, url: "/tasks/#{task_id}") do
23-
{:ok, %{body: body}} ->
23+
{:ok, %{status: status, body: body}} when status in 200..299 ->
2424
case body do
2525
%{"status" => status} when status in ["enqueued", "processing"] ->
2626
timeout = Sequin.Time.exponential_backoff(200, retries, 10_000)
@@ -42,6 +42,16 @@ defmodule Sequin.Sinks.Meilisearch.Client do
4242
{:ok}
4343
end
4444

45+
{:ok, %{status: status, body: body}} ->
46+
message = extract_error_message(body) || "Request failed with status #{status}"
47+
48+
{:error,
49+
Error.service(
50+
service: :meilisearch,
51+
message: message,
52+
details: %{status: status, body: body}
53+
)}
54+
4555
{:error, reason} ->
4656
{:error, Error.service(service: :meilisearch, message: "Unknown error", details: reason)}
4757
end
@@ -64,9 +74,19 @@ defmodule Sequin.Sinks.Meilisearch.Client do
6474
)
6575

6676
case Req.put(req) do
67-
{:ok, %{body: body}} ->
77+
{:ok, %{status: status, body: body}} when status in 200..299 ->
6878
verify_task_by_id(sink, body["taskUid"], 0)
6979

80+
{:ok, %{status: status, body: body}} ->
81+
message = extract_error_message(body) || "Request failed with status #{status}"
82+
83+
{:error,
84+
Error.service(
85+
service: :meilisearch,
86+
message: message,
87+
details: %{status: status, body: body}
88+
)}
89+
7090
{:error, %Req.TransportError{} = error} ->
7191
{:error,
7292
Error.service(
@@ -93,9 +113,19 @@ defmodule Sequin.Sinks.Meilisearch.Client do
93113
)
94114

95115
case Req.post(req) do
96-
{:ok, %{body: body}} ->
116+
{:ok, %{status: status, body: body}} when status in 200..299 ->
97117
verify_task_by_id(sink, body["taskUid"], 0)
98118

119+
{:ok, %{status: status, body: body}} ->
120+
message = extract_error_message(body) || "Request failed with status #{status}"
121+
122+
{:error,
123+
Error.service(
124+
service: :meilisearch,
125+
message: message,
126+
details: %{status: status, body: body}
127+
)}
128+
99129
{:error, %Req.TransportError{} = error} ->
100130
{:error,
101131
Error.service(
@@ -129,9 +159,19 @@ defmodule Sequin.Sinks.Meilisearch.Client do
129159
)
130160

131161
case Req.post(req) do
132-
{:ok, %{body: body}} ->
162+
{:ok, %{status: status, body: body}} when status in 200..299 ->
133163
verify_task_by_id(sink, body["taskUid"], 0)
134164

165+
{:ok, %{status: status, body: body}} ->
166+
message = extract_error_message(body) || "Request failed with status #{status}"
167+
168+
{:error,
169+
Error.service(
170+
service: :meilisearch,
171+
message: message,
172+
details: %{status: status, body: body}
173+
)}
174+
135175
{:error, %Req.TransportError{} = error} ->
136176
{:error,
137177
Error.service(
@@ -170,9 +210,17 @@ defmodule Sequin.Sinks.Meilisearch.Client do
170210
req = base_request(sink)
171211

172212
case Req.get(req, url: "/health") do
173-
{:ok, %{status: status}} when status == 200 ->
213+
{:ok, %{status: status}} when status in 200..299 ->
174214
:ok
175215

216+
{:ok, %{status: status, body: body}} ->
217+
{:error,
218+
Error.service(
219+
service: :meilisearch,
220+
message: "Health check failed with status #{status}",
221+
details: %{status: status, body: body}
222+
)}
223+
176224
{:error, reason} ->
177225
{:error,
178226
Error.service(

0 commit comments

Comments
 (0)