Skip to content
Open
Show file tree
Hide file tree
Changes from 93 commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
c153122
feat(pool): add Pool struct to manage goroutine
geunwoonoh Nov 24, 2025
8300895
feat(agent): add ContextManager to manage context
geunwoonoh Nov 24, 2025
b862cd2
feat(pool): add test cases for Pool
geunwoonoh Nov 24, 2025
fc9c1aa
feat(agent): add test cases for ContextManager
geunwoonoh Nov 24, 2025
19a78c1
refactor: refactor due to the addition of Pool and ContextManager
geunwoonoh Nov 24, 2025
123366c
fix(pty): fix race condition in PtyClient
geunwoonoh Nov 24, 2025
0ee4daa
feat(config): add pool section to alpamon.conf
geunwoonoh Nov 24, 2025
a35efa3
refactor(config): update due to apply changes in alpamon.conf
geunwoonoh Nov 24, 2025
6633f4b
chore(agent): Delete unused function
geunwoonoh Nov 24, 2025
261d75b
fix(pool): adjust golint to pool_test.go
geunwoonoh Nov 24, 2025
52eb1a6
feat(scheduler): add ReportManager to manage reporter goroutine
geunwoonoh Nov 24, 2025
d3bb4d7
fix(scheduler): fix context leak
geunwoonoh Nov 24, 2025
01adca2
fix(command): fix context leaks in specific cases
geunwoonoh Nov 24, 2025
41eead2
fix: fix context leaks in specific component
geunwoonoh Nov 24, 2025
94bb1ef
refactor(commit): use time.After for delayed execution in CommitAsync
geunwoonoh Nov 24, 2025
c750986
fix(config): allow explicit zero timeout in pool configuration
geunwoonoh Nov 24, 2025
bf7506c
Merge pull request #138 from alpacax/132-alpamon-agent-refactoring-ph…
geunwoonoh Nov 24, 2025
bde6d2b
Merge remote-tracking branch 'origin/main' into 131-alpamon-agent-ref…
geunwoonoh Nov 26, 2025
727feb1
feat(executor): add executor components
geunwoonoh Dec 2, 2025
9817ffa
feat(executor): add Registry component
geunwoonoh Dec 2, 2025
1dd19d7
feat(executor): add CommandDispatcher component
geunwoonoh Dec 2, 2025
eed68e6
feat(handlers): Add BaseHandler
geunwoonoh Dec 2, 2025
10dcc7a
feat(executor): add CommandArgs to provide type safety
geunwoonoh Dec 2, 2025
f44e207
refactor(executor): update CommandDispatcher to apply changes
geunwoonoh Dec 2, 2025
ac3802e
refactor(executor): update Interfaces to apply changes
geunwoonoh Dec 2, 2025
e0ee9f6
chore(handler): Delete unused function
geunwoonoh Dec 2, 2025
f4049a4
refactor(command): update following the addition of Executor
geunwoonoh Dec 3, 2025
6a69b91
refactor(client): update following the addition of Executor
geunwoonoh Dec 3, 2025
9597127
chore(executor): minor fix
geunwoonoh Dec 3, 2025
648de3a
feat(handler): add SystemHandler
geunwoonoh Dec 3, 2025
a60ed90
feat(executor): define types for commands and handler kinds
geunwoonoh Dec 3, 2025
c5ed085
feat(handler): add GroupHandler
geunwoonoh Dec 4, 2025
95b0da9
refactor(executor): delete unused methods
geunwoonoh Dec 4, 2025
698f585
feat(handler): add InfoHandler and SystemInfoAdapter
geunwoonoh Dec 4, 2025
87bd3ad
feat(handler): add ShellHandler
geunwoonoh Dec 4, 2025
7043c5e
chore(commit): change the access scope of the functions
geunwoonoh Dec 4, 2025
e313ed3
feat(handler): add UserHandler
geunwoonoh Dec 4, 2025
81d66fa
feat(service): define GroupService interface
geunwoonoh Dec 4, 2025
40dd838
refactor(executor): move SystemInfoAdapter to executor package
geunwoonoh Dec 4, 2025
f42d4fc
feat(handler): add FirewallHandler
geunwoonoh Dec 4, 2025
7de9449
chore(utils): change functions' visibility
geunwoonoh Dec 5, 2025
4c8ffdf
feat(handler): add FileHandler
geunwoonoh Dec 5, 2025
7f2fed5
feat(utils): add utility functions for use within the FileHandler
geunwoonoh Dec 5, 2025
081818b
feat(utils): add Demote() to eliminate duplicate code
geunwoonoh Dec 5, 2025
e7d3e65
refactor: update due to apply changes
geunwoonoh Dec 5, 2025
b3513cd
chore(config): align the lines
geunwoonoh Dec 5, 2025
8e96b59
feat(handler): add TerminalHandler
geunwoonoh Dec 5, 2025
2b129cb
feat(pty): add functions to resize PtyClient
geunwoonoh Dec 5, 2025
908d510
feat(executor): add HandlerFactory
geunwoonoh Dec 5, 2025
07511c9
feat(executor): add InitDispatcher
geunwoonoh Dec 5, 2025
c2ad2dd
feat: add logic for initializing the dispatcher
geunwoonoh Dec 5, 2025
cfc67c7
feat: add test cases for the executor system
geunwoonoh Dec 5, 2025
44b08ef
Merge remote-tracking branch 'origin/main' into 131-alpamon-agent-ref…
geunwoonoh Dec 5, 2025
fbeb9a4
Merge remote-tracking branch 'origin/131-alpamon-agent-refactoring' i…
geunwoonoh Dec 5, 2025
c81c2bf
chore(executor): delete unused functions and unnecessary log
geunwoonoh Dec 5, 2025
a7badcb
chore(executor): update logs
geunwoonoh Dec 5, 2025
d75be9c
fix: apply golint
geunwoonoh Dec 5, 2025
2e02150
fix: apply golint
geunwoonoh Dec 5, 2025
c16370e
fix(handlers): remove redundant logic in handleDelUser()
geunwoonoh Dec 5, 2025
6de7180
fix(handlers): add exception handling to statFileTransfer()
geunwoonoh Dec 5, 2025
3089d58
fix(handlers): fix context leak in SystemHandler
geunwoonoh Dec 5, 2025
ac55acb
chore(handlers): add log to statFileTransfer()
geunwoonoh Dec 5, 2025
08ee185
Merge pull request #144 from alpacax/133-alpamon-agent-refactoring-ph…
geunwoonoh Dec 5, 2025
2d42e7a
feat(handlers): add FirewallDetector
geunwoonoh Dec 10, 2025
f99a748
feat(handlers): add FirewallBackend and its implementations
geunwoonoh Dec 10, 2025
a6fc5d3
fix(handlers): rename FirewallBackend to BackendType
geunwoonoh Dec 10, 2025
6c5d7f1
feat(handlers): add BackupManager
geunwoonoh Dec 10, 2025
74b18c2
feat(handlers): add Validator for firewall rules
geunwoonoh Dec 10, 2025
01ac2b5
refactor(handlers): apply changes to the FirewallHandler
geunwoonoh Dec 10, 2025
cd40597
refactor(runner): remove firewall-related logic
geunwoonoh Dec 10, 2025
9d0c556
chore: apply golint and minor fix
geunwoonoh Dec 10, 2025
7b2eab6
fix(lint): resolve golangci-lint errors
geunwoonoh Dec 10, 2025
aa916ea
fix(handlers): remove unused code
geunwoonoh Dec 10, 2025
3b99093
Merge pull request #147 from alpacax/134-alpamon-agent-refactoring-ph…
junho226 Dec 11, 2025
7ad8e0c
refactor: move command related types to internal
geunwoonoh Dec 16, 2025
2254dfa
refactor: update to apply changes
geunwoonoh Dec 16, 2025
1c9120d
Merge pull request #153 from geunwoonoh/135-alpamon-agent-refactoring…
geunwoonoh Dec 16, 2025
a0e30ec
test(handlers): add unit tests for info, shell, and system handlers
geunwoonoh Dec 18, 2025
9d79f9f
test: add goroutine leak detection tests
geunwoonoh Dec 18, 2025
d0553a0
test: add performance benchmarks for core components
geunwoonoh Dec 18, 2025
04fc496
test(executor): add integration, regression, and resource tests
geunwoonoh Dec 18, 2025
0511174
chore: apply golint
geunwoonoh Dec 18, 2025
c1f5a54
chore: apply golint
geunwoonoh Dec 18, 2025
d5882f5
chore: apply golint
geunwoonoh Dec 18, 2025
2f46385
Merge pull request #155 from alpacax/136-alpamon-agent-refactoring-ph…
geunwoonoh Dec 19, 2025
cd8ada4
Merge remote-tracking branch 'origin/main' into 162-alpamon-agent-ref…
geunwoonoh Dec 29, 2025
ea4d9be
feat(handlers): implement TunnelHandler
geunwoonoh Dec 29, 2025
469b2ca
feat(protocol): add tunnel specific field
geunwoonoh Dec 29, 2025
0052267
feat(executor): update RegisterAll() to include TunnelHandler registr…
geunwoonoh Dec 29, 2025
4cf6ff7
Merge pull request #163 from alpacax/162-alpamon-agent-refactoring-in…
geunwoonoh Dec 29, 2025
3764b18
feat(ci): add CodeQL scanning with inline false positive suppression
geunwoonoh Dec 29, 2025
d0c97c9
feat(ci): add CodeQL security scanning workflow and configuration
geunwoonoh Dec 29, 2025
182f511
revert(ci): remove custom CodeQL workflow in favor of Default Setup
geunwoonoh Dec 29, 2025
f65f8d7
fix(security): add LGTM suppression comments for CodeQL false positives
geunwoonoh Dec 29, 2025
ab43e08
fix(tunnel): add duplicate session check to prevent process leak
geunwoonoh Dec 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 63 additions & 21 deletions cmd/alpamon/command/root.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package command

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/alpacax/alpamon/cmd/alpamon/command/ftp"
"github.com/alpacax/alpamon/cmd/alpamon/command/setup"
"github.com/alpacax/alpamon/cmd/alpamon/command/tunnel"
"github.com/alpacax/alpamon/internal/pool"
"github.com/alpacax/alpamon/pkg/agent"
"github.com/alpacax/alpamon/pkg/collector"
"github.com/alpacax/alpamon/pkg/config"
"github.com/alpacax/alpamon/pkg/db"
"github.com/alpacax/alpamon/pkg/executor"
"github.com/alpacax/alpamon/pkg/logger"
"github.com/alpacax/alpamon/pkg/pidfile"
"github.com/alpacax/alpamon/pkg/runner"
Expand Down Expand Up @@ -42,15 +45,16 @@ func init() {
}

