Skip to content

Commit 1f75464

Browse files
authored
Feat: separate tool_call_item and tool_call_output_item in stream events (#974)
### Summary During my analysis of the streaming API (run_streamed) results, I noticed that tool_call_item and tool_call_output_item events are currently being emitted concurrently upon tool call completion (as evidenced in [ #831](#831)). This implementation conflates what should logically be distinct events. The current PR addresses this by properly separating these event triggers to better reflect the actual workflow. ### Test plan The test file is created in `tests` named `test_stream_events.py`.Run the test script below to test. ```bash pytest -s test_stream_events.py ``` The test result is: ```text ======================================================================== test session starts ======================================================================== platform win32 -- Python 3.12.10, pytest-8.3.5, pluggy-1.5.0 rootdir: D:\moon\projects\openai-agents-python configfile: pyproject.toml plugins: anyio-4.9.0, inline-snapshot-0.22.3, asyncio-0.26.0, mock-3.14.0 asyncio: mode=Mode.AUTO, asyncio_default_fixture_loop_scope=session, asyncio_default_test_loop_scope=function collected 1 item tests\test_stream_events.py === Run starting === Agent updated: Joker -- Message output: a_message -- Tool was called at 1751271106971851300 -- Tool output: success! at 1751271109987313900 -- Message output: done === Run complete === . ========================================================================= 1 passed in 3.07s ========================================================================= ``` ### Issue number [ #831](#831) ### Checks - [x] I've added new tests (if relevant) - [x] I've run `make lint` and `make format` - [x] I've made sure tests pass
1 parent fb68e77 commit 1f75464

File tree

4 files changed

+1943
-1835
lines changed

4 files changed

+1943
-1835
lines changed

src/agents/_run_impl.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -914,12 +914,12 @@ async def run_single_output_guardrail(
914914
return result
915915

916916
@classmethod
917-
def stream_step_result_to_queue(
917+
def stream_step_items_to_queue(
918918
cls,
919-
step_result: SingleStepResult,
919+
new_step_items: list[RunItem],
920920
queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel],
921921
):
922-
for item in step_result.new_step_items:
922+
for item in new_step_items:
923923
if isinstance(item, MessageOutputItem):
924924
event = RunItemStreamEvent(item=item, name="message_output_created")
925925
elif isinstance(item, HandoffCallItem):
@@ -944,6 +944,14 @@ def stream_step_result_to_queue(
944944
if event:
945945
queue.put_nowait(event)
946946

947+
@classmethod
948+
def stream_step_result_to_queue(
949+
cls,
950+
step_result: SingleStepResult,
951+
queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel],
952+
):
953+
cls.stream_step_items_to_queue(step_result.new_step_items, queue)
954+
947955
@classmethod
948956
async def _check_for_final_output_from_tools(
949957
cls,

src/agents/run.py

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -904,10 +904,9 @@ async def _run_single_turn_streamed(
904904
raise ModelBehaviorError("Model did not produce a final response!")
905905

906906
# 3. Now, we can process the turn as we do in the non-streaming case
907-
single_step_result = await cls._get_single_step_result_from_response(
907+
return await cls._get_single_step_result_from_streamed_response(
908908
agent=agent,
909-
original_input=streamed_result.input,
910-
pre_step_items=streamed_result.new_items,
909+
streamed_result=streamed_result,
911910
new_response=final_response,
912911
output_schema=output_schema,
913912
all_tools=all_tools,
@@ -918,9 +917,6 @@ async def _run_single_turn_streamed(
918917
tool_use_tracker=tool_use_tracker,
919918
)
920919

921-
RunImpl.stream_step_result_to_queue(single_step_result, streamed_result._event_queue)
922-
return single_step_result
923-
924920
@classmethod
925921
async def _run_single_turn(
926922
cls,
@@ -1023,6 +1019,57 @@ async def _get_single_step_result_from_response(
10231019
run_config=run_config,
10241020
)
10251021

1022+
@classmethod
1023+
async def _get_single_step_result_from_streamed_response(
1024+
cls,
1025+
*,
1026+
agent: Agent[TContext],
1027+
all_tools: list[Tool],
1028+
streamed_result: RunResultStreaming,
1029+
new_response: ModelResponse,
1030+
output_schema: AgentOutputSchemaBase | None,
1031+
handoffs: list[Handoff],
1032+
hooks: RunHooks[TContext],
1033+
context_wrapper: RunContextWrapper[TContext],
1034+
run_config: RunConfig,
1035+
tool_use_tracker: AgentToolUseTracker,
1036+
) -> SingleStepResult:
1037+
1038+
original_input = streamed_result.input
1039+
pre_step_items = streamed_result.new_items
1040+
event_queue = streamed_result._event_queue
1041+
1042+
processed_response = RunImpl.process_model_response(
1043+
agent=agent,
1044+
all_tools=all_tools,
1045+
response=new_response,
1046+
output_schema=output_schema,
1047+
handoffs=handoffs,
1048+
)
1049+
new_items_processed_response = processed_response.new_items
1050+
tool_use_tracker.add_tool_use(agent, processed_response.tools_used)
1051+
RunImpl.stream_step_items_to_queue(new_items_processed_response, event_queue)
1052+
1053+
single_step_result = await RunImpl.execute_tools_and_side_effects(
1054+
agent=agent,
1055+
original_input=original_input,
1056+
pre_step_items=pre_step_items,
1057+
new_response=new_response,
1058+
processed_response=processed_response,
1059+
output_schema=output_schema,
1060+
hooks=hooks,
1061+
context_wrapper=context_wrapper,
1062+
run_config=run_config,
1063+
)
1064+
new_step_items = [
1065+
item
1066+
for item in single_step_result.new_step_items
1067+
if item not in new_items_processed_response
1068+
]
1069+
RunImpl.stream_step_items_to_queue(new_step_items, event_queue)
1070+
1071+
return single_step_result
1072+
10261073
@classmethod
10271074
async def _run_input_guardrails(
10281075
cls,

tests/test_stream_events.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import asyncio
2+
import time
3+
4+
import pytest
5+
6+
from agents import Agent, Runner, function_tool
7+
8+
from .fake_model import FakeModel
9+
from .test_responses import get_function_tool_call, get_text_message
10+
11+
12+
@function_tool
13+
async def foo() -> str:
14+
await asyncio.sleep(3)
15+
return "success!"
16+
17+
@pytest.mark.asyncio
18+
async def test_stream_events_main():
19+
model = FakeModel()
20+
agent = Agent(
21+
name="Joker",
22+
model=model,
23+
tools=[foo],
24+
)
25+
26+
model.add_multiple_turn_outputs(
27+
[
28+
# First turn: a message and tool call
29+
[
30+
get_text_message("a_message"),
31+
get_function_tool_call("foo", ""),
32+
],
33+
# Second turn: text message
34+
[get_text_message("done")],
35+
]
36+
)
37+
38+
result = Runner.run_streamed(
39+
agent,
40+
input="Hello",
41+
)
42+
tool_call_start_time = -1
43+
tool_call_end_time = -1
44+
async for event in result.stream_events():
45+
if event.type == "run_item_stream_event":
46+
if event.item.type == "tool_call_item":
47+
tool_call_start_time = time.time_ns()
48+
elif event.item.type == "tool_call_output_item":
49+
tool_call_end_time = time.time_ns()
50+
51+
assert tool_call_start_time > 0, "tool_call_item was not observed"
52+
assert tool_call_end_time > 0, "tool_call_output_item was not observed"
53+
assert tool_call_start_time < tool_call_end_time, "Tool call ended before or equals it started?"

0 commit comments

Comments
 (0)