Skip to content

🚏 Add support for meilisearch functions #2019

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion assets/svelte/consumers/dynamicRoutingDocs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ export const routedSinkDocs: Record<RoutedSinkType, RoutedSinkDocs> = {
meilisearch: {
fields: {
action: {
description: "Meilisearch action to perform",
description:
"Meilisearch action to perform: 'index', 'delete', or 'function'",
staticValue:
"'index' for insert, update, and read, 'delete' for delete",
dynamicDefault:
Expand All @@ -140,6 +141,24 @@ export const routedSinkDocs: Record<RoutedSinkType, RoutedSinkDocs> = {
staticFormField: "index_name",
dynamicDefault: "sequin.<table_schema>.<table_name>",
},
function: {
description:
"RHAI Function to apply (required when action is 'function')",
staticValue: "<empty>",
dynamicDefault: "<empty>",
},
filter: {
description:
"Filter expression for function updates (usable when action is 'function')",
staticValue: "<empty>",
dynamicDefault: "<empty>",
},
context: {
description:
"Object for function updates (usable when action is 'function')",
staticValue: "<empty>",
dynamicDefault: "<empty>",
},
},
},
elasticsearch: {
Expand Down
20 changes: 20 additions & 0 deletions assets/svelte/sinks/meilisearch/MeilisearchSinkForm.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@
</li>
</ul>
</div>
<div class="mb-2">
<strong>Advanced:</strong> You can use routing functions to perform
partial updates using Meilisearch's function capability:
<ul class="ml-6 list-disc">
<li>
Return <code>action: :function</code> from your routing function
</li>
<li>
Specify a filter expression and a <a
class="underline font-medium"
href="https://rhai.rs/book/engine/hello-world.html"
target="_blank">Rhai</a
> (a JavaScript-like language) function to update specific fields.
</li>
<li>
This is useful for updating nested arrays or specific document
properties
</li>
</ul>
</div>
</Alert.Description>
</Alert.Root>

Expand Down
52 changes: 48 additions & 4 deletions docs/how-to/stream-postgres-to-meilisearch.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,14 @@ For large-scale applications, you may consider sharding indexes—e.g., sharding

You can use Sequin's [filters](/reference/filters) to accomplish this. For example, to shard `users` by region, create a sink per region with filters to route the correct rows to the corresponding Meilisearch index.

<Note>
Sequin will soon support [Routing functions](/reference/transforms) for dynamically routing rows from one table to multiple Meilisearch indexes within a single sink.
</Note>
### Advanced: Using routing functions

Sequin supports [routing functions](/reference/routing) for dynamically routing rows and performing advanced operations. With routing functions, you can:
- Route rows from one table to multiple Meilisearch indexes
- Perform function-based updates (increments, array modifications, etc.)
- Apply conditional logic for complex routing scenarios

See the [Meilisearch sink reference](/reference/sinks/meilisearch#routing) for detailed examples.

## Create Meilisearch sink

Expand Down Expand Up @@ -150,6 +155,45 @@ If documents don't seem to be flowing:
2. Click any failed message.
3. Check the delivery logs for error details, including the response from Meilisearch.

## Advanced use cases

### Using function updates for counters and aggregations

Meilisearch's function update feature, combined with Sequin's routing functions, enables powerful use cases like maintaining counters or aggregated values without re-indexing entire documents.

For example, if you have a `products` index and want to update view counts or inventory levels:

```elixir
def route(action, record, changes, metadata) do
case metadata.table_name do
# When a product is viewed, increment its view counter
"product_views" ->
%{
action: :function,
index_name: "products",
filter: "id = #{record["product_id"]}",
function: "doc.view_count = doc.view_count + 1"
}

# When inventory changes, update the stock quantity
"inventory_transactions" ->
%{
action: :function,
index_name: "products",
filter: "sku = '#{record["sku"]}'",
function: "doc.stock_quantity = doc.stock_quantity + context.quantity_change",
context: %{quantity_change: record["quantity_change"]}
}

# Default behavior for products table
"products" ->
%{index_name: "products", action: if(record["deleted_at"], do: :delete, else: :index)}
end
end
```

This approach is much more efficient than re-indexing the entire product document for simple counter updates.

## Next steps

Assuming you've followed the steps above for your local environment, "[How to deploy to production](/how-to/deploy-to-production)" will show you how to deploy your implementation to production.
Assuming you've followed the steps above for your local environment, "[How to deploy to production](/how-to/deploy-to-production)" will show you how to deploy your implementation to production.
50 changes: 49 additions & 1 deletion docs/reference/sinks/meilisearch.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,14 @@ Sequin does not perform any searches.

The Meilisearch sink supports dynamic routing of the `action` and `index_name` with [routing functions](/reference/routing).

Example routing function:
### Routing Actions

The Meilisearch sink supports three actions:
- `index` - Create or update a document
- `delete` - Delete a document
- `function` - Apply a function-based update using Meilisearch's [documents edit API](https://www.meilisearch.com/docs/reference/api/documents#edit-documents-with-a-function)

### Basic Example

```elixir
def route(action, record, changes, metadata) do
Expand All @@ -80,6 +87,47 @@ def route(action, record, changes, metadata) do
end
```

### Function Updates

The `function` action allows you to update documents using Meilisearch's function expressions. This is useful for:
- Incrementing/decrementing values
- Modifying arrays
- Conditional updates
- Complex transformations that would be difficult with regular indexing

When using `action: "function"`, you must also provide:
- `filter` - A Meilisearch filter expression to select documents to update
- `function` - A function expression using [Rhai (a JavaScript-like language)](https://docs.meilisearch.com/reference/api/documents#edit-documents-with-rhai) to apply to the matched documents
- `context` (optional) - Additional context data for the function

Example routing function with function updates:

```elixir
def route(action, record, changes, metadata) do
# Increment view count when a page is viewed
if metadata.table_name == "page_views" do
%{
action: :function,
index_name: "pages",
filter: "id = #{record["page_id"]}",
function: "doc.view_count += 1"
}
# Update inventory count for products
elsif metadata.table_name == "inventory_updates" do
%{
action: :function,
index_name: "products",
filter: "sku = '#{record["sku"]}'",
function: "doc.stock_quantity += context.quantity_change",
context: %{quantity_change: record["quantity_change"]}
}
else
# Default indexing behavior
%{index_name: metadata.table_name, action: "index"}
end
end
```

When not using a routing function, documents will be written to the index specified in the sink configuration.

## Error handling
Expand Down
86 changes: 64 additions & 22 deletions lib/sequin/runtime/meilisearch_pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,43 +23,40 @@ defmodule Sequin.Runtime.MeilisearchPipeline do
concurrency: concurrency,
batch_size: consumer.sink.batch_size || 100,
batch_timeout: 5
],
delete: [
concurrency: concurrency,
batch_size: consumer.sink.batch_size || 100,
batch_timeout: 5
]
]
end

@impl SinkPipeline
def handle_message(message, context) do
%Routing.Consumers.Meilisearch{action: action, index_name: index_name} =
Routing.route_message(context.consumer, message.data)

batcher =
case action do
:index -> :default
:delete -> :delete
routing_info = Routing.route_message(context.consumer, message.data)
%Routing.Consumers.Meilisearch{action: action, index_name: index_name} = routing_info

action =
if is_atom(action) do
# Backwards compatibility with existing routing functions with atom actions
to_string(action)
else
action
end

message =
message
|> Broadway.Message.put_batcher(batcher)
|> Broadway.Message.put_batch_key(index_name)
|> Broadway.Message.put_batch_key({action, index_name})
|> Broadway.Message.update_data(fn data ->
Map.put(data, :routing_info, routing_info)
end)

{:ok, message, context}
end

@impl SinkPipeline
def handle_batch(:default, messages, batch_info, context) do
def handle_batch(:default, messages, %{batch_key: {"index", index_name}}, context) do
%{
consumer: %SinkConsumer{sink: sink, transform: transform} = consumer,
test_pid: test_pid
} = context

index_name = batch_info.batch_key

setup_allowances(test_pid)

records =
Expand All @@ -70,7 +67,7 @@ defmodule Sequin.Runtime.MeilisearchPipeline do
end)

case Client.import_documents(sink, index_name, records) do
{:ok} ->
:ok ->
Trace.info(consumer.id, %Trace.Event{
message: "Imported documents to \"#{index_name}\" index"
})
Expand All @@ -88,21 +85,19 @@ defmodule Sequin.Runtime.MeilisearchPipeline do
end

@impl SinkPipeline
def handle_batch(:delete, messages, batch_info, context) do
def handle_batch(:default, messages, %{batch_key: {"delete", index_name}}, context) do
%{
consumer: %SinkConsumer{sink: sink} = consumer,
test_pid: test_pid
} = context

index_name = batch_info.batch_key

setup_allowances(test_pid)

document_ids =
Enum.flat_map(messages, fn %{data: message} -> message.record_pks end)

case Client.delete_documents(sink, index_name, document_ids) do
{:ok} ->
:ok ->
Trace.info(consumer.id, %Trace.Event{
message: "Deleted documents from \"#{index_name}\" index",
extra: %{document_ids: document_ids}
Expand All @@ -121,6 +116,53 @@ defmodule Sequin.Runtime.MeilisearchPipeline do
end
end

@impl SinkPipeline
def handle_batch(:default, messages, %{batch_key: {"function", index_name}}, context) do
%{
consumer: %SinkConsumer{sink: sink} = consumer,
test_pid: test_pid
} = context

setup_allowances(test_pid)

# Process each message individually
results =
Enum.map(messages, fn message ->
%{routing_info: routing_info} = message.data

case Client.update_documents_with_function(
sink,
index_name,
routing_info.filter,
routing_info.function,
routing_info.context || %{}
) do
:ok ->
Trace.info(consumer.id, %Trace.Event{
message: "Applied function update to \"#{index_name}\" index",
extra: %{filter: routing_info.filter, function: routing_info.function}
})

:ok

{:error, error} ->
Trace.error(consumer.id, %Trace.Event{
message: "Failed to apply function update to \"#{index_name}\" index",
error: error,
extra: %{filter: routing_info.filter, function: routing_info.function}
})

{:error, error}
end
end)

# If any update failed, return the first error
case Enum.find(results, &match?({:error, _}, &1)) do
{:error, error} -> {:error, error}
nil -> {:ok, messages, context}
end
end

# Helper functions

# If the consumer does not have a transform, attempt to fill in missing primary key
Expand Down
33 changes: 25 additions & 8 deletions lib/sequin/runtime/routing/consumers/meilisearch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,47 @@ defmodule Sequin.Runtime.Routing.Consumers.Meilisearch do
use Sequin.Runtime.Routing.RoutedConsumer

@primary_key false
@derive {Jason.Encoder, only: [:index_name]}
@derive {Jason.Encoder, only: [:index_name, :filter, :function, :context]}
typed_embedded_schema do
field :action, Ecto.Enum, values: [:index, :delete]
field :action, Ecto.Enum, values: [:index, :delete, :function]
field :index_name, :string
field :filter, :string
field :function, :string
field :context, :map
end

def changeset(struct, params) do
allowed_keys = [:action, :index_name]
allowed_keys = [:action, :index_name, :filter, :function, :context]

struct
|> cast(params, allowed_keys, empty_values: [])
|> Routing.Helpers.validate_no_extra_keys(params, allowed_keys)
|> validate_required([:action, :index_name])
|> validate_inclusion(:action, [:index, :delete])
|> validate_inclusion(:action, [:index, :delete, :function])
|> validate_length(:index_name, min: 1, max: 1024)
|> validate_function_fields()
end

defp validate_function_fields(changeset) do
action = get_field(changeset, :action)

if action == :function do
changeset
|> validate_required([:filter, :function])
|> validate_length(:filter, min: 1, max: 10_000)
|> validate_length(:function, min: 1, max: 10_000)
else
changeset
end
end

def route(action, _record, _changes, _metadata) do
meilisearch_action =
case action do
"insert" -> :index
"update" -> :index
"delete" -> :delete
"read" -> :index
"insert" -> "index"
"update" -> "index"
"delete" -> "delete"
"read" -> "index"
end

%{action: meilisearch_action}
Expand Down
Loading
Loading