-
Notifications
You must be signed in to change notification settings - Fork 983
fix(ingress): support WebSocket over HTTP/2 (replace gorilla/websocket) #1117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
4c45038
3706dd1
89857ac
7f9996f
713f21d
45f13d6
6b1fb2c
6bb5df3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,31 +15,16 @@ | |
| package proxy | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "context" | ||
| "errors" | ||
| "io" | ||
| "net" | ||
| "net/http" | ||
| "net/url" | ||
| "strings" | ||
|
|
||
| slogger "github.com/alibaba/opensandbox/internal/logger" | ||
| "github.com/gorilla/websocket" | ||
| ) | ||
|
|
||
| var ( | ||
| // defaultWebSocketDialer is a dialer with all fields set to the default zero values. | ||
| defaultWebSocketDialer = websocket.DefaultDialer | ||
|
|
||
| // defaultUpgrader specifies the parameters for upgrading an HTTP | ||
| // connection to a WebSocket connection. | ||
| defaultUpgrader = &websocket.Upgrader{ | ||
| ReadBufferSize: 1024, | ||
| WriteBufferSize: 1024, | ||
| // Allow any Origin: ingress sits behind trusted gateways where Host/Origin | ||
| // often diverge (e.g. browser UI vs internal target). gorilla's default | ||
| // same-origin check rejects those upgrades. | ||
| CheckOrigin: func(_ *http.Request) bool { return true }, | ||
| } | ||
| "github.com/coder/websocket" | ||
| ) | ||
|
|
||
| // WebSocketProxy is an HTTP Handler that takes an incoming WebSocket | ||
|
|
@@ -54,14 +39,6 @@ type WebSocketProxy struct { | |
| // the incoming WebSocket connection. Request is the initial incoming and | ||
| // unmodified request. | ||
| backend func(*http.Request) *url.URL | ||
|
|
||
| // dialer contains options for connecting to the backend WebSocket server. | ||
| // If nil, DefaultDialer is used. | ||
| dialer *websocket.Dialer | ||
|
|
||
| // upgrader specifies the parameters for upgrading a incoming HTTP | ||
| // connection to a WebSocket connection. If nil, DefaultUpgrader is used. | ||
| upgrader *websocket.Upgrader | ||
| } | ||
|
|
||
| // ProxyHandler returns a new http.Handler interface that reverse proxies the | ||
|
|
@@ -95,11 +72,6 @@ func (w *WebSocketProxy) ServeHTTP(rw http.ResponseWriter, r *http.Request) { | |
| return | ||
| } | ||
|
|
||
| dialer := w.dialer | ||
| if w.dialer == nil { | ||
| dialer = defaultWebSocketDialer | ||
| } | ||
|
|
||
| // Forward all incoming headers to the backend except hop-by-hop headers | ||
| // (RFC 7230 §6.1) and WebSocket handshake headers managed by the dialer. | ||
| // Per RFC 7230, also strip any header named by Connection tokens. | ||
|
|
@@ -157,88 +129,131 @@ func (w *WebSocketProxy) ServeHTTP(rw http.ResponseWriter, r *http.Request) { | |
| w.director(r, requestHeader) | ||
| } | ||
|
|
||
| // Connect to the backend URL, also pass the headers we get from the requst | ||
| // together with the Forwarded headers we prepared above. | ||
| connBackend, resp, err := dialer.Dial(backendURL.String(), requestHeader) | ||
| // HTTP/2 Extended CONNECT (RFC 8441) — raw bidirectional tunnel. | ||
| if r.ProtoMajor >= 2 && r.Method == http.MethodConnect { | ||
| w.serveH2Tunnel(rw, r, backendURL, requestHeader) | ||
| return | ||
| } | ||
|
|
||
| w.serveH1(rw, r, backendURL, requestHeader) | ||
| } | ||
|
|
||
| // serveH1 handles the traditional HTTP/1.1 WebSocket upgrade path. | ||
| func (w *WebSocketProxy) serveH1(rw http.ResponseWriter, r *http.Request, backendURL *url.URL, requestHeader http.Header) { | ||
| ctx := r.Context() | ||
|
|
||
| // Dial the backend first so we can relay errors before upgrading the client. | ||
| connBackend, resp, err := websocket.Dial(ctx, backendURL.String(), &websocket.DialOptions{ | ||
| HTTPHeader: requestHeader, | ||
|
Pangjiping marked this conversation as resolved.
Outdated
|
||
| }) | ||
|
Pangjiping marked this conversation as resolved.
Outdated
Comment on lines
+179
to
+183
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In the HTTP/1 path, this Useful? React with 👍 / 👎. |
||
| if err != nil { | ||
| Logger.With(slogger.Field{Key: "error", Value: err}).Errorf("WebSocketProxy: couldn't dial to remote backend") | ||
| if resp != nil { | ||
| // If the WebSocket handshake fails, ErrBadHandshake is returned | ||
| // along with a non-nil *http.Response so that callers can handle | ||
| // redirects, authentication, etcetera. | ||
| if err := copyResponse(rw, resp); err != nil { | ||
| Logger.With(slogger.Field{Key: "error", Value: err}).Errorf("WebSocketProxy: couldn't write response after failed remote backend handshake") | ||
| if copyErr := copyResponse(rw, resp); copyErr != nil { | ||
| Logger.With(slogger.Field{Key: "error", Value: copyErr}).Errorf("WebSocketProxy: couldn't write response after failed remote backend handshake") | ||
| } | ||
| } else { | ||
| http.Error(rw, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable) | ||
| } | ||
| return | ||
| } | ||
| defer connBackend.Close() | ||
|
|
||
| upgrader := w.upgrader | ||
| if w.upgrader == nil { | ||
| upgrader = defaultUpgrader | ||
| } | ||
| defer connBackend.CloseNow() | ||
|
|
||
| // Only pass those headers to the upgrader. | ||
| upgradeHeader := http.Header{} | ||
| if hdr := resp.Header.Get(SecWebSocketProtocol); hdr != "" { | ||
| upgradeHeader.Set(SecWebSocketProtocol, hdr) | ||
| } | ||
| if hdr := resp.Header.Get(SetCookie); hdr != "" { | ||
| upgradeHeader.Set(SetCookie, hdr) | ||
| } | ||
|
|
||
| // Now upgrade the existing incoming request to a WebSocket connection. | ||
| // Also pass the header that we gathered from the Dial handshake. | ||
| connPub, err := upgrader.Upgrade(rw, r, upgradeHeader) | ||
| // Accept the client-side WebSocket upgrade. | ||
| connPub, err := websocket.Accept(rw, r, &websocket.AcceptOptions{ | ||
| InsecureSkipVerify: true, | ||
| Subprotocols: subprotocolsFromResponse(resp), | ||
| }) | ||
|
Pangjiping marked this conversation as resolved.
|
||
| if err != nil { | ||
| Logger.With(slogger.Field{Key: "error", Value: err}).Errorf("WebSocketProxy: couldn't upgrade websocket connection") | ||
| return | ||
| } | ||
| defer connPub.Close() | ||
| defer connPub.CloseNow() | ||
|
|
||
| // Bidirectional relay. | ||
| errClient := make(chan error, 1) | ||
| errBackend := make(chan error, 1) | ||
| replicateWebsocketConn := func(dst, src *websocket.Conn, errc chan error) { | ||
| for { | ||
| msgType, msg, err := src.ReadMessage() | ||
| if err != nil { | ||
| m := websocket.FormatCloseMessage(websocket.CloseNormalClosure, fmt.Sprintf("%v", err)) | ||
| if e, ok := err.(*websocket.CloseError); ok { //nolint:errorlint | ||
| if e.Code != websocket.CloseNoStatusReceived { | ||
| m = websocket.FormatCloseMessage(e.Code, e.Text) | ||
| } | ||
| } | ||
| errc <- err | ||
| _ = dst.WriteMessage(websocket.CloseMessage, m) | ||
| break | ||
| } | ||
| err = dst.WriteMessage(msgType, msg) | ||
| if err != nil { | ||
| errc <- err | ||
| break | ||
| } | ||
| } | ||
| } | ||
|
|
||
| go replicateWebsocketConn(connPub, connBackend, errClient) | ||
| go replicateWebsocketConn(connBackend, connPub, errBackend) | ||
| go replicateConn(ctx, connPub, connBackend, errClient) | ||
| go replicateConn(ctx, connBackend, connPub, errBackend) | ||
|
|
||
| var message string | ||
| select { | ||
| case err = <-errClient: | ||
| message = "WebSocketProxy: Error when copying from backend to client: %v" | ||
| case err = <-errBackend: | ||
| message = "WebSocketProxy: Error when copying from client to backend: %v" | ||
|
|
||
| } | ||
| if e, ok := err.(*websocket.CloseError); !ok || e.Code == websocket.CloseAbnormalClosure { //nolint:errorlint | ||
|
|
||
| var closeErr websocket.CloseError | ||
| if !errors.As(err, &closeErr) || closeErr.Code == websocket.StatusAbnormalClosure { | ||
| Logger.With(slogger.Field{Key: "error", Value: err}).Errorf(message, err) | ||
| } | ||
| } | ||
|
|
||
| // serveH2Tunnel handles HTTP/2 Extended CONNECT (RFC 8441) by creating | ||
| // a raw bidirectional tunnel between the h2 stream and a backend h1 WebSocket. | ||
| func (w *WebSocketProxy) serveH2Tunnel(rw http.ResponseWriter, r *http.Request, backendURL *url.URL, requestHeader http.Header) { | ||
| ctx := r.Context() | ||
|
|
||
| connBackend, _, err := websocket.Dial(ctx, backendURL.String(), &websocket.DialOptions{ | ||
| HTTPHeader: requestHeader, | ||
|
Pangjiping marked this conversation as resolved.
Outdated
|
||
| }) | ||
| if err != nil { | ||
| Logger.With(slogger.Field{Key: "error", Value: err}).Errorf("WebSocketProxy: couldn't dial to remote backend (h2 tunnel)") | ||
| http.Error(rw, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable) | ||
|
Pangjiping marked this conversation as resolved.
Outdated
|
||
| return | ||
| } | ||
| backendNetConn := websocket.NetConn(ctx, connBackend, websocket.MessageBinary) | ||
|
Pangjiping marked this conversation as resolved.
Outdated
|
||
| defer backendNetConn.Close() | ||
|
|
||
| rc := http.NewResponseController(rw) | ||
| if err := rc.EnableFullDuplex(); err != nil { | ||
| Logger.With(slogger.Field{Key: "error", Value: err}).Errorf("WebSocketProxy: EnableFullDuplex failed") | ||
| http.Error(rw, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) | ||
| return | ||
| } | ||
| rw.WriteHeader(http.StatusOK) | ||
|
Pangjiping marked this conversation as resolved.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When an h2 Extended CONNECT client requests a WebSocket subprotocol and the backend accepts one, this success branch sends the client a bare Useful? React with 👍 / 👎. |
||
| if err := rc.Flush(); err != nil { | ||
| Logger.With(slogger.Field{Key: "error", Value: err}).Errorf("WebSocketProxy: flush failed") | ||
| return | ||
| } | ||
|
|
||
| done := make(chan struct{}) | ||
| go func() { | ||
| defer close(done) | ||
| io.Copy(backendNetConn, r.Body) | ||
| }() | ||
| io.Copy(rw, backendNetConn) | ||
| <-done | ||
|
Pangjiping marked this conversation as resolved.
|
||
| } | ||
|
|
||
| func replicateConn(ctx context.Context, dst, src *websocket.Conn, errc chan error) { | ||
| for { | ||
| msgType, msg, err := src.Read(ctx) | ||
|
Pangjiping marked this conversation as resolved.
|
||
| if err != nil { | ||
| errc <- err | ||
| dst.Close(websocket.StatusNormalClosure, "") | ||
|
Pangjiping marked this conversation as resolved.
Outdated
|
||
| break | ||
| } | ||
| err = dst.Write(ctx, msgType, msg) | ||
| if err != nil { | ||
| errc <- err | ||
| break | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func subprotocolsFromResponse(resp *http.Response) []string { | ||
| if resp == nil { | ||
| return nil | ||
| } | ||
| if proto := resp.Header.Get(SecWebSocketProtocol); proto != "" { | ||
| return []string{proto} | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func copyResponse(rw http.ResponseWriter, resp *http.Response) error { | ||
| copyHeader(rw.Header(), resp.Header) | ||
| rw.WriteHeader(resp.StatusCode) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.