From bcfae006805576509c9808cf0638e09cb1441ed2 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 19 Jun 2017 08:02:25 +0300 Subject: [PATCH] rpcc: Allow the compression level to be changed --- rpcc/conn.go | 25 +++++++++++++++++++++---- rpcc/conn_test.go | 9 ++++++--- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/rpcc/conn.go b/rpcc/conn.go index c4b2407..5b21938 100644 --- a/rpcc/conn.go +++ b/rpcc/conn.go @@ -60,7 +60,8 @@ func WithWriteBufferSize(n int) DialOption { } // WithCompression returns a DialOption that enables compression for the -// underlying websocket connection. +// underlying websocket connection. Use SetCompressionLevel on Conn to +// change the default compression level for subsequent writes. func WithCompression() DialOption { return func(o *dialOptions) { o.wsDialer.EnableCompression = true @@ -140,6 +141,11 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * if err != nil { return nil, err } + + if ws.EnableCompression { + c.compressionLevel = wsConn.SetCompressionLevel + } + return &wsNetConn{conn: wsConn}, nil } } @@ -224,9 +230,7 @@ type Conn struct { dialOpts dialOptions conn net.Conn - // Codec encodes and decodes JSON onto conn. There is only one - // active decoder (recv) and encoder (guaranteed via reqMu). - codec Codec + compressionLevel func(level int) error mu sync.Mutex // Protects following. closed bool @@ -235,6 +239,9 @@ type Conn struct { reqMu sync.Mutex // Protects following. req Request + // Encodes and decodes JSON onto conn. Encoding is + // guarded by mutex and decoding is done by recv. + codec Codec streamMu sync.Mutex // Protects following. streams map[string]*streamClients @@ -463,6 +470,16 @@ func (c *Conn) close(err error) error { return err } +// SetCompressionLevel sets the flate compressions level for writes. Valid level +// range is [-2, 9]. Returns error if compression is not eanbled for Conn. See +// package compress/flate for a description of compression levels. +func (c *Conn) SetCompressionLevel(level int) error { + if c.compressionLevel == nil { + return errors.New("rpcc: compression is not enabled for Conn") + } + return c.compressionLevel(level) +} + // Close closes the connection. func (c *Conn) Close() error { return c.close(nil) diff --git a/rpcc/conn_test.go b/rpcc/conn_test.go index 9064dc0..8a68960 100644 --- a/rpcc/conn_test.go +++ b/rpcc/conn_test.go @@ -1,6 +1,7 @@ package rpcc import ( + "compress/flate" "context" "encoding/json" "errors" @@ -33,12 +34,13 @@ func newTestServer(t testing.TB, respond func(*websocket.Conn, *Request) error) var err error ts := &testServer{} upgrader := &websocket.Upgrader{ - HandshakeTimeout: timeout, + HandshakeTimeout: timeout, + EnableCompression: true, } setupDone := make(chan struct{}) ts.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - conn, err := upgrader.Upgrade(w, r, r.Header) + conn, err := upgrader.Upgrade(w, r, nil) if err != nil { t.Fatal(err) } @@ -63,10 +65,11 @@ func newTestServer(t testing.TB, respond func(*websocket.Conn, *Request) error) } })) - ts.conn, err = Dial("ws" + strings.TrimPrefix(ts.srv.URL, "http")) + ts.conn, err = Dial("ws"+strings.TrimPrefix(ts.srv.URL, "http"), WithCompression()) if err != nil { t.Fatal(err) } + ts.conn.SetCompressionLevel(flate.BestSpeed) <-setupDone return ts