func runAgent() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create global context manager for the entire application
ctxManager := agent.NewContextManager()
ctx := ctxManager.Root()

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

go func() {
<-sigChan
cancel()
ctxManager.Shutdown()
}()

// Logger
Expand All @@ -72,65 +76,90 @@ func runAgent() {
settings := config.LoadConfig(config.Files(name), wsPath)
config.InitSettings(settings)

// Create global worker pool for the entire application using config settings
workerPool := pool.NewPool(settings.PoolMaxWorkers, settings.PoolQueueSize)
log.Info().Msgf("Initialized global worker pool with %d workers and queue capacity %d",
workerPool.MaxWorkers(), workerPool.QueueCapacity())

// Session
session := scheduler.InitSession()
commissioned := session.CheckSession(ctx)

// Reporter
scheduler.StartReporters(session)
// Reporter - pass context manager for centralized context management
reporters := scheduler.StartReporters(session, ctxManager)

// Log server
logServer := logger.NewLogServer()
// Log server - pass worker pool and context manager for connection handling
logServer := logger.NewLogServer(workerPool, ctxManager)
if logServer != nil {
go logServer.StartLogServer()
}

log.Info().Msgf("%s initialized and running.", name)

// Commit
runner.CommitAsync(session, commissioned)
// Commit - pass context manager for coordinated lifecycle management
runner.CommitAsync(session, commissioned, ctxManager)

// DB
client := db.InitDB()

// Collector
metricCollector := collector.InitCollector(session, client)
// Collector - pass context manager for centralized context management
metricCollector := collector.InitCollector(session, client, ctxManager)
if metricCollector != nil {
metricCollector.Start()
}

// Websocket Client (Backhaul - commands, sessions)
wsClient := runner.NewWebsocketClient(session)
// Websocket Client - pass context manager and worker pool for centralized management
wsClient := runner.NewWebsocketClient(session, ctxManager, workerPool)

// Initialize dispatcher system with callbacks
dispatcher, err := executor.InitDispatcher(
workerPool,
ctxManager,
session,
wsClient,
executor.SystemInfoCallbacks{
CommitFunc: runner.CommitSystemInfo,
SyncFunc: runner.SyncSystemInfo,
},
)
if err != nil {
log.Fatal().Err(err).Msg("Failed to initialize dispatcher system")
}

wsClient.SetDispatcher(dispatcher)
log.Info().Msg("Dispatcher system initialized successfully")

go wsClient.RunForever(ctx)

// Control Client (Control - sudo approval)
controlClient := runner.NewControlClient()
go controlClient.RunForever(ctx)

// Auth Manager for sudo approval workflow
authManager := runner.GetAuthManager(controlClient)
go authManager.Start(ctx)

for {
select {
case <-ctx.Done():
log.Info().Msg("Received termination signal. Shutting down...")
gracefulShutdown(metricCollector, wsClient, controlClient, authManager, logServer, pidFilePath)
gracefulShutdown(metricCollector, wsClient, controlClient, authManager, workerPool, logServer, reporters, pidFilePath)
return
case <-wsClient.ShutDownChan:
log.Info().Msg("Shutdown command received. Shutting down...")
cancel()
gracefulShutdown(metricCollector, wsClient, controlClient, authManager, logServer, pidFilePath)
ctxManager.Shutdown()
gracefulShutdown(metricCollector, wsClient, controlClient, authManager, workerPool, logServer, reporters, pidFilePath)
return
case <-wsClient.RestartChan:
log.Info().Msg("Restart command received. Restarting...")
cancel()
gracefulShutdown(metricCollector, wsClient, controlClient, authManager, logServer, pidFilePath)
ctxManager.Shutdown()
gracefulShutdown(metricCollector, wsClient, controlClient, authManager, workerPool, logServer, reporters, pidFilePath)
restartAgent()
return
case <-wsClient.CollectorRestartChan:
log.Info().Msg("Collector restart command received. Restarting Collector...")
metricCollector.Stop()
metricCollector = collector.InitCollector(session, client)
metricCollector = collector.InitCollector(session, client, ctxManager)
metricCollector.Start()
}
}
Expand All @@ -149,7 +178,7 @@ func restartAgent() {
}
}

