Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add TTFT support in OpenAI chat generator #8444

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

LastRemote
Copy link
Contributor

Related Issues

Proposed Changes:

  • adds TTFT support for OpenAI generators

How did you test it?

  • unit tests

Notes for the reviewer

  • I am not exactly sure how to set streaming_options.include_usage in OpenAI SDK, and thus still no usage data for streaming LLMs.
  • The timestamp is stored in isoformat so it can be safely dumped when deployed as a service.

Checklist

@LastRemote LastRemote requested review from a team as code owners October 8, 2024 10:00
@LastRemote LastRemote requested review from dfokina and davidsbatista and removed request for a team October 8, 2024 10:00
@coveralls
Copy link
Collaborator

coveralls commented Oct 8, 2024

Pull Request Test Coverage Report for Build 11496599945

Details

  • 0 of 0 changed or added relevant lines in 0 files are covered.
  • 13 unchanged lines in 1 file lost coverage.
  • Overall coverage increased (+0.01%) to 90.482%

Files with Coverage Reduction New Missed Lines %
components/generators/chat/openai.py 13 80.99%
Totals Coverage Status
Change from base Build 11463116725: 0.01%
Covered Lines: 7558
Relevant Lines: 8353

💛 - Coveralls

@vblagoje
Copy link
Member

vblagoje commented Oct 8, 2024

Ok @LastRemote regarding streaming response this approach worked for me. It is slightly different that yours, perhaps have a look at it and consider some of these changes.

@LastRemote
Copy link
Contributor Author

LastRemote commented Oct 11, 2024

Ok @LastRemote regarding streaming response this approach worked for me. It is slightly different that yours, perhaps have a look at it and consider some of these changes.

@vblagoje Hello, I reviewed your approach and still have some concerns regarding the meta structure and potential integration with Langfuse. Specifically, when a chunk is built, the current behavior captures fields like index, model, tool_calls, and stop_reason. However, I believe we should avoid including this information in the meta["usage"] field when merging meta across chunks. Instead, I prefer preserving the usage meta to reflect only the information directly from the LLM response (or leave it empty if no usage data is captured). This would help prevent potential mismatches when passing usage data to third-party tracers or other downstream services.

Additionally, I converted the TTFT timestamp to ISO format. This change is intended to make it easier to log the entire ChatMessage object or include it as part of a service response. It also requires a slightly different update in the Langfuse integration (which I already implemented in my current project):

        ...
        if tags.get("haystack.component.type") in _SUPPORTED_CHAT_GENERATORS:
            replies = span._data.get("haystack.component.output", {}).get("replies")
            if replies:
                meta = replies[0].meta
                completion_start_time = meta.get("completion_start_time")
                if completion_start_time:
                    completion_start_time = datetime.fromisoformat(completion_start_time)
                span._span.update(
                    usage=meta.get("usage") or None,
                    model=meta.get("model"),
                    completion_start_time=completion_start_time,
                )

@vblagoje
Copy link
Member

Ok thanks @LastRemote - I'll take a look at this again soon, got sidetracked with some other higher prio tasks, this week def.

@vblagoje
Copy link
Member

@LastRemote I took a closer look and I noticed that you implemented this feature in OpenAIGenerator. As we are going to deprecate all generators in the near future let's focus on supporting this feature in OpenAIChatGenerator only. 🙏

@LastRemote
Copy link
Contributor Author

@LastRemote As we are going to deprecate all generators in the near future let's focus on supporting this feature in OpenAIChatGenerator only. 🙏

Sure thing, we can focus on OpenAIChatGenerator for now. I personally find the generator implementations a bit cumbersome too. When playing around with haystack, I actually did write an extra ChatModelService abstraction that has an.infer() method to make corrsponding calls to LLMs and cast the response as List[ChatMessage]. This way I can have a single ChatGenerator component that uses a ChatModelService, and I am able to switch between different models without needing to change my pipeline architecture. Not sure if this aligns with what you have in mind, but I hope it helps.

@vblagoje
Copy link
Member

@LastRemote As we are going to deprecate all generators in the near future let's focus on supporting this feature in OpenAIChatGenerator only. 🙏

Sure thing, we can focus on OpenAIChatGenerator for now. I personally find the generator implementations a bit cumbersome too. When playing around with haystack, I actually did write an extra ChatModelService abstraction that has an.infer() method to make corrsponding calls to LLMs and cast the response as List[ChatMessage]. This way I can have a single ChatGenerator component that uses a ChatModelService, and I am able to switch between different models without needing to change my pipeline architecture. Not sure if this aligns with what you have in mind, but I hope it helps.

Awesome 💪
Hmm, that's interesting, but how do you switch the models, one needs all these different packages installed say cohere and anthropic and particular peculiar ways they invoke chat completions?

@LastRemote
Copy link
Contributor Author

LastRemote commented Oct 17, 2024

