diff --git a/docs/sphinx_doc/en/source/tutorial/208-distribute.md b/docs/sphinx_doc/en/source/tutorial/208-distribute.md index 01c0a4bc8..6358c4dab 100644 --- a/docs/sphinx_doc/en/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/en/source/tutorial/208-distribute.md @@ -82,11 +82,16 @@ server.launch() server.wait_until_terminate() ``` -> For similarity, you can run the following command in your terminal rather than the above code: -> -> ```shell -> as_server --host ip_a --port 12001 --model-config-path model_config_path_a -> ``` +For simplicity, you can run the following command in your terminal rather than the above code: + +```shell +as_server --host ip_a --port 12001 --model-config-path model_config_path_a --agent-dir parent_dir_of_agent_a_and_b +``` + +> Note: +> The `--agent-dir` field is used to specify the directory where your customized agent classes are located. +> Please make sure that all custom Agent classes are located in `--agent-dir`, and that the custom modules they depend on are also located in the directory. +> Additionally, because the above command will load all Python files in the directory, please ensure that the directory does not contain any malicious files to avoid security risks. Then put your model config file accordingly in `model_config_path_b`, set environment variables, and run the following code on `Machine2`. @@ -112,7 +117,7 @@ server.wait_until_terminate() > Similarly, you can run the following command in your terminal to setup the agent server: > > ```shell -> as_server --host ip_b --port 12002 --model-config-path model_config_path_b +> as_server --host ip_b --port 12002 --model-config-path model_config_path_b --agent-dir parent_dir_of_agent_a_and_b > ``` Then, you can connect to the agent servers from the main process with the following code. @@ -139,6 +144,9 @@ And developers just need to write the application flow in a centralized way in t ### Step 2: Orchestrate Distributed Application Flow +> Note: +> Currently, distributed version of Agent only supports `__call__` method call (i.e. `agent(x)`), not support calling other methods or reading/writing properties. + In AgentScope, the orchestration of distributed application flow is exactly the same as non-distributed programs, and developers can write the entire application flow in a centralized way. At the same time, AgentScope allows the use of a mixture of locally and distributed deployed agents, and developers do not need to distinguish which agents are local and which are distributed. @@ -315,6 +323,65 @@ When running large-scale multi-agent applications, it's common to have multiple ok = client.delete_all_agent() ``` +#### Connecting to AgentScope Studio + +The agent server process can be connected to [AgentScope Studio](#209-gui-en) at startup, allowing the `to_dist` method in subsequent distributed applications to be assigned automatically by Studio without the need for any parameters. + +For scenarios where the agent server process is started using Python code, simply fill in the `studio_url` in the initialization parameters of `RpcAgentServerLauncher`. This requires that the URL is correct and accessible over the network, for example, the default URL for the Studio is `http://127.0.0.1:5000`. + +```python +# import some packages + +# register models which can be used in the server +agentscope.init( + model_configs=model_config_path_a, +) +# Create an agent service process +server = RpcAgentServerLauncher( + host="ip_a", + port=12001, # choose an available port + custom_agent_classes=[...], # register your customized agent classes + studio_url="http://studio_ip:studio_port", # connect to AgentScope Studio +) + +# Start the service +server.launch() +server.wait_until_terminate() +``` + +For scenarios using the command `as_server` in your command line, simply fill in the `--studio-url` parameter. + +```shell +as_server --host ip_a --port 12001 --model-config-path model_config_path_a --agent-dir parent_dir_of_agent_a_and_b --studio-url http://studio_ip:studio_port +``` + +After executing the above code or command, you can enter the Server Manager page of AgentScope Studio to check if the connection is successful. If the connection is successful, the agent server process will be displayed in the page table, and you can observe the running status and resource occupation of the process in the page, then you can use the advanced functions brought by AgentScope Studio. This section will focus on the impact of `to_dist` method brought by AgentScope Studio, and please refer to [AgentScope Studio](#209-gui-en) for the specific usage of the page. + +After the agent server process successfully connects to Studio, you only need to pass the `studio_url` of this Studio in the `agentscope.init` method, and then the `to_dist` method no longer needs to fill in the `host` and `port` fields, but automatically select an agent server process that has been connected to Studio. + +```python +# import some packages + +agentscope.init( + model_configs=model_config_path_a, + studio_url="http://studio_ip:studio_port", +) + +a = AgentA( + name="A" + # ... +).to_dist() # automatically select an agent server + +# your application code +``` + +> Note: +> +> - The Agent used in this method must be registered at the start of the agent server process through `custom_agent_classes` or `--agent-dir`. +> - When using this method, make sure that the agent server process connected to Studio is still running normally. + +After the application starts running, you can observe in the Server Manager page of Studio which agent server process this Agent is specifically running on, and after the application is completed, you can also delete this Agent through the Server Manager page. + ## Implementation ### Actor Model diff --git a/docs/sphinx_doc/en/source/tutorial/209-gui.md b/docs/sphinx_doc/en/source/tutorial/209-gui.md index 8e644efcf..2b62a8e83 100644 Binary files a/docs/sphinx_doc/en/source/tutorial/209-gui.md and b/docs/sphinx_doc/en/source/tutorial/209-gui.md differ diff --git a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md index 11028fc41..d6fe031bc 100644 --- a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md @@ -80,11 +80,16 @@ server.launch() server.wait_until_terminate() ``` -> 为了进一步简化使用,可以在命令行中输入如下指令来代替上述代码: -> -> ```shell -> as_server --host ip_a --port 12001 --model-config-path model_config_path_a -> ``` +为了进一步简化使用,可以在命令行中输入如下指令来代替上述代码: + +```shell +as_server --host ip_a --port 12001 --model-config-path model_config_path_a --agent-dir parent_dir_of_agent_a_and_b +``` + +> Note: +> `--agent-dir` 用来指定你的自定义 Agent 类所在的目录。 +> 请确保所有的自定义 Agent 类都位于 `--agent-dir` 指定的目录下,并且它们所依赖的自定义模块也都位于该目录下。 +> 另外,因为上述指令会加载目录下的所有 Python 文件,在运行前请确保指定的目录内没有恶意文件,以避免出现安全问题。 在 `Machine2` 上运行如下代码,这里同样要确保已经将模型配置文件放置在 `model_config_path_b` 位置并设置环境变量,从而确保运行在该机器上的 Agent 能够正常访问到模型。 @@ -110,7 +115,7 @@ server.wait_until_terminate() > 这里也同样可以用如下指令来代替上面的代码。 > > ```shell -> as_server --host ip_b --port 12002 --model-config-path model_config_path_b +> as_server --host ip_b --port 12002 --model-config-path model_config_path_b --agent-dir parent_dir_of_agent_a_and_b > ``` 接下来,就可以使用如下代码从主进程中连接这两个智能体服务器进程。 @@ -137,6 +142,9 @@ b = AgentB( ### 步骤2: 编排分布式应用流程 +> Note: +> 当前分布式版本的 Agent 仅支持 `__call__` 方法调用 (即 `agent(x)`),不支持调用其他方法或是属性读写。 + 在AgentScope中,分布式应用流程的编排和非分布式的程序完全一致,开发者可以用中心化的方式编写全部应用流程。 同时,AgentScope允许本地和分布式部署的智能体混合使用,开发者不用特意区分哪些智能体是本地的,哪些是分布式部署的。 @@ -312,6 +320,65 @@ b = AgentB( ok = client.delete_all_agent() ``` +#### 连接 AgentScope Studio + +智能体服务器进程可以在启动时连接 [AgentScope Studio](#209-gui-zh) ,从而让后续搭建的分布式应用中的 `to_dist` 方法不再需要填写任何参数,而是由 Stduio 为其自动分配智能体服务器进程。 + +对于使用 Python 代码启动智能体服务器进程的场景,只需要在 `RpcAgentServerLauncher` 的初始化参数中填入 `studio_url` 即可,这里需要确保填写正确且能够通过网络访问,例如默认情况下启动的 Studio 的 URL 为 `http://127.0.0.1:5000`。 + +```python +# import some packages + +# register models which can be used in the server +agentscope.init( + model_configs=model_config_path_a, +) +# Create an agent service process +server = RpcAgentServerLauncher( + host="ip_a", + port=12001, # choose an available port + custom_agent_classes=[...] # register your customized agent classes + studio_url="http://studio_ip:studio_port", # connect to AgentScope Studio +) + +# Start the service +server.launch() +server.wait_until_terminate() +``` + +对于使用命令行 `as_server` 的场景,也只需要在命令行中填入 `--studio-url` 参数。 + +```shell +as_server --host ip_a --port 12001 --model-config-path model_config_path_a --agent-dir parent_dir_of_agent_a_and_b --studio-url http://studio_ip:studio_port +``` + +执行上述代码或命令后可以进入 AgentScope Studio 的 Server Manager 页面查看是否连接成功。如果连接成功,该智能体服务器进程会显示在页面的表格中,并且可以在页面中观察到该进程的运行状态以及资源占用情况,之后就可以使用 AgentScope Studio 所带来的高级功能了。本节将聚焦于 AgentScope Studio 对 `to_dist` 方法带来的影响,而页面的具体用法请参考 [AgentScope Studio](#209-gui-zh)。 + +在智能体服务器进程成功连接 Studio 后,只需要在 `agentscope.init` 方法中传入该 Studio 的 `studio_url`,后续的 `to_dist` 方法就不再需要填写 `host` 和 `port` 域,而是自动选择一个已经连接到 Studio 的智能体服务器进程。 + +```python +# import some packages + +agentscope.init( + model_configs=model_config_path_a, + studio_url="http://studio_ip:studio_port", +) + +a = AgentA( + name="A" + # ... +).to_dist() # automatically select an agent server + +# your application code +``` + +> Note: +> +> - 该方法中使用的 Agent 必须在智能体服务器进程启动时就已经通过 `custom_agent_classes` 或 `--agent-dir` 注册。 +> - 使用该方法时需要确定连接到 Studio 的智能体服务器进程还在正常运行。 + +在应用开始运行后,可以在 Studio 的 Server Manager 页面中观察该 Agent 具体运行在哪个智能体服务器进程上,应用运行完成后也可以通过 Server Manager 页面删除该 Agent。 + ## 实现原理 ### Actor模式 diff --git a/docs/sphinx_doc/zh_CN/source/tutorial/209-gui.md b/docs/sphinx_doc/zh_CN/source/tutorial/209-gui.md index 6e911a95f..7feeccd29 100644 --- a/docs/sphinx_doc/zh_CN/source/tutorial/209-gui.md +++ b/docs/sphinx_doc/zh_CN/source/tutorial/209-gui.md @@ -6,9 +6,8 @@ AgentScope Studio 是一个开源的 Web UI 工具包,用于构建和监控多 - **Dashboard**:一个用户友好的界面,可以在其中监视正在运行的应用程序,并查看运行历史。 - **Workstation**:一个强大的界面,可通过**拖拽**的方式构建多智能体应用程序。 +- **Server Manager**:一个简单易用的监控与管理工具,用于管理大规模分布式的多智能体应用程序。 - **Gallery**:即将推出! -- **Server Management**:即将推出! - ## 启动 AgentScope Studio @@ -91,7 +90,7 @@ agentscope.studio.init( ) ``` -## About Workstation +## Workstation Workstation 是为零代码用户设计的,可以通过**拖拽**的方式构建多智能体应用程序。 @@ -117,6 +116,7 @@ AgentScope Studio中,拖过点击 workstation 图标进入 Workstation 界面 #### 构建应用程序 要构建应用程序,请按照以下步骤操作: + - **选择和拖动组件**:从侧边栏中选择您想要的组件,然后将其拖放到中央工作区。 - **连接节点**:大多数节点都有输入和输出点。单击一个组件的输出点,然后将其拖动到另一个组件的输入点,以创建消息流管道。这个过程允许不同的节点传递消息。 - **配置节点**:将节点拖放到工作区后,单击任何节点以填写其配置设置。可以自定义提示、参数和其他属性。 @@ -148,3 +148,59 @@ as_workflow config.json --compile ${YOUR_PYTHON_SCRIPT_NAME}.py - 必填字段验证:所有必填字段必须填充,以确保每个节点具有正确运行所需的参数。 - 一致的配置命名:Agent 节点使用的“Model config name”必须对应于 Model 节点中定义的“Config Name”。 - 节点嵌套正确:ReActAgent 等节点应仅包含工具节点。类似地,IfElsePipeline 等 Pipeline 节点应包含正确数量的元素(不超过 2 个),而 ForLoopPipeline、WhileLoopPipeline 和 MsgHub 应遵循一个元素的规则(必须是 SequentialPipeline 作为子节点)。 + +## Server Manager + +> 阅读本节内容需要先了解 AgentScope [分布式](#208-distribute-zh) 的基本概念及用法。 + +Server Manager 是一个用于监控和管理 AgentScope 智能体服务器进程(Server)以及大规模分布式应用的图形化界面。 + +### 注册 Server 进程 + +在初始化 `RpcAgentServerLauncher` 时传入 `studio_url` 参数即可实现注册。 + +```python +# import some packages +server = RpcAgentServerLauncher( + # ... + studio_url="http://studio_ip:studio_port", # connect to AgentScope Studio +) +``` + +更具体的注册方法请参考 [分布式](#208-distribute-zh) 中 *连接 AgentScope Studio* 部分。 + +### 管理 Server 进程 + +从 AgentScope Studio 主页面或侧边栏中的 Server Manager 按钮即可进入 Server Manager 页面。 +当前 Server Manager 页面由 Servers 列表, Agents 列表, Memory 列表三个部分构成。 + +

