Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
347667a
feat: TCP timeout flush
tobyxdd Apr 8, 2024
5f447d4
Merge pull request #124 from apernet/wip-tcp-flush
tobyxdd Apr 9, 2024
107e29e
fix: do not reload geoip/geosite when reloading ruleset to prevent le…
tobyxdd Apr 11, 2024
245ac46
Merge pull request #130 from apernet/fix-geo-leak
tobyxdd Apr 11, 2024
d750626
fix: provide correct timestamp for TCP reassembler
tobyxdd May 6, 2024
5723490
Merge pull request #133 from apernet/fix-timestamp
tobyxdd May 6, 2024
9438745
feat: add support for pcap replay
eddc005 May 6, 2024
f01b79e
rebase and remove replayDelay
eddc005 May 6, 2024
abd7725
close pcap properly and implement ProtectedDialContext
eddc005 May 7, 2024
70fee14
chore: format
tobyxdd May 8, 2024
76c0f47
chore: do not default replay.realtime to true
tobyxdd May 8, 2024
5e15fd6
ci: install pcap for build
tobyxdd May 8, 2024
0daaa32
ci: install pcap for build 2
tobyxdd May 8, 2024
c453020
Merge pull request #132 from eddc005/feat-pcap
tobyxdd May 8, 2024
dabcc95
ci: enable cgo
tobyxdd May 8, 2024
5014523
Merge pull request #134 from apernet/ci-cgo
tobyxdd May 8, 2024
2ac8783
Revert "Merge pull request #132 from eddc005/feat-pcap"
tobyxdd May 8, 2024
b51ea5f
Revert "Merge pull request #134 from apernet/ci-cgo"
tobyxdd May 8, 2024
3ec5456
Merge pull request #135 from apernet/revert-pcap
tobyxdd May 8, 2024
8cab86b
Reapply "Merge pull request #132 from eddc005/feat-pcap"
haruue May 8, 2024
7456e59
refactor(pcap): switch to pcapgo
haruue May 8, 2024
cb0427b
Revert "ci: install pcap for build"
haruue May 8, 2024
301f9af
Revert "ci: install pcap for build 2"
haruue May 8, 2024
1934c06
feat(pcap): impl realtime wait() with time offset
haruue May 8, 2024
1de95ed
Merge pull request #136 from apernet/wip-pcapgo
tobyxdd May 9, 2024
d3f1785
feat: netlink queueNum/table config options
kpetku Aug 8, 2024
d8d7c5b
chore: allow set nfqueue num to 0
haruue Oct 27, 2024
5f4df7e
chore: rm nfqueueNum parameter in setupNft()
haruue Oct 27, 2024
0e97c9f
feat: connmark accept/drop config options
haruue Oct 28, 2024
278d731
Merge pull request #147 from kpetku/feat-expose-netlink-config-options
tobyxdd Oct 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 86 additions & 32 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os/signal"
"strings"
"syscall"
"time"

"github.com/apernet/OpenGFW/analyzer"
"github.com/apernet/OpenGFW/analyzer/tcp"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/apernet/OpenGFW/modifier"
modUDP "github.com/apernet/OpenGFW/modifier/udp"
"github.com/apernet/OpenGFW/ruleset"
"github.com/apernet/OpenGFW/ruleset/builtins/geo"

"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand All @@ -41,6 +43,7 @@ var logger *zap.Logger
// Flags
var (
cfgFile string
pcapFile string
logLevel string
logFormat string
)
Expand Down Expand Up @@ -116,6 +119,7 @@ func init() {

func initFlags() {
rootCmd.PersistentFlags().StringVarP(&cfgFile, "config", "c", "", "config file")
rootCmd.PersistentFlags().StringVarP(&pcapFile, "pcap", "p", "", "pcap file (optional)")
rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "l", envOrDefaultString(appLogLevelEnv, "info"), "log level")
rootCmd.PersistentFlags().StringVarP(&logFormat, "log-format", "f", envOrDefaultString(appLogFormatEnv, "console"), "log format")
}
Expand Down Expand Up @@ -165,22 +169,33 @@ type cliConfig struct {
IO cliConfigIO `mapstructure:"io"`
Workers cliConfigWorkers `mapstructure:"workers"`
Ruleset cliConfigRuleset `mapstructure:"ruleset"`
Replay cliConfigReplay `mapstructure:"replay"`
}

