Skip to content

Commit

Permalink
feat(go): Add support for SSE (#4772)
Browse files Browse the repository at this point in the history
  • Loading branch information
amckinney authored Sep 29, 2024
1 parent 3197402 commit d3885df
Show file tree
Hide file tree
Showing 47 changed files with 3,819 additions and 149 deletions.
3 changes: 3 additions & 0 deletions fern/pages/changelogs/go-sdk/2024-09-27.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## 0.27.0
**`(feat):`** Add support for SSE (Server-Sent Events) streaming responses. The user-facing interface for streaming responses remains the same between standard HTTP streaming and SSE.

9 changes: 9 additions & 0 deletions generators/go/internal/generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1547,3 +1547,12 @@ var pointerFunctionNames = map[string]struct{}{
"Uintptr": struct{}{},
"Time": struct{}{},
}

// valueOf dereferences the given value, or returns the zero value if nil.
func valueOf[T any](value *T) T {
var result T
if value == nil {
return result
}
return *value
}
91 changes: 73 additions & 18 deletions generators/go/internal/generator/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,9 @@ func (f *fileWriter) WriteClient(
}
}
}
if endpoint.Accept != "" {
f.P(fmt.Sprintf(`%s.Set("Accept", %q)`, headersParameter, endpoint.Accept))
}
if endpoint.ContentType != "" {
f.P(fmt.Sprintf(`%s.Set("Content-Type", %q)`, headersParameter, endpoint.ContentType))
}
Expand Down Expand Up @@ -1114,7 +1117,7 @@ func (f *fileWriter) WriteClient(
}

