Skip to content

Commit 6923a53

Browse files
committed
🚏 Add support for meilisearch functions
1 parent 7ba4aa7 commit 6923a53

File tree

9 files changed

+553
-43
lines changed

9 files changed

+553
-43
lines changed

assets/svelte/consumers/dynamicRoutingDocs.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ export const routedSinkDocs: Record<RoutedSinkType, RoutedSinkDocs> = {
128128
meilisearch: {
129129
fields: {
130130
action: {
131-
description: "Meilisearch action to perform",
131+
description:
132+
"Meilisearch action to perform: 'index', 'delete', or 'function'",
132133
staticValue:
133134
"'index' for insert, update, and read, 'delete' for delete",
134135
dynamicDefault:
@@ -140,6 +141,24 @@ export const routedSinkDocs: Record<RoutedSinkType, RoutedSinkDocs> = {
140141
staticFormField: "index_name",
141142
dynamicDefault: "sequin.<table_schema>.<table_name>",
142143
},
144+
function: {
145+
description:
146+
"RHAI Function to apply (required when action is 'function')",
147+
staticValue: "<empty>",
148+
dynamicDefault: "<empty>",
149+
},
150+
filter: {
151+
description:
152+
"Filter expression for function updates (usable when action is 'function')",
153+
staticValue: "<empty>",
154+
dynamicDefault: "<empty>",
155+
},
156+
context: {
157+
description:
158+
"Object for function updates (usable when action is 'function')",
159+
staticValue: "<empty>",
160+
dynamicDefault: "<empty>",
161+
},
143162
},
144163
},
145164
elasticsearch: {

assets/svelte/sinks/meilisearch/MeilisearchSinkForm.svelte

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,26 @@
6464
</li>
6565
</ul>
6666
</div>
67+
<div class="mb-2">
68+
<strong>Advanced:</strong> You can use routing functions to perform
69+
partial updates using Meilisearch's function capability:
70+
<ul class="ml-6 list-disc">
71+
<li>
72+
Return <code>action: :function</code> from your routing function
73+
</li>
74+
<li>
75+
Specify a filter expression and a <a
76+
class="underline font-medium"
77+
href="https://rhai.rs/book/engine/hello-world.html"
78+
target="_blank">Rhai</a> (a JavaScript-like language) function to update
79+
specific fields.
80+
</li>
81+
<li>
82+
This is useful for updating nested arrays or specific document
83+
properties
84+
</li>
85+
</ul>
86+
</div>
6787
</Alert.Description>
6888
</Alert.Root>
6989

docs/how-to/stream-postgres-to-meilisearch.mdx

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,14 @@ For large-scale applications, you may consider sharding indexes—e.g., sharding
5050

5151
You can use Sequin's [filters](/reference/filters) to accomplish this. For example, to shard `users` by region, create a sink per region with filters to route the correct rows to the corresponding Meilisearch index.
5252

53-
<Note>
54-
Sequin will soon support [Routing functions](/reference/transforms) for dynamically routing rows from one table to multiple Meilisearch indexes within a single sink.
55-
</Note>
53+
### Advanced: Using routing functions
54+
55+
Sequin supports [routing functions](/reference/routing) for dynamically routing rows and performing advanced operations. With routing functions, you can:
56+
- Route rows from one table to multiple Meilisearch indexes
57+
- Perform function-based updates (increments, array modifications, etc.)
58+
- Apply conditional logic for complex routing scenarios
59+
60+
See the [Meilisearch sink reference](/reference/sinks/meilisearch#routing) for detailed examples.
5661

5762
## Create Meilisearch sink
5863

@@ -150,6 +155,45 @@ If documents don't seem to be flowing:
150155
2. Click any failed message.
151156
3. Check the delivery logs for error details, including the response from Meilisearch.
152157

158+
## Advanced use cases
159+
160+
### Using function updates for counters and aggregations
161+
162+
Meilisearch's function update feature, combined with Sequin's routing functions, enables powerful use cases like maintaining counters or aggregated values without re-indexing entire documents.
163+
164+
For example, if you have a `products` index and want to update view counts or inventory levels:
165+
166+
```elixir
167+
def route(action, record, changes, metadata) do
168+
case metadata.table_name do
169+
# When a product is viewed, increment its view counter
170+
"product_views" ->
171+
%{
172+
action: :function,
173+
index_name: "products",
174+
filter: "id = #{record["product_id"]}",
175+
function: "doc.view_count = doc.view_count + 1"
176+
}
177+
178+
# When inventory changes, update the stock quantity
179+
"inventory_transactions" ->
180+
%{
181+
action: :function,
182+
index_name: "products",
183+
filter: "sku = '#{record["sku"]}'",
184+
function: "doc.stock_quantity = doc.stock_quantity + context.quantity_change",
185+
context: %{quantity_change: record["quantity_change"]}
186+
}
187+
188+
# Default behavior for products table
189+
"products" ->
190+
%{index_name: "products", action: if(record["deleted_at"], do: :delete, else: :index)}
191+
end
192+
end
193+
```
194+
195+
This approach is much more efficient than re-indexing the entire product document for simple counter updates.
196+
153197
## Next steps
154198

155-
Assuming you've followed the steps above for your local environment, "[How to deploy to production](/how-to/deploy-to-production)" will show you how to deploy your implementation to production.
199+
Assuming you've followed the steps above for your local environment, "[How to deploy to production](/how-to/deploy-to-production)" will show you how to deploy your implementation to production.

docs/reference/sinks/meilisearch.mdx

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,14 @@ Sequin does not perform any searches.
6868

6969
The Meilisearch sink supports dynamic routing of the `action` and `index_name` with [routing functions](/reference/routing).
7070

71-
Example routing function:
71+
### Routing Actions
72+
73+
The Meilisearch sink supports three actions:
74+
- `index` - Create or update a document
75+
- `delete` - Delete a document
76+
- `function` - Apply a function-based update using Meilisearch's [documents edit API](https://www.meilisearch.com/docs/reference/api/documents#edit-documents-with-a-function)
77+
78+
### Basic Example
7279

7380
```elixir
7481
def route(action, record, changes, metadata) do
@@ -80,6 +87,47 @@ def route(action, record, changes, metadata) do
8087
end
8188
```
8289

90+
### Function Updates
91+
92+
The `function` action allows you to update documents using Meilisearch's function expressions. This is useful for:
93+
- Incrementing/decrementing values
94+
- Modifying arrays
95+
- Conditional updates
96+
- Complex transformations that would be difficult with regular indexing
97+
98+
When using `action: "function"`, you must also provide:
99+
- `filter` - A Meilisearch filter expression to select documents to update
100+
- `function` - A function expression using [Rhai (a JavaScript-like language)](https://docs.meilisearch.com/reference/api/documents#edit-documents-with-rhai) to apply to the matched documents
101+
- `context` (optional) - Additional context data for the function
102+
103+
Example routing function with function updates:
104+
105+
```elixir
106+
def route(action, record, changes, metadata) do
107+
# Increment view count when a page is viewed
108+
if metadata.table_name == "page_views" do
109+
%{
110+
action: :function,
111+
index_name: "pages",
112+
filter: "id = #{record["page_id"]}",
113+
function: "doc.view_count += 1"
114+
}
115+
# Update inventory count for products
116+
elsif metadata.table_name == "inventory_updates" do
117+
%{
118+
action: :function,
119+
index_name: "products",
120+
filter: "sku = '#{record["sku"]}'",
121+
function: "doc.stock_quantity += context.quantity_change",
122+
context: %{quantity_change: record["quantity_change"]}
123+
}
124+
else
125+
# Default indexing behavior
126+
%{index_name: metadata.table_name, action: "index"}
127+
end
128+
end
129+
```
130+
83131
When not using a routing function, documents will be written to the index specified in the sink configuration.
84132

85133
## Error handling

lib/sequin/runtime/meilisearch_pipeline.ex

Lines changed: 64 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,43 +23,40 @@ defmodule Sequin.Runtime.MeilisearchPipeline do
2323
concurrency: concurrency,
2424
batch_size: consumer.sink.batch_size || 100,
2525
batch_timeout: 5
26-
],
27-
delete: [
28-
concurrency: concurrency,
29-
batch_size: consumer.sink.batch_size || 100,
30-
batch_timeout: 5
3126
]
3227
]
3328
end
3429

3530
@impl SinkPipeline
3631
def handle_message(message, context) do
37-
%Routing.Consumers.Meilisearch{action: action, index_name: index_name} =
38-
Routing.route_message(context.consumer, message.data)
39-
40-
batcher =
41-
case action do
42-
:index -> :default
43-
:delete -> :delete
32+
routing_info = Routing.route_message(context.consumer, message.data)
33+
%Routing.Consumers.Meilisearch{action: action, index_name: index_name} = routing_info
34+
35+
action =
36+
if is_atom(action) do
37+
# Backwards compatibility with existing routing functions with atom actions
38+
to_string(action)
39+
else
40+
action
4441
end
4542

4643
message =
4744
message
48-
|> Broadway.Message.put_batcher(batcher)
49-
|> Broadway.Message.put_batch_key(index_name)
45+
|> Broadway.Message.put_batch_key({action, index_name})
46+
|> Broadway.Message.update_data(fn data ->
47+
Map.put(data, :routing_info, routing_info)
48+
end)
5049

5150
{:ok, message, context}
5251
end
5352

5453
@impl SinkPipeline
55-
def handle_batch(:default, messages, batch_info, context) do
54+
def handle_batch(:default, messages, %{batch_key: {"index", index_name}}, context) do
5655
%{
5756
consumer: %SinkConsumer{sink: sink, transform: transform} = consumer,
5857
test_pid: test_pid
5958
} = context
6059

61-
index_name = batch_info.batch_key
62-
6360
setup_allowances(test_pid)
6461

6562
records =
@@ -70,7 +67,7 @@ defmodule Sequin.Runtime.MeilisearchPipeline do
7067
end)
7168

7269
case Client.import_documents(sink, index_name, records) do
73-
{:ok} ->
70+
:ok ->
7471
Trace.info(consumer.id, %Trace.Event{
7572
message: "Imported documents to \"#{index_name}\" index"
7673
})
@@ -88,21 +85,19 @@ defmodule Sequin.Runtime.MeilisearchPipeline do
8885
end
8986

9087
@impl SinkPipeline
91-
def handle_batch(:delete, messages, batch_info, context) do
88+
def handle_batch(:default, messages, %{batch_key: {"delete", index_name}}, context) do
9289
%{
9390
consumer: %SinkConsumer{sink: sink} = consumer,
9491
test_pid: test_pid
9592
} = context
9693

97-
index_name = batch_info.batch_key
98-
9994
setup_allowances(test_pid)
10095

10196
document_ids =
10297
Enum.flat_map(messages, fn %{data: message} -> message.record_pks end)
10398

10499
case Client.delete_documents(sink, index_name, document_ids) do
105-
{:ok} ->
100+
:ok ->
106101
Trace.info(consumer.id, %Trace.Event{
107102
message: "Deleted documents from \"#{index_name}\" index",
108103
extra: %{document_ids: document_ids}
@@ -121,6 +116,53 @@ defmodule Sequin.Runtime.MeilisearchPipeline do
121116
end
122117
end
123118

119+
@impl SinkPipeline
120+
def handle_batch(:default, messages, %{batch_key: {"function", index_name}}, context) do
121+
%{
122+
consumer: %SinkConsumer{sink: sink} = consumer,
123+
test_pid: test_pid
124+
} = context
125+
126+
setup_allowances(test_pid)
127+
128+
# Process each message individually
129+
results =
130+
Enum.map(messages, fn message ->
131+
%{routing_info: routing_info} = message.data
132+
133+
case Client.update_documents_with_function(
134+
sink,
135+
index_name,
136+
routing_info.filter,
137+
routing_info.function,
138+
routing_info.context || %{}
139+
) do
140+
:ok ->
141+
Trace.info(consumer.id, %Trace.Event{
142+
message: "Applied function update to \"#{index_name}\" index",
143+
extra: %{filter: routing_info.filter, function: routing_info.function}
144+
})
145+
146+
:ok
147+
148+
{:error, error} ->
149+
Trace.error(consumer.id, %Trace.Event{
150+
message: "Failed to apply function update to \"#{index_name}\" index",
151+
error: error,
152+
extra: %{filter: routing_info.filter, function: routing_info.function}
153+
})
154+
155+
{:error, error}
156+
end
157+
end)
158+
159+
# If any update failed, return the first error
160+
case Enum.find(results, &match?({:error, _}, &1)) do
161+
{:error, error} -> {:error, error}
162+
nil -> {:ok, messages, context}
163+
end
164+
end
165+
124166
# Helper functions
125167

126168
# If the consumer does not have a transform, attempt to fill in missing primary key

lib/sequin/runtime/routing/consumers/meilisearch.ex

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,47 @@ defmodule Sequin.Runtime.Routing.Consumers.Meilisearch do
33
use Sequin.Runtime.Routing.RoutedConsumer
44

55
@primary_key false
6-
@derive {Jason.Encoder, only: [:index_name]}
6+
@derive {Jason.Encoder, only: [:index_name, :filter, :function, :context]}
77
typed_embedded_schema do
8-
field :action, Ecto.Enum, values: [:index, :delete]
8+
field :action, Ecto.Enum, values: [:index, :delete, :function]
99
field :index_name, :string
10+
field :filter, :string
11+
field :function, :string
12+
field :context, :map
1013
end
1114

1215
def changeset(struct, params) do
13-
allowed_keys = [:action, :index_name]
16+
allowed_keys = [:action, :index_name, :filter, :function, :context]
1417

1518
struct
1619
|> cast(params, allowed_keys, empty_values: [])
1720
|> Routing.Helpers.validate_no_extra_keys(params, allowed_keys)
1821
|> validate_required([:action, :index_name])
19-
|> validate_inclusion(:action, [:index, :delete])
22+
|> validate_inclusion(:action, [:index, :delete, :function])
2023
|> validate_length(:index_name, min: 1, max: 1024)
24+
|> validate_function_fields()
25+
end
26+
27+
defp validate_function_fields(changeset) do
28+
action = get_field(changeset, :action)
29+
30+
if action == :function do
31+
changeset
32+
|> validate_required([:filter, :function])
33+
|> validate_length(:filter, min: 1, max: 10_000)
34+
|> validate_length(:function, min: 1, max: 10_000)
35+
else
36+
changeset
37+
end
2138
end
2239

2340
def route(action, _record, _changes, _metadata) do
2441
meilisearch_action =
2542
case action do
26-
"insert" -> :index
27-
"update" -> :index
28-
"delete" -> :delete
29-
"read" -> :index
43+
"insert" -> "index"
44+
"update" -> "index"
45+
"delete" -> "delete"
46+
"read" -> "index"
3047
end
3148

3249
%{action: meilisearch_action}

0 commit comments

Comments
 (0)