Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions backend/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,10 @@ type GatewaySchedulingConfig struct {

// 负载计算
LoadBatchEnabled bool `mapstructure:"load_batch_enabled"`
// 负载均衡时每次窗口化读取的候选账号数量。
CandidatePageSize int `mapstructure:"candidate_page_size"`
// 负载均衡时最多参与轮转采样的有序账号数量。
CandidateScanLimit int `mapstructure:"candidate_scan_limit"`

// 过期槽位清理周期(0 表示禁用)
SlotCleanupInterval time.Duration `mapstructure:"slot_cleanup_interval"`
Expand Down Expand Up @@ -1411,6 +1415,8 @@ func setDefaults() {
viper.SetDefault("gateway.scheduling.fallback_max_waiting", 100)
viper.SetDefault("gateway.scheduling.fallback_selection_mode", "last_used")
viper.SetDefault("gateway.scheduling.load_batch_enabled", true)
viper.SetDefault("gateway.scheduling.candidate_page_size", 256)
viper.SetDefault("gateway.scheduling.candidate_scan_limit", 8192)
viper.SetDefault("gateway.scheduling.slot_cleanup_interval", 30*time.Second)
viper.SetDefault("gateway.scheduling.db_fallback_enabled", true)
viper.SetDefault("gateway.scheduling.db_fallback_timeout_seconds", 0)
Expand Down Expand Up @@ -2185,6 +2191,15 @@ func (c *Config) Validate() error {
if c.Gateway.Scheduling.FallbackMaxWaiting <= 0 {
return fmt.Errorf("gateway.scheduling.fallback_max_waiting must be positive")
}
if c.Gateway.Scheduling.CandidatePageSize <= 0 {
return fmt.Errorf("gateway.scheduling.candidate_page_size must be positive")
}
if c.Gateway.Scheduling.CandidateScanLimit <= 0 {
return fmt.Errorf("gateway.scheduling.candidate_scan_limit must be positive")
}
if c.Gateway.Scheduling.CandidateScanLimit < c.Gateway.Scheduling.CandidatePageSize {
return fmt.Errorf("gateway.scheduling.candidate_scan_limit must be >= candidate_page_size")
}
if c.Gateway.Scheduling.SlotCleanupInterval < 0 {
return fmt.Errorf("gateway.scheduling.slot_cleanup_interval must be non-negative")
}
Expand Down
44 changes: 37 additions & 7 deletions backend/internal/pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
sugar atomic.Pointer[zap.SugaredLogger]
atomicLevel zap.AtomicLevel
initOptions InitOptions
activeClosers []io.Closer
currentSink atomic.Value // sinkState
stdLogUndo func()
bootstrapOnce sync.Once
Expand All @@ -72,23 +73,26 @@ func Init(options InitOptions) error {

func initLocked(options InitOptions) error {
normalized := options.normalized()
zl, al, err := buildLogger(normalized)
zl, al, closers, err := buildLogger(normalized)
if err != nil {
return err
}

prev := global.Load()
prevClosers := activeClosers
global.Store(zl)
sugar.Store(zl.Sugar())
atomicLevel = al
initOptions = normalized
activeClosers = closers

bridgeSlogLocked()
bridgeStdLogLocked()

if prev != nil {
_ = prev.Sync()
}
closeClosers(prevClosers)
return nil
}

Expand Down Expand Up @@ -205,6 +209,19 @@ func Sync() {
}
}

func Close() {
mu.Lock()
defer mu.Unlock()

if l := global.Load(); l != nil {
_ = l.Sync()
}
closeClosers(activeClosers)
activeClosers = nil
global.Store(nil)
sugar.Store(nil)
}

func bridgeStdLogLocked() {
if stdLogUndo != nil {
stdLogUndo()
Expand Down Expand Up @@ -238,7 +255,7 @@ func bridgeSlogLocked() {
slog.SetDefault(slog.New(newSlogZapHandler(base.Named("slog"))))
}

func buildLogger(options InitOptions) (*zap.Logger, zap.AtomicLevel, error) {
func buildLogger(options InitOptions) (*zap.Logger, zap.AtomicLevel, []io.Closer, error) {
level, _ := parseLevel(options.Level)
atomic := zap.NewAtomicLevelAt(level)

Expand All @@ -265,6 +282,7 @@ func buildLogger(options InitOptions) (*zap.Logger, zap.AtomicLevel, error) {

sinkCore := newSinkCore()
cores := make([]zapcore.Core, 0, 3)
closers := make([]io.Closer, 0, 1)

if options.Output.ToStdout {
infoPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
Expand All @@ -278,7 +296,7 @@ func buildLogger(options InitOptions) (*zap.Logger, zap.AtomicLevel, error) {
}

if options.Output.ToFile {
fileCore, filePath, fileErr := buildFileCore(enc, atomic, options)
fileCore, fileCloser, filePath, fileErr := buildFileCore(enc, atomic, options)
if fileErr != nil {
_, _ = fmt.Fprintf(os.Stderr, "time=%s level=WARN msg=\"日志文件输出初始化失败,降级为仅标准输出\" path=%s err=%v\n",
time.Now().Format(time.RFC3339Nano),
Expand All @@ -287,6 +305,9 @@ func buildLogger(options InitOptions) (*zap.Logger, zap.AtomicLevel, error) {
)
} else {
cores = append(cores, fileCore)
if fileCloser != nil {
closers = append(closers, fileCloser)
}
}
}

Expand All @@ -313,18 +334,18 @@ func buildLogger(options InitOptions) (*zap.Logger, zap.AtomicLevel, error) {
zap.String("service", options.ServiceName),
zap.String("env", options.Environment),
)
return logger, atomic, nil
return logger, atomic, closers, nil
}

func buildFileCore(enc zapcore.Encoder, atomic zap.AtomicLevel, options InitOptions) (zapcore.Core, string, error) {
func buildFileCore(enc zapcore.Encoder, atomic zap.AtomicLevel, options InitOptions) (zapcore.Core, io.Closer, string, error) {
filePath := options.Output.FilePath
if strings.TrimSpace(filePath) == "" {
filePath = resolveLogFilePath("")
}

dir := filepath.Dir(filePath)
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, filePath, err
return nil, nil, filePath, err
}
lj := &lumberjack.Logger{
Filename: filePath,
Expand All @@ -334,7 +355,16 @@ func buildFileCore(enc zapcore.Encoder, atomic zap.AtomicLevel, options InitOpti
Compress: options.Rotation.Compress,
LocalTime: options.Rotation.LocalTime,
}
return zapcore.NewCore(enc, zapcore.AddSync(lj), atomic), filePath, nil
return zapcore.NewCore(enc, zapcore.AddSync(lj), atomic), lj, filePath, nil
}

func closeClosers(closers []io.Closer) {
for _, closer := range closers {
if closer == nil {
continue
}
_ = closer.Close()
}
}

type sinkCore struct {
Expand Down
15 changes: 13 additions & 2 deletions backend/internal/pkg/logger/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,18 @@ import (
"io"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
)

func syncTestLogger() {
if runtime.GOOS == "windows" {
return
}
Sync()
}

func TestInit_DualOutput(t *testing.T) {
tmpDir := t.TempDir()
logPath := filepath.Join(tmpDir, "logs", "sub2api.log")
Expand Down Expand Up @@ -54,10 +62,11 @@ func TestInit_DualOutput(t *testing.T) {
if err != nil {
t.Fatalf("Init() error: %v", err)
}
t.Cleanup(Close)

L().Info("dual-output-info")
L().Warn("dual-output-warn")
Sync()
syncTestLogger()

_ = stdoutW.Close()
_ = stderrW.Close()
Expand Down Expand Up @@ -121,6 +130,7 @@ func TestInit_FileOutputFailureDowngrade(t *testing.T) {
if err != nil {
t.Fatalf("Init() should downgrade instead of failing, got: %v", err)
}
t.Cleanup(Close)

_ = stderrW.Close()
stderrBytes, _ := io.ReadAll(stderrR)
Expand Down Expand Up @@ -164,9 +174,10 @@ func TestInit_CallerShouldPointToCallsite(t *testing.T) {
}); err != nil {
t.Fatalf("Init() error: %v", err)
}
t.Cleanup(Close)

L().Info("caller-check")
Sync()
syncTestLogger()
_ = stdoutW.Close()
logBytes, _ := io.ReadAll(stdoutR)

Expand Down
2 changes: 1 addition & 1 deletion backend/internal/pkg/logger/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestBuildFileCore_InvalidPathFallback(t *testing.T) {
EncodeLevel: zapcore.CapitalLevelEncoder,
}
encoder := zapcore.NewJSONEncoder(encoderCfg)
_, _, err := buildFileCore(encoder, zap.NewAtomicLevel(), opts)
_, _, _, err := buildFileCore(encoder, zap.NewAtomicLevel(), opts)
if err == nil {
t.Fatalf("buildFileCore() expected error for invalid path")
}
Expand Down
6 changes: 4 additions & 2 deletions backend/internal/pkg/logger/stdlog_bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ func TestStdLogBridgeRoutesLevels(t *testing.T) {
}); err != nil {
t.Fatalf("Init() error: %v", err)
}
t.Cleanup(Close)

log.Printf("service started")
log.Printf("Warning: queue full")
log.Printf("Forward request failed: timeout")
Sync()
syncTestLogger()

_ = stdoutW.Close()
_ = stderrW.Close()
Expand Down Expand Up @@ -135,11 +136,12 @@ func TestLegacyPrintfRoutesLevels(t *testing.T) {
}); err != nil {
t.Fatalf("Init() error: %v", err)
}
t.Cleanup(Close)

LegacyPrintf("service.test", "request started")
LegacyPrintf("service.test", "Warning: queue full")
LegacyPrintf("service.test", "forward failed: timeout")
Sync()
syncTestLogger()

_ = stdoutW.Close()
_ = stderrW.Close()
Expand Down
Loading