Skip to content

Commit 24a6337

Browse files
committed
feat: update featurs
1 parent 94d9900 commit 24a6337

File tree

5 files changed

+71
-6
lines changed

5 files changed

+71
-6
lines changed

drive_flow/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from .core import EventEngineCls
2-
from .types import EventInput
2+
from .types import EventInput, ReturnBehavior
33

4-
__version__ = "0.0.1.alpha3"
4+
__version__ = "0.0.1.alpha4"
55
__author__ = "Jianbai Ye"
66
__url__ = "https://github.com/memodb-io/drive-flow"
77

drive_flow/types.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class ReturnBehavior(Enum):
1616
DISPATCH = "dispatch"
1717
GOTO = "goto"
1818
ABORT = "abort"
19+
INPUT = "input"
1920

2021

2122
class TaskStatus(Enum):
@@ -44,6 +45,12 @@ class EventGroupInput:
4445
class EventInput(EventGroupInput):
4546
task_id: str = field(default_factory=generate_uuid)
4647

48+
@classmethod
49+
def from_input(cls: "EventInput", input_data: dict[str, Any]) -> "EventInput":
50+
return cls(
51+
group_name="user_input", results=input_data, behavior=ReturnBehavior.INPUT
52+
)
53+
4754

4855
@dataclass
4956
class _SpecialEventReturn:

examples/6_llm_agent_ReAct.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,45 @@
1-
# TODO
1+
import asyncio
2+
from openai import AsyncOpenAI
3+
from drive_flow import default_drive, EventInput, ReturnBehavior
4+
from drive_flow.dynamic import goto_events, abort_this
5+
6+
openai_client = AsyncOpenAI()
7+
8+
9+
@default_drive.make_event
10+
async def plan(event: EventInput, global_ctx):
11+
print("Planning")
12+
if event.behavior == ReturnBehavior.INPUT:
13+
query = event.results["query"]
14+
global_ctx["user_query"] = query
15+
16+
17+
@default_drive.listen_group([plan])
18+
async def action(event: EventInput, global_ctx):
19+
print("Executing")
20+
21+
22+
@default_drive.listen_group([action])
23+
async def observate(event: EventInput, global_ctx):
24+
print("Observing")
25+
26+
27+
# make it a loop
28+
plan = default_drive.listen_group([observate])(plan)
29+
30+
if __name__ == "__main__":
31+
question = "What is the answer to the ultimate question of life, the universe, and everything?"
32+
storage_results = {}
33+
print(observate.debug_string())
34+
# asyncio.run(
35+
# default_drive.invoke_event(
36+
# plan,
37+
# event_input=EventInput.from_input({"query": question}),
38+
# global_ctx=storage_results,
39+
# )
40+
# )
41+
42+
# if "answer" not in storage_results:
43+
# print(f"Failed to get answer {question}")
44+
# exit(1)
45+
# print(storage_results)

readme.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,15 @@ async def hello(event: EventInput, global_ctx):
6262
async def world(event: EventInput, global_ctx):
6363
print("world")
6464

65+
# display the dependencies of 'world' event
66+
print(world.debug_string())
6567
asyncio.run(default_drive.invoke_event(hello))
6668
```
6769

6870
In this example, The return of `hello` event will trigger `world` event.
6971

72+
### Break-down
73+
7074
To make an event function, there are few elements:
7175

7276
* Input Signature: must be `(event: EventInput, global_ctx)`. `EventInput` is the returns of the listening groups. `global_ctx` is set by you when invoking events, it can be anything and default to `None`.
@@ -116,9 +120,13 @@ async def adding(event: EventInput, global_ctx):
116120
results = asyncio.run(default_drive.invoke_event(start))
117121
assert results[adding.id] == 3
118122
```
123+
124+
`adding` will be triggered at first time as long as `hello` and `world` are done.
119125
</details>
120126

121-
`drive_flow` suppports different behaviors for multi-event triggering:
127+
#### Re-trigger the event
128+
129+
`drive_flow` suppports different behaviors for multi-event retriggering:
122130

123131
- `all`: retrigger this event only when all the listening events are updated.
124132
- `any`: retrigger this event as long as one of the listening events is updated.
@@ -150,7 +158,6 @@ async def world(event: EventInput, global_ctx):
150158
await asyncio.sleep(0.2)
151159
print(datetime.now(), "world done")
152160

153-
154161
asyncio.run(default_drive.invoke_event(start))
155162
```
156163

tests/test_types.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1-
from drive_flow.types import BaseEvent, EventGroup
1+
from drive_flow.types import BaseEvent, EventGroup, EventInput, ReturnBehavior
2+
3+
4+
def test_user_input():
5+
fake_input = {"query": "Hello World"}
6+
a = EventInput.from_input(fake_input)
7+
assert a.results == fake_input
8+
assert a.behavior == ReturnBehavior.INPUT
29

310

411
def test_node_hash():

0 commit comments

Comments
 (0)