@LastRemote As we are going to deprecate all generators in the near future let's focus on supporting this feature in OpenAIChatGenerator only. 🙏

Sure thing, we can focus on OpenAIChatGenerator for now. I personally find the generator implementations a bit cumbersome too. When playing around with haystack, I actually did write an extra ChatModelService abstraction that has an.infer() method to make corrsponding calls to LLMs and cast the response as List[ChatMessage]. This way I can have a single ChatGenerator component that uses a ChatModelService, and I am able to switch between different models without needing to change my pipeline architecture. Not sure if this aligns with what you have in mind, but I hope it helps.

Awesome 💪 Hmm, that's interesting, but how do you switch the models, one needs all these different packages installed say cohere and anthropic and particular peculiar ways they invoke chat completions?

So I extend the abstract ChatModelService class to support different invocation methods. I have implemented OpenAIChatModelService, BedrockAnthropicChatModelService, GeminiChatModelService and AnthropicChatModelService (I never really used the latter two in practice but just for the sake of completeness). Each type of model service will have its own implementation of .infer() method, which may be using another sdk or simply httpx client. And I will include how to build a request and parse a response inside the infer() method as well, handling all the variations among different calling formats, so that I can call them in the same way inside the pipeline component.

This is the actual code I have for ChatGenerator:

@component
class ChatGenerator:
    """
    A class used to generate chat responses from a given chat model service.

    :param model_service: An instance of the ChatModelService to interact with the model.
    :param model: The name of the model to use for generating responses, if provided.
    :param stream: Whether to use streaming when calling the chat model service, defaults to False
    :param streaming_callbacks: A list of callback functions to handle streaming responses, defaults to None
    :param timeout: The timeout duration for the HTTP client, defaults to 20.0
    :param generation_kwargs: Additional keyword arguments to pass to the model for generation, defaults to None
    """

    def __init__(
        self,
        model_service: ChatModelService,
        model: Optional[str] = None,
        stream: bool = False,
        streaming_callbacks: Optional[List[Callable[[StreamingChunk], None]]] = None,
        timeout: Optional[float] = 20.0,
        generation_kwargs: Optional[Dict[str, Any]] = None,
    ):
        self.model_service = model_service
        self.model = model or ""
        self.stream = stream
        self.streaming_callbacks = streaming_callbacks
        self.generation_kwargs = generation_kwargs or {}
        self.timeout = timeout

    def to_dict(self) -> Dict[str, Any]:
        """
        Returns a serialized dictionary representation of the component.

        :return: A dictionary representation of the component.
        """
        # Create a local asset reference for the model service
        model_service_reference = self.model_service.create_local_reference()
        # Serialize the streaming callbacks
        callback_names = (
            [serialize_callable(callback) for callback in self.streaming_callbacks]
            if self.streaming_callbacks
            else None
        )
        return default_to_dict(
            self,
            model_service=model_service_reference.to_dict(),
            model=self.model,
            streaming_callbacks=callback_names,
            timeout=self.timeout,
            generation_kwargs=self.generation_kwargs,
        )

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> Self:
        """
        Loads a ChatGenerator component from its serialized dictionary representation.

        :param data: The serialized dictionary representation of the component.
        :return: The deserialized ChatGenerator component.
        """
        # Make a shallow copy of init_parameters to avoid modifying the original data
        init_parameters_data = data.get("init_parameters", {}).copy()
        # Load the chat model service from reference
        model_service_reference = AssetReference.from_dict(init_parameters_data["model_service"])
        model_service = ModelServiceFactory.load_from_reference(model_service_reference)
        init_parameters_data["model_service"] = model_service

        # Deserialize the streaming callbacks
        serialized_callbacks = init_parameters_data.get("streaming_callbacks")
        if serialized_callbacks and isinstance(serialized_callbacks, list):
            data["init_parameters"]["streaming_callbacks"] = [
                deserialize_callable(callback) for callback in serialized_callbacks
            ]

        # Load the component
        return default_from_dict(cls, {"type": data["type"], "init_parameters": init_parameters_data})

    @component.output_types(replies=List[ChatMessage])
    def run(
        self,
        messages: List[ChatMessage],
        generation_kwargs: Optional[Dict[str, Any]] = None,
        **kwargs,
    ):
        """
        Generates responses for the given list of messages.

        :param messages: The list of messages to generate responses for.
        :param generation_kwargs: Additional keyword arguments to pass to the model for generation, defaults to None
        :param kwargs: Additional keyword arguments to pass to the httpx client.
        :return: A dictionary containing the generated responses.
        """
        # update generation kwargs by merging with the generation kwargs passed to the run method
        generation_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}

        completions = self.model_service.infer(
            messages,
            model=self.model,
            timeout=self.timeout,
            stream=self.stream,
            streaming_callbacks=self.streaming_callbacks,
            inference_kwargs=generation_kwargs,
            **kwargs,
        )

        return {"replies": completions}