type cliConfigIO struct {
QueueSize uint32 `mapstructure:"queueSize"`
ReadBuffer int `mapstructure:"rcvBuf"`
WriteBuffer int `mapstructure:"sndBuf"`
Local bool `mapstructure:"local"`
RST bool `mapstructure:"rst"`
QueueSize uint32 `mapstructure:"queueSize"`
QueueNum *uint16 `mapstructure:"queueNum"`
Table string `mapstructure:"table"`
ConnMarkAccept uint32 `mapstructure:"connMarkAccept"`
ConnMarkDrop uint32 `mapstructure:"connMarkDrop"`

ReadBuffer int `mapstructure:"rcvBuf"`
WriteBuffer int `mapstructure:"sndBuf"`
Local bool `mapstructure:"local"`
RST bool `mapstructure:"rst"`
}

type cliConfigReplay struct {
Realtime bool `mapstructure:"realtime"`
}

type cliConfigWorkers struct {
Count int `mapstructure:"count"`
QueueSize int `mapstructure:"queueSize"`
TCPMaxBufferedPagesTotal int `mapstructure:"tcpMaxBufferedPagesTotal"`
TCPMaxBufferedPagesPerConn int `mapstructure:"tcpMaxBufferedPagesPerConn"`
UDPMaxStreams int `mapstructure:"udpMaxStreams"`
Count int `mapstructure:"count"`
QueueSize int `mapstructure:"queueSize"`
TCPMaxBufferedPagesTotal int `mapstructure:"tcpMaxBufferedPagesTotal"`
TCPMaxBufferedPagesPerConn int `mapstructure:"tcpMaxBufferedPagesPerConn"`
TCPTimeout time.Duration `mapstructure:"tcpTimeout"`
UDPMaxStreams int `mapstructure:"udpMaxStreams"`
}

type cliConfigRuleset struct {
Expand All @@ -194,17 +209,35 @@ func (c *cliConfig) fillLogger(config *engine.Config) error {
}

func (c *cliConfig) fillIO(config *engine.Config) error {
nfio, err := io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{
QueueSize: c.IO.QueueSize,
ReadBuffer: c.IO.ReadBuffer,
WriteBuffer: c.IO.WriteBuffer,
Local: c.IO.Local,
RST: c.IO.RST,
})
var ioImpl io.PacketIO
var err error
if pcapFile != "" {
// Setup IO for pcap file replay
logger.Info("replaying from pcap file", zap.String("pcap file", pcapFile))
ioImpl, err = io.NewPcapPacketIO(io.PcapPacketIOConfig{
PcapFile: pcapFile,
Realtime: c.Replay.Realtime,
})
} else {
// Setup IO for nfqueue
ioImpl, err = io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{
QueueSize: c.IO.QueueSize,
QueueNum: c.IO.QueueNum,
Table: c.IO.Table,
ConnMarkAccept: c.IO.ConnMarkAccept,
ConnMarkDrop: c.IO.ConnMarkDrop,

ReadBuffer: c.IO.ReadBuffer,
WriteBuffer: c.IO.WriteBuffer,
Local: c.IO.Local,
RST: c.IO.RST,
})
}

if err != nil {
return configError{Field: "io", Err: err}
}
config.IO = nfio
config.IO = ioImpl
return nil
}

Expand All @@ -213,6 +246,7 @@ func (c *cliConfig) fillWorkers(config *engine.Config) error {
config.WorkerQueueSize = c.Workers.QueueSize
config.WorkerTCPMaxBufferedPagesTotal = c.Workers.TCPMaxBufferedPagesTotal
config.WorkerTCPMaxBufferedPagesPerConn = c.Workers.TCPMaxBufferedPagesPerConn
config.WorkerTCPTimeout = c.Workers.TCPTimeout
config.WorkerUDPMaxStreams = c.Workers.UDPMaxStreams
return nil
}
Expand Down Expand Up @@ -256,8 +290,7 @@ func runMain(cmd *cobra.Command, args []string) {
}
rsConfig := &ruleset.BuiltinConfig{
Logger: &rulesetLogger{},
GeoSiteFilename: config.Ruleset.GeoSite,
GeoIpFilename: config.Ruleset.GeoIp,
GeoMatcher: geo.NewGeoMatcher(config.Ruleset.GeoSite, config.Ruleset.GeoIp),
ProtectedDialContext: engineConfig.IO.ProtectedDialContext,
}
rs, err := ruleset.CompileExprRules(rawRs, analyzers, modifiers, rsConfig)
Expand Down Expand Up @@ -340,12 +373,26 @@ func (l *engineLogger) TCPStreamPropUpdate(info ruleset.StreamInfo, close bool)
}

