Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ekzhu committed Oct 31, 2024
1 parent 6a222c9 commit 305ae5e
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ async def on_messages_stream(
self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[InnerMessage | ChatMessage | Response, None]:
"""Handles incoming messages and returns a stream of messages and
and the final item is the response."""
and the final item is the response. The base implementation in :class:`BaseChatAgent`
simply calls :meth:`on_messages` and yields the messages in the response."""
response = await self.on_messages(messages, cancellation_token)
for inner_message in response.inner_messages or []:
yield inner_message
Expand All @@ -67,3 +68,22 @@ async def run(
messages += response.inner_messages
messages.append(response.chat_message)
return TaskResult(messages=messages)

async def run_stream(
self,
task: str,
*,
cancellation_token: CancellationToken | None = None,
) -> AsyncGenerator[InnerMessage | ChatMessage | TaskResult, None]:
"""Run the agent with the given task and return a stream of messages
and the final task result as the last item in the stream."""
if cancellation_token is None:
cancellation_token = CancellationToken()
first_message = TextMessage(content=task, source="user")
messages: List[InnerMessage | ChatMessage] = [first_message]
async for message in self.on_messages_stream([first_message], cancellation_token):
if isinstance(message, Response):
yield TaskResult(messages=messages)
else:
messages.append(message)
yield message
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import AsyncIterator, Protocol, Sequence
from typing import AsyncGenerator, Protocol, Sequence

from autogen_core.base import CancellationToken

Expand Down Expand Up @@ -33,7 +33,7 @@ def run_stream(
task: str,
*,
cancellation_token: CancellationToken | None = None,
) -> AsyncIterator[InnerMessage | ChatMessage | TaskResult]:
) -> AsyncGenerator[InnerMessage | ChatMessage | TaskResult, None]:
"""Run the task and produces a stream of messages and the final result
as the last item in the stream."""
...
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import uuid
from abc import ABC, abstractmethod
from typing import Callable, List
from typing import AsyncGenerator, Callable, List

from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import (
Expand Down Expand Up @@ -169,3 +169,11 @@ async def collect_output_messages(

# Return the result.
return TaskResult(messages=output_messages)

async def run_stream(
self,
task: str,
*,
cancellation_token: CancellationToken | None = None,
) -> AsyncGenerator[InnerMessage | ChatMessage | TaskResult, None]:
pass

0 comments on commit 305ae5e

Please sign in to comment.