Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
# plato
一款纯go编写的,支持亿级通信的IM系统。
https://hardcore.feishu.cn/wiki/wikcnRfpMp8DUAxp8AtKAEF7Hng
一款纯go编写的,高性能通信的IM系统。

# 项目亮点
1. 单机百万长连接热重启实现
2. 万人群聊消息通信延迟优化
3. 架构可用性4个9建设
4. 超大规模消息推送优化
5. 直播/聊天室/弹幕多场景消息中台扩展
5. 直播/聊天室/弹幕多场景消息中台扩展
51 changes: 29 additions & 22 deletions gateway/epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"log"
"net"
"reflect"
"runtime"
"sync"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -46,30 +45,38 @@ func newEPool(ln *net.TCPListener, cb func(c *connection, ep *epoller)) *ePool {
}
}

// 创建一个专门处理 accept 事件的协程,与当前cpu的核数对应,能够发挥最大功效
func (e *ePool) createAcceptProcess() {
for i := 0; i < runtime.NumCPU(); i++ {
go func() {
for {
conn, e := e.ln.AcceptTCP()
// 限流熔断
if !checkTcp() {
_ = conn.Close()
continue
}
setTcpConifg(conn)
if e != nil {
if ne, ok := e.(net.Error); ok && ne.Temporary() {
fmt.Errorf("accept temp err: %v", ne)
go func() { // 单goroutine负责accept
for {
conn, err := e.ln.AcceptTCP()
if err != nil {
if ne, ok := err.(net.Error); ok {
if ne.Temporary() {
log.Printf("temp accept err: %v", ne)
continue
}
fmt.Errorf("accept err: %v", e)
log.Printf("permanent accept err: %v", err)
return // 停止循环
}
c := NewConnection(conn)
ep.addTask(c)
log.Printf("accept err: %v", err)
continue
}
}()
}
if !checkConnRate() { // 熔断检查
conn.Close()
log.Printf("connection rejected by rate limiter")
continue
}
// 设置TCP配置
setTcpConfig(conn)
// 分发任务(确保非阻塞)
select {
case e.eChan <- NewConnection(conn):
default:
conn.Close()
log.Printf("task queue full, connection closed")
}
}
}()
}

func (e *ePool) startEPool() {
Expand Down Expand Up @@ -213,12 +220,12 @@ func subTcpNum() {
atomic.AddInt32(&tcpNum, -1)
}

func checkTcp() bool {
func checkConnRate() bool {
num := getTcpNum()
maxTcpNum := config.GetGatewayMaxTcpNum()
return num <= maxTcpNum
}

func setTcpConifg(c *net.TCPConn) {
func setTcpConfig(c *net.TCPConn) {
_ = c.SetKeepAlive(true)
}