func (l *engineLogger) TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) {
logger.Info("TCP stream action",
zap.Int64("id", info.ID),
zap.String("src", info.SrcString()),
zap.String("dst", info.DstString()),
zap.String("action", action.String()),
zap.Bool("noMatch", noMatch))
if noMatch {
logger.Debug("TCP stream no match",
zap.Int64("id", info.ID),
zap.String("src", info.SrcString()),
zap.String("dst", info.DstString()),
zap.String("action", action.String()))
} else {
logger.Info("TCP stream action",
zap.Int64("id", info.ID),
zap.String("src", info.SrcString()),
zap.String("dst", info.DstString()),
zap.String("action", action.String()))
}
}

func (l *engineLogger) TCPFlush(workerID, flushed, closed int) {
logger.Debug("TCP flush",
zap.Int("workerID", workerID),
zap.Int("flushed", flushed),
zap.Int("closed", closed))
}

func (l *engineLogger) UDPStreamNew(workerID int, info ruleset.StreamInfo) {
Expand All @@ -366,12 +413,19 @@ func (l *engineLogger) UDPStreamPropUpdate(info ruleset.StreamInfo, close bool)
}

func (l *engineLogger) UDPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) {
logger.Info("UDP stream action",
zap.Int64("id", info.ID),
zap.String("src", info.SrcString()),
zap.String("dst", info.DstString()),
zap.String("action", action.String()),
zap.Bool("noMatch", noMatch))
if noMatch {
logger.Debug("UDP stream no match",
zap.Int64("id", info.ID),
zap.String("src", info.SrcString()),
zap.String("dst", info.DstString()),
zap.String("action", action.String()))
} else {
logger.Info("UDP stream action",
zap.Int64("id", info.ID),
zap.String("src", info.SrcString()),
zap.String("dst", info.DstString()),
zap.String("action", action.String()))
}
}

