Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
CH3CHO authored Dec 27, 2024
2 parents 298585f + 6dc4d43 commit 70ea2c3
Show file tree
Hide file tree
Showing 22 changed files with 343 additions and 237 deletions.
4 changes: 2 additions & 2 deletions Makefile.core.mk
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ kube-load-image: $(tools/kind) ## Install the Higress image to a kind cluster us
tools/hack/docker-pull-image.sh higress-registry.cn-hangzhou.cr.aliyuncs.com/higress/echo-server 1.3.0
tools/hack/docker-pull-image.sh higress-registry.cn-hangzhou.cr.aliyuncs.com/higress/echo-server v1.0
tools/hack/docker-pull-image.sh higress-registry.cn-hangzhou.cr.aliyuncs.com/higress/echo-body 1.0.0
tools/hack/docker-pull-image.sh openpolicyagent/opa latest
tools/hack/docker-pull-image.sh openpolicyagent/opa 0.61.0
tools/hack/docker-pull-image.sh curlimages/curl latest
tools/hack/docker-pull-image.sh registry.cn-hangzhou.aliyuncs.com/2456868764/httpbin 1.0.2
tools/hack/docker-pull-image.sh registry.cn-hangzhou.aliyuncs.com/hinsteny/nacos-standlone-rc3 1.0.0-RC3
Expand All @@ -312,7 +312,7 @@ kube-load-image: $(tools/kind) ## Install the Higress image to a kind cluster us
tools/hack/kind-load-image.sh higress-registry.cn-hangzhou.cr.aliyuncs.com/higress/echo-server 1.3.0
tools/hack/kind-load-image.sh higress-registry.cn-hangzhou.cr.aliyuncs.com/higress/echo-server v1.0
tools/hack/kind-load-image.sh higress-registry.cn-hangzhou.cr.aliyuncs.com/higress/echo-body 1.0.0
tools/hack/kind-load-image.sh openpolicyagent/opa latest
tools/hack/kind-load-image.sh openpolicyagent/opa 0.61.0
tools/hack/kind-load-image.sh curlimages/curl latest
tools/hack/kind-load-image.sh registry.cn-hangzhou.aliyuncs.com/2456868764/httpbin 1.0.2
tools/hack/kind-load-image.sh registry.cn-hangzhou.aliyuncs.com/hinsteny/nacos-standlone-rc3 1.0.0-RC3
Expand Down
14 changes: 9 additions & 5 deletions plugins/wasm-go/extensions/ai-cache/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,22 +164,26 @@ func onHttpResponseBody(ctx wrapper.HttpContext, c config.PluginConfig, chunk []
return chunk
}

stream := ctx.GetContext(STREAM_CONTEXT_KEY)
var err error
if !isLastChunk {
if err := handleNonLastChunk(ctx, c, chunk, log); err != nil {
if stream == nil {
err = handleNonStreamChunk(ctx, c, chunk, log)
} else {
err = handleStreamChunk(ctx, c, unifySSEChunk(chunk), log)
}
if err != nil {
log.Errorf("[onHttpResponseBody] handle non last chunk failed, error: %v", err)
// Set an empty struct in the context to indicate an error in processing the partial message
ctx.SetContext(ERROR_PARTIAL_MESSAGE_KEY, struct{}{})
}
return chunk
}

stream := ctx.GetContext(STREAM_CONTEXT_KEY)
var value string
var err error
if stream == nil {
value, err = processNonStreamLastChunk(ctx, c, chunk, log)
} else {
value, err = processStreamLastChunk(ctx, c, chunk, log)
value, err = processStreamLastChunk(ctx, c, unifySSEChunk(chunk), log)
}

if err != nil {
Expand Down
40 changes: 16 additions & 24 deletions plugins/wasm-go/extensions/ai-cache/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"fmt"
"strings"

Expand All @@ -9,17 +10,6 @@ import (
"github.com/tidwall/gjson"
)

func handleNonLastChunk(ctx wrapper.HttpContext, c config.PluginConfig, chunk []byte, log wrapper.Log) error {
stream := ctx.GetContext(STREAM_CONTEXT_KEY)
err := error(nil)
if stream == nil {
err = handleNonStreamChunk(ctx, c, chunk, log)
} else {
err = handleStreamChunk(ctx, c, chunk, log)
}
return err
}

func handleNonStreamChunk(ctx wrapper.HttpContext, c config.PluginConfig, chunk []byte, log wrapper.Log) error {
tempContentI := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY)
if tempContentI == nil {
Expand All @@ -32,6 +22,12 @@ func handleNonStreamChunk(ctx wrapper.HttpContext, c config.PluginConfig, chunk
return nil
}

func unifySSEChunk(data []byte) []byte {
data = bytes.ReplaceAll(data, []byte("\r\n"), []byte("\n"))
data = bytes.ReplaceAll(data, []byte("\r"), []byte("\n"))
return data
}

func handleStreamChunk(ctx wrapper.HttpContext, c config.PluginConfig, chunk []byte, log wrapper.Log) error {
var partialMessage []byte
partialMessageI := ctx.GetContext(PARTIAL_MESSAGE_CONTEXT_KEY)
Expand Down Expand Up @@ -103,7 +99,7 @@ func processStreamLastChunk(ctx wrapper.HttpContext, c config.PluginConfig, chun
func processSSEMessage(ctx wrapper.HttpContext, c config.PluginConfig, sseMessage string, log wrapper.Log) (string, error) {
content := ""
for _, chunk := range strings.Split(sseMessage, "\n\n") {
log.Infof("chunk _ : %s", chunk)
log.Debugf("single sse message: %s", chunk)
subMessages := strings.Split(chunk, "\n")
var message string
for _, msg := range subMessages {
Expand Down Expand Up @@ -140,19 +136,15 @@ func processSSEMessage(ctx wrapper.HttpContext, c config.PluginConfig, sseMessag
}
return content, fmt.Errorf("[processSSEMessage] unable to extract content from message; cache content is nil: %s", message)
} else {
tempContentI := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY)

// If there is no content in the cache, initialize and set the content
if tempContentI == nil {
content = responseBody.String()
ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, content)
} else {
// Update the content in the cache
appendMsg := responseBody.String()
content = tempContentI.(string) + appendMsg
ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, content)
}
content += responseBody.String()
}
}
tempContentI := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY)
// If there is no content in the cache, initialize and set the content
if tempContentI == nil {
ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, content)
} else {
ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, tempContentI.(string)+content)
}
return content, nil
}
25 changes: 17 additions & 8 deletions plugins/wasm-go/extensions/ai-proxy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ description: AI 代理插件配置参考
| `context` | object | 非必填 | - | 配置 AI 对话上下文信息 |
| `customSettings` | array of customSetting | 非必填 | - | 为AI请求指定覆盖或者填充参数 |
| `failover` | object | 非必填 | - | 配置 apiToken 的 failover 策略,当 apiToken 不可用时,将其移出 apiToken 列表,待健康检测通过后重新添加回 apiToken 列表 |
| `retryOnFailure` | object | 非必填 | - | 当请求失败时立即进行重试 |

`context`的配置字段说明如下:

Expand Down Expand Up @@ -78,14 +79,22 @@ custom-setting会遵循如下表格,根据`name`和协议来替换对应的字

`failover` 的配置字段说明如下:

| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 |
|------------------|--------|------|-------|-----------------------------|
| enabled | bool | 非必填 | false | 是否启用 apiToken 的 failover 机制 |
| failureThreshold | int | 非必填 | 3 | 触发 failover 连续请求失败的阈值(次数) |
| successThreshold | int | 非必填 | 1 | 健康检测的成功阈值(次数) |
| healthCheckInterval | int | 非必填 | 5000 | 健康检测的间隔时间,单位毫秒 |
| healthCheckTimeout | int | 非必填 | 5000 | 健康检测的超时时间,单位毫秒 |
| healthCheckModel | string | 必填 | | 健康检测使用的模型 |
| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 |
|------------------|--------|-----------------|-------|-----------------------------|
| enabled | bool | 非必填 | false | 是否启用 apiToken 的 failover 机制 |
| failureThreshold | int | 非必填 | 3 | 触发 failover 连续请求失败的阈值(次数) |
| successThreshold | int | 非必填 | 1 | 健康检测的成功阈值(次数) |
| healthCheckInterval | int | 非必填 | 5000 | 健康检测的间隔时间,单位毫秒 |
| healthCheckTimeout | int | 非必填 | 5000 | 健康检测的超时时间,单位毫秒 |
| healthCheckModel | string | 启用 failover 时必填 | | 健康检测使用的模型 |

`retryOnFailure` 的配置字段说明如下:

| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 |
|------------------|--------|-----------------|-------|-------------|
| enabled | bool | 非必填 | false | 是否启用失败请求重试 |
| maxRetries | int | 非必填 | 1 | 最大重试次数 |
| retryTimeout | int | 非必填 | 5000 | 重试超时时间,单位毫秒 |

### 提供商特有配置

Expand Down
55 changes: 23 additions & 32 deletions plugins/wasm-go/extensions/ai-proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
const (
pluginName = "ai-proxy"

ctxKeyApiName = "apiName"

defaultMaxBodyBytes uint32 = 10 * 1024 * 1024
)

Expand Down Expand Up @@ -92,14 +90,13 @@ func onHttpRequestHeader(ctx wrapper.HttpContext, pluginConfig config.PluginConf
log.Warnf("[onHttpRequestHeader] unsupported path: %s", path.Path)
return types.ActionContinue
}

ctx.SetContext(provider.CtxKeyApiName, apiName)
// Disable the route re-calculation since the plugin may modify some headers related to the chosen route.
ctx.DisableReroute()

ctx.SetContext(ctxKeyApiName, apiName)

