Streaming Support for Intermediate Nodes #358
-
Streaming Support For Intermediate NodesFirst of all, thank you for developing such an amazing framework! I’m currently working on a complex system using Burr, where actions (nodes) and their transitions (edges) are dynamically generated. This setup was working flawlessly when the system was a simple chatbot, but I’m now transitioning to a Voice Bot using VAPI, and I’m running into some challenges. Context:The goal is to have OpenAI-compatible event streams, as low latency is a priority for me. The current flow works like this:
Here's a simplified version of the initial code: while True:
_action, result, state = await app.arun(
halt_before=["process_user_query"], halt_after=[], inputs=inputs
)
if _action and _action.name == "process_user_query":
break
return result, state Transition to Streaming:When I implemented streaming, I modified the function as follows: async def get_response_stream(self, user: User, chat_id, query):
"""
Get response for a query
"""
app = await self.build_or_get_app(user, chat_id, sequence_number=None, stream=True)
inputs = {"query": query}
streaming_container = None
while True:
action, streaming_container = await app.astream_result(
halt_before=["process_user_query"], halt_after=[], inputs=inputs
)
async for result in streaming_container:
yield result, None
if action and action.name == "process_user_query":
break
result, state = await streaming_container.get()
yield result, state The Problem:Although I’ve switched my actions to Question:Is there a way to allow results to be streamed continuously while still processing actions before control is passed back to the |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 5 replies
-
I'm facing same issue and its stopping me to use burr for call applications |
Beta Was this translation helpful? Give feedback.
-
Hey @faisalrehmankhattak and @meeran03 -- thanks for the issue! I think there are a few issues here, but this is a supported use-case and we'll make sure we get to a solution. Two cases: (1) you want to use
|
Beta Was this translation helpful? Give feedback.
-
Success with Streaming Solution and a Potential Burr ImprovementI finally managed to get the streaming functionality working using the second proposed solution. However, I ran into an issue where the machine wasn’t halting execution as expected. This was because I had implicit (default) transitions to the Current Solution:For now, I’ve resolved the issue by checking whether the async def get_response_stream(self, user: User, chat_id, query):
"""
Get response for a query
"""
app = await self.build_or_get_app(user, chat_id, sequence_number=None, stream=True)
inputs = {"query": query}
streaming_container = None
while True:
action, streaming_container = await app.astream_result(
halt_after=[
action.name for action in app.graph.actions if action.streaming
],
halt_before=["process_user_query"],
inputs=inputs,
)
async for result in streaming_container:
yield result, None
result, state = await streaming_container.get()
if action and action.name == "process_user_query" or result.get('content') is not None:
break
result, state = await streaming_container.get()
yield result, state Possible Improvement in Burr:While this workaround solves the issue for now, I feel that Burr could potentially handle this more elegantly. Perhaps Burr could introduce a more explicit mechanism to stop execution when a complete result is received, eliminating the need for manual checks like this. I’m happy with the current progress, but I’d love to hear any thoughts or suggestions on how Burr might handle this more natively! |
Beta Was this translation helpful? Give feedback.
-
FYI @meeran03 I sent you an email about Burr cloud -- let me know if that went to spam. |
Beta Was this translation helpful? Give feedback.
Hey @faisalrehmankhattak and @meeran03 -- thanks for the issue! I think there are a few issues here, but this is a supported use-case and we'll make sure we get to a solution.
Two cases:
(1) you want to use
halt_before
+astream_result(...)
This assumes that the only node you want to stream back is
process_user_query
-- see (2) otherwise -- @meeran03 I think this is describing your case, but I'm not 100% sure.This is a bit of unclear documentation, but I think there is an easy fix.
halt_before
doesn't actually stream (to transition between actions, we need full state to complete so we know where to transition to) -- ifprocess_user_query
is the node that streams, it'll stop before that, …