func (l *engineLogger) ModifyError(info ruleset.StreamInfo, err error) {
Expand Down
16 changes: 13 additions & 3 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func NewEngine(config Config) (Engine, error) {
Ruleset: config.Ruleset,
TCPMaxBufferedPagesTotal: config.WorkerTCPMaxBufferedPagesTotal,
TCPMaxBufferedPagesPerConn: config.WorkerTCPMaxBufferedPagesPerConn,
TCPTimeout: config.WorkerTCPTimeout,
UDPMaxStreams: config.WorkerUDPMaxStreams,
})
if err != nil {
Expand All @@ -57,12 +58,17 @@ func (e *engine) UpdateRuleset(r ruleset.Ruleset) error {
}

func (e *engine) Run(ctx context.Context) error {
workerCtx, workerCancel := context.WithCancel(ctx)
defer workerCancel() // Stop workers

// Register IO shutdown
ioCtx, ioCancel := context.WithCancel(ctx)
defer ioCancel() // Stop workers & IO
e.io.SetCancelFunc(ioCancel)
defer ioCancel() // Stop IO

// Start workers
for _, w := range e.workers {
go w.Run(ioCtx)
go w.Run(workerCtx)
}

// Register IO callback
Expand All @@ -84,6 +90,8 @@ func (e *engine) Run(ctx context.Context) error {
return err
case <-ctx.Done():
return nil
case <-ioCtx.Done():
return nil
}
}

Expand All @@ -101,9 +109,11 @@ func (e *engine) dispatch(p io.Packet) bool {
_ = e.io.SetVerdict(p, io.VerdictAcceptStream, nil)
return true
}
// Convert to gopacket.Packet
packet := gopacket.NewPacket(data, layerType, gopacket.DecodeOptions{Lazy: true, NoCopy: true})
packet.Metadata().Timestamp = p.Timestamp()
// Load balance by stream ID
index := p.StreamID() % uint32(len(e.workers))
packet := gopacket.NewPacket(data, layerType, gopacket.DecodeOptions{Lazy: true, NoCopy: true})
e.workers[index].Feed(&workerPacket{
StreamID: p.StreamID(),
Packet: packet,
Expand Down
3 changes: 3 additions & 0 deletions engine/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package engine

import (
"context"
"time"

"github.com/apernet/OpenGFW/io"
"github.com/apernet/OpenGFW/ruleset"
Expand All @@ -25,6 +26,7 @@ type Config struct {
WorkerQueueSize int
WorkerTCPMaxBufferedPagesTotal int
WorkerTCPMaxBufferedPagesPerConn int
WorkerTCPTimeout time.Duration
WorkerUDPMaxStreams int
}

Expand All @@ -36,6 +38,7 @@ type Logger interface {
TCPStreamNew(workerID int, info ruleset.StreamInfo)
TCPStreamPropUpdate(info ruleset.StreamInfo, close bool)
TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool)
TCPFlush(workerID, flushed, closed int)

UDPStreamNew(workerID int, info ruleset.StreamInfo)
UDPStreamPropUpdate(info ruleset.StreamInfo, close bool)
Expand Down
25 changes: 23 additions & 2 deletions engine/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package engine

import (
"context"
"time"

"github.com/apernet/OpenGFW/io"
"github.com/apernet/OpenGFW/ruleset"
Expand All @@ -14,9 +15,12 @@ import (

const (
defaultChanSize = 64
defaultTCPMaxBufferedPagesTotal = 4096
defaultTCPMaxBufferedPagesPerConnection = 64
defaultTCPMaxBufferedPagesTotal = 65536
defaultTCPMaxBufferedPagesPerConnection = 16
defaultTCPTimeout = 10 * time.Minute
defaultUDPMaxStreams = 4096

tcpFlushInterval = 1 * time.Minute
)

type workerPacket struct {
Expand All @@ -33,6 +37,7 @@ type worker struct {
tcpStreamFactory *tcpStreamFactory
tcpStreamPool *reassembly.StreamPool
tcpAssembler *reassembly.Assembler
tcpTimeout time.Duration

udpStreamFactory *udpStreamFactory
udpStreamManager *udpStreamManager
Expand All @@ -47,6 +52,7 @@ type workerConfig struct {
Ruleset ruleset.Ruleset
TCPMaxBufferedPagesTotal int
TCPMaxBufferedPagesPerConn int
TCPTimeout time.Duration
UDPMaxStreams int
}

Expand All @@ -60,6 +66,9 @@ func (c *workerConfig) fillDefaults() {
if c.TCPMaxBufferedPagesPerConn <= 0 {
c.TCPMaxBufferedPagesPerConn = defaultTCPMaxBufferedPagesPerConnection
}
if c.TCPTimeout <= 0 {
c.TCPTimeout = defaultTCPTimeout
}
if c.UDPMaxStreams <= 0 {
c.UDPMaxStreams = defaultUDPMaxStreams
}
Expand Down Expand Up @@ -98,6 +107,7 @@ func newWorker(config workerConfig) (*worker, error) {
tcpStreamFactory: tcpSF,
tcpStreamPool: tcpStreamPool,
tcpAssembler: tcpAssembler,
tcpTimeout: config.TCPTimeout,
udpStreamFactory: udpSF,
udpStreamManager: udpSM,
modSerializeBuffer: gopacket.NewSerializeBuffer(),
Expand All @@ -111,6 +121,10 @@ func (w *worker) Feed(p *workerPacket) {
func (w *worker) Run(ctx context.Context) {
w.logger.WorkerStart(w.id)
defer w.logger.WorkerStop(w.id)

tcpFlushTicker := time.NewTicker(tcpFlushInterval)
defer tcpFlushTicker.Stop()

for {
select {
case <-ctx.Done():
Expand All @@ -122,6 +136,8 @@ func (w *worker) Run(ctx context.Context) {
}
v, b := w.handle(wPkt.StreamID, wPkt.Packet)
_ = wPkt.SetVerdict(v, b)
case <-tcpFlushTicker.C:
w.flushTCP(w.tcpTimeout)
}
}
}
Expand Down Expand Up @@ -176,6 +192,11 @@ func (w *worker) handleTCP(ipFlow gopacket.Flow, pMeta *gopacket.PacketMetadata,
return io.Verdict(ctx.Verdict)
}

func (w *worker) flushTCP(timeout time.Duration) {
flushed, closed := w.tcpAssembler.FlushCloseOlderThan(time.Now().Add(-timeout))
w.logger.TCPFlush(w.id, flushed, closed)
}

func (w *worker) handleUDP(streamID uint32, ipFlow gopacket.Flow, udp *layers.UDP) (io.Verdict, []byte) {
ctx := &udpContext{
Verdict: udpVerdictAccept,
Expand Down
6 changes: 6 additions & 0 deletions io/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io
import (
"context"
"net"
"time"
)

type Verdict int
Expand All @@ -24,6 +25,8 @@ const (
type Packet interface {
// StreamID is the ID of the stream the packet belongs to.
StreamID() uint32
// Timestamp is the time the packet was received.
Timestamp() time.Time
// Data is the raw packet data, starting with the IP header.
Data() []byte
}
Expand All @@ -45,6 +48,9 @@ type PacketIO interface {
ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error)
// Close closes the packet IO.
Close() error
// SetCancelFunc gives packet IO access to context cancel function, enabling it to
// trigger a shutdown
SetCancelFunc(cancelFunc context.CancelFunc) error
}

type ErrInvalidPacket struct {
Expand Down
Loading