// Prepare a response variable.
if endpoint.ResponseType != "" && !endpoint.IsStreaming && endpoint.PaginationInfo == nil {
if endpoint.ResponseType != "" && endpoint.StreamingInfo == nil && endpoint.PaginationInfo == nil {
f.P(fmt.Sprintf(endpoint.ResponseInitializerFormat, endpoint.ResponseType))
}

Expand Down Expand Up @@ -1208,13 +1211,23 @@ func (f *fileWriter) WriteClient(
}

// Issue the request.
if endpoint.IsStreaming {
if endpoint.StreamingInfo != nil {
streamingInfo := endpoint.StreamingInfo
f.P("streamer := core.NewStreamer[", endpoint.ResponseType, "](", receiver, ".caller)")
f.P("return streamer.Stream(")
f.P("ctx,")
f.P("&core.StreamParams{")
f.P("URL: endpointURL, ")
f.P("Method:", endpoint.Method, ",")
if streamingInfo.Delimiter != "" {
f.P("Delimiter: ", streamingInfo.Delimiter, ",")
}
if streamingInfo.Prefix != "" {
f.P("Prefix:", streamingInfo.Prefix, ",")
}
if streamingInfo.Terminator != "" {
f.P("Terminator:", streamingInfo.Terminator, ",")
}
f.P("MaxAttempts: options.MaxAttempts,")
f.P("BodyProperties: options.BodyProperties,")
f.P("QueryParameters: options.QueryParameters,")
Expand All @@ -1226,9 +1239,6 @@ func (f *fileWriter) WriteClient(
if endpoint.ErrorDecoderParameterName != "" {
f.P("ErrorDecoder:", endpoint.ErrorDecoderParameterName, ",")
}
if endpoint.StreamDelimiter != "" {
f.P("Delimiter: ", endpoint.StreamDelimiter, ",")
}
f.P("},")
f.P(")")
f.P("}")
Expand Down Expand Up @@ -1390,6 +1400,48 @@ func (f *fileWriter) WriteClient(
)
}

type streamingInfo struct {
Delimiter string
Prefix string
Terminator string
AcceptHeader string
}

func getStreamingInfo(
irEndpoint *ir.HttpEndpoint,
) (*streamingInfo, error) {
if irEndpoint == nil || irEndpoint.Response == nil || irEndpoint.Response.Streaming == nil {
return nil, nil
}
streamingResponse := irEndpoint.Response.Streaming
switch streamingResponse.Type {
case "text":
return &streamingInfo{}, nil
case "json":
var terminator string
if value := valueOf(streamingResponse.Json.Terminator); value != "" {
terminator = fmt.Sprintf("%q", value)
}
return &streamingInfo{
Terminator: terminator,
}, nil
case "sse":
terminator := valueOf(streamingResponse.Sse.Terminator)
if terminator != "" {
terminator = fmt.Sprintf("%q", terminator)
} else {
terminator = "core.DefaultSSETerminator"
}
return &streamingInfo{
Prefix: "core.DefaultSSEDataPrefix",
Terminator: terminator,
AcceptHeader: "text/event-stream",
}, nil
default:
return nil, fmt.Errorf("stream response type %q is not supported", streamingResponse.Type)
}
}

type paginationInfo struct {
Type string
Page *ir.QueryParameter
Expand Down Expand Up @@ -1996,10 +2048,9 @@ type endpoint struct {
OptionConstructor string
PathSuffix string
Method string
IsStreaming bool
StreamDelimiter string
ErrorDecoderParameterName string
Idempotent bool
Accept string
ContentType string
Errors ir.ResponseErrors
QueryParameters []*ir.QueryParameter
Expand All @@ -2008,6 +2059,7 @@ type endpoint struct {
FilePropertyInfo *filePropertyInfo
FileProperties []*ir.FileProperty
FileBodyProperties []*ir.InlinedRequestBodyProperty
StreamingInfo *streamingInfo
PaginationInfo *paginationInfo
}

Expand Down Expand Up @@ -2172,6 +2224,11 @@ func (f *fileWriter) endpointFromIR(
},
)

streamingInfo, err := getStreamingInfo(irEndpoint)
if err != nil {
return nil, err
}

paginationInfo, err := f.getPaginationInfo(irEndpoint, scope, requestParameterName)
if err != nil {
return nil, err
Expand All @@ -2185,8 +2242,6 @@ func (f *fileWriter) endpointFromIR(
signatureReturnValues string
successfulReturnValues string
errorReturnValues string
streamDelimiter string
isStreaming bool
)
var responseIsOptionalParameter bool
if irEndpoint.Response != nil {
Expand Down Expand Up @@ -2234,12 +2289,6 @@ func (f *fileWriter) endpointFromIR(
successfulReturnValues = "response.String(), nil"
errorReturnValues = `"", err`
case "streaming":
if irEndpoint.Response.Streaming.Json == nil && irEndpoint.Response.Streaming.Text == nil {
return nil, fmt.Errorf("unsupported streaming response type: %s", irEndpoint.Response.Streaming.Type)
}
if irEndpoint.Response.Streaming.Json != nil && irEndpoint.Response.Streaming.Json.Terminator != nil {
streamDelimiter = *irEndpoint.Response.Streaming.Json.Terminator
}
typeReference, err := typeReferenceFromStreamingResponse(irEndpoint.Response.Streaming)
if err != nil {
return nil, err
Expand All @@ -2248,7 +2297,6 @@ func (f *fileWriter) endpointFromIR(
responseParameterName = "response"
signatureReturnValues = fmt.Sprintf("(*core.Stream[%s], error)", responseType)
errorReturnValues = "nil, err"
isStreaming = true
default:
return nil, fmt.Errorf("%s requests are not supported yet", irEndpoint.Response.Type)
}
Expand Down Expand Up @@ -2308,6 +2356,11 @@ func (f *fileWriter) endpointFromIR(
optionConstructor = "core.NewIdempotentRequestOptions(opts...)"
}

var accept string
if streamingInfo != nil && streamingInfo.AcceptHeader != "" {
accept = streamingInfo.AcceptHeader
}

contentTypeOverride := contentTypeFromRequestBody(irEndpoint.RequestBody)
if contentTypeOverride != "" {
contentType = contentTypeOverride
Expand Down Expand Up @@ -2336,9 +2389,8 @@ func (f *fileWriter) endpointFromIR(
BaseURL: baseURL,
PathSuffix: pathSuffix,
Method: irMethodToMethodEnum(irEndpoint.Method),
IsStreaming: isStreaming,
StreamDelimiter: streamDelimiter,
ErrorDecoderParameterName: errorDecoderParameterName,
Accept: accept,
ContentType: contentType,
Idempotent: irEndpoint.Idempotent,
Errors: irEndpoint.Errors,
Expand All @@ -2348,6 +2400,7 @@ func (f *fileWriter) endpointFromIR(
FilePropertyInfo: filePropertyInfo,
FileProperties: fileProperties,
FileBodyProperties: fileBodyProperties,
StreamingInfo: streamingInfo,
PaginationInfo: paginationInfo,
}, nil
}
Expand Down Expand Up @@ -3109,6 +3162,8 @@ func typeReferenceFromStreamingResponse(
switch streamingResponse.Type {
case "json":
return streamingResponse.Json.Payload, nil
case "sse":
return streamingResponse.Sse.Payload, nil
case "text":
return ir.NewTypeReferenceFromPrimitive(ir.PrimitiveTypeString), nil
}
Expand Down
Loading

0 comments on commit d3885df

Please sign in to comment.