@LastRemote
Copy link
Contributor Author

Hey @vblagoje, can I ask for an update on the PR status? Trying to keep things moving while I am still happy to discuss more about the future deprecation of generators :)

@vblagoje
Copy link
Member

Hey @vblagoje, can I ask for an update on the PR status? Trying to keep things moving while I am still happy to discuss more about the future deprecation of generators :)

Hey @LastRemote in general your approach with generic ChatGenerator seems nice, there are many peculiarities with each chat generator in terms of parameters and how streaming is handled. We used this approach in Haystack 1.x for PromptNode and found that it turned into a maintenance nightmare. But YMMV and in your use case it might be a perfect solution.

Regarding the PR - I was awaiting chat generator code, maybe we didn't understand each other?

@LastRemote
Copy link
Contributor Author

Hey @vblagoje, can I ask for an update on the PR status? Trying to keep things moving while I am still happy to discuss more about the future deprecation of generators :)

Hey @LastRemote in general your approach with generic ChatGenerator seems nice, there are many peculiarities with each chat generator in terms of parameters and how streaming is handled. We used this approach in Haystack 1.x for PromptNode and found that it turned into a maintenance nightmare. But YMMV and in your use case it might be a perfect solution.

Regarding the PR - I was awaiting chat generator code, maybe we didn't understand each other?

Oh, there is some misunderstanding here. I am still not exactly sure what you have mind after deprecating the current generators, and I am just offering my two cents here hoping it could be useful. Nonetheless, regarding the scope of this PR, I'd prefer that we purpose a smaller change in OpenAIChatGenerator for now. The whole ChatGenerator and ModelService implementation is somewhat complicated and I'd suggest that we separate that topic to another thread if you are still interested in looking my code. That could be a big change that needs a lot more consideration.

@vblagoje
Copy link
Member

Nonetheless, regarding the scope of this PR, I'd prefer that we purpose a smaller change in OpenAIChatGenerator for now.

Yes, exactly - we do understand each other - let's implement TTFT in OpenAIChatGenerator 💪

The whole ChatGenerator and ModelService implementation is somewhat complicated and I'd suggest that we separate that topic to another thread if you are still interested in looking my code. That could be a big change that needs a lot more consideration.

Exactly, that's what I meant as well.

@LastRemote
Copy link
Contributor Author

LastRemote commented Oct 23, 2024

Awesome, thanks! Any additional comments for the current code change? I described my intention of the current changes in a previous post. I will create a PR on the langfuse integration part as well in the integration repository.

@vblagoje Hello, I reviewed your approach and still have some concerns regarding the meta structure and potential integration with Langfuse. Specifically, when a chunk is built, the current behavior captures fields like index, model, tool_calls, and stop_reason. However, I believe we should avoid including this information in the meta["usage"] field when merging meta across chunks. Instead, I prefer preserving the usage meta to reflect only the information directly from the LLM response (or leave it empty if no usage data is captured). This would help prevent potential mismatches when passing usage data to third-party tracers or other downstream services.

Additionally, I converted the TTFT timestamp to ISO format. This change is intended to make it easier to log the entire ChatMessage object or include it as part of a service response. It also requires a slightly different update in the Langfuse integration (which I already implemented in my current project):

        ...
        if tags.get("haystack.component.type") in _SUPPORTED_CHAT_GENERATORS:
            replies = span._data.get("haystack.component.output", {}).get("replies")
            if replies:
                meta = replies[0].meta
                completion_start_time = meta.get("completion_start_time")
                if completion_start_time:
                    completion_start_time = datetime.fromisoformat(completion_start_time)
                span._span.update(
                    usage=meta.get("usage") or None,
                    model=meta.get("model"),
                    completion_start_time=completion_start_time,
                )

@vblagoje
Copy link
Member

@LastRemote thanks for your effort and patience! To clarify, let's focus on adding TTFT to OpenAIChatGenerator, not OpenAIGenerator, in this PR to keep it simple and aligned with our current architecture and objectives. Let me know if you have any questions!

@LastRemote
Copy link
Contributor Author

@LastRemote thanks for your effort and patience! To clarify, let's focus on adding TTFT to OpenAIChatGenerator, not OpenAIGenerator, in this PR to keep it simple and aligned with our current architecture and objectives. Let me know if you have any questions!

Oh. I did mistakenly update the wrong generator then. Thanks for pointing out, and I will make another update today.

@LastRemote LastRemote changed the title feat: Add TTFT support in OpenAI generators feat: Add TTFT support in OpenAI chat generator Oct 24, 2024
@github-actions github-actions bot added the type:documentation Improvements on the docs label Oct 24, 2024
@LastRemote
Copy link
Contributor Author

Updated commit and cleaned up git commit history

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
topic:tests type:documentation Improvements on the docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants