diff --git a/plugins/wasm-go/extensions/ai-cache/main.go b/plugins/wasm-go/extensions/ai-cache/main.go index 62edb80dcb..650d3805ac 100644 --- a/plugins/wasm-go/extensions/ai-cache/main.go +++ b/plugins/wasm-go/extensions/ai-cache/main.go @@ -164,22 +164,26 @@ func onHttpResponseBody(ctx wrapper.HttpContext, c config.PluginConfig, chunk [] return chunk } + stream := ctx.GetContext(STREAM_CONTEXT_KEY) + var err error if !isLastChunk { - if err := handleNonLastChunk(ctx, c, chunk, log); err != nil { + if stream == nil { + err = handleNonStreamChunk(ctx, c, chunk, log) + } else { + err = handleStreamChunk(ctx, c, unifySSEChunk(chunk), log) + } + if err != nil { log.Errorf("[onHttpResponseBody] handle non last chunk failed, error: %v", err) // Set an empty struct in the context to indicate an error in processing the partial message ctx.SetContext(ERROR_PARTIAL_MESSAGE_KEY, struct{}{}) } return chunk } - - stream := ctx.GetContext(STREAM_CONTEXT_KEY) var value string - var err error if stream == nil { value, err = processNonStreamLastChunk(ctx, c, chunk, log) } else { - value, err = processStreamLastChunk(ctx, c, chunk, log) + value, err = processStreamLastChunk(ctx, c, unifySSEChunk(chunk), log) } if err != nil { diff --git a/plugins/wasm-go/extensions/ai-cache/util.go b/plugins/wasm-go/extensions/ai-cache/util.go index 7fbd4954e2..50a39c88c3 100644 --- a/plugins/wasm-go/extensions/ai-cache/util.go +++ b/plugins/wasm-go/extensions/ai-cache/util.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "fmt" "strings" @@ -9,17 +10,6 @@ import ( "github.com/tidwall/gjson" ) -func handleNonLastChunk(ctx wrapper.HttpContext, c config.PluginConfig, chunk []byte, log wrapper.Log) error { - stream := ctx.GetContext(STREAM_CONTEXT_KEY) - err := error(nil) - if stream == nil { - err = handleNonStreamChunk(ctx, c, chunk, log) - } else { - err = handleStreamChunk(ctx, c, chunk, log) - } - return err -} - func handleNonStreamChunk(ctx wrapper.HttpContext, c config.PluginConfig, chunk []byte, log wrapper.Log) error { tempContentI := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY) if tempContentI == nil { @@ -32,6 +22,12 @@ func handleNonStreamChunk(ctx wrapper.HttpContext, c config.PluginConfig, chunk return nil } +func unifySSEChunk(data []byte) []byte { + data = bytes.ReplaceAll(data, []byte("\r\n"), []byte("\n")) + data = bytes.ReplaceAll(data, []byte("\r"), []byte("\n")) + return data +} + func handleStreamChunk(ctx wrapper.HttpContext, c config.PluginConfig, chunk []byte, log wrapper.Log) error { var partialMessage []byte partialMessageI := ctx.GetContext(PARTIAL_MESSAGE_CONTEXT_KEY) @@ -103,7 +99,7 @@ func processStreamLastChunk(ctx wrapper.HttpContext, c config.PluginConfig, chun func processSSEMessage(ctx wrapper.HttpContext, c config.PluginConfig, sseMessage string, log wrapper.Log) (string, error) { content := "" for _, chunk := range strings.Split(sseMessage, "\n\n") { - log.Infof("chunk _ : %s", chunk) + log.Debugf("single sse message: %s", chunk) subMessages := strings.Split(chunk, "\n") var message string for _, msg := range subMessages { @@ -140,19 +136,15 @@ func processSSEMessage(ctx wrapper.HttpContext, c config.PluginConfig, sseMessag } return content, fmt.Errorf("[processSSEMessage] unable to extract content from message; cache content is nil: %s", message) } else { - tempContentI := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY) - - // If there is no content in the cache, initialize and set the content - if tempContentI == nil { - content = responseBody.String() - ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, content) - } else { - // Update the content in the cache - appendMsg := responseBody.String() - content = tempContentI.(string) + appendMsg - ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, content) - } + content += responseBody.String() } } + tempContentI := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY) + // If there is no content in the cache, initialize and set the content + if tempContentI == nil { + ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, content) + } else { + ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, tempContentI.(string)+content) + } return content, nil }