Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: write separation #875

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions server/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,7 @@ func (ctx *Context) Write(v interface{}) error {

var err error
if ctx.async {
go func() {
_, err = ctx.conn.Write(*respData)
protocol.PutData(respData)
}()
return wirteResp(ctx.ctx, res)
} else {
_, err = ctx.conn.Write(*respData)
protocol.PutData(respData)
Expand Down
7 changes: 7 additions & 0 deletions server/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,10 @@ func WithAsyncWrite() OptionFn {
s.AsyncWrite = true
}
}

// AsyncOutgoing sets AsyncWrite outgoing queue
func AsyncOutgoing(limit int) OptionFn {
return func(s *Server) {
s.AsyncOutgoing = limit
}
}
93 changes: 68 additions & 25 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ var (
TagContextKey = &contextKey{"service-tag"}
// HttpConnContextKey is used to store http connection.
HttpConnContextKey = &contextKey{"http-conn"}
// AsyncWriteCh
AsyncWriteCh = &contextKey{"async-write-ch"}
)

type Handler func(ctx *Context) error
Expand All @@ -87,6 +89,7 @@ type Server struct {
DisableHTTPGateway bool // disable http invoke or not.
DisableJSONRPC bool // disable json rpc or not.
AsyncWrite bool // set true if your server only serves few clients
AsyncOutgoing int // write message conn outgoing queue max
pool WorkerPool

serviceMapMu sync.RWMutex
Expand Down Expand Up @@ -133,14 +136,15 @@ type Server struct {
// NewServer returns a server.
func NewServer(options ...OptionFn) *Server {
s := &Server{
Plugins: &pluginContainer{},
options: make(map[string]interface{}),
activeConn: make(map[net.Conn]struct{}),
doneChan: make(chan struct{}),
serviceMap: make(map[string]*service),
router: make(map[string]Handler),
AsyncWrite: false, // 除非你想做进一步的优化测试,否则建议你设置为false
Started: make(chan struct{}),
Plugins: &pluginContainer{},
options: make(map[string]interface{}),
activeConn: make(map[net.Conn]struct{}),
doneChan: make(chan struct{}),
serviceMap: make(map[string]*service),
router: make(map[string]Handler),
AsyncWrite: false, // 除非你想做进一步的优化测试,否则建议你设置为false
AsyncOutgoing: 100000, //
Started: make(chan struct{}),
}

for _, op := range options {
Expand Down Expand Up @@ -364,23 +368,11 @@ func (s *Server) sendResponse(ctx *share.Context, conn net.Conn, err error, req,

data := res.EncodeSlicePointer()
if s.AsyncWrite {
if s.pool != nil {
s.pool.Submit(func() {
if s.writeTimeout != 0 {
conn.SetWriteDeadline(time.Now().Add(s.writeTimeout))
}
conn.Write(*data)
protocol.PutData(data)
})
} else {
go func() {
if s.writeTimeout != 0 {
conn.SetWriteDeadline(time.Now().Add(s.writeTimeout))
}
conn.Write(*data)
protocol.PutData(data)
}()
err := wirteResp(ctx, res)
if err != nil {
log.Errorf(err.Error())
}
return
} else {
if s.writeTimeout != 0 {
conn.SetWriteDeadline(time.Now().Add(s.writeTimeout))
Expand Down Expand Up @@ -433,9 +425,14 @@ func (s *Server) serveConn(conn net.Conn) {
return
}
}
var asyncWriteCh chan *protocol.Message
// async write
if s.AsyncWrite {
asyncWriteCh = make(chan *protocol.Message, s.AsyncOutgoing)
go s.asyncWrite(conn, asyncWriteCh)
}

r := bufio.NewReaderSize(conn, ReaderBuffsize)

// read requests and handle it
for {
if s.isShutdown() {
Expand All @@ -449,6 +446,9 @@ func (s *Server) serveConn(conn net.Conn) {

// create a rpcx Context
ctx := share.WithValue(context.Background(), RemoteConnContextKey, conn)
if s.AsyncWrite {
ctx = share.WithValue(ctx, AsyncWriteCh, asyncWriteCh)
}

// read a request from the underlying connection
req, err := s.readRequest(ctx, r)
Expand Down Expand Up @@ -522,6 +522,36 @@ func (s *Server) serveConn(conn net.Conn) {
}
}

// syncWrite
func (s *Server) asyncWrite(conn net.Conn, asyncWriteCh chan *protocol.Message) {
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
ss := runtime.Stack(buf, false)
if ss > size {
ss = size
}
buf = buf[:ss]
log.Errorf("serving %s panic error: %s, stack:\n %s", conn.RemoteAddr(), err, string(buf))
}
for {
if s.isShutdown() {
return
}
select {
case <-s.doneChan:
return
case res := <-asyncWriteCh:
data := res.EncodeSlicePointer()
if s.writeTimeout != 0 {
conn.SetWriteDeadline(time.Now().Add(s.writeTimeout))
}
conn.Write(*data)
protocol.PutData(data)
}
}
}

func (s *Server) processOneRequest(ctx *share.Context, req *protocol.Message, conn net.Conn) {
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -1073,6 +1103,19 @@ func (s *Server) closeDoneChanLocked() {
}
}

func wirteResp(ctx context.Context, res *protocol.Message) error {
ch, ok := ctx.Value(AsyncWriteCh).(chan *protocol.Message)
if !ok {
return fmt.Errorf("async write chan closed")
}
select {
case ch <- res:
default:
return fmt.Errorf("could not write message, conn outgoing queue full")
}
return nil
}

var ip4Reg = regexp.MustCompile(`^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$`)

func validIP4(ipAddress string) bool {
Expand Down
Loading