|
| 1 | +--- |
| 2 | +title: Manage Flows Dynamically |
| 3 | +description: "Learn how to dynamically manage multiple flow instances in CocoIndex. Create parameterized data indexing pipelines, handle persistent resources, perform updates, and implement memory-efficient caching for scalable applications." |
| 4 | +--- |
| 5 | + |
| 6 | +# Manage Flows Dynamically |
| 7 | + |
| 8 | +You write a function, a.k.a. *flow definition*, to define indexing logic. |
| 9 | +Sometimes you want to reuse the same flow definition for multiple *flow instances* (a.k.a. *flow*), e.g. each takes input from different sources, exports to different targets, and even with slightly different parameters for transformation logic. |
| 10 | + |
| 11 | +## States of a flow instance |
| 12 | + |
| 13 | +A flow instance has states from two aspects: |
| 14 | + |
| 15 | +* *In-process object*, of type `cocoindex.Flow`. |
| 16 | +* *Persistent resource*, including states in the [internal storage](/docs/core/basics#internal-storage) and backend resources that are owned by the flow instance. |
| 17 | + |
| 18 | +A flow instance is ultimately a persistent resource. Its in-process object is a handle to operate on it. Consider file handles and database connections. |
| 19 | +CocoIndex provides APIs to *open* and *close* flow instances, and *setup* and *drop* the persistent resource. |
| 20 | + |
| 21 | +## Parameterize the flow definition |
| 22 | + |
| 23 | +In the example from the [Quickstart Guide](/docs/getting_started/quickstart), we decorate the flow definition function with a `@cocoindex.flow_def(name="DemoFlow")` decorator: |
| 24 | + |
| 25 | +```python title="Example in Quickstart Guide" |
| 26 | +@cocoindex.flow_def(name="TextEmbedding") |
| 27 | +def text_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): |
| 28 | + ... |
| 29 | +``` |
| 30 | + |
| 31 | +This immediately creates the in-process object of the flow instance, using the given function as the flow definition. |
| 32 | +This is a shortcut of: |
| 33 | + |
| 34 | +```python |
| 35 | +def _text_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): |
| 36 | + ... |
| 37 | + |
| 38 | +text_embedding_flow = cocoindex.open_flow("TextEmbedding", _text_embedding_flow) |
| 39 | +``` |
| 40 | + |
| 41 | +Here, `cocoindex.open_flow()` is the function that creates the in-process object of the flow instance, with the given name and flow definition function. |
| 42 | +You can directly call it dynamically with flow name created programmatically. |
| 43 | + |
| 44 | +Oftentimes, you also want to parameterize the flow definition function. |
| 45 | +For example, we may have a dataclass like this to hold the parameters of the flow: |
| 46 | + |
| 47 | +```python |
| 48 | +@dataclass |
| 49 | +class TextEmbeddingFlowParameters: |
| 50 | + source_path: str |
| 51 | + target_table_name: str |
| 52 | +``` |
| 53 | + |
| 54 | +And consider we have a registry of parameters for all flow instances somewhere. |
| 55 | +For simplicity, we use a hardcoded `dict` here, and provide a simple function to get the parameters for a given flow name. |
| 56 | +In reality, the source of truth may come from a configuration file, a database, etc., and the function can be replaced by your own implementation. |
| 57 | + |
| 58 | +```python |
| 59 | +FLOW_PARAMETERS: dict[str, TextEmbeddingFlowParameters] = { |
| 60 | + "foo": TextEmbeddingFlowParameters(source_path="/path/to/foo", target_table_name="foo_embeddings"), |
| 61 | + "bar": TextEmbeddingFlowParameters(source_path="/path/to/bar", target_table_name="bar_embeddings"), |
| 62 | +} |
| 63 | + |
| 64 | +def get_flow_parameters(name: str) -> TextEmbeddingFlowParameters: |
| 65 | + return FLOW_PARAMETERS[name] |
| 66 | +``` |
| 67 | + |
| 68 | +Then you can have a function that returns the flow definition function for the given parameters: |
| 69 | + |
| 70 | +```python |
| 71 | +def text_embedding_flow_def(params: TextEmbeddingFlowParameters): |
| 72 | + def _flow_def(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): |
| 73 | + # Add a data source to read files from the specified directory |
| 74 | + data_scope["documents"] = flow_builder.add_source( |
| 75 | + cocoindex.sources.LocalFile(path=params.source_path)) |
| 76 | + |
| 77 | + doc_embeddings = data_scope.add_collector() |
| 78 | + ... |
| 79 | + |
| 80 | + # Export the collected data to a Postgres table, with the specified table name. |
| 81 | + doc_embeddings.export( |
| 82 | + "doc_embeddings", |
| 83 | + cocoindex.targets.Postgres(table_name=params.target_table_name), |
| 84 | + primary_key_fields=["filename", "location"], |
| 85 | + ) |
| 86 | + |
| 87 | + return _flow_def |
| 88 | +``` |
| 89 | + |
| 90 | +With this, you can open flow instances dynamically with its parameters: |
| 91 | + |
| 92 | +```python |
| 93 | +text_embedding_flows: dict[str, cocoindex.Flow] = {} |
| 94 | + |
| 95 | +def get_text_embedding_flow(name: str) -> cocoindex.Flow: |
| 96 | + flow = text_embedding_flows.get(name) |
| 97 | + |
| 98 | + if flow is None: |
| 99 | + params = get_flow_parameters(name) |
| 100 | + flow = text_embedding_flows[name] = cocoindex.open_flow(f"TextEmbedding_{name}", text_embedding_flow_def(params)) |
| 101 | + |
| 102 | + return flow |
| 103 | +``` |
| 104 | + |
| 105 | +## Operate on the flow instances |
| 106 | + |
| 107 | +### Setup the persistent resource |
| 108 | + |
| 109 | +After you instantiated and open flow instances dynamically, before you can perform any data updates, you need to make sure the persistent resource is ready. |
| 110 | +You can use the `setup()` method, e.g. modify the above code to: |
| 111 | + |
| 112 | +```python |
| 113 | +text_embedding_flows: dict[str, cocoindex.Flow] = {} |
| 114 | + |
| 115 | +def get_text_embedding_flow(name: str) -> cocoindex.Flow: |
| 116 | + flow = text_embedding_flows.get(name) |
| 117 | + |
| 118 | + if flow is None: |
| 119 | + params = get_flow_parameters(name) |
| 120 | + flow = text_embedding_flows[name] = cocoindex.open_flow(f"TextEmbedding_{name}", text_embedding_flow_def(params)) |
| 121 | + flow.setup(report_to_stdout=True) |
| 122 | + |
| 123 | + return flow |
| 124 | +``` |
| 125 | + |
| 126 | +`setup()` method synchronizes the persistent resource to a state that is consistent with the in-process object. For example, |
| 127 | +* If the persistent resource is not there yet, it will create the backend resources for new targets. |
| 128 | +* If your flow definition changed and a new target has been added since the last time of setup, it will create the backend resources for new targets. |
| 129 | +* If an existing target is removed from the flow definition, it will drop the backend resources for the removed target. |
| 130 | +* If nothing changed since the last time of setup, it will be a no-op. i.e. the `setup()` method is idempotent. |
| 131 | + |
| 132 | +`setup()` takes a `report_to_stdout` parameter to control whether to print the setup progress to the standard output. |
| 133 | + |
| 134 | +`setup()` takes care of all scenarios and makes sure the persistent resource is in the right state. |
| 135 | +It's generally safe to call it after you open a flow instance, even if you don't know whether the persistent resource already exists. |
| 136 | + |
| 137 | + |
| 138 | +### Perform data updates |
| 139 | + |
| 140 | +After you make sure the persistent resource is ready, you can perform data updates using the flow. |
| 141 | + |
| 142 | +The `update()` method updates the target defined by the flow. |
| 143 | + |
| 144 | +```python |
| 145 | +flow.update() |
| 146 | +``` |
| 147 | + |
| 148 | +This performs a one-time data update. After the function returns, the target is up-to-date as of the moment when the function is called. For example, we can call `update()` to update the target after the flow is setup: |
| 149 | + |
| 150 | +```python |
| 151 | +def update_text_embedding_index(name: str): |
| 152 | + flow = get_text_embedding_flow(name) |
| 153 | + flow.update() |
| 154 | +``` |
| 155 | + |
| 156 | +You can also do a live update. |
| 157 | +See the [Live Updates](/docs/tutorials/live_updates) tutorial for more details. |
| 158 | + |
| 159 | + |
| 160 | +### Close the flow object |
| 161 | + |
| 162 | +Sometimes you don't want to hold the in-process object forever. |
| 163 | +You can free up the memory resources by closing the flow instances with the `close()` method. |
| 164 | + |
| 165 | +For example, the `dict` we managed above behaves like a cache to hold the flow instances. |
| 166 | +If a specific flow isn't used for a while, we may close it. |
| 167 | +The `TTLCache` from [`cachetools`](https://pypi.org/project/cachetools/) package provides exactly this functionality. |
| 168 | +We can rewrite the above code a little bit. |
| 169 | +First, we bring in necessary imports: |
| 170 | + |
| 171 | +```python |
| 172 | +from cachetools import cached, TTLCache |
| 173 | +``` |
| 174 | + |
| 175 | +Then we define our own version of `TTLCache` to make it call the `close()` method when the flow instance is evicted from the cache: |
| 176 | + |
| 177 | +```python |
| 178 | +class MyTTLCache(TTLCache): |
| 179 | + def popitem(self): |
| 180 | + # Close the flow instance when it is evicted from the cache |
| 181 | + key, flow = super().popitem() |
| 182 | + flow.close() |
| 183 | + return key, flow |
| 184 | +``` |
| 185 | + |
| 186 | +With this, we can modify our `get_text_embedding_flow()` function to use `MyTTLCache` to cache the flow instances, instead of managing our own `dict`: |
| 187 | + |
| 188 | +```python |
| 189 | +@cached(cache=MyTTLCache(maxsize=20, ttl=600)) |
| 190 | +def get_text_embedding_flow(name: str) -> cocoindex.Flow: |
| 191 | + params = get_flow_parameters(name) |
| 192 | + flow = cocoindex.open_flow(f"TextEmbedding_{name}", text_embedding_flow_def(params)) |
| 193 | + flow.setup(report_to_stdout=True) |
| 194 | + return flow |
| 195 | +``` |
| 196 | + |
| 197 | +The `@cached()` decorator from `cachetools` package automatically manages the cache for us (and it also offers thread safety!). |
| 198 | +Once a flow is not touched for 10 minutes, it will call the `popitem()` method, which will close the in-memory flow object. |
| 199 | + |
| 200 | + |
| 201 | +### Drop the persistent resource |
| 202 | + |
| 203 | +Occasionally, you may want to drop the persistent resource of a flow. |
| 204 | +The `drop()` method is for this purpose. |
| 205 | + |
| 206 | +```python |
| 207 | +def drop_text_embedding_index(name: str): |
| 208 | + flow = get_text_embedding_flow(name) |
| 209 | + flow.drop() |
| 210 | +``` |
| 211 | + |
| 212 | +This will drop the persistent resource of the flow. |
| 213 | +The in-memory flow object is still alive, and can be reused until it's closed. |
| 214 | +For example, you can still call `setup()` again. |
| 215 | + |
| 216 | +## Put it all together |
| 217 | + |
| 218 | +```python |
| 219 | +import cocoindex |
| 220 | +from cachetools import cached, TTLCache |
| 221 | +from dataclasses import dataclass |
| 222 | + |
| 223 | +@dataclass |
| 224 | +class TextEmbeddingFlowParameters: |
| 225 | + source_path: str |
| 226 | + target_table_name: str |
| 227 | + |
| 228 | +FLOW_PARAMETERS: dict[str, TextEmbeddingFlowParameters] = { |
| 229 | + "foo": TextEmbeddingFlowParameters(source_path="/path/to/foo", target_table_name="foo_embeddings"), |
| 230 | + "bar": TextEmbeddingFlowParameters(source_path="/path/to/bar", target_table_name="bar_embeddings"), |
| 231 | +} |
| 232 | + |
| 233 | +# Placeholder to get the parameters for a given flow name. You can replace this with your own implementation. |
| 234 | +def get_flow_parameters(name: str) -> TextEmbeddingFlowParameters: |
| 235 | + return FLOW_PARAMETERS[name] |
| 236 | + |
| 237 | + |
| 238 | +def text_embedding_flow_def(params: TextEmbeddingFlowParameters): |
| 239 | + def _flow_def(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): |
| 240 | + # Add a data source to read files from a directory |
| 241 | + data_scope["documents"] = flow_builder.add_source( |
| 242 | + cocoindex.sources.LocalFile(path=params.source_path)) |
| 243 | + |
| 244 | + doc_embeddings = data_scope.add_collector() |
| 245 | + ... |
| 246 | + |
| 247 | + # Export the collected data to a Postgres table, with the specified table name. |
| 248 | + doc_embeddings.export( |
| 249 | + "doc_embeddings", |
| 250 | + cocoindex.targets.Postgres(table_name=params.target_table_name), |
| 251 | + primary_key_fields=["filename", "location"], |
| 252 | + ) |
| 253 | + |
| 254 | + return _flow_def |
| 255 | + |
| 256 | +class MyTTLCache(TTLCache): |
| 257 | + def popitem(self): |
| 258 | + # Close the flow instance when it is evicted from the cache |
| 259 | + key, flow = super().popitem() |
| 260 | + flow.close() |
| 261 | + return key, flow |
| 262 | + |
| 263 | +@cached(cache=MyTTLCache(maxsize=20, ttl=600)) |
| 264 | +def get_text_embedding_flow(name: str) -> cocoindex.Flow: |
| 265 | + params = get_flow_parameters(name) |
| 266 | + flow = cocoindex.open_flow(f"TextEmbedding_{name}", text_embedding_flow_def(params)) |
| 267 | + flow.setup(report_to_stdout=True) |
| 268 | + return flow |
| 269 | + |
| 270 | +def update_text_embedding_index(name: str): |
| 271 | + flow = get_text_embedding_flow(name) |
| 272 | + flow.update() |
| 273 | + |
| 274 | +def drop_text_embedding_index(name: str): |
| 275 | + flow = get_text_embedding_flow(name) |
| 276 | + flow.drop() |
| 277 | +``` |
| 278 | + |
| 279 | +This provides a skeleton. |
| 280 | +With this, you can trigger `update_text_embedding_index()` and `drop_text_embedding_index()` from your application, e.g. a web server API. |
| 281 | + |
| 282 | +## Takeaways |
| 283 | + |
| 284 | +From this tutorial, we walked through major flow management / operation APIs provided by CocoIndex. These APIs can be categorized into three aspects: |
| 285 | + |
| 286 | +| Aspect | APIs | Description | |
| 287 | +|--------|------|-------------| |
| 288 | +| Life of in-process flow object | `open_flow()`, `Flow.close()` | Create and destroy the in-memory handle to operate on flow instances | |
| 289 | +| Life of persistent resource | `Flow.setup()`, `Flow.drop()` | Create and destroy the backend resources and internal storage | |
| 290 | +| Data updates | `Flow.update()`, `FlowLiveUpdater` | Execute the indexing logic to update targets. *Requires persistent resource to be up-to-date first.* | |
| 291 | + |
| 292 | + |
| 293 | +For simplicity, we use an in-memory `dict` as source of truth for the flow parameters. |
| 294 | +You can replace it with your own mechanism, e.g. table from a database, a configuration file, etc. |
| 295 | +You can trigger these APIs from your applications specific to your use case, e.g. from a specific API endpoint of a web server. |
| 296 | + |
| 297 | +## Further readings |
| 298 | + |
| 299 | +You can see the following documents for more details: |
| 300 | + |
| 301 | +* [CocoIndex Flow Definition: Entry Point](/docs/core/flow_def#entry-point) |
| 302 | +* [Operate a Flow](/docs/core/flow_methods) |
0 commit comments