func gracefulShutdown(collector *collector.Collector, wsClient *runner.WebsocketClient, controlClient *runner.ControlClient, authManager *runner.AuthManager, logServer *logger.LogServer, pidPath string) {
func gracefulShutdown(collector *collector.Collector, wsClient *runner.WebsocketClient, controlClient *runner.ControlClient, authManager *runner.AuthManager, workerPool *pool.Pool, logServer *logger.LogServer, reporters *scheduler.ReporterManager, pidPath string) {
if collector != nil {
collector.Stop()
}
Expand All @@ -162,6 +191,19 @@ func gracefulShutdown(collector *collector.Collector, wsClient *runner.Websocket
if authManager != nil {
authManager.Stop()
}
// Shutdown reporters before worker pool
if reporters != nil {
if err := reporters.Shutdown(1 * time.Second); err != nil {
log.Error().Err(err).Msg("Failed to shutdown reporters gracefully")
}
}
// Shutdown the global worker pool
if workerPool != nil {
log.Info().Msg("Shutting down global worker pool...")
if err := workerPool.Shutdown(1 * time.Second); err != nil {
log.Error().Err(err).Msg("Failed to shutdown worker pool gracefully")
}
}
if logServer != nil {
logServer.Stop()
}
Expand Down
17 changes: 16 additions & 1 deletion configs/alpamon.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,19 @@ verify = {{.Verify}}
ca_cert = {{.CACert}}

[logging]
debug = {{.Debug}}
debug = {{.Debug}}

[pool]
# Maximum number of concurrent workers in the global worker pool
# Default: 20
# max_workers = 20

# Size of the job queue for the global worker pool
# Default: 200
# queue_size = 200

# Default timeout in seconds for pool tasks
# If not set or commented out, the default value of 30 seconds will be used
# Set to a positive number to specify timeout in seconds
# Default: 30
# default_timeout = 30
176 changes: 176 additions & 0 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Package pool provides a true worker pool implementation with job queue and context support.
package pool

import (
"context"
"fmt"
"log"
"runtime/debug"
"sync"
"time"
)

// Job represents a unit of work to be executed by the pool
type Job struct {
fn func() error
}

// Pool represents a pool of workers with a job queue
type Pool struct {
maxWorkers int
jobQueue chan *Job
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

// NewPool creates a new worker pool with job queue
func NewPool(maxWorkers int, queueSize int) *Pool {
if maxWorkers <= 0 {
maxWorkers = 10
}
if queueSize <= 0 {
queueSize = maxWorkers * 10 // Default queue size
}

ctx, cancel := context.WithCancel(context.Background())

p := &Pool{
maxWorkers: maxWorkers,
jobQueue: make(chan *Job, queueSize),
ctx: ctx,
cancel: cancel,
}

// Start worker goroutines
p.startWorkers()

return p
}

// startWorkers launches the worker goroutines
func (p *Pool) startWorkers() {
for i := 0; i < p.maxWorkers; i++ {
p.wg.Add(1)
go p.worker()
}
}

// worker is the main loop for each worker goroutine
func (p *Pool) worker() {
defer p.wg.Done()

for {
select {
case job, ok := <-p.jobQueue:
if !ok {
// Job queue is closed
return
}
p.executeJob(job)
case <-p.ctx.Done():
// Pool is shutting down - drain remaining jobs first
for {
select {
case job, ok := <-p.jobQueue:
if !ok {
return
}
p.executeJob(job)
default:
// No more jobs in queue
return
}
}
}
}
}

// executeJob runs a job with panic recovery
func (p *Pool) executeJob(job *Job) {
// Panic recovery
defer func() {
if r := recover(); r != nil {
log.Printf("recovered from panic in pool worker: %v\nstack: %s", r, debug.Stack())
}
}()

// Execute the function
err := job.fn()
if err != nil {
log.Printf("pool worker error: %v", err)
}
}

// Submit adds a job to the queue (non-blocking)
// Returns error if the queue is full or pool is shutting down
func (p *Pool) Submit(ctx context.Context, fn func() error) error {
// Check if pool is shutting down first
select {
case <-p.ctx.Done():
return fmt.Errorf("pool is shutting down")
default:
}

select {
case <-ctx.Done():
return ctx.Err()
case <-p.ctx.Done():
return fmt.Errorf("pool is shutting down")
case p.jobQueue <- &Job{fn: fn}:
return nil
default:
// Queue is full
return fmt.Errorf("job queue is full")
}
}

// QueueSize returns the current number of jobs in the queue
func (p *Pool) QueueSize() int {
return len(p.jobQueue)
}

// QueueCapacity returns the maximum queue capacity
func (p *Pool) QueueCapacity() int {
return cap(p.jobQueue)
}

// MaxWorkers returns the number of workers
func (p *Pool) MaxWorkers() int {
return p.maxWorkers
}

// Shutdown gracefully shuts down the pool
func (p *Pool) Shutdown(timeout time.Duration) error {
// Signal shutdown to prevent new submissions
p.cancel()

// Don't close the job queue immediately - let workers drain it
// Workers will exit when context is cancelled

// Wait for all workers to complete
done := make(chan struct{})
go func() {
p.wg.Wait()
// Close the job queue after all workers have exited
close(p.jobQueue)
close(done)
}()

select {
case <-done:
return nil
case <-time.After(timeout):
return fmt.Errorf("shutdown timeout after %v", timeout)
}
}

// IsShuttingDown returns true if the pool is shutting down
func (p *Pool) IsShuttingDown() bool {
select {
case <-p.ctx.Done():
return true
default:
return false
}
}
Loading
Loading