-
Notifications
You must be signed in to change notification settings - Fork 418
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: sync chat_ctx for openai RealtimeModel from and to the remote realtime session #1015
feat: sync chat_ctx for openai RealtimeModel from and to the remote realtime session #1015
Conversation
🦋 Changeset detectedLatest commit: dd74f0f The changes in this PR will be included in the next version bump. This PR includes changesets to release 2 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
c23d7a9
to
03fd0ee
Compare
Some known issues to be solved:
|
For this issue, could we address by ignoring user input during the sync? how long does that take typically in your tests? |
Yes, I think so. The sync takes just 1-2 seconds on my end. @self._session.on("input_speech_started")
def _input_speech_started():
self.emit("user_started_speaking")
self._update_state("listening")
if self._playing_handle is not None and not self._playing_handle.done():
self._playing_handle.interrupt()
self._session.conversation.item.truncate(
item_id=self._playing_handle.item_id,
content_index=self._playing_handle.content_index,
audio_end_ms=int(self._playing_handle.audio_samples / 24000 * 1000),
) |
The first problem I found with the agent hanging was because the real-time API responded with text instead of audio in this case. In some cases the API will only output in text if a text chat context is set. Some messages I received when this happened:
I disabled the audio playout in (this commit) if the response is a text to avoid forever It seems that this is a common issue of the realtime API https://community.openai.com/t/realtime-api-no-response-audio-or-audio-deltas-despite-modalities-being-set-to-audio-text/991062 |
@@ -77,9 +94,19 @@ async def get_weather( | |||
), | |||
), | |||
fnc_ctx=fnc_ctx, | |||
chat_ctx=chat_ctx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks nice!
tests/test_message_change.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙌
@@ -767,6 +798,35 @@ def session_update( | |||
} | |||
) | |||
|
|||
def _sync_chat_ctx_to_session( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think this can be an async function that waits for the OAI answers?
We're currently emitting conversation_item_deleted
and conversation_item_created
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess conversation.item.create
needs to be async?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a async version of item create.
@chat_ctx.setter | ||
def chat_ctx(self, chat_ctx: llm.ChatContext) -> None: | ||
"""Sync the ctx to the session and reset the chat context.""" | ||
self._sync_chat_ctx_to_session(self._chat_ctx, chat_ctx) | ||
self._chat_ctx = chat_ctx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should add a setter. Instead we should have a public sync_ctx
function.
I guess RealtimeModel will needs to have 2 llm.ChatContext (one with the current server states, and one that the user can edit)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I am thinking something similar that we should keep a "copy" of server state in the RealtimeModel that user cannot edit. I'll add that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created a _conversation_items
(maybe give it another name later) in the realtime session to track the item created and deleted events from the API. The content of the item like user and agent transcriptions and the function call outputs will be updated to the item from the multimodal agent.
The _session.chat_ctx
will be created from the tracked conversation items and user can sync a chat_ctx
to the session as well.
The chat_ctx does support function tools messages. E.g on how we handle it for the PipelineAgent |
Not sure to understand what you mean by this? Is there no audio being generated? |
So you mean to append the function tool messages to the chat_ctx, right? That makes sense. |
No, there is no audio generated in that case, the API response in
The main branch currently doesn't check the part type of the But the main problem is how to make sure the API always returns audio instead of text. |
if the API doesn't return audio for any reason, can make it so the agent doesn't wait for playout? |
Yes, I already added a check in this commit that it will skip the playout if it's a text response. But from what I saw, if the realtime API responded in text at the start it will always response in text in the session. Here are some very recent discussions about this issue on OAI's community, I'll try to find if there is any workround How can I switch from text generation to audio generation? - API - OpenAI Developer Forum |
_next: Optional[ConversationItem] = field(default=None, repr=False) | ||
|
||
|
||
class ConversationItems: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a double linked list to mimic the item operation in the Realtime API. Let me know if this is necessary or if there is any other tools for easy deleting and inserting items to the list. And this may need a rename to avoid confusion with the Conversation Items in the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is OK since this is internal. Maybe RemoteConversationItems
?
Let's prepend '_
' to the file name and classes to explicitly mark them as internal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sound good.
async def aitem_create( | ||
self, | ||
message: llm.ChatMessage, | ||
previous_item_id: str | None = None, | ||
_on_create_callback: Callable[[], None] | None = None, | ||
) -> None: | ||
fut = asyncio.Future[None]() | ||
self._item_created_futs[message.id] = fut | ||
self.conversation.item.create(message, previous_item_id) | ||
if _on_create_callback: | ||
_on_create_callback() | ||
await fut | ||
del self._item_created_futs[message.id] | ||
|
||
async def aitem_delete(self, item_id: str) -> None: | ||
fut = asyncio.Future[None]() | ||
self._item_deleted_futs[item_id] = fut | ||
self.conversation.item.delete(item_id) | ||
await fut | ||
del self._item_deleted_futs[item_id] | ||
|
||
async def async_chat_ctx(self, new_ctx: llm.ChatContext) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was more thinking about making the current API async. Feel free to break the API. The RealtimeModel API is still beta :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to make the self.conversation.item.create
and self.conversation.item.delete
be async?
Maybe I can move the acreate
and adelete
to the conversation.item
but keep the original version of create and delete, as for some cases like sync_chat_ctx
we may want to add all the create and delete messages to the event queue at once and wait at the end?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to make the self.conversation.item.create and self.conversation.item.delete be async?
Yep.
I don't think we need two functions (sync and async)
What we can do is to just use asyncio.gather when using sync_chat_ctx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is that when we call gather(item.acreate(item1), item.acreate(item2))
, is there a guarantee for the two items are pushed to the message queue in the correct order?
What do you think of an API looking like:
If you want to replace the whole chat_ctx we could do smthg like
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See message on Slack, otherwise lgtm!
The following is the current API, the user always needs to grab a copy of the current chat_ctx then modify it updated_ctx = model.chat_ctx # maybe make this a method to emphasize this returns a copy of the ctx
updated_ctx.append(...)
await model.sync_chat_ctx(updated_ctx) I think your version is more straightforward, but there might be a problem that the internal managed ctx and the user-modifiable ctx might be out of sync when user modified the latter without a immediate For example, if user call |
it'd be great to explicitly call out that you are getting a copy of the context.. i.e. ctx_copy = model.chat_ctx_copy()
...
await model.set_chat_ctx(ctx_copy) if we are simply replacing, instead of modifying in-place, then I think |
@davidzhao I have updated the API as you suggested. |
examples/multimodal_agent.py
Outdated
# Add some test context to verify if the sync_chat_ctx works | ||
# FIXME: OAI realtime API does not support this properly when the chat context is too many | ||
# It may answer with the text responses only for some cases |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this comment is now outdated?
if new_ctx.messages and all( | ||
isinstance(msg.content, str) for msg in new_ctx.messages | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you check if audio
is inside the modality
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to
if new_ctx.messages and not any(
isinstance(msg.content, llm.ChatAudio) for msg in new_ctx.messages
):
This makes sense, but it would be great to still allow in-place edit. Since ppl have access to the So adding |
Awesome! Only small nits related to the API are remaining. Can you also fix the type-check CI? |
|
||
from livekit.plugins.openai import realtime | ||
|
||
@self._session.on("response_content_added") | ||
def _on_content_added(message: realtime.RealtimeContent): | ||
if message.content_type == "text": | ||
logger.warning( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During my testing there is still a chance the agent response in text mode, should we make this an error log with more details, and emit an event so people can decide to restart the agent in their script?
Actually it has only |
2e1f551
to
eb8ac4e
Compare
c27ba3e
to
dd74f0f
Compare
Fixed. |
It'll more likely race when users edit in place while the realtime API may add to it. it can lead to a bit of unpredictability in terms of what is in the chat history. |
# Create a task to wait for initialization and start the main task | ||
async def _init_and_start(): | ||
try: | ||
await self._session._init_sync_task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weirdly, I was testing out this branch using the virtual assistant sandbox playground had an issue in which the agent wouldn't start, while waiting for this task to complete. Does that make sense at all? I removed it temporarily from my local venv
and the agent works fine, albiet after a 2 second waiting period to sync the initial context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you share the package and the version you installed with the issue? I can take a look and try to reproduce it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the latest commit actually helped fix the issue, apologies! Thanks for the tip that it could've been an issue with the commit I was on.
Stuff can't race if the whole "edit code" is sync and the first async fnc to be called is model.sync_chat_ctx() |
It's more that there may be content added to the context that that's unexpected by the dev. imagine the following:
the overall point is that if two separate processes could modify chat_context, the order and content of the message history would end up being unpredictable. |
Agree it could create more confusion. Let's see if ppl are asking for direct access. |
sync_chat_ctx
to sync the local chat context to OAI realtime session