_, needHandleBody := activeProvider.(provider.ResponseBodyHandler)
_, needHandleStreamingBody := activeProvider.(provider.StreamingResponseBodyHandler)
if needHandleBody || needHandleStreamingBody {
if needHandleStreamingBody {
proxywasm.RemoveHttpRequestHeader("Accept-Encoding")
}

Expand Down Expand Up @@ -138,7 +135,7 @@ func onHttpRequestBody(ctx wrapper.HttpContext, pluginConfig config.PluginConfig
log.Debugf("[onHttpRequestBody] provider=%s", activeProvider.GetProviderType())

if handler, ok := activeProvider.(provider.RequestBodyHandler); ok {
apiName, _ := ctx.GetContext(ctxKeyApiName).(provider.ApiName)
apiName, _ := ctx.GetContext(provider.CtxKeyApiName).(provider.ApiName)

newBody, settingErr := pluginConfig.GetProviderConfig().ReplaceByCustomSettings(body)
if settingErr != nil {
Expand Down Expand Up @@ -186,32 +183,25 @@ func onHttpResponseHeaders(ctx wrapper.HttpContext, pluginConfig config.PluginCo
log.Errorf("unable to load :status header from response: %v", err)
}
ctx.DontReadResponseBody()
providerConfig.OnRequestFailed(ctx, apiTokenInUse, log)

return types.ActionContinue
return providerConfig.OnRequestFailed(activeProvider, ctx, apiTokenInUse, log)
}

// Reset ctxApiTokenRequestFailureCount if the request is successful,
// the apiToken is removed only when the number of consecutive request failures exceeds the threshold.
providerConfig.ResetApiTokenRequestFailureCount(apiTokenInUse, log)

if handler, ok := activeProvider.(provider.ResponseHeadersHandler); ok {
apiName, _ := ctx.GetContext(ctxKeyApiName).(provider.ApiName)
action, err := handler.OnResponseHeaders(ctx, apiName, log)
if err == nil {
checkStream(&ctx, log)
return action
}
util.ErrorHandler("ai-proxy.proc_resp_headers_failed", fmt.Errorf("failed to process response headers: %v", err))
return types.ActionContinue
headers := util.GetOriginalResponseHeaders()
if handler, ok := activeProvider.(provider.TransformResponseHeadersHandler); ok {
apiName, _ := ctx.GetContext(provider.CtxKeyApiName).(provider.ApiName)
handler.TransformResponseHeaders(ctx, apiName, headers, log)
} else {
providerConfig.DefaultTransformResponseHeaders(ctx, headers)
}
util.ReplaceResponseHeaders(headers)

checkStream(&ctx, log)
_, needHandleBody := activeProvider.(provider.ResponseBodyHandler)
_, needHandleStreamingBody := activeProvider.(provider.StreamingResponseBodyHandler)
if !needHandleBody && !needHandleStreamingBody {
ctx.DontReadResponseBody()
} else if !needHandleStreamingBody {
if !needHandleStreamingBody {
ctx.BufferResponseBody()
}

Expand All @@ -230,7 +220,7 @@ func onStreamingResponseBody(ctx wrapper.HttpContext, pluginConfig config.Plugin
log.Debugf("isLastChunk=%v chunk: %s", isLastChunk, string(chunk))

if handler, ok := activeProvider.(provider.StreamingResponseBodyHandler); ok {
apiName, _ := ctx.GetContext(ctxKeyApiName).(provider.ApiName)
apiName, _ := ctx.GetContext(provider.CtxKeyApiName).(provider.ApiName)
modifiedChunk, err := handler.OnStreamingResponseBody(ctx, apiName, chunk, isLastChunk, log)
if err == nil && modifiedChunk != nil {
return modifiedChunk
Expand All @@ -249,16 +239,17 @@ func onHttpResponseBody(ctx wrapper.HttpContext, pluginConfig config.PluginConfi
}

log.Debugf("[onHttpResponseBody] provider=%s", activeProvider.GetProviderType())
//log.Debugf("response body: %s", string(body))

if handler, ok := activeProvider.(provider.ResponseBodyHandler); ok {
apiName, _ := ctx.GetContext(ctxKeyApiName).(provider.ApiName)
action, err := handler.OnResponseBody(ctx, apiName, body, log)
if err == nil {
return action
if handler, ok := activeProvider.(provider.TransformResponseBodyHandler); ok {
apiName, _ := ctx.GetContext(provider.CtxKeyApiName).(provider.ApiName)
body, err := handler.TransformResponseBody(ctx, apiName, body, log)
if err != nil {
util.ErrorHandler("ai-proxy.proc_resp_body_failed", fmt.Errorf("failed to process response body: %v", err))
return types.ActionContinue
}
if err = provider.ReplaceResponseBody(body, log); err != nil {
util.ErrorHandler("ai-proxy.replace_resp_body_failed", fmt.Errorf("failed to replace response body: %v", err))
}
util.ErrorHandler("ai-proxy.proc_resp_body_failed", fmt.Errorf("failed to process response body: %v", err))
return types.ActionContinue
}
return types.ActionContinue
}
Expand Down
20 changes: 4 additions & 16 deletions plugins/wasm-go/extensions/ai-proxy/provider/claude.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-proxy/util"
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
)

Expand Down Expand Up @@ -139,27 +138,16 @@ func (c *claudeProvider) TransformRequestBody(ctx wrapper.HttpContext, apiName A
return json.Marshal(claudeRequest)
}

func (c *claudeProvider) OnResponseBody(ctx wrapper.HttpContext, apiName ApiName, body []byte, log wrapper.Log) (types.Action, error) {
func (c *claudeProvider) TransformResponseBody(ctx wrapper.HttpContext, apiName ApiName, body []byte, log wrapper.Log) ([]byte, error) {
claudeResponse := &claudeTextGenResponse{}
if err := json.Unmarshal(body, claudeResponse); err != nil {
return types.ActionContinue, fmt.Errorf("unable to unmarshal claude response: %v", err)
return nil, fmt.Errorf("unable to unmarshal claude response: %v", err)
}
if claudeResponse.Error != nil {
return types.ActionContinue, fmt.Errorf("claude response error, error_type: %s, error_message: %s", claudeResponse.Error.Type, claudeResponse.Error.Message)
return nil, fmt.Errorf("claude response error, error_type: %s, error_message: %s", claudeResponse.Error.Type, claudeResponse.Error.Message)
}
response := c.responseClaude2OpenAI(ctx, claudeResponse)
return types.ActionContinue, replaceJsonResponseBody(response, log)
}

func (c *claudeProvider) OnResponseHeaders(ctx wrapper.HttpContext, apiName ApiName, log wrapper.Log) (types.Action, error) {
// use original protocol, skip OnStreamingResponseBody() and OnResponseBody()
if c.config.protocol == protocolOriginal {
ctx.DontReadResponseBody()
return types.ActionContinue, nil
}

_ = proxywasm.RemoveHttpResponseHeader("Content-Length")
return types.ActionContinue, nil
return json.Marshal(response)
}

func (c *claudeProvider) OnStreamingResponseBody(ctx wrapper.HttpContext, name ApiName, chunk []byte, isLastChunk bool, log wrapper.Log) ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion plugins/wasm-go/extensions/ai-proxy/provider/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func insertContext(provider Provider, content string, err error, body []byte, lo
if err != nil {
util.ErrorHandler(fmt.Sprintf("ai-proxy.%s.insert_ctx_failed", typ), fmt.Errorf("failed to insert context message: %v", err))
}
if err := replaceHttpJsonRequestBody(body, log); err != nil {
if err := replaceRequestBody(body, log); err != nil {
util.ErrorHandler(fmt.Sprintf("ai-proxy.%s.replace_request_body_failed", typ), fmt.Errorf("failed to replace request body: %v", err))
}
}
Expand Down
12 changes: 3 additions & 9 deletions plugins/wasm-go/extensions/ai-proxy/provider/deepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-proxy/util"
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
)

Expand Down Expand Up @@ -112,18 +111,13 @@ func (d *deeplProvider) TransformRequestBodyHeaders(ctx wrapper.HttpContext, api
return json.Marshal(baiduRequest)
}

func (d *deeplProvider) OnResponseHeaders(ctx wrapper.HttpContext, apiName ApiName, log wrapper.Log) (types.Action, error) {
_ = proxywasm.RemoveHttpResponseHeader("Content-Length")
return types.ActionContinue, nil
}

func (d *deeplProvider) OnResponseBody(ctx wrapper.HttpContext, apiName ApiName, body []byte, log wrapper.Log) (types.Action, error) {
func (d *deeplProvider) TransformResponseBody(ctx wrapper.HttpContext, apiName ApiName, body []byte, log wrapper.Log) ([]byte, error) {
deeplResponse := &deeplResponse{}
if err := json.Unmarshal(body, deeplResponse); err != nil {
return types.ActionContinue, fmt.Errorf("unable to unmarshal deepl response: %v", err)
return nil, fmt.Errorf("unable to unmarshal deepl response: %v", err)
}
response := d.responseDeepl2OpenAI(ctx, deeplResponse)
return types.ActionContinue, replaceJsonResponseBody(response, log)
return json.Marshal(response)
}

func (d *deeplProvider) responseDeepl2OpenAI(ctx wrapper.HttpContext, deeplResponse *deeplResponse) *chatCompletionResponse {
Expand Down
Loading

0 comments on commit 70ea2c3

Please sign in to comment.