+agentscope-manager +

+ +#### Servers 列表 + +注册到 Studio 的智能体服务器进程(Server)都会显示在 Server Manager 页面的 Servers 列表中,列表中会不仅会显示每个 Server 的 `ID`, `Hostname`, `Port`, `Created Time`,还会显示每个 Server 的状态以及计算资源使用情况,包括 `Status`, `CPU Usage`, `Memory Usage`。 + +其中 `Status` 有以下几种: + - `running`:表示 Server 正在运行。 + - `dead`:表示 Server 已停止运行。 + - `unknown`:表示目前无法正常访问 Studio 服务。 + +只有在 `running` 状态的 Server 才会显示 CPU 和 Memory 的使用情况。用户可以点击 Servers 栏左边的刷新按钮来刷新 Servers 列表,同时也能够通过点击 Servers 栏右侧的删除按钮来一键删除所有已经处于 `dead` 状态的 Server。 + +Servers 列表每行的最后一列都提供了删除按钮,用于关闭并删除 Server,需要注意的是该操作是无法恢复的,因此需要谨慎使用。 + +#### Agents 列表 + +在点击任意处于 `running` 状态的 Server 行后,会在页面中展开 Agents 列表,该列表中会显示该 Server 下所有 Agent,列表中会显示每个 Agent 的 `ID`, `Name`, `Class`, `System Prompt` 以及 `Model`。 + +用户同样可以通过 Agents 列表栏左侧的刷新按钮来刷新 Agents 列表。并且用户也可以通过每个 Agent 行最右侧的删除按钮来删除该 Agent,并通过 Agents 列表栏右侧的删除按钮来批量删除 Server 中所有的 Agent。这里的删除操作都是不可恢复的,因此需要谨慎使用。 + +#### Memory 列表 + +在点击任意 Agent 行后,会在页面中展开 Memory 列表,该列表中会显示该 Agent 的 Memory 中的所有消息,每条消息会在左侧显示其 `Name` 和 `Role` 属性值,在点击后会在列表右侧显示该消息的具体内容。 +这里同样可以点击 Memory 列表栏左侧的刷新按钮来刷新当前的 Memory 列表。 + +[[回到顶部]](#209-gui-zh) diff --git a/src/agentscope/agents/agent.py b/src/agentscope/agents/agent.py index da7794f10..4b9399094 100644 --- a/src/agentscope/agents/agent.py +++ b/src/agentscope/agents/agent.py @@ -64,7 +64,7 @@ def __call__(cls, *args: tuple, **kwargs: dict) -> Any: ), max_timeout_seconds=to_dist.pop( # type: ignore[arg-type] "max_timeout_seconds", - 1800, + 7200, ), local_mode=to_dist.pop( # type: ignore[arg-type] "local_mode", @@ -100,9 +100,9 @@ def __init__( host: str = "localhost", port: int = None, max_pool_size: int = 8192, - max_timeout_seconds: int = 1800, + max_timeout_seconds: int = 7200, local_mode: bool = True, - lazy_launch: bool = True, + lazy_launch: bool = False, ): """Init the distributed configuration. @@ -113,12 +113,12 @@ def __init__( Port of the rpc agent server. max_pool_size (`int`, defaults to `8192`): Max number of task results that the server can accommodate. - max_timeout_seconds (`int`, defaults to `1800`): + max_timeout_seconds (`int`, defaults to `7200`): Timeout for task results. local_mode (`bool`, defaults to `True`): Whether the started rpc server only listens to local requests. - lazy_launch (`bool`, defaults to `True`): + lazy_launch (`bool`, defaults to `False`): Only launch the server when the agent is called. """ self["host"] = host @@ -424,9 +424,9 @@ def to_dist( host: str = "localhost", port: int = None, max_pool_size: int = 8192, - max_timeout_seconds: int = 1800, + max_timeout_seconds: int = 7200, local_mode: bool = True, - lazy_launch: bool = True, + lazy_launch: bool = False, launch_server: bool = None, ) -> AgentBase: """Convert current agent instance into a distributed version. @@ -441,7 +441,7 @@ def to_dist( The max number of agent reply messages that the started agent server can accommodate. Note that the oldest message will be deleted after exceeding the pool size. - max_timeout_seconds (`int`, defaults to `1800`): + max_timeout_seconds (`int`, defaults to `7200`): Only takes effect when `host` and `port` are not filled in. Maximum time for reply messages to be cached in the launched agent server. Note that expired messages will be deleted. @@ -449,7 +449,7 @@ def to_dist( Only takes effect when `host` and `port` are not filled in. Whether the started agent server only listens to local requests. - lazy_launch (`bool`, defaults to `True`): + lazy_launch (`bool`, defaults to `False`): Only takes effect when `host` and `port` are not filled in. If `True`, launch the agent server when the agent is called, otherwise, launch the agent server immediately. diff --git a/src/agentscope/agents/rpc_agent.py b/src/agentscope/agents/rpc_agent.py index 0b3cf245e..fb6a4eba2 100644 --- a/src/agentscope/agents/rpc_agent.py +++ b/src/agentscope/agents/rpc_agent.py @@ -24,9 +24,9 @@ def __init__( agent_class: Type[AgentBase] = None, agent_configs: Optional[dict] = None, max_pool_size: int = 8192, - max_timeout_seconds: int = 1800, + max_timeout_seconds: int = 7200, local_mode: bool = True, - lazy_launch: bool = True, + lazy_launch: bool = False, agent_id: str = None, connect_existing: bool = False, ) -> None: @@ -44,12 +44,12 @@ def __init__( init configs of the agent, generated by `_AgentMeta`. max_pool_size (`int`, defaults to `8192`): Max number of task results that the server can accommodate. - max_timeout_seconds (`int`, defaults to `1800`): + max_timeout_seconds (`int`, defaults to `7200`): Timeout for task results. local_mode (`bool`, defaults to `True`): Whether the started gRPC server only listens to local requests. - lazy_launch (`bool`, defaults to `True`): + lazy_launch (`bool`, defaults to `False`): Only launch the server when the agent is called. agent_id (`str`, defaults to `None`): The agent id of this instance. If `None`, it will @@ -69,8 +69,18 @@ def __init__( if agent_id is not None: self._agent_id = agent_id # if host and port are not provided, launch server locally - launch_server = port is None + if self.port is None and _studio_client.active: + server = _studio_client.alloc_server() + if "host" in server: + if RpcAgentClient( + host=server["host"], + port=server["port"], + ).is_alive(): + self.host = server["host"] + self.port = server["port"] + launch_server = self.port is None if launch_server: + # check studio first self.host = "localhost" studio_url = None if _studio_client.active: diff --git a/src/agentscope/server/launcher.py b/src/agentscope/server/launcher.py index 7303f0b12..f65ced242 100644 --- a/src/agentscope/server/launcher.py +++ b/src/agentscope/server/launcher.py @@ -1,13 +1,14 @@ # -*- coding: utf-8 -*- """ Server of distributed agent""" import os +import sys import asyncio import signal import argparse import time +import importlib from multiprocessing import Process, Event, Pipe from multiprocessing.synchronize import Event as EventClass -from typing import Type from concurrent import futures from loguru import logger @@ -45,6 +46,7 @@ def _setup_agent_server( max_timeout_seconds: int = 7200, studio_url: str = None, custom_agent_classes: list = None, + agent_dir: str = None, ) -> None: """Setup agent server. @@ -76,6 +78,9 @@ def _setup_agent_server( custom_agent_classes (`list`, defaults to `None`): A list of customized agent classes that are not in `agentscope.agents`. + agent_dir (`str`, defaults to `None`): + The abs path to the directory containing customized agent python + files. """ asyncio.run( _setup_agent_server_async( @@ -91,11 +96,12 @@ def _setup_agent_server( max_timeout_seconds=max_timeout_seconds, studio_url=studio_url, custom_agent_classes=custom_agent_classes, + agent_dir=agent_dir, ), ) -async def _setup_agent_server_async( +async def _setup_agent_server_async( # pylint: disable=R0912 host: str, port: int, server_id: str, @@ -108,6 +114,7 @@ async def _setup_agent_server_async( max_timeout_seconds: int = 7200, studio_url: str = None, custom_agent_classes: list = None, + agent_dir: str = None, ) -> None: """Setup agent server in an async way. @@ -140,6 +147,9 @@ async def _setup_agent_server_async( custom_agent_classes (`list`, defaults to `None`): A list of customized agent classes that are not in `agentscope.agents`. + agent_dir (`str`, defaults to `None`): + The abs path to the directory containing customized agent python + files. """ if init_settings is not None: @@ -154,10 +164,13 @@ async def _setup_agent_server_async( max_pool_size=max_pool_size, max_timeout_seconds=max_timeout_seconds, ) + if custom_agent_classes is None: + custom_agent_classes = [] + if agent_dir is not None: + custom_agent_classes.extend(load_agents_from_dir(agent_dir)) # update agent registry - if custom_agent_classes is not None: - for agent_class in custom_agent_classes: - AgentBase.register_agent_class(agent_class=agent_class) + for agent_class in custom_agent_classes: + AgentBase.register_agent_class(agent_class=agent_class) async def shutdown_signal_handler() -> None: logger.info( @@ -214,6 +227,68 @@ async def shutdown_signal_handler() -> None: ) +def load_agents_from_file(agent_file: str) -> list: + """Load AgentBase sub classes from a python file. + + Args: + agent_file (str): the path to the python file. + + Returns: + list: a list of agent classes + """ + module_path = agent_file.replace(os.sep, ".") + module_name = module_path[:-3] + spec = importlib.util.spec_from_file_location( + module_name, + agent_file, + ) + module = importlib.util.module_from_spec(spec) # type: ignore[arg-type] + spec.loader.exec_module(module) + custom_agent_classes = [] + for attr_name in dir(module): + attr = getattr(module, attr_name) + if ( + isinstance(attr, type) + and issubclass(attr, AgentBase) + and attr is not AgentBase + ): + custom_agent_classes.append(attr) + return custom_agent_classes + + +def load_agents_from_dir(agent_dir: str) -> list: + """Load customized agents from a directory. + + Args: + agent_dir (`str`): a directory contains customized agent python files. + + Returns: + list: a list of customized agent classes + """ + if agent_dir is None: + return [] + original_sys_path = sys.path.copy() + abs_agent_dir = os.path.abspath(agent_dir) + sys.path.insert(0, abs_agent_dir) + try: + custom_agent_classes = [] + for root, _, files in os.walk(agent_dir): + for file in files: + if file.endswith(".py"): + try: + module_path = os.path.join(root, file) + custom_agent_classes.extend( + load_agents_from_file(module_path), + ) + except Exception as e: + logger.error( + f"Failed to load agent class from [{file}]: {e}", + ) + return custom_agent_classes + finally: + sys.path = original_sys_path + + class RpcAgentServerLauncher: """The launcher of AgentServer.""" @@ -224,12 +299,10 @@ def __init__( max_pool_size: int = 8192, max_timeout_seconds: int = 7200, local_mode: bool = False, + agent_dir: str = None, custom_agent_classes: list = None, server_id: str = None, studio_url: str = None, - agent_class: Type[AgentBase] = None, - agent_args: tuple = (), - agent_kwargs: dict = None, ) -> None: """Init a launcher of agent server. @@ -248,6 +321,8 @@ def __init__( local_mode (`bool`, defaults to `False`): If `True`, only listen to requests from "localhost", otherwise, listen to requests from all hosts. + agent_dir (`str`, defaults to `None`): + The directory containing customized agent python files. custom_agent_classes (`list`, defaults to `None`): A list of customized agent classes that are not in `agentscope.agents`. @@ -256,12 +331,6 @@ def __init__( will be generated. studio_url (`Optional[str]`, defaults to `None`): The url of the agentscope studio. - agent_class (`Type[AgentBase]`, deprecated): - The AgentBase subclass encapsulated by this wrapper. - agent_args (`tuple`, deprecated): The args tuple used to - initialize the agent_class. - agent_kwargs (`dict`, deprecated): The args dict used to - initialize the agent_class. """ self.host = host self.port = check_port(port) @@ -272,21 +341,15 @@ def __init__( self.parent_con = None self.custom_agent_classes = custom_agent_classes self.stop_event = Event() + self.agent_dir = ( + os.path.abspath(agent_dir) if agent_dir is not None else None + ) self.server_id = ( RpcAgentServerLauncher.generate_server_id(self.host, self.port) if server_id is None else server_id ) self.studio_url = studio_url - if ( - agent_class is not None - or len(agent_args) > 0 - or agent_kwargs is not None - ): - logger.warning( - "`agent_class`, `agent_args` and `agent_kwargs` is deprecated" - " in `RpcAgentServerLauncher`", - ) @classmethod def generate_server_id(cls, host: str, port: int) -> str: @@ -308,6 +371,7 @@ def _launch_in_main(self) -> None: max_timeout_seconds=self.max_timeout_seconds, local_mode=self.local_mode, custom_agent_classes=self.custom_agent_classes, + agent_dir=self.agent_dir, studio_url=self.studio_url, ), ) @@ -333,6 +397,7 @@ def _launch_in_sub(self) -> None: "local_mode": self.local_mode, "studio_url": self.studio_url, "custom_agent_classes": self.custom_agent_classes, + "agent_dir": self.agent_dir, }, ) server_process.start() @@ -393,15 +458,20 @@ def as_server() -> None: * `--local-mode`: whether the started agent server only listens to local requests. * `--model-config-path`: the path to the model config json file + * `--agent-dir`: the directory containing your customized agent python + files + * `--studio-url`: the url of agentscope studio In most cases, you only need to specify the `--host`, `--port` and - `--model-config-path`. + `--model-config-path`, and `--agent-dir`. .. code-block:: shell - as_server --host localhost --port 12345 --model-config-path config.json - - """ # noqa + as_server --host localhost \ + --port 12345 \ + --model-config-path config.json \ + --agent-dir ./my_agents + """ parser = argparse.ArgumentParser() parser.add_argument( "--host", @@ -461,6 +531,12 @@ def as_server() -> None: default=None, help="the url of agentscope studio", ) + parser.add_argument( + "--agent-dir", + type=str, + default=None, + help="the directory containing customized agent python files", + ) parser.add_argument( "--no-log", action="store_true", diff --git a/src/agentscope/server/servicer.py b/src/agentscope/server/servicer.py index 94764da8e..e047b8fc4 100644 --- a/src/agentscope/server/servicer.py +++ b/src/agentscope/server/servicer.py @@ -54,7 +54,11 @@ def _register_server_to_studio( url = f"{studio_url}/api/servers/register" resp = requests.post( url, - json={"server_id": server_id, "host": host, "port": port}, + json={ + "server_id": server_id, + "host": host, + "port": port, + }, timeout=10, # todo: configurable timeout ) if resp.status_code != 200: @@ -84,7 +88,7 @@ def __init__( server_id: str = None, studio_url: str = None, max_pool_size: int = 8192, - max_timeout_seconds: int = 1800, + max_timeout_seconds: int = 7200, ): """Init the AgentServerServicer. @@ -102,7 +106,7 @@ def __init__( The max number of agent reply messages that the server can accommodate. Note that the oldest message will be deleted after exceeding the pool size. - max_timeout_seconds (`int`, defaults to `1800`): + max_timeout_seconds (`int`, defaults to `7200`): Maximum time for reply messages to be cached in the server. Note that expired messages will be deleted. """ @@ -373,6 +377,7 @@ def get_server_info( process = psutil.Process(self.pid) status["cpu"] = process.cpu_percent(interval=1) status["mem"] = process.memory_info().rss / (1024**2) + status["size"] = len(self.agent_pool) return agent_pb2.GeneralResponse(ok=True, message=json.dumps(status)) def set_model_configs( diff --git a/src/agentscope/studio/_app.py b/src/agentscope/studio/_app.py index dd8483e9a..1b4696db1 100644 --- a/src/agentscope/studio/_app.py +++ b/src/agentscope/studio/_app.py @@ -10,6 +10,7 @@ from datetime import datetime from typing import Tuple, Union, Any, Optional from pathlib import Path +from random import choice from flask import ( Flask, @@ -33,6 +34,7 @@ ) from ..rpc.rpc_agent_client import RpcAgentClient + _app = Flask(__name__) # Set the cache directory @@ -260,11 +262,7 @@ def _register_server() -> Response: abort(400, f"run_id [{server_id}] already exists") _db.session.add( - _ServerTable( - id=server_id, - host=host, - port=port, - ), + _ServerTable(id=server_id, host=host, port=port), ) _db.session.commit() @@ -307,6 +305,7 @@ def _get_server_status(server_id: str) -> Response: "status": "running", "cpu": status["cpu"], "mem": status["mem"], + "size": status["size"], }, ) @@ -365,6 +364,23 @@ def _agent_memory() -> Response: return jsonify(mem) +@_app.route("/api/servers/alloc", methods=["GET"]) +def _alloc_server() -> Response: + # TODO: check the server is still running + # TODO: support to alloc multiple servers in one call + # TODO: use hints to decide which server to allocate + # TODO: allocate based on server's cpu and memory usage + # currently random select a server + servers = _ServerTable.query.all() + server = choice(servers) + return jsonify( + { + "host": server.host, + "port": server.port, + }, + ) + + @_app.route("/api/messages/push", methods=["POST"]) def _push_message() -> Response: """Receive a message from the agentscope application, and display it on diff --git a/src/agentscope/studio/_client.py b/src/agentscope/studio/_client.py index a00617b31..e999b76b6 100644 --- a/src/agentscope/studio/_client.py +++ b/src/agentscope/studio/_client.py @@ -209,6 +209,26 @@ def get_run_detail_page_url(self) -> str: """Get the URL of the run detail page.""" return f"{self.studio_url}/?run_id={self.runtime_id}" + def alloc_server(self) -> dict: + """Allocate a list of servers. + + Returns: + `dict`: A dict with host and port field. + """ + send_url = f"{self.studio_url}/api/servers/alloc" + try: + response = requests.get( + send_url, + timeout=10, + ) + except Exception as e: + logger.error(f"Fail to allocate servers: {e}") + return {} + if response.status_code != 200: + logger.error(f"Fail to allocate servers: {response.text}") + return {} + return response.json() + def flush(self) -> None: """Flush the client.""" self.studio_url = None diff --git a/src/agentscope/studio/static/js/server.js b/src/agentscope/studio/static/js/server.js index 65c599f62..bdf1ede12 100644 --- a/src/agentscope/studio/static/js/server.js +++ b/src/agentscope/studio/static/js/server.js @@ -93,6 +93,7 @@ function getServerStatus(cell, formatterParams, onRendered) { status: data.status, cpu: data.cpu, mem: data.mem, + size: data.size, }); } else { cell.getElement().innerHTML = @@ -176,6 +177,11 @@ function initServerTable(data) { vertAlign: "middle", formatter: getServerStatus, }, + { + title: "Agent Number", + field: "size", + vertAlign: "middle", + }, { title: "CPU Usage", field: "cpu", @@ -441,6 +447,7 @@ function initAgentMemoryTable(agentId, memoryData) { minimap: { enabled: false, }, + wordWrap: "on", scrollBeyondLastLine: false, readOnly: true, } @@ -475,6 +482,8 @@ function initializeServerPage() { }; let deleteAllAgentBtn = document.getElementById("delete-all-agent-btn"); deleteAllAgentBtn.onclick = deleteAllAgent; + let memoryflushBtn = document.getElementById("flush-memory-btn"); + memoryflushBtn.onclick = flushAgentMemoryTable; window.addEventListener("resize", () => { if (messageEditor) { messageEditor.layout(); diff --git a/src/agentscope/utils/tools.py b/src/agentscope/utils/tools.py index 18e3a0392..fa7584051 100644 --- a/src/agentscope/utils/tools.py +++ b/src/agentscope/utils/tools.py @@ -84,7 +84,11 @@ def check_port(port: Optional[int] = None) -> int: new_port = find_available_port() return new_port with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - if s.connect_ex(("localhost", port)) == 0: + try: + code = s.connect_ex(("localhost", port)) + if code != 0: + raise RuntimeError("Port is occupied.") + except Exception: new_port = find_available_port() return new_port return port diff --git a/tests/custom/custom_agent.py b/tests/custom/custom_agent.py new file mode 100644 index 000000000..061366deb --- /dev/null +++ b/tests/custom/custom_agent.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +"""A python module contains AgentBase subclasses. +For testing the agent dir loading functionality. +""" +from utils import speak # pylint: disable=E0611 +from agentscope.agents import AgentBase +from agentscope.message import Msg + + +class CustomAgent(AgentBase): + """A customized agent class which import a function from another file""" + + def reply(self, x: Msg = None) -> Msg: + return Msg(name=self.name, role="assistant", content=speak()) diff --git a/tests/custom/utils.py b/tests/custom/utils.py new file mode 100644 index 000000000..6dfddb817 --- /dev/null +++ b/tests/custom/utils.py @@ -0,0 +1,7 @@ +# -*- coding: utf-8 -*- +"""A python file without agent classes""" + + +def speak() -> str: + """A function in a separate file.""" + return "Hello world" diff --git a/tests/rpc_agent_test.py b/tests/rpc_agent_test.py index bbce7c95d..ca8ba56da 100644 --- a/tests/rpc_agent_test.py +++ b/tests/rpc_agent_test.py @@ -7,6 +7,7 @@ import time import shutil from typing import Optional, Union, Sequence +from unittest.mock import MagicMock, PropertyMock, patch from loguru import logger @@ -20,6 +21,7 @@ from agentscope.msghub import msghub from agentscope.pipelines import sequentialpipeline from agentscope.rpc.rpc_agent_client import RpcAgentClient +from agentscope.agents import RpcAgent from agentscope.exception import AgentCallError, QuotaExceededError @@ -290,19 +292,13 @@ def test_multi_rpc_agent(self) -> None: """test setup multi rpc agent""" agent_a = DemoRpcAgentAdd( name="a", - ).to_dist( - lazy_launch=False, - ) + ).to_dist() agent_b = DemoRpcAgentAdd( name="b", - ).to_dist( - lazy_launch=False, - ) + ).to_dist() agent_c = DemoRpcAgentAdd( name="c", - ).to_dist( - lazy_launch=False, - ) + ).to_dist() # test sequential msg = Msg( @@ -346,9 +342,7 @@ def test_mix_rpc_agent_and_local_agent(self) -> None: """test to use local and rpc agent simultaneously""" agent_a = DemoRpcAgentAdd( name="a", - ).to_dist( - lazy_launch=False, - ) + ).to_dist() # local agent b agent_b = DemoLocalAgentAdd( name="b", @@ -356,9 +350,7 @@ def test_mix_rpc_agent_and_local_agent(self) -> None: # rpc agent c agent_c = DemoRpcAgentAdd( # pylint: disable=E1123 name="c", - to_dist=DistConf( - lazy_launch=False, - ), + to_dist=True, ) msg = Msg( name="System", @@ -412,7 +404,7 @@ def test_multi_agent_in_same_server(self) -> None: """test agent server with multi-agent""" launcher = RpcAgentServerLauncher( host="127.0.0.1", - port=12010, + port=-1, local_mode=False, custom_agent_classes=[DemoRpcAgentWithMemory], ) @@ -505,7 +497,7 @@ def test_clone_instances(self) -> None: """Test the clone_instances method of RpcAgent""" agent = DemoRpcAgentWithMemory( name="a", - ).to_dist() + ).to_dist(lazy_launch=True) # lazy launch will not init client self.assertIsNone(agent.client) # generate two agents (the first is it self) @@ -735,3 +727,78 @@ def test_agent_server_management_funcs(self) -> None: # time.sleep(1) # self.assertFalse(client.is_alive()) launcher.shutdown() + + @patch("agentscope.studio._client.StudioClient.alloc_server") + @patch( + "agentscope.studio._client.StudioClient.active", + new_callable=PropertyMock, + ) + def test_server_auto_alloc( + self, + mock_active: PropertyMock, + mock_alloc: MagicMock, + ) -> None: + """Test the auto allocation of server""" + mock_active.return_value = True + host = "localhost" + launcher = RpcAgentServerLauncher( + # choose port automatically + host=host, + local_mode=False, + custom_agent_classes=[DemoRpcAgentWithMemory], + agent_dir=os.path.abspath( + os.path.join( + os.path.abspath(os.path.dirname(__file__)), + "custom", + ), + ), + ) + launcher.launch() + port = launcher.port + mock_alloc.return_value = {"host": host, "port": port} + + # test auto allocation + a1 = DemoRpcAgentWithMemory(name="Auto1", to_dist=True) + a2 = DemoRpcAgentWithMemory(name="Auto2").to_dist() + self.assertEqual(a1.host, host) + self.assertEqual(a1.port, port) + self.assertEqual(a2.host, host) + self.assertEqual(a2.port, port) + client = RpcAgentClient(host=host, port=port) + al = client.get_agent_list() + self.assertEqual(len(al), 2) + + # test not alive server + mock_alloc.return_value = {"host": "not_exist", "port": 1234} + a3 = DemoRpcAgentWithMemory(name="Auto3", to_dist=True) + self.assertEqual(a3.host, "localhost") + nclient = RpcAgentClient(host=a3.host, port=a3.port) + nal = nclient.get_agent_list() + self.assertEqual(len(nal), 1) + + # test agent dir loading + custom_agent_id = "custom_test" + self.assertTrue( + client.create_agent( + agent_configs={ + "args": (), + "kwargs": {"name": "custom"}, + "class_name": "CustomAgent", + }, + agent_id=custom_agent_id, + ), + ) + ra = RpcAgent( + name="custom", + host=launcher.host, + port=launcher.port, + agent_id=custom_agent_id, + connect_existing=True, + ) + resp = ra(Msg(name="sys", role="user", content="Hello")) + self.assertEqual(resp.name, "custom") + self.assertEqual(resp.content, "Hello world") + al = client.get_agent_list() + self.assertEqual(len(al), 3) + + launcher.shutdown()