Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 124 additions & 33 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
_ "net/http/pprof"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -47,6 +48,12 @@ func setupServer(tb testing.TB) (addr string, stopfunc func(), client net.Conn,
return ln.Addr().String(), func() { ln.Close() }, conn, nil
}

func setupServerPipe(tb testing.TB) (addr string, stopfunc func(), client net.Conn, err error) {
ln, conn := net.Pipe()
go handleConnection(ln)
return "", func() { ln.Close() }, conn, nil
}

func handleConnection(conn net.Conn) {
session, _ := Server(conn, nil)
for {
Expand Down Expand Up @@ -288,7 +295,7 @@ func TestWriteToV2(t *testing.T) {
}

func TestGetDieCh(t *testing.T) {
cs, ss, err := getSmuxStreamPair()
cs, ss, err := getSmuxStreamPair(nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -527,58 +534,53 @@ func TestIsClose(t *testing.T) {
}

func TestKeepAliveTimeout(t *testing.T) {
ln, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatal(err)
}
defer ln.Close()
go func() {
ln.Accept()
}()

cli, err := net.Dial("tcp", ln.Addr().String())
c1, c2, err := getTCPConnectionPair()
if err != nil {
t.Fatal(err)
}
defer cli.Close()
defer c1.Close()
defer c2.Close()

config := DefaultConfig()
config.KeepAliveInterval = time.Second
config.KeepAliveTimeout = 2 * time.Second
session, _ := Client(cli, config)
session, _ := Client(c1, config)
time.Sleep(3 * time.Second)
if !session.IsClosed() {
t.Fatal("keepalive-timeout failed")
}
}

type blockWriteConn struct {
type delayWriteConn struct {
net.Conn
Delay time.Duration
}

func (c *blockWriteConn) Write(b []byte) (n int, err error) {
forever := time.Hour * 24
time.Sleep(forever)
func (c *delayWriteConn) Write(b []byte) (n int, err error) {
time.Sleep(c.Delay)
return c.Conn.Write(b)
}

func TestKeepAliveBlockWriteTimeout(t *testing.T) {
ln, err := net.Listen("tcp", "localhost:0")
c1, c2, err := getTCPConnectionPair()
if err != nil {
t.Fatal(err)
}
defer ln.Close()
go func() {
ln.Accept()
}()
defer c1.Close()
defer c2.Close()
testKeepAliveBlockWriteTimeout(t, c1)
}

cli, err := net.Dial("tcp", ln.Addr().String())
if err != nil {
t.Fatal(err)
}
defer cli.Close()
func TestKeepAliveBlockWriteTimeoutPipe(t *testing.T) {
c1, c2 := net.Pipe()
defer c1.Close()
defer c2.Close()
testKeepAliveBlockWriteTimeout(t, c1)
}

func testKeepAliveBlockWriteTimeout(t *testing.T, cli net.Conn) {
//when writeFrame block, keepalive in old version never timeout
blockWriteCli := &blockWriteConn{cli}
blockWriteCli := &delayWriteConn{cli, 24 * time.Hour}

config := DefaultConfig()
config.KeepAliveInterval = time.Second
Expand Down Expand Up @@ -901,7 +903,7 @@ func TestWriteFrameInternal(t *testing.T) {
config := DefaultConfig()
config.KeepAliveInterval = time.Second
config.KeepAliveTimeout = 2 * time.Second
session, _ = Client(&blockWriteConn{cli}, config)
session, _ = Client(&delayWriteConn{cli, 24 * time.Hour}, config)
f := newFrame(1, byte(rand.Uint32()), rand.Uint32())
c := make(chan time.Time)
go func() {
Expand Down Expand Up @@ -967,12 +969,75 @@ func TestWriteDeadline(t *testing.T) {
session.Close()
}

func TestWriteOnlyExhaustedSessionBucket(t *testing.T) {
testWriteOnlyExhaustedSessionBucket(t, getSmuxStreamPair)
}

func TestWriteOnlyExhaustedSessionBucketPipe(t *testing.T) {
testWriteOnlyExhaustedSessionBucket(t, getSmuxStreamPairPipe)
}

func testWriteOnlyExhaustedSessionBucket(t *testing.T, getPairFn func(config *Config) (*Stream, *Stream, error)) {
config := DefaultConfig()
config.Version = 2
config.MaxReceiveBuffer = initialPeerWindow / 2 // less than initialPeerWindow (262144)

c1, c2, err := getPairFn(config)
if err != nil {
t.Fatal(err)
return
}
defer c1.Close()
defer c2.Close()

startNotify := make(chan bool, 1)
go func() {
buf := make([]byte, int(float64(config.MaxStreamBuffer)*0.9))
for {
c1.SetWriteDeadline(time.Now().Add(100 * time.Millisecond))
_, err := c1.Write(buf)
if err != nil {
break
}
}
startNotify <- true
}()
<-startNotify

// we never read out any data from c2
// so peerWindow will always be initialPeerWindow (262144)
// and c1 can keep writing until c2 session bucket exhausted

bucket1 := atomic.LoadInt32(&c1.sess.bucket)
bucket2 := atomic.LoadInt32(&c2.sess.bucket)
t.Log("[MaxReceiveBuffer]", config.MaxReceiveBuffer)
t.Log("[MaxStreamBuffer]", config.MaxStreamBuffer)
t.Log("[c1]", bucket1)
t.Log("[c2]", bucket2)
if bucket2 <= 0 {
t.Fatal("bucket exhausted!!", bucket2)
}
}

func BenchmarkAcceptClose(b *testing.B) {
_, stop, cli, err := setupServer(b)
if err != nil {
b.Fatal(err)
}
defer stop()
benchmarkAcceptClose(b, cli)
}

func BenchmarkAcceptClosePipe(b *testing.B) {
_, stop, cli, err := setupServerPipe(b)
if err != nil {
b.Fatal(err)
}
defer stop()
benchmarkAcceptClose(b, cli)
}

func benchmarkAcceptClose(b *testing.B, cli net.Conn) {
session, _ := Client(cli, nil)
for i := 0; i < b.N; i++ {
if stream, err := session.OpenStream(); err == nil {
Expand All @@ -982,8 +1047,19 @@ func BenchmarkAcceptClose(b *testing.B) {
}
}
}

func BenchmarkConnSmux(b *testing.B) {
cs, ss, err := getSmuxStreamPair()
cs, ss, err := getSmuxStreamPair(nil)
if err != nil {
b.Fatal(err)
}
defer cs.Close()
defer ss.Close()
bench(b, cs, ss)
}

func BenchmarkConnSmuxPipe(b *testing.B) {
cs, ss, err := getSmuxStreamPairPipe(nil)
if err != nil {
b.Fatal(err)
}
Expand All @@ -1002,17 +1078,32 @@ func BenchmarkConnTCP(b *testing.B) {
bench(b, cs, ss)
}

func getSmuxStreamPair() (*Stream, *Stream, error) {
func BenchmarkConnPipe(b *testing.B) {
cs, ss := net.Pipe()
defer cs.Close()
defer ss.Close()
bench(b, cs, ss)
}

func getSmuxStreamPair(config *Config) (*Stream, *Stream, error) {
c1, c2, err := getTCPConnectionPair()
if err != nil {
return nil, nil, err
}
return getSmuxStreamPairInternal(c1, c2, config)
}

func getSmuxStreamPairPipe(config *Config) (*Stream, *Stream, error) {
c1, c2 := net.Pipe()
return getSmuxStreamPairInternal(c1, c2, config)
}

s, err := Server(c2, nil)
func getSmuxStreamPairInternal(c1, c2 net.Conn, config *Config) (*Stream, *Stream, error) {
s, err := Server(c2, config)
if err != nil {
return nil, nil, err
}
c, err := Client(c1, nil)
c, err := Client(c1, config)
if err != nil {
return nil, nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ func newStream(id uint32, frameSize int, sess *Session) *Stream {
s.sess = sess
s.die = make(chan struct{})
s.chFinEvent = make(chan struct{})
s.peerWindow = initialPeerWindow // set to initial window size
s.peerWindow = initialPeerWindow // set to initial window size
if uint32(sess.config.MaxStreamBuffer) < initialPeerWindow { // avoid write only cause session bucket exhausted
s.peerWindow = uint32(sess.config.MaxStreamBuffer)
}
return s
}

Expand Down