diff --git a/plugins/wasm-go/extensions/ai-statistics/README.md b/plugins/wasm-go/extensions/ai-statistics/README.md index 211201be26..5409f2c7c4 100644 --- a/plugins/wasm-go/extensions/ai-statistics/README.md +++ b/plugins/wasm-go/extensions/ai-statistics/README.md @@ -1,69 +1,173 @@ -# 介绍 -提供AI可观测基础能力,其后需接ai-proxy插件,如果不接ai-proxy插件的话,则只支持openai协议。 +--- +title: AI可观测 +keywords: [higress, AI, observability] +description: AI可观测配置参考 +--- -# 配置说明 +## 介绍 +提供AI可观测基础能力,包括 metric, log, trace,其后需接ai-proxy插件,如果不接ai-proxy插件的话,则需要用户进行相应配置才可生效。 + +## 配置说明 +插件默认请求符合openai协议格式,并提供了以下基础可观测值,用户无需特殊配置: + +- metric:提供了输入token、输出token、首个token的rt(流式请求)、请求总rt等指标,支持在网关、路由、服务、模型四个维度上进行观测 +- log:提供了 input_token, output_token, model, llm_service_duration, llm_first_token_duration 等字段 + +用户还可以通过配置的方式对可观测的值进行扩展: | 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | |----------------|-------|------|-----|------------------------| -| `enable` | bool | 必填 | - | 是否开启ai统计功能 | -| `tracing_span` | array | 非必填 | - | 自定义tracing span tag 配置 | +| `attributes` | []Attribute | 非必填 | - | 用户希望记录在log/span中的信息 | + +Attribute 配置说明: -## tracing_span 配置说明 | 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | |----------------|-------|-----|-----|------------------------| -| `key` | string | 必填 | - | tracing tag 名称 | -| `value_source` | string | 必填 | - | tag 取值来源 | -| `value` | string | 必填 | - | tag 取值 key value/path | +| `key` | string | 必填 | - | attrribute 名称 | +| `value_source` | string | 必填 | - | attrribute 取值来源,可选值为 `fixed_value`, `request_header`, `request_body`, `response_header`, `response_body`, `response_streaming_body` | +| `value` | string | 必填 | - | attrribute 取值 key value/path | +| `rule` | string | 非必填 | - | 从流式响应中提取 attrribute 的规则,可选值为 `first`, `replace`, `append`| +| `apply_to_log` | bool | 非必填 | false | 是否将提取的信息记录在日志中 | +| `apply_to_span` | bool | 非必填 | false | 是否将提取的信息记录在链路追踪span中 | -value_source为 tag 值的取值来源,可选配置值有 4 个: -- property : tag 值通过proxywasm.GetProperty()方法获取,value配置GetProperty()方法要提取的key名 -- requeset_header : tag 值通过http请求头获取,value配置为header key -- request_body :tag 值通过请求body获取,value配置格式为 gjson的 GJSON PATH 语法 -- response_header : tag 值通过http响应头获取,value配置为header key +`value_source` 的各种取值含义如下: + +- `fixed_value`:固定值 +- `requeset_header` : attrribute 值通过 http 请求头获取,value 配置为 header key +- `request_body` :attrribute 值通过请求 body 获取,value 配置格式为 gjson 的 jsonpath +- `response_header` :attrribute 值通过 http 响应头获取,value 配置为header key +- `response_body` :attrribute 值通过响应 body 获取,value 配置格式为 gjson 的 jsonpath +- `response_streaming_body` :attrribute 值通过流式响应 body 获取,value 配置格式为 gjson 的 jsonpath + + +当 `value_source` 为 `response_streaming_body` 时,应当配置 `rule`,用于指定如何从流式body中获取指定值,取值含义如下: + +- `first`:多个chunk中取第一个有效chunk的值 +- `replace`:多个chunk中取最后一个有效chunk的值 +- `append`:拼接多个有效chunk中的值,可用于获取回答内容 + +## 配置示例 +如果希望在网关访问日志中记录ai-statistic相关的统计值,需要修改log_format,在原log_format基础上添加一个新字段,示例如下: -举例如下: ```yaml -tracing_label: -- key: "session_id" - value_source: "requeset_header" - value: "session_id" -- key: "user_content" - value_source: "request_body" - value: "input.messages.1.content" +'{"ai_log":"%FILTER_STATE(wasm.ai_log:PLAIN)%"}' ``` -开启后 metrics 示例: +### 空配置 +#### 监控 ``` -route_upstream_model_input_token{ai_route="openai",ai_cluster="qwen",ai_model="qwen-max"} 21 -route_upstream_model_output_token{ai_route="openai",ai_cluster="qwen",ai_model="qwen-max"} 17 +route_upstream_model_metric_input_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 10 +route_upstream_model_metric_llm_duration_count{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 1 +route_upstream_model_metric_llm_first_token_duration{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 309 +route_upstream_model_metric_llm_service_duration{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 1955 +route_upstream_model_metric_output_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 69 ``` -日志示例: +#### 日志 +```json +{ + "ai_log":"{\"model\":\"qwen-turbo\",\"input_token\":\"10\",\"output_token\":\"69\",\"llm_first_token_duration\":\"309\",\"llm_service_duration\":\"1955\"}" +} +``` + +#### 链路追踪 +配置为空时,不会在span中添加额外的attribute +### 从非openai协议提取token使用信息 +在ai-proxy中设置协议为original时,以百炼为例,可作如下配置指定如何提取model, input_token, output_token + +```yaml +attributes: + - key: model + value_source: response_body + value: usage.models.0.model_id + apply_to_log: true + apply_to_span: false + - key: input_token + value_source: response_body + value: usage.models.0.input_tokens + apply_to_log: true + apply_to_span: false + - key: output_token + value_source: response_body + value: usage.models.0.output_tokens + apply_to_log: true + apply_to_span: false +``` +#### 监控 +``` +route_upstream_model_metric_input_token{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 343 +route_upstream_model_metric_output_token{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 153 +route_upstream_model_metric_llm_service_duration{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 3725 +route_upstream_model_metric_llm_duration_count{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 1 +``` + +#### 日志 +此配置下日志效果如下: ```json { - "model": "qwen-max", - "input_token": "21", - "output_token": "17", - "authority": "dashscope.aliyuncs.com", - "bytes_received": "336", - "bytes_sent": "1675", - "duration": "1590", - "istio_policy_status": "-", - "method": "POST", - "path": "/v1/chat/completions", - "protocol": "HTTP/1.1", - "request_id": "5895f5a9-e4e3-425b-98db-6c6a926195b7", - "requested_server_name": "-", - "response_code": "200", - "response_flags": "-", - "route_name": "openai", - "start_time": "2024-06-18T09:37:14.078Z", - "trace_id": "-", - "upstream_cluster": "qwen", - "upstream_service_time": "496", - "upstream_transport_failure_reason": "-", - "user_agent": "PostmanRuntime/7.37.3", - "x_forwarded_for": "-" + "ai_log": "{\"model\":\"qwen-max\",\"input_token\":\"343\",\"output_token\":\"153\",\"llm_service_duration\":\"19110\"}" } +``` + +#### 链路追踪 +链路追踪的 span 中可以看到 model, input_token, output_token 三个额外的 attribute + +### 配合认证鉴权记录consumer +举例如下: +```yaml +attributes: + - key: consumer # 配合认证鉴权记录consumer + value_source: request_header + value: x-mse-consumer + apply_to_log: true +``` + +### 记录问题与回答 +```yaml +attributes: + - key: question # 记录问题 + value_source: request_body + value: messages.@reverse.0.content + apply_to_log: true + - key: answer # 在流式响应中提取大模型的回答 + value_source: response_streaming_body + value: choices.0.delta.content + rule: append + apply_to_log: true + - key: answer # 在非流式响应中提取大模型的回答 + value_source: response_body + value: choices.0.message.content + apply_to_log: true +``` + +## 进阶 +配合阿里云SLS数据加工,可以将ai相关的字段进行提取加工,例如原始日志为: + +``` +ai_log:{"question":"用python计算2的3次方","answer":"你可以使用 Python 的乘方运算符 `**` 来计算一个数的次方。计算2的3次方,即2乘以自己2次,可以用以下代码表示:\n\n```python\nresult = 2 ** 3\nprint(result)\n```\n\n运行这段代码,你会得到输出结果为8,因为2乘以自己两次等于8。","model":"qwen-max","input_token":"16","output_token":"76","llm_service_duration":"5913"} +``` + +使用如下数据加工脚本,可以提取出question和answer: + +``` +e_regex("ai_log", grok("%{EXTRACTJSON}")) +e_set("question", json_select(v("json"), "question", default="-")) +e_set("answer", json_select(v("json"), "answer", default="-")) +``` + +提取后,SLS中会添加question和answer两个字段,示例如下: + +``` +ai_log:{"question":"用python计算2的3次方","answer":"你可以使用 Python 的乘方运算符 `**` 来计算一个数的次方。计算2的3次方,即2乘以自己2次,可以用以下代码表示:\n\n```python\nresult = 2 ** 3\nprint(result)\n```\n\n运行这段代码,你会得到输出结果为8,因为2乘以自己两次等于8。","model":"qwen-max","input_token":"16","output_token":"76","llm_service_duration":"5913"} + +question:用python计算2的3次方 + +answer:你可以使用 Python 的乘方运算符 `**` 来计算一个数的次方。计算2的3次方,即2乘以自己2次,可以用以下代码表示: + +result = 2 ** 3 +print(result) + +运行这段代码,你会得到输出结果为8,因为2乘以自己两次等于8。 + ``` \ No newline at end of file diff --git a/plugins/wasm-go/extensions/ai-statistics/README_EN.md b/plugins/wasm-go/extensions/ai-statistics/README_EN.md new file mode 100644 index 0000000000..1eccffcacc --- /dev/null +++ b/plugins/wasm-go/extensions/ai-statistics/README_EN.md @@ -0,0 +1,140 @@ +--- +title: AI Statistics +keywords: [higress, AI, observability] +description: AI Statistics plugin configuration reference +--- + +## Introduction +Provides basic AI observability capabilities, including metric, log, and trace. The ai-proxy plug-in needs to be connected afterwards. If the ai-proxy plug-in is not connected, the user needs to configure it accordingly to take effect. + +## Configuration instructions +The default request of the plug-in conforms to the openai protocol format and provides the following basic observable values. Users do not need special configuration: + +- metric: It provides indicators such as input token, output token, rt of the first token (streaming request), total request rt, etc., and supports observation in the four dimensions of gateway, routing, service, and model. +- log: Provides input_token, output_token, model, llm_service_duration, llm_first_token_duration and other fields + +Users can also expand observable values ​​through configuration: + +| Name | Type | Required | Default | Description | +|----------------|-------|------|-----|------------------------| +| `attributes` | []Attribute | required | - | Information that the user wants to record in log/span | + +Attribute Configuration instructions: + +| Name | Type | Required | Default | Description | +|----------------|-------|-----|-----|------------------------| +| `key` | string | required | - | attrribute key | +| `value_source` | string | required | - | attrribute value source, optional values ​​are `fixed_value`, `request_header`, `request_body`, `response_header`, `response_body`, `response_streaming_body` | +| `value` | string | required | - | how to get attrribute value | +| `rule` | string | optional | - | Rule to extract attribute from streaming response, optional values ​​are `first`, `replace`, `append`| +| `apply_to_log` | bool | optional | false | Whether to record the extracted information in the log | +| `apply_to_span` | bool | optional | false | Whether to record the extracted information in the link tracking span | + +The meanings of various values for `value_source` ​​are as follows: + +- `fixed_value`: fixed value +- `requeset_header`: The attrribute is obtained through the http request header +- `request_body`: The attrribute is obtained through the http request body +- `response_header`: The attrribute is obtained through the http response header +- `response_body`: The attrribute is obtained through the http response body +- `response_streaming_body`: The attrribute is obtained through the http streaming response body + + +When `value_source` is `response_streaming_body`, `rule` should be configured to specify how to obtain the specified value from the streaming body. The meaning of the value is as follows: + +- `first`: extract value from the first valid chunk +- `replace`: extract value from the last valid chunk +- `append`: join value pieces from all valid chunks + +## Configuration example +If you want to record ai-statistic related statistical values ​​​​in the gateway access log, you need to modify log_format and add a new field based on the original log_format. The example is as follows: + +```yaml +'{"ai_log":"%FILTER_STATE(wasm.ai_log:PLAIN)%"}' +``` + +### Empty +#### Metric +``` +route_upstream_model_metric_input_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 10 +route_upstream_model_metric_llm_duration_count{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 1 +route_upstream_model_metric_llm_first_token_duration{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 309 +route_upstream_model_metric_llm_service_duration{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 1955 +route_upstream_model_metric_output_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 69 +``` + +#### Log +```json +{ + "ai_log":"{\"model\":\"qwen-turbo\",\"input_token\":\"10\",\"output_token\":\"69\",\"llm_first_token_duration\":\"309\",\"llm_service_duration\":\"1955\"}" +} +``` + +#### Trace +When the configuration is empty, no additional attributes will be added to the span. + +### Extract token usage information from non-openai protocols +When setting the protocol to original in ai-proxy, taking Alibaba Cloud Bailian as an example, you can make the following configuration to specify how to extract `model`, `input_token`, `output_token` + +```yaml +attributes: + - key: model + value_source: response_body + value: usage.models.0.model_id + apply_to_log: true + apply_to_span: false + - key: input_token + value_source: response_body + value: usage.models.0.input_tokens + apply_to_log: true + apply_to_span: false + - key: output_token + value_source: response_body + value: usage.models.0.output_tokens + apply_to_log: true + apply_to_span: false +``` +#### Metric +``` +route_upstream_model_metric_input_token{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 343 +route_upstream_model_metric_output_token{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 153 +route_upstream_model_metric_llm_service_duration{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 3725 +route_upstream_model_metric_llm_duration_count{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 1 +``` + +#### Log +```json +{ + "ai_log": "{\"model\":\"qwen-max\",\"input_token\":\"343\",\"output_token\":\"153\",\"llm_service_duration\":\"19110\"}" +} +``` + +#### Trace +Three additional attributes `model`, `input_token`, and `output_token` can be seen in the trace spans. + +### Cooperate with authentication and authentication record consumer +```yaml +attributes: + - key: consumer + value_source: request_header + value: x-mse-consumer + apply_to_log: true +``` + +### Record questions and answers +```yaml +attributes: + - key: question + value_source: request_body + value: messages.@reverse.0.content + apply_to_log: true + - key: answer + value_source: response_streaming_body + value: choices.0.delta.content + rule: append + apply_to_log: true + - key: answer + value_source: response_body + value: choices.0.message.content + apply_to_log: true +``` \ No newline at end of file diff --git a/plugins/wasm-go/extensions/ai-statistics/go.mod b/plugins/wasm-go/extensions/ai-statistics/go.mod index 8d0f87c062..a5c87ef617 100644 --- a/plugins/wasm-go/extensions/ai-statistics/go.mod +++ b/plugins/wasm-go/extensions/ai-statistics/go.mod @@ -10,8 +10,6 @@ require ( github.com/tidwall/gjson v1.14.3 ) -require github.com/tetratelabs/wazero v1.7.1 // indirect - require ( github.com/google/uuid v1.3.0 // indirect github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 // indirect @@ -19,5 +17,4 @@ require ( github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect github.com/tidwall/resp v0.1.1 // indirect - github.com/wasilibs/go-re2 v1.5.3 ) diff --git a/plugins/wasm-go/extensions/ai-statistics/go.sum b/plugins/wasm-go/extensions/ai-statistics/go.sum index b0732f4e65..f473e12b2d 100644 --- a/plugins/wasm-go/extensions/ai-statistics/go.sum +++ b/plugins/wasm-go/extensions/ai-statistics/go.sum @@ -1,19 +1,14 @@ -github.com/alibaba/higress/plugins/wasm-go v1.3.6-0.20240522012622-fc6a6aad8906 h1:RhEmB+ApLKsClZD7joTC4ifmsVgOVz4pFLdPR3xhNaE= -github.com/alibaba/higress/plugins/wasm-go v1.3.6-0.20240522012622-fc6a6aad8906/go.mod h1:10jQXKsYFUF7djs+Oy7t82f4dbie9pISfP9FJwpPLuk= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 h1:IHDghbGQ2DTIXHBHxWfqCYQW1fKjyJ/I7W1pMyUDeEA= github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520/go.mod h1:Nz8ORLaFiLWotg6GeKlJMhv8cci8mM43uEnLA5t8iew= -github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc h1:t2AT8zb6N/59Y78lyRWedVoVWHNRSCBh0oWCC+bluTQ= -github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo= +github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240711023527-ba358c48772f h1:ZIiIBRvIw62gA5MJhuwp1+2wWbqL9IGElQ499rUsYYg= github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240711023527-ba358c48772f/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo= github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo= github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/tetratelabs/wazero v1.7.1 h1:QtSfd6KLc41DIMpDYlJdoMc6k7QTN246DM2+n2Y/Dx8= -github.com/tetratelabs/wazero v1.7.1/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXAwJfQ3X6/7Y= github.com/tidwall/gjson v1.14.3 h1:9jvXn7olKEHU1S9vwoMGliaT8jq1vJ7IH/n9zD9Dnlw= github.com/tidwall/gjson v1.14.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -22,6 +17,4 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/resp v0.1.1 h1:Ly20wkhqKTmDUPlyM1S7pWo5kk0tDu8OoC/vFArXmwE= github.com/tidwall/resp v0.1.1/go.mod h1:3/FrruOBAxPTPtundW0VXgmsQ4ZBA0Aw714lVYgwFa0= -github.com/wasilibs/go-re2 v1.5.3 h1:wiuTcgDZdLhu8NG8oqF5sF5Q3yIU14lPAvXqeYzDK3g= -github.com/wasilibs/go-re2 v1.5.3/go.mod h1:PzpVPsBdFC7vM8QJbbEnOeTmwA0DGE783d/Gex8eCV8= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/plugins/wasm-go/extensions/ai-statistics/main.go b/plugins/wasm-go/extensions/ai-statistics/main.go index e7396160f4..14fcc4d2ab 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main.go +++ b/plugins/wasm-go/extensions/ai-statistics/main.go @@ -3,19 +3,16 @@ package main import ( "bytes" "encoding/json" + "errors" "fmt" - "github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" - "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" - "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" - "github.com/tidwall/gjson" "strconv" "strings" "time" -) -const ( - StatisticsRequestStartTime = "ai-statistics-request-start-time" - StatisticsFirstTokenTime = "ai-statistics-first-token-time" + "github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" + "github.com/tidwall/gjson" ) func main() { @@ -30,146 +27,243 @@ func main() { ) } +const ( + // Trace span prefix + TracePrefix = "trace_span_tag." + // Context consts + StatisticsRequestStartTime = "ai-statistics-request-start-time" + StatisticsFirstTokenTime = "ai-statistics-first-token-time" + CtxGeneralAtrribute = "attributes" + CtxLogAtrribute = "logAttributes" + CtxStreamingBodyBuffer = "streamingBodyBuffer" + + // Source Type + FixedValue = "fixed_value" + RequestHeader = "request_header" + RequestBody = "request_body" + ResponseHeader = "response_header" + ResponseStreamingBody = "response_streaming_body" + ResponseBody = "response_body" + + // Inner metric & log attributes name + Model = "model" + InputToken = "input_token" + OutputToken = "output_token" + LLMFirstTokenDuration = "llm_first_token_duration" + LLMServiceDuration = "llm_service_duration" + LLMDurationCount = "llm_duration_count" + + // Extract Rule + RuleFirst = "first" + RuleReplace = "replace" + RuleAppend = "append" +) + // TracingSpan is the tracing span configuration. -type TracingSpan struct { - Key string `required:"true" yaml:"key" json:"key"` - ValueSource string `required:"true" yaml:"valueSource" json:"valueSource"` - Value string `required:"true" yaml:"value" json:"value"` +type Attribute struct { + Key string `json:"key"` + ValueSource string `json:"value_source"` + Value string `json:"value"` + Rule string `json:"rule,omitempty"` + ApplyToLog bool `json:"apply_to_log,omitempty"` + ApplyToSpan bool `json:"apply_to_span,omitempty"` } type AIStatisticsConfig struct { - Enable bool `required:"true" yaml:"enable" json:"enable"` - // TracingSpan array define the tracing span. - TracingSpan []TracingSpan `required:"true" yaml:"tracingSpan" json:"tracingSpan"` - Metrics map[string]proxywasm.MetricCounter `required:"true" yaml:"metrics" json:"metrics"` + // Metrics + // TODO: add more metrics in Gauge and Histogram format + counterMetrics map[string]proxywasm.MetricCounter + // Attributes to be recorded in log & span + attributes []Attribute + // If there exist attributes extracted from streaming body, chunks should be buffered + shouldBufferStreamingBody bool +} + +func generateMetricName(route, cluster, model, metricName string) string { + return fmt.Sprintf("route.%s.upstream.%s.model.%s.metric.%s", route, cluster, model, metricName) +} + +func getRouteName() (string, error) { + if raw, err := proxywasm.GetProperty([]string{"route_name"}); err != nil { + return "-", err + } else { + return string(raw), nil + } } -func (config *AIStatisticsConfig) incrementCounter(metricName string, inc uint64, log wrapper.Log) { - counter, ok := config.Metrics[metricName] +func getClusterName() (string, error) { + if raw, err := proxywasm.GetProperty([]string{"cluster_name"}); err != nil { + return "-", err + } else { + return string(raw), nil + } +} + +func (config *AIStatisticsConfig) incrementCounter(metricName string, inc uint64) { + counter, ok := config.counterMetrics[metricName] if !ok { counter = proxywasm.DefineCounterMetric(metricName) - config.Metrics[metricName] = counter + config.counterMetrics[metricName] = counter } counter.Increment(inc) } func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrapper.Log) error { - config.Enable = configJson.Get("enable").Bool() - - // Parse tracing span. - tracingSpanConfigArray := configJson.Get("tracing_span").Array() - config.TracingSpan = make([]TracingSpan, len(tracingSpanConfigArray)) - for i, tracingSpanConfig := range tracingSpanConfigArray { - tracingSpan := TracingSpan{ - Key: tracingSpanConfig.Get("key").String(), - ValueSource: tracingSpanConfig.Get("value_source").String(), - Value: tracingSpanConfig.Get("value").String(), + // Parse tracing span attributes setting. + attributeConfigs := configJson.Get("attributes").Array() + config.attributes = make([]Attribute, len(attributeConfigs)) + for i, attributeConfig := range attributeConfigs { + attribute := Attribute{} + err := json.Unmarshal([]byte(attributeConfig.Raw), &attribute) + if err != nil { + log.Errorf("parse config failed, %v", err) + return err } - config.TracingSpan[i] = tracingSpan + if attribute.ValueSource == ResponseStreamingBody { + config.shouldBufferStreamingBody = true + } + if attribute.Rule != "" && attribute.Rule != RuleFirst && attribute.Rule != RuleReplace && attribute.Rule != RuleAppend { + return errors.New("value of rule must be one of [nil, first, replace, append]") + } + config.attributes[i] = attribute } - - config.Metrics = make(map[string]proxywasm.MetricCounter) - - configStr, _ := json.Marshal(config) - log.Infof("Init ai-statistics config success, config: %s.", configStr) + // Metric settings + config.counterMetrics = make(map[string]proxywasm.MetricCounter) return nil } func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper.Log) types.Action { - - if !config.Enable { - ctx.DontReadRequestBody() - return types.ActionContinue - } - - // Fetch request header tracing span value. - setTracingSpanValueBySource(config, "request_header", nil, log) - // Fetch request process proxy wasm property. - // Warn: The property may be modified by response process , so the value of the property may be overwritten. - setTracingSpanValueBySource(config, "property", nil, log) - + ctx.SetContext(CtxGeneralAtrribute, map[string]string{}) + ctx.SetContext(CtxLogAtrribute, map[string]string{}) + ctx.SetContext(StatisticsRequestStartTime, time.Now().UnixMilli()) + + // Set user defined log & span attributes which type is fixed_value + setAttributeBySource(ctx, config, FixedValue, nil, log) + // Set user defined log & span attributes which type is request_header + setAttributeBySource(ctx, config, RequestHeader, nil, log) // Set request start time. - ctx.SetContext(StatisticsRequestStartTime, strconv.FormatUint(uint64(time.Now().UnixMilli()), 10)) - // The request has a body and requires delaying the header transmission until a cache miss occurs, - // at which point the header should be sent. return types.ActionContinue } func onHttpRequestBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body []byte, log wrapper.Log) types.Action { - // Set request body tracing span value. - setTracingSpanValueBySource(config, "request_body", body, log) + // Set user defined log & span attributes. + setAttributeBySource(ctx, config, RequestBody, body, log) return types.ActionContinue } func onHttpResponseHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper.Log) types.Action { - if !config.Enable { - ctx.DontReadResponseBody() - return types.ActionContinue - } contentType, _ := proxywasm.GetHttpResponseHeader("content-type") if !strings.Contains(contentType, "text/event-stream") { ctx.BufferResponseBody() } - // Set response header tracing span value. - setTracingSpanValueBySource(config, "response_header", nil, log) + // Set user defined log & span attributes. + setAttributeBySource(ctx, config, ResponseHeader, nil, log) return types.ActionContinue } func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, data []byte, endOfStream bool, log wrapper.Log) []byte { - - // If the end of the stream is reached, calculate the total time and set tracing span tag total_time. - // Otherwise, set tracing span tag first_token_time. - if endOfStream { - requestStartTimeStr := ctx.GetContext(StatisticsRequestStartTime).(string) - requestStartTime, _ := strconv.ParseInt(requestStartTimeStr, 10, 64) - responseEndTime := time.Now().UnixMilli() - setTracingSpanValue("total_time", fmt.Sprintf("%d", responseEndTime-requestStartTime), log) - } else { - firstTokenTime := ctx.GetContext(StatisticsFirstTokenTime) - if firstTokenTime == nil { - firstTokenTimeStr := strconv.FormatInt(time.Now().UnixMilli(), 10) - ctx.SetContext(StatisticsFirstTokenTime, firstTokenTimeStr) - setTracingSpanValue("first_token_time", firstTokenTimeStr, log) + // Buffer stream body for record log & span attributes + if config.shouldBufferStreamingBody { + var streamingBodyBuffer []byte + streamingBodyBuffer, ok := ctx.GetContext(CtxStreamingBodyBuffer).([]byte) + if !ok { + streamingBodyBuffer = data + } else { + streamingBodyBuffer = append(streamingBodyBuffer, data...) } + ctx.SetContext(CtxStreamingBodyBuffer, streamingBodyBuffer) } - model, inputToken, outputToken, ok := getUsage(data) + // Get requestStartTime from http context + requestStartTime, ok := ctx.GetContext(StatisticsRequestStartTime).(int64) if !ok { + log.Error("failed to get requestStartTime from http context") return data } - setFilterStateData(model, inputToken, outputToken, log) - incrementCounter(config, model, inputToken, outputToken, log) - // Set tracing span tag input_tokens and output_tokens. - setTracingSpanValue("input_tokens", strconv.FormatInt(inputToken, 10), log) - setTracingSpanValue("output_tokens", strconv.FormatInt(outputToken, 10), log) - // Set response process proxy wasm property. - setTracingSpanValueBySource(config, "property", nil, log) + // If this is the first chunk, record first token duration metric and span attribute + if ctx.GetContext(StatisticsFirstTokenTime) == nil { + firstTokenTime := time.Now().UnixMilli() + ctx.SetContext(StatisticsFirstTokenTime, firstTokenTime) + attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string) + attributes[LLMFirstTokenDuration] = fmt.Sprint(firstTokenTime - requestStartTime) + ctx.SetContext(CtxGeneralAtrribute, attributes) + } + + // Set information about this request + + if model, inputToken, outputToken, ok := getUsage(data); ok { + attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string) + // Record Log Attributes + attributes[Model] = model + attributes[InputToken] = fmt.Sprint(inputToken) + attributes[OutputToken] = fmt.Sprint(outputToken) + // Set attributes to http context + ctx.SetContext(CtxGeneralAtrribute, attributes) + } + // If the end of the stream is reached, record metrics/logs/spans. + if endOfStream { + responseEndTime := time.Now().UnixMilli() + attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string) + attributes[LLMServiceDuration] = fmt.Sprint(responseEndTime - requestStartTime) + ctx.SetContext(CtxGeneralAtrribute, attributes) + + // Set user defined log & span attributes. + if config.shouldBufferStreamingBody { + streamingBodyBuffer, ok := ctx.GetContext(CtxStreamingBodyBuffer).([]byte) + if !ok { + return data + } + setAttributeBySource(ctx, config, ResponseStreamingBody, streamingBodyBuffer, log) + } + + // Write inner filter states which can be used by other plugins such as ai-token-ratelimit + writeFilterStates(ctx, log) + + // Write log + writeLog(ctx, log) + + // Write metrics + writeMetric(ctx, config, log) + } return data } func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body []byte, log wrapper.Log) types.Action { + // Get attributes from http context + attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string) + + // Get requestStartTime from http context + requestStartTime, _ := ctx.GetContext(StatisticsRequestStartTime).(int64) - // Calculate the total time and set tracing span tag total_time. - requestStartTimeStr := ctx.GetContext(StatisticsRequestStartTime).(string) - requestStartTime, _ := strconv.ParseInt(requestStartTimeStr, 10, 64) responseEndTime := time.Now().UnixMilli() - setTracingSpanValue("total_time", fmt.Sprintf("%d", responseEndTime-requestStartTime), log) + attributes[LLMServiceDuration] = fmt.Sprint(responseEndTime - requestStartTime) + // Set information about this request model, inputToken, outputToken, ok := getUsage(body) - if !ok { - return types.ActionContinue + if ok { + attributes[Model] = model + attributes[InputToken] = fmt.Sprint(inputToken) + attributes[OutputToken] = fmt.Sprint(outputToken) + // Update attributes + ctx.SetContext(CtxGeneralAtrribute, attributes) } - setFilterStateData(model, inputToken, outputToken, log) - incrementCounter(config, model, inputToken, outputToken, log) - // Set tracing span tag input_tokens and output_tokens. - setTracingSpanValue("input_tokens", strconv.FormatInt(inputToken, 10), log) - setTracingSpanValue("output_tokens", strconv.FormatInt(outputToken, 10), log) - // Set response process proxy wasm property. - setTracingSpanValueBySource(config, "property", nil, log) + + // Set user defined log & span attributes. + setAttributeBySource(ctx, config, ResponseBody, body, log) + + // Write inner filter states which can be used by other plugins such as ai-token-ratelimit + writeFilterStates(ctx, log) + + // Write log + writeLog(ctx, log) + + // Write metrics + writeMetric(ctx, config, log) + return types.ActionContinue } @@ -198,92 +292,210 @@ func getUsage(data []byte) (model string, inputTokenUsage int64, outputTokenUsag return } -// setFilterData sets the input_token and output_token in the filter state. -// ai-token-ratelimit will use these values to calculate the total token usage. -func setFilterStateData(model string, inputToken int64, outputToken int64, log wrapper.Log) { - if e := proxywasm.SetProperty([]string{"model"}, []byte(model)); e != nil { - log.Errorf("failed to set model in filter state: %v", e) - } - if e := proxywasm.SetProperty([]string{"input_token"}, []byte(fmt.Sprintf("%d", inputToken))); e != nil { - log.Errorf("failed to set input_token in filter state: %v", e) - } - if e := proxywasm.SetProperty([]string{"output_token"}, []byte(fmt.Sprintf("%d", outputToken))); e != nil { - log.Errorf("failed to set output_token in filter state: %v", e) - } -} - -func incrementCounter(config AIStatisticsConfig, model string, inputToken int64, outputToken int64, log wrapper.Log) { - var route, cluster string - if raw, err := proxywasm.GetProperty([]string{"route_name"}); err == nil { - route = string(raw) - } - if raw, err := proxywasm.GetProperty([]string{"cluster_name"}); err == nil { - cluster = string(raw) - } - config.incrementCounter("route."+route+".upstream."+cluster+".model."+model+".input_token", uint64(inputToken), log) - config.incrementCounter("route."+route+".upstream."+cluster+".model."+model+".output_token", uint64(outputToken), log) -} - // fetches the tracing span value from the specified source. -func setTracingSpanValueBySource(config AIStatisticsConfig, tracingSource string, body []byte, log wrapper.Log) { - for _, tracingSpanEle := range config.TracingSpan { - if tracingSource == tracingSpanEle.ValueSource { - switch tracingSource { - case "response_header": - if value, err := proxywasm.GetHttpResponseHeader(tracingSpanEle.Value); err == nil { - setTracingSpanValue(tracingSpanEle.Key, value, log) +func setAttributeBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, source string, body []byte, log wrapper.Log) { + attributes, ok := ctx.GetContext(CtxGeneralAtrribute).(map[string]string) + if !ok { + log.Error("failed to get attributes from http context") + return + } + for _, attribute := range config.attributes { + if source == attribute.ValueSource { + switch source { + case FixedValue: + log.Debugf("[attribute] source type: %s, key: %s, value: %s", source, attribute.Key, attribute.Value) + attributes[attribute.Key] = attribute.Value + case RequestHeader: + if value, err := proxywasm.GetHttpRequestHeader(attribute.Value); err == nil { + log.Debugf("[attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value) + attributes[attribute.Key] = value } - case "request_body": - bodyJson := gjson.ParseBytes(body) - value := trimQuote(bodyJson.Get(tracingSpanEle.Value).String()) - setTracingSpanValue(tracingSpanEle.Key, value, log) - case "request_header": - if value, err := proxywasm.GetHttpRequestHeader(tracingSpanEle.Value); err == nil { - setTracingSpanValue(tracingSpanEle.Key, value, log) + case RequestBody: + raw := gjson.GetBytes(body, attribute.Value).Raw + var value string + if len(raw) > 2 { + value = raw[1 : len(raw)-1] } - case "property": - if raw, err := proxywasm.GetProperty([]string{tracingSpanEle.Value}); err == nil { - setTracingSpanValue(tracingSpanEle.Key, string(raw), log) + log.Debugf("[attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value) + attributes[attribute.Key] = value + case ResponseHeader: + if value, err := proxywasm.GetHttpResponseHeader(attribute.Value); err == nil { + log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value) + attributes[attribute.Key] = value } + case ResponseStreamingBody: + value := extractStreamingBodyByJsonPath(body, attribute.Value, attribute.Rule, log) + log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value) + attributes[attribute.Key] = value + case ResponseBody: + value := gjson.GetBytes(body, attribute.Value).Raw + if len(value) > 2 && value[0] == '"' && value[len(value)-1] == '"' { + value = value[1 : len(value)-1] + } + log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value) + attributes[attribute.Key] = value default: - } } + if attribute.ApplyToLog { + setLogAttribute(ctx, attribute.Key, attributes[attribute.Key], log) + } + if attribute.ApplyToSpan { + setSpanAttribute(attribute.Key, attributes[attribute.Key], log) + } } + ctx.SetContext(CtxGeneralAtrribute, attributes) } -// Set the tracing span with value. -func setTracingSpanValue(tracingKey, tracingValue string, log wrapper.Log) { - log.Debugf("try to set trace span [%s] with value [%s].", tracingKey, tracingValue) - - if tracingValue != "" { - traceSpanTag := "trace_span_tag." + tracingKey - - if raw, err := proxywasm.GetProperty([]string{traceSpanTag}); err == nil { - if raw != nil { - log.Warnf("trace span [%s] already exists, value will be overwrite, orign value: %s.", traceSpanTag, string(raw)) +func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string, log wrapper.Log) string { + chunks := bytes.Split(bytes.TrimSpace(data), []byte("\n\n")) + var value string + if rule == RuleFirst { + for _, chunk := range chunks { + jsonObj := gjson.GetBytes(chunk, jsonPath) + if jsonObj.Exists() { + value = jsonObj.String() + break + } + } + } else if rule == RuleReplace { + for _, chunk := range chunks { + jsonObj := gjson.GetBytes(chunk, jsonPath) + if jsonObj.Exists() { + value = jsonObj.String() + } + } + } else if rule == RuleAppend { + // extract llm response + for _, chunk := range chunks { + raw := gjson.GetBytes(chunk, jsonPath).Raw + if len(raw) > 2 && raw[0] == '"' && raw[len(raw)-1] == '"' { + value += raw[1 : len(raw)-1] } } + } else { + log.Errorf("unsupported rule type: %s", rule) + } + return value +} + +func setFilterState(key, value string, log wrapper.Log) { + if value != "" { + if e := proxywasm.SetProperty([]string{key}, []byte(fmt.Sprint(value))); e != nil { + log.Errorf("failed to set %s in filter state: %v", key, e) + } + } else { + log.Debugf("failed to write filter state [%s], because it's value is empty") + } +} - if e := proxywasm.SetProperty([]string{traceSpanTag}, []byte(tracingValue)); e != nil { +// Set the tracing span with value. +func setSpanAttribute(key, value string, log wrapper.Log) { + if value != "" { + traceSpanTag := TracePrefix + key + if e := proxywasm.SetProperty([]string{traceSpanTag}, []byte(value)); e != nil { log.Errorf("failed to set %s in filter state: %v", traceSpanTag, e) } - log.Debugf("successed to set trace span [%s] with value [%s].", traceSpanTag, tracingValue) + } else { + log.Debugf("failed to write span attribute [%s], because it's value is empty") + } +} + +// fetches the tracing span value from the specified source. +func setLogAttribute(ctx wrapper.HttpContext, key string, value interface{}, log wrapper.Log) { + logAttributes, ok := ctx.GetContext(CtxLogAtrribute).(map[string]string) + if !ok { + log.Error("failed to get logAttributes from http context") + return } + logAttributes[key] = fmt.Sprint(value) + ctx.SetContext(CtxLogAtrribute, logAttributes) } -// trims the quote from the source string. -func trimQuote(source string) string { - TempKey := strings.Trim(source, `"`) - Key, _ := zhToUnicode([]byte(TempKey)) - return string(Key) +func writeFilterStates(ctx wrapper.HttpContext, log wrapper.Log) { + attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string) + setFilterState(Model, attributes[Model], log) + setFilterState(InputToken, attributes[InputToken], log) + setFilterState(OutputToken, attributes[OutputToken], log) +} + +func writeMetric(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper.Log) { + attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string) + route, _ := getRouteName() + cluster, _ := getClusterName() + model, ok := attributes["model"] + if !ok { + log.Errorf("Get model failed") + return + } + if inputToken, ok := attributes[InputToken]; ok { + inputTokenUint64, err := strconv.ParseUint(inputToken, 10, 0) + if err != nil || inputTokenUint64 == 0 { + log.Errorf("inputToken convert failed, value is %d, err msg is [%v]", inputTokenUint64, err) + return + } + config.incrementCounter(generateMetricName(route, cluster, model, InputToken), inputTokenUint64) + } + if outputToken, ok := attributes[OutputToken]; ok { + outputTokenUint64, err := strconv.ParseUint(outputToken, 10, 0) + if err != nil || outputTokenUint64 == 0 { + log.Errorf("outputToken convert failed, value is %d, err msg is [%v]", outputTokenUint64, err) + return + } + config.incrementCounter(generateMetricName(route, cluster, model, OutputToken), outputTokenUint64) + } + if llmFirstTokenDuration, ok := attributes[LLMFirstTokenDuration]; ok { + llmFirstTokenDurationUint64, err := strconv.ParseUint(llmFirstTokenDuration, 10, 0) + if err != nil || llmFirstTokenDurationUint64 == 0 { + log.Errorf("llmFirstTokenDuration convert failed, value is %d, err msg is [%v]", llmFirstTokenDurationUint64, err) + return + } + config.incrementCounter(generateMetricName(route, cluster, model, LLMFirstTokenDuration), llmFirstTokenDurationUint64) + } + if llmServiceDuration, ok := attributes[LLMServiceDuration]; ok { + llmServiceDurationUint64, err := strconv.ParseUint(llmServiceDuration, 10, 0) + if err != nil || llmServiceDurationUint64 == 0 { + log.Errorf("llmServiceDuration convert failed, value is %d, err msg is [%v]", llmServiceDurationUint64, err) + return + } + config.incrementCounter(generateMetricName(route, cluster, model, LLMServiceDuration), llmServiceDurationUint64) + } + config.incrementCounter(generateMetricName(route, cluster, model, LLMDurationCount), 1) } -// converts the zh string to Unicode. -func zhToUnicode(raw []byte) ([]byte, error) { - str, err := strconv.Unquote(strings.Replace(strconv.Quote(string(raw)), `\\u`, `\u`, -1)) - if err != nil { - return nil, err +func writeLog(ctx wrapper.HttpContext, log wrapper.Log) { + attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string) + logAttributes, _ := ctx.GetContext(CtxLogAtrribute).(map[string]string) + // Set inner log fields + if attributes[Model] != "" { + logAttributes[Model] = attributes[Model] + } + if attributes[InputToken] != "" { + logAttributes[InputToken] = attributes[InputToken] + } + if attributes[OutputToken] != "" { + logAttributes[OutputToken] = attributes[OutputToken] + } + if attributes[LLMFirstTokenDuration] != "" { + logAttributes[LLMFirstTokenDuration] = attributes[LLMFirstTokenDuration] + } + if attributes[LLMServiceDuration] != "" { + logAttributes[LLMServiceDuration] = attributes[LLMServiceDuration] + } + // Traverse log fields + items := []string{} + for k, v := range logAttributes { + items = append(items, fmt.Sprintf(`"%s":"%s"`, k, v)) + } + aiLogField := fmt.Sprintf(`{%s}`, strings.Join(items, ",")) + // log.Infof("ai request json log: %s", aiLogField) + jsonMap := map[string]string{ + "ai_log": aiLogField, + } + serialized, _ := json.Marshal(jsonMap) + jsonLogRaw := gjson.GetBytes(serialized, "ai_log").Raw + jsonLog := jsonLogRaw[1 : len(jsonLogRaw)-1] + if err := proxywasm.SetProperty([]string{"ai_log"}, []byte(jsonLog)); err != nil { + log.Errorf("failed to set ai_log in filter state: %v", err) } - return []byte(str), nil }