Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable streaming in GoogleAIGemini #1016

Merged
merged 8 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import logging
from typing import Any, Dict, List, Optional, Union
from typing import Any, Callable, Dict, List, Optional, Union

import google.generativeai as genai
from google.ai.generativelanguage import Content, Part
from google.ai.generativelanguage import Tool as ToolProto
from google.generativeai import GenerationConfig, GenerativeModel
from google.generativeai.types import HarmBlockThreshold, HarmCategory, Tool
from google.generativeai.types import GenerateContentResponse, HarmBlockThreshold, HarmCategory, Tool
from haystack.core.component import component
from haystack.core.serialization import default_from_dict, default_to_dict
from haystack.dataclasses.byte_stream import ByteStream
from haystack.dataclasses import ByteStream, StreamingChunk
from haystack.dataclasses.chat_message import ChatMessage, ChatRole
from haystack.utils import Secret, deserialize_secrets_inplace
from haystack.utils import Secret, deserialize_callable, deserialize_secrets_inplace, serialize_callable

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -107,6 +107,7 @@ def __init__(
generation_config: Optional[Union[GenerationConfig, Dict[str, Any]]] = None,
safety_settings: Optional[Dict[HarmCategory, HarmBlockThreshold]] = None,
tools: Optional[List[Tool]] = None,
streaming_callback: Optional[Callable[[StreamingChunk], None]] = None,
):
"""
Initializes a `GoogleAIGeminiChatGenerator` instance.
Expand All @@ -132,6 +133,8 @@ def __init__(
A dictionary with `HarmCategory` as keys and `HarmBlockThreshold` as values.
For more information, see [the API reference](https://ai.google.dev/api)
:param tools: A list of Tool objects that can be used for [Function calling](https://ai.google.dev/docs/function_calling).
:param streaming_callback: A callback function that is called when a new token is received from the stream.
The callback function accepts StreamingChunk as an argument.
"""

genai.configure(api_key=api_key.resolve_value())
Expand All @@ -142,6 +145,7 @@ def __init__(
self._safety_settings = safety_settings
self._tools = tools
self._model = GenerativeModel(self._model_name, tools=self._tools)
self._streaming_callback = streaming_callback

def _generation_config_to_dict(self, config: Union[GenerationConfig, Dict[str, Any]]) -> Dict[str, Any]:
if isinstance(config, dict):
Expand All @@ -162,13 +166,16 @@ def to_dict(self) -> Dict[str, Any]:
:returns:
Dictionary with serialized data.
"""
callback_name = serialize_callable(self._streaming_callback) if self._streaming_callback else None

data = default_to_dict(
self,
api_key=self._api_key.to_dict(),
model=self._model_name,
generation_config=self._generation_config,
safety_settings=self._safety_settings,
tools=self._tools,
streaming_callback=callback_name,
)
if (tools := data["init_parameters"].get("tools")) is not None:
data["init_parameters"]["tools"] = []
Expand Down Expand Up @@ -213,6 +220,8 @@ def from_dict(cls, data: Dict[str, Any]) -> "GoogleAIGeminiChatGenerator":
data["init_parameters"]["safety_settings"] = {
HarmCategory(k): HarmBlockThreshold(v) for k, v in safety_settings.items()
}
if (serialized_callback_handler := data["init_parameters"].get("streaming_callback")) is not None:
data["init_parameters"]["streaming_callback"] = deserialize_callable(serialized_callback_handler)
return default_from_dict(cls, data)

def _convert_part(self, part: Union[str, ByteStream, Part]) -> Part:
Expand Down Expand Up @@ -274,16 +283,23 @@ def _message_to_content(self, message: ChatMessage) -> Content:
return Content(parts=[part], role=role)

@component.output_types(replies=List[ChatMessage])
def run(self, messages: List[ChatMessage]):
def run(
self,
messages: List[ChatMessage],
streaming_callback: Optional[Callable[[StreamingChunk], None]] = None,
):
"""
Generates text based on the provided messages.

:param messages:
A list of `ChatMessage` instances, representing the input messages.
:param streaming_callback:
A callback function that is called when a new token is received from the stream.
:returns:
A dictionary containing the following key:
- `replies`: A list containing the generated responses as `ChatMessage` instances.
"""
streaming_callback = streaming_callback or self._streaming_callback
history = [self._message_to_content(m) for m in messages[:-1]]
session = self._model.start_chat(history=history)

Expand All @@ -292,10 +308,22 @@ def run(self, messages: List[ChatMessage]):
content=new_message,
generation_config=self._generation_config,
safety_settings=self._safety_settings,
stream=streaming_callback is not None,
)

replies = self._get_stream_response(res, streaming_callback) if streaming_callback else self._get_response(res)

return {"replies": replies}

def _get_response(self, response_body: GenerateContentResponse) -> List[ChatMessage]:
"""
Extracts the responses from the Google AI response.

:param response_body: The response from Google AI request.
:returns: The extracted responses.
"""
replies = []
for candidate in res.candidates:
for candidate in response_body.candidates:
for part in candidate.content.parts:
if part.text != "":
replies.append(ChatMessage.from_system(part.text))
Expand All @@ -307,5 +335,23 @@ def run(self, messages: List[ChatMessage]):
name=part.function_call.name,
)
)
return replies

return {"replies": replies}
def _get_stream_response(
self, stream: GenerateContentResponse, streaming_callback: Callable[[StreamingChunk], None]
) -> List[ChatMessage]:
"""
Extracts the responses from the Google AI streaming response.

:param stream: The streaming response from the Google AI request.
:param streaming_callback: The handler for the streaming response.
:returns: The extracted response with the content of all streaming chunks.
"""
responses = []
for chunk in stream:
content = chunk.text if len(chunk.parts) > 0 and "text" in chunk.parts[0] else ""
streaming_callback(StreamingChunk(content=content, meta=chunk.to_dict()))
responses.append(content)

combined_response = "".join(responses).lstrip()
return [ChatMessage.from_system(content=combined_response)]
Amnah199 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import logging
from typing import Any, Dict, List, Optional, Union
from typing import Any, Callable, Dict, List, Optional, Union

import google.generativeai as genai
from google.ai.generativelanguage import Content, Part, Tool
from google.generativeai import GenerationConfig, GenerativeModel
from google.generativeai.types import HarmBlockThreshold, HarmCategory
from google.generativeai.types import GenerateContentResponse, HarmBlockThreshold, HarmCategory
from haystack.core.component import component
from haystack.core.component.types import Variadic
from haystack.core.serialization import default_from_dict, default_to_dict
from haystack.dataclasses.byte_stream import ByteStream
from haystack.utils import Secret, deserialize_secrets_inplace
from haystack.dataclasses import ByteStream, StreamingChunk
from haystack.utils import Secret, deserialize_callable, deserialize_secrets_inplace, serialize_callable

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,6 +70,7 @@ def __init__(
generation_config: Optional[Union[GenerationConfig, Dict[str, Any]]] = None,
safety_settings: Optional[Dict[HarmCategory, HarmBlockThreshold]] = None,
tools: Optional[List[Tool]] = None,
streaming_callback: Optional[Callable[[StreamingChunk], None]] = None,
):
"""
Initializes a `GoogleAIGeminiGenerator` instance.
Expand All @@ -91,6 +92,8 @@ def __init__(
A dictionary with `HarmCategory` as keys and `HarmBlockThreshold` as values.
For more information, see [the API reference](https://ai.google.dev/api)
:param tools: A list of Tool objects that can be used for [Function calling](https://ai.google.dev/docs/function_calling).
:param streaming_callback: A callback function that is called when a new token is received from the stream.
The callback function accepts StreamingChunk as an argument.
"""
genai.configure(api_key=api_key.resolve_value())

Expand All @@ -100,6 +103,7 @@ def __init__(
self._safety_settings = safety_settings
self._tools = tools
self._model = GenerativeModel(self._model_name, tools=self._tools)
self._streaming_callback = streaming_callback

def _generation_config_to_dict(self, config: Union[GenerationConfig, Dict[str, Any]]) -> Dict[str, Any]:
if isinstance(config, dict):
Expand All @@ -120,13 +124,15 @@ def to_dict(self) -> Dict[str, Any]:
:returns:
Dictionary with serialized data.
"""
callback_name = serialize_callable(self._streaming_callback) if self._streaming_callback else None
data = default_to_dict(
self,
api_key=self._api_key.to_dict(),
model=self._model_name,
generation_config=self._generation_config,
safety_settings=self._safety_settings,
tools=self._tools,
streaming_callback=callback_name,
)
if (tools := data["init_parameters"].get("tools")) is not None:
data["init_parameters"]["tools"] = [Tool.serialize(t) for t in tools]
Expand Down Expand Up @@ -156,6 +162,8 @@ def from_dict(cls, data: Dict[str, Any]) -> "GoogleAIGeminiGenerator":
data["init_parameters"]["safety_settings"] = {
HarmCategory(k): HarmBlockThreshold(v) for k, v in safety_settings.items()
}
if (serialized_callback_handler := data["init_parameters"].get("streaming_callback")) is not None:
data["init_parameters"]["streaming_callback"] = deserialize_callable(serialized_callback_handler)

return default_from_dict(cls, data)

Expand All @@ -176,28 +184,45 @@ def _convert_part(self, part: Union[str, ByteStream, Part]) -> Part:
raise ValueError(msg)

@component.output_types(replies=List[Union[str, Dict[str, str]]])
def run(self, parts: Variadic[Union[str, ByteStream, Part]]):
def run(
self,
parts: Variadic[Union[str, ByteStream, Part]],
streaming_callback: Optional[Callable[[StreamingChunk], None]] = None,
):
"""
Generates text based on the given input parts.

:param parts:
A heterogeneous list of strings, `ByteStream` or `Part` objects.
:param streaming_callback: A callback function that is called when a new token is received from the stream.
:returns:
A dictionary containing the following key:
- `replies`: A list of strings or dictionaries with function calls.
"""

# check if streaming_callback is passed
streaming_callback = streaming_callback or self._streaming_callback
converted_parts = [self._convert_part(p) for p in parts]

contents = [Content(parts=converted_parts, role="user")]
res = self._model.generate_content(
contents=contents,
generation_config=self._generation_config,
safety_settings=self._safety_settings,
stream=streaming_callback is not None,
)
self._model.start_chat()
replies = self._get_stream_response(res, streaming_callback) if streaming_callback else self._get_response(res)

return {"replies": replies}

def _get_response(self, response_body: GenerateContentResponse) -> List[str]:
"""
Extracts the responses from the Google AI request.
:param response_body: The response body from the Google AI request.
:returns: A list of string responses.
"""
replies = []
for candidate in res.candidates:
for candidate in response_body.candidates:
for part in candidate.content.parts:
if part.text != "":
replies.append(part.text)
Expand All @@ -207,5 +232,23 @@ def run(self, parts: Variadic[Union[str, ByteStream, Part]]):
"args": dict(part.function_call.args.items()),
}
replies.append(function_call)
return replies

return {"replies": replies}
def _get_stream_response(
self, stream: GenerateContentResponse, streaming_callback: Callable[[StreamingChunk], None]
) -> List[str]:
"""
Extracts the responses from the Google AI streaming response.
:param stream: The streaming response from the Google AI request.
:param streaming_callback: The handler for the streaming response.
:returns: A list of string responses.
"""

responses = []
for chunk in stream:
content = chunk.text if len(chunk.parts) > 0 and "text" in chunk.parts[0] else ""
streaming_callback(StreamingChunk(content=content, meta=chunk.to_dict()))
responses.append(content)

combined_response = ["".join(responses).lstrip()]
return combined_response
Loading