Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug Report: astream_events fails when tool returns a dictionary #26763

Open
5 tasks done
sharrajesh opened this issue Sep 23, 2024 · 1 comment
Open
5 tasks done

Bug Report: astream_events fails when tool returns a dictionary #26763

sharrajesh opened this issue Sep 23, 2024 · 1 comment
Labels
🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature investigate

Comments

@sharrajesh
Copy link
Contributor

sharrajesh commented Sep 23, 2024

Checked other resources

  • I added a very descriptive title to this issue.
  • I searched the LangChain documentation with the integrated search.
  • I used the GitHub search to find a similar question and didn't find it.
  • I am sure that this is a bug in LangChain rather than my code.
  • The bug is not resolved by updating to the latest stable version of LangChain (or the specific integration package).

Description

When using astream_events with an agent that has multiple tools, if one of the tools returns a dictionary containing an answer and source documents, the streaming process fails. However, the same setup works correctly in non-streaming mode.

Steps to Reproduce

  1. Set up an agent with multiple tools, including one that returns a dictionary.
  2. Use astream_events to stream the agent's output.
  3. Observe the error when the tool returning a dictionary is called.

Code to Reproduce

import asyncio
import json
import os
import random
import warnings
from typing import Type, Dict, Any

import boto3
from dotenv import find_dotenv, load_dotenv
from langchain.agents import create_react_agent, AgentExecutor
from langchain.prompts import PromptTemplate
from langchain.tools import BaseTool
from langchain_aws import ChatBedrock
from langsmith import Client
from pydantic import BaseModel, Field

# Suppress warnings
warnings.filterwarnings("ignore")

load_dotenv(find_dotenv())

reproduce_problem = True

boto3_session = boto3.Session(
    aws_access_key_id=os.environ["BEDROCK_AWS_ACCESS_KEY"],
    aws_secret_access_key=os.environ["BEDROCK_AWS_ACCESS_SECRET"],
    region_name=os.environ["BEDROCK_AWS_REGION"],
)
model = ChatBedrock(
    model_id="anthropic.claude-3-5-sonnet-20240620-v1:0",
    client=boto3_session.client("bedrock-runtime"),
    streaming=True,
    model_kwargs={
        "temperature": 0,
    },
    tags=["agent"],
    metadata={"streaming": True},
    callbacks=None,
)


class CatHidingInput(BaseModel):
    query: str = Field(default="", description="Query about where the cat is hiding")


class CatHidingTool(BaseTool):
    name = "where_cat_is_hiding"
    description = "Use this tool to find out where the cat is hiding right now."
    args_schema: Type[BaseModel] = CatHidingInput

    async def _arun(self, query: str) -> str:
        return random.choice(["under the bed", "on the shelf"])

    def _run(self, query: str) -> str:
        return random.choice(["under the bed", "on the shelf"])


class GetItemsInput(BaseModel):
    place: str = Field(..., description="The place to look for items")


class GetItemsTool(BaseTool):
    name = "get_items"
    description = "Use this tool to look up which items are in the given place."
    args_schema: Type[BaseModel] = GetItemsInput

    async def _arun(self, place: str) -> Dict[str, Any]:
        return self._get_items(place)

    def _run(self, place: str) -> Dict[str, Any]:
        return self._get_items(place)

    def _get_items(self, place: str):
        items = ""
        if "bed" in place:
            items = "socks, shoes and dust bunnies"
        elif "shelf" in place:
            items = "books, pencils and pictures"
        else:
            items = "cat snacks"
        answer = f"The items in the {place} are: {items}"
        source_documents = [
            {
                "page_content": f"Items found in {place}: {items}",
                "metadata": {"source": "GetItemsTool", "place": place},
            }
        ]
        # xxxx: this is the issue
        # it doesn't work in streaming if i return a dictionary which works in non-streaming mode in my application
        if reproduce_problem:
            return {"answer": answer, "source_documents": source_documents}
        else:
            # it works if i return one string
            return json.dumps({"answer": answer, "source_documents": source_documents})


client = Client()

REACT_PROMPT = PromptTemplate.from_template(
    """You are an AI assistant helping to find a cat and items in its location.
Human: {input}
AI: To solve this task, I have access to the following tools:

{tools}

The available tool names are: {tool_names}

Let's approach this step-by-step:

Always use the following format:

Thought: Consider what to do next
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

{agent_scratchpad}"""
)


async def run_agent_streaming():
    tools = [GetItemsTool(), CatHidingTool()]
    agent = create_react_agent(model, tools, REACT_PROMPT)
    agent_executor = AgentExecutor(
        agent=agent,
        tools=tools,
        return_intermediate_steps=True,
        return_source_documents=True,
        handle_parsing_errors=True,
    ).with_config({"run_name": "Agent"})
    async for event in agent_executor.astream_events(
        {"input": "where is the cat hiding? what items are in that location?"},
        version="v2",
    ):
        kind = event["event"]
        if kind == "on_chain_start":
            if event["name"] == "Agent":
                print(
                    f"Starting agent: {event['name']} with input: {event['data'].get('input')}"
                )
        elif kind == "on_chain_end":
            if event["name"] == "Agent":
                print()
                print("--")
                print(
                    f"Done agent: {event['name']} with output: {event['data'].get('output')['output']}"
                )
        elif kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                print(content, end="|")
        elif kind == "on_tool_start":
            print("--")
            print(
                f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}"
            )
        elif kind == "on_tool_end":
            print(f"Done tool: {event['name']}")
            print(f"Tool output was: {event['data'].get('output')}")
            print("--")


async def run_agent_non_streaming():
    tools = [GetItemsTool(), CatHidingTool()]
    agent = create_react_agent(model, tools, REACT_PROMPT)
    agent_executor = AgentExecutor(
        agent=agent,
        tools=tools,
        return_intermediate_steps=True,
        return_source_documents=True,
        handle_parsing_errors=True,
    ).with_config({"run_name": "Agent"})
    result = await agent_executor.ainvoke(
        {"input": "where is the cat hiding? what items are in that location?"}
    )
    print(f"Agent output: {result['output']}")
    print("Intermediate steps:")
    for step in result.get("intermediate_steps", []):
        print(f"- Action: {step[0].tool}")
        print(f"  Input: {step[0].tool_input}")
        print(f"  Output: {step[1]}")


if __name__ == "__main__":
    print("Running streaming version:")
    asyncio.run(run_agent_streaming())
    print("\nRunning non-streaming version:")
    asyncio.run(run_agent_non_streaming())

Error Message

pydantic.v1.error_wrappers.ValidationError: 2 validation errors for HumanMessage
content
  str type expected (type=type_error.str)
content
  value is not a valid list (type=type_error.list)

Expected Behavior

The astream_events method should handle tool outputs that return dictionaries, just as it does in non-streaming mode.

Actual Behavior

The streaming process fails with a ValidationError when a tool returns a dictionary.

Workaround

Returning a JSON string instead of a dictionary from the tool allows the streaming to work:

return json.dumps({"answer": answer, "source_documents": source_documents})

Environment

  • LangChain version: 0.3.0
  • LangChain Core version: 0.3.2
  • Python version: 3.11
  • Pydantic version: 2.9.2
  • Pydantic Core version: 2.23.4
  • langchain-aws: (please specify version)
  • boto3: (please specify version)

Model:

  • AWS Bedrock
  • Model ID: anthropic.claude-3-5-sonnet-20240620-v1:0

Operating System: (Please specify your OS)

Additional Context

This issue only occurs in streaming mode when using astream_events. The same code works correctly in non-streaming mode. It appears that the astream_events method is not properly handling dictionary outputs from tools, possibly due to an issue in the event conversion process.

The problem is reproducible with AWS Bedrock using the Claude 3 Sonnet model, but it may also affect other LLM providers and models.

@langcarl langcarl bot added the investigate label Sep 23, 2024
@dosubot dosubot bot added the 🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature label Sep 23, 2024
@sharrajesh
Copy link
Contributor Author

sharrajesh commented Sep 23, 2024

To add more context

This gist shows working streaming and non streaming code for my langchain agent with multiple tools returning dict when using create_openai_tools_agent
https://gist.github.com/sharrajesh/1080af5a95ae9d7b83a8da46597b68e1

This gist show non working streaming code for my langchain agent with multiple tools returning dict when using create_react_agent
https://gist.github.com/sharrajesh/765c0b6edfe991363675f45d467e3c93

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature investigate
Projects
None yet
Development

No branches or pull requests

1 participant