From 0ef077f68806cd7b2931ebd5c87c823950fc9975 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Thu, 30 Oct 2025 16:02:48 +0100 Subject: [PATCH 01/13] refactor: improve ExtensionWorkers API --- caddy/app.go | 23 ++++++++++ frankenphp.go | 5 --- options.go | 75 ++++++++++++++++++++++--------- worker.go | 4 ++ workerextension.go | 99 ++++++++++------------------------------- workerextension_test.go | 55 +++++++---------------- 6 files changed, 119 insertions(+), 142 deletions(-) diff --git a/caddy/app.go b/caddy/app.go index ad648efd7..330313bc7 100644 --- a/caddy/app.go +++ b/caddy/app.go @@ -7,6 +7,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "time" "github.com/caddyserver/caddy/v2" @@ -18,6 +19,22 @@ import ( "github.com/dunglas/frankenphp/internal/fastabs" ) +var ( + options []frankenphp.Option + optionsMU sync.Mutex +) + +// EXPERIMENTAL: RegisterWorkers provides a way for extension to register frankenphp.Workers +func RegisterWorkers(name, fileName string, num int, wo ...frankenphp.WorkerOption) frankenphp.Workers { + w, opt := frankenphp.WithExtensionWorkers(name, fileName, num, wo...) + + optionsMU.Lock() + options = append(options, opt) + optionsMU.Unlock() + + return w +} + // FrankenPHPApp represents the global "frankenphp" directive in the Caddyfile // it's responsible for starting up the global PHP instance and all threads // @@ -118,6 +135,8 @@ func (f *FrankenPHPApp) Start() error { frankenphp.WithPhpIni(f.PhpIni), frankenphp.WithMaxWaitTime(f.MaxWaitTime), } + opts = append(opts, options...) + for _, w := range append(f.Workers) { workerOpts := []frankenphp.WorkerOption{ frankenphp.WithWorkerEnv(w.Env), @@ -151,6 +170,10 @@ func (f *FrankenPHPApp) Stop() error { f.NumThreads = 0 f.MaxWaitTime = 0 + optionsMU.Lock() + options = nil + optionsMU.Unlock() + return nil } diff --git a/frankenphp.go b/frankenphp.go index ae11ec102..2e9b568fe 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -210,11 +210,6 @@ func Init(options ...Option) error { registerExtensions() - // add registered external workers - for _, ew := range extensionWorkers { - options = append(options, WithWorkers(ew.name, ew.fileName, ew.num, ew.options...)) - } - opt := &opt{} for _, o := range options { if err := o(opt); err != nil { diff --git a/options.go b/options.go index befe3a7fb..80a17e983 100644 --- a/options.go +++ b/options.go @@ -35,6 +35,7 @@ type workerOpt struct { env PreparedEnv watch []string maxConsecutiveFailures int + extensionWorkers *extensionWorkers onThreadReady func(int) onThreadShutdown func(int) onServerStartup func() @@ -67,7 +68,7 @@ func WithMetrics(m Metrics) Option { } // WithWorkers configures the PHP workers to start -func WithWorkers(name string, fileName string, num int, options ...WorkerOption) Option { +func WithWorkers(name, fileName string, num int, options ...WorkerOption) Option { return func(o *opt) error { worker := workerOpt{ name: name, @@ -90,6 +91,54 @@ func WithWorkers(name string, fileName string, num int, options ...WorkerOption) } } +// EXPERIMENTAL: WithExtensionWorkers allow extensions to create workers. +// +// A worker script with the provided name, fileName and thread count will be registered, along with additional +// configuration through WorkerOptions. +// +// Workers are designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down. +// +// Extension workers receive the lowest priority when determining thread allocations. If the requested number of threads +//cannot be allocated, then FrankenPHP will panic and provide this information to the user (who will need to allocate more +// total threads). Don't be greedy. +func WithExtensionWorkers(name, fileName string, numThreads int, options ...WorkerOption) (Workers, Option) { + w := &extensionWorkers{ + name: name, + fileName: fileName, + num: numThreads, + } + + w.options = append(options, withExtensionWorkers(w)) + + return w, WithWorkers(w.name, w.fileName, w.num, w.options...) +} + +// WithLogger configures the global logger to use. +func WithLogger(l *slog.Logger) Option { + return func(o *opt) error { + o.logger = l + + return nil + } +} + +// WithPhpIni configures user defined PHP ini settings. +func WithPhpIni(overrides map[string]string) Option { + return func(o *opt) error { + o.phpIni = overrides + return nil + } +} + +// WithMaxWaitTime configures the max time a request may be stalled waiting for a thread. +func WithMaxWaitTime(maxWaitTime time.Duration) Option { + return func(o *opt) error { + o.maxWaitTime = maxWaitTime + + return nil + } +} + // WithWorkerEnv sets environment variables for the worker func WithWorkerEnv(env map[string]string) WorkerOption { return func(w *workerOpt) error { @@ -154,27 +203,9 @@ func WithWorkerOnServerShutdown(f func()) WorkerOption { } } -// WithLogger configures the global logger to use. -func WithLogger(l *slog.Logger) Option { - return func(o *opt) error { - o.logger = l - - return nil - } -} - -// WithPhpIni configures user defined PHP ini settings. -func WithPhpIni(overrides map[string]string) Option { - return func(o *opt) error { - o.phpIni = overrides - return nil - } -} - -// WithMaxWaitTime configures the max time a request may be stalled waiting for a thread. -func WithMaxWaitTime(maxWaitTime time.Duration) Option { - return func(o *opt) error { - o.maxWaitTime = maxWaitTime +func withExtensionWorkers(w *extensionWorkers) WorkerOption { + return func(wo *workerOpt) error { + wo.extensionWorkers = w return nil } diff --git a/worker.go b/worker.go index ee468bd6c..9b618dce7 100644 --- a/worker.go +++ b/worker.go @@ -131,6 +131,10 @@ func newWorker(o workerOpt) (*worker, error) { onThreadShutdown: o.onThreadShutdown, } + if o.extensionWorkers != nil { + o.extensionWorkers.internalWorker = w + } + return w, nil } diff --git a/workerextension.go b/workerextension.go index 743cc7f1d..d10a2d4e7 100644 --- a/workerextension.go +++ b/workerextension.go @@ -1,56 +1,30 @@ package frankenphp import ( - "errors" "net/http" ) -// EXPERIMENTAL: Worker allows you to register a worker where, instead of calling FrankenPHP handlers on -// frankenphp_handle_request(), the GetRequest method is called. -// -// You may provide an http.Request that will be conferred to the underlying worker script, -// or custom parameters that will be passed to frankenphp_handle_request(). -// -// After the execution of frankenphp_handle_request(), the return value WorkerRequest.AfterFunc will be called, -// with the optional return value of the callback passed as parameter. -// -// A worker script with the provided name, fileName and thread count will be registered, along with additional -// configuration through WorkerOptions. -// -// Workers are designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down. -// -// Extension workers receive the lowest priority when determining thread allocations. If MinThreads cannot be -// allocated, then FrankenPHP will panic and provide this information to the user (who will need to allocate more -// total threads). Don't be greedy. -type Worker struct { - name string - fileName string - num int - options []WorkerOption +// EXPERIMENTAL: Workers allows you to register a worker. +type Workers interface { + // SendRequest calls the closure passed to frankenphp_handle_request() and update the context. + // The generated HTTP response will be written through the provided writer + SendRequest(rw http.ResponseWriter, r *http.Request) error + // SendMessage calls the closure passed to frankenphp_handle_request() and pass message as parameter, if the value returned by the closure is returned by the function. + SendMessage(message any, rw http.ResponseWriter) any + // NumThreads returns the number of available threads + NumThreads() int } -var extensionWorkers = make(map[string]Worker) - -// EXPERIMENTAL: RegisterWorker registers an external worker. -// external workers are booted together with regular workers on server startup. -func RegisterWorker(worker Worker) error { - if _, exists := extensionWorkers[worker.name]; exists { - return errors.New("worker with this name is already registered: " + worker.name) - } - - extensionWorkers[worker.name] = worker - - return nil +type extensionWorkers struct { + name string + fileName string + num int + options []WorkerOption + internalWorker *worker } // EXPERIMENTAL: SendRequest sends an HTTP request to the worker and writes the response to the provided ResponseWriter. -func (w Worker) SendRequest(rw http.ResponseWriter, r *http.Request) error { - worker := getWorkerByName(w.name) - - if worker == nil { - return errors.New("worker not found: " + w.name) - } - +func (w *extensionWorkers) SendRequest(rw http.ResponseWriter, r *http.Request) error { fr, err := NewRequestWithContext( r, WithOriginalRequest(r), @@ -61,50 +35,23 @@ func (w Worker) SendRequest(rw http.ResponseWriter, r *http.Request) error { return err } - err = ServeHTTP(rw, fr) - - if err != nil { - return err - } + return ServeHTTP(rw, fr) - return nil } -func (w Worker) NumThreads() int { - worker := getWorkerByName(w.name) - - if worker == nil { - return 0 - } - - return worker.countThreads() +func (w *extensionWorkers) NumThreads() int { + return w.internalWorker.countThreads() } // EXPERIMENTAL: SendMessage sends a message to the worker and waits for a response. -func (w Worker) SendMessage(message any, rw http.ResponseWriter) (any, error) { - internalWorker := getWorkerByName(w.name) - - if internalWorker == nil { - return nil, errors.New("worker not found: " + w.name) - } - +func (w *extensionWorkers) SendMessage(message any, rw http.ResponseWriter) any { fc := newFrankenPHPContext() fc.logger = logger - fc.worker = internalWorker + fc.worker = w.internalWorker fc.responseWriter = rw fc.handlerParameters = message - internalWorker.handleRequest(fc) + w.internalWorker.handleRequest(fc) - return fc.handlerReturn, nil -} - -// EXPERIMENTAL: NewWorker registers an external worker with the given options -func NewWorker(name string, fileName string, num int, options ...WorkerOption) Worker { - return Worker{ - name: name, - fileName: fileName, - num: num, - options: options, - } + return fc.handlerReturn } diff --git a/workerextension_test.go b/workerextension_test.go index cddbb854e..c18de126f 100644 --- a/workerextension_test.go +++ b/workerextension_test.go @@ -9,14 +9,14 @@ import ( "github.com/stretchr/testify/require" ) -func TestWorkerExtension(t *testing.T) { +func TestWorkersExtension(t *testing.T) { readyWorkers := 0 shutdownWorkers := 0 serverStarts := 0 serverShutDowns := 0 - externalWorker := NewWorker( - "externalWorker", + externalWorkers, o := WithExtensionWorkers( + "extensionWorkers", "testdata/worker.php", 1, WithWorkerOnReady(func(id int) { @@ -33,20 +33,16 @@ func TestWorkerExtension(t *testing.T) { }), ) - assert.NoError(t, RegisterWorker(externalWorker)) - - require.NoError(t, Init()) - defer func() { - // Clean up external workers after test to avoid interfering with other tests - delete(extensionWorkers, externalWorker.name) + require.NoError(t, Init(o)) + t.Cleanup(func() { Shutdown() assert.Equal(t, 1, shutdownWorkers, "Worker shutdown hook should have been called") assert.Equal(t, 1, serverShutDowns, "Server shutdown hook should have been called") - }() + }) assert.Equal(t, readyWorkers, 1, "Worker thread should have called onReady()") assert.Equal(t, serverStarts, 1, "Server start hook should have been called") - assert.Equal(t, externalWorker.NumThreads(), 1, "NumThreads() should report 1 thread") + assert.Equal(t, externalWorkers.NumThreads(), 1, "NumThreads() should report 1 thread") // Create a test request req := httptest.NewRequest("GET", "https://example.com/test/?foo=bar", nil) @@ -54,7 +50,7 @@ func TestWorkerExtension(t *testing.T) { w := httptest.NewRecorder() // Inject the request into the worker through the extension - err := externalWorker.SendRequest(w, req) + err := externalWorkers.SendRequest(w, req) assert.NoError(t, err, "Sending request should not produce an error") resp := w.Result() @@ -67,38 +63,19 @@ func TestWorkerExtension(t *testing.T) { } func TestWorkerExtensionSendMessage(t *testing.T) { - externalWorker := NewWorker("externalWorker", "testdata/message-worker.php", 1) - assert.NoError(t, RegisterWorker(externalWorker)) - - // Clean up external workers after test to avoid interfering with other tests - defer func() { - delete(extensionWorkers, externalWorker.name) - }() + externalWorker, o := WithExtensionWorkers("extensionWorkers", "testdata/message-worker.php", 1) - err := Init() + err := Init(o) require.NoError(t, err) - defer Shutdown() + t.Cleanup(Shutdown) - result, err := externalWorker.SendMessage("Hello Worker", nil) - assert.NoError(t, err, "Sending message should not produce an error") - - switch v := result.(type) { - case string: - assert.Equal(t, "received message: Hello Worker", v) - default: - t.Fatalf("Expected result to be string, got %T", v) - } + assert.Equal(t, "received message: Hello Workers", externalWorker.SendMessage("Hello Workers", nil)) } func TestErrorIf2WorkersHaveSameName(t *testing.T) { - w := NewWorker("duplicateWorker", "testdata/worker.php", 1) - w2 := NewWorker("duplicateWorker", "testdata/worker2.php", 1) - - err := RegisterWorker(w) - require.NoError(t, err, "First registration should succeed") + _, o1 := WithExtensionWorkers("duplicateWorker", "testdata/worker.php", 1) + _, o2 := WithExtensionWorkers("duplicateWorker", "testdata/worker2.php", 1) - err = RegisterWorker(w2) - require.Error(t, err, "Second registration with duplicate name should fail") - // Clean up external workers after test to avoid interfering with other tests - extensionWorkers = make(map[string]Worker) + t.Cleanup(Shutdown) + require.Error(t, Init(o1, o2)) } From 1b309526a2e42df8996ae1bdb1aa7adb0e420d0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Thu, 30 Oct 2025 16:37:52 +0100 Subject: [PATCH 02/13] Update workerextension.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- workerextension.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workerextension.go b/workerextension.go index d10a2d4e7..d40ccb1b6 100644 --- a/workerextension.go +++ b/workerextension.go @@ -6,7 +6,7 @@ import ( // EXPERIMENTAL: Workers allows you to register a worker. type Workers interface { - // SendRequest calls the closure passed to frankenphp_handle_request() and update the context. + // SendRequest calls the closure passed to frankenphp_handle_request() and updates the context. // The generated HTTP response will be written through the provided writer SendRequest(rw http.ResponseWriter, r *http.Request) error // SendMessage calls the closure passed to frankenphp_handle_request() and pass message as parameter, if the value returned by the closure is returned by the function. From 3db847f52997ff351d4f28d04cff2813feabebd1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Thu, 30 Oct 2025 16:38:15 +0100 Subject: [PATCH 03/13] Update workerextension.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- workerextension.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workerextension.go b/workerextension.go index d40ccb1b6..c2612a05b 100644 --- a/workerextension.go +++ b/workerextension.go @@ -9,7 +9,7 @@ type Workers interface { // SendRequest calls the closure passed to frankenphp_handle_request() and updates the context. // The generated HTTP response will be written through the provided writer SendRequest(rw http.ResponseWriter, r *http.Request) error - // SendMessage calls the closure passed to frankenphp_handle_request() and pass message as parameter, if the value returned by the closure is returned by the function. + // SendMessage calls the closure passed to frankenphp_handle_request(), passes message as a parameter, and returns the value produced by the closure. SendMessage(message any, rw http.ResponseWriter) any // NumThreads returns the number of available threads NumThreads() int From 3c9012a337d1479ce749c69e7301cb54d321456c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Thu, 30 Oct 2025 16:38:40 +0100 Subject: [PATCH 04/13] Update caddy/app.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- caddy/app.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/caddy/app.go b/caddy/app.go index 330313bc7..afa13e876 100644 --- a/caddy/app.go +++ b/caddy/app.go @@ -24,7 +24,7 @@ var ( optionsMU sync.Mutex ) -// EXPERIMENTAL: RegisterWorkers provides a way for extension to register frankenphp.Workers +// EXPERIMENTAL: RegisterWorkers provides a way for extensions to register frankenphp.Workers func RegisterWorkers(name, fileName string, num int, wo ...frankenphp.WorkerOption) frankenphp.Workers { w, opt := frankenphp.WithExtensionWorkers(name, fileName, num, wo...) From 4fdc421bda8d93edd5f4c09e7de2adb3a5f01920 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Thu, 30 Oct 2025 16:39:22 +0100 Subject: [PATCH 05/13] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- workerextension.go | 1 - 1 file changed, 1 deletion(-) diff --git a/workerextension.go b/workerextension.go index c2612a05b..d852a6521 100644 --- a/workerextension.go +++ b/workerextension.go @@ -36,7 +36,6 @@ func (w *extensionWorkers) SendRequest(rw http.ResponseWriter, r *http.Request) } return ServeHTTP(rw, fr) - } func (w *extensionWorkers) NumThreads() int { From f850ff3ea76d1c5f12ce4c7eb7f56617b25e3399 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Fri, 31 Oct 2025 14:17:01 +0100 Subject: [PATCH 06/13] review --- caddy/app.go | 5 ++++- options.go | 4 ++-- workerextension.go | 6 +++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/caddy/app.go b/caddy/app.go index afa13e876..02dcf1700 100644 --- a/caddy/app.go +++ b/caddy/app.go @@ -21,7 +21,7 @@ import ( var ( options []frankenphp.Option - optionsMU sync.Mutex + optionsMU sync.RWMutex ) // EXPERIMENTAL: RegisterWorkers provides a way for extensions to register frankenphp.Workers @@ -135,7 +135,10 @@ func (f *FrankenPHPApp) Start() error { frankenphp.WithPhpIni(f.PhpIni), frankenphp.WithMaxWaitTime(f.MaxWaitTime), } + + optionsMU.RLock() opts = append(opts, options...) + optionsMU.RUnlock() for _, w := range append(f.Workers) { workerOpts := []frankenphp.WorkerOption{ diff --git a/options.go b/options.go index 80a17e983..9d58125cc 100644 --- a/options.go +++ b/options.go @@ -99,8 +99,8 @@ func WithWorkers(name, fileName string, num int, options ...WorkerOption) Option // Workers are designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down. // // Extension workers receive the lowest priority when determining thread allocations. If the requested number of threads -//cannot be allocated, then FrankenPHP will panic and provide this information to the user (who will need to allocate more -// total threads). Don't be greedy. +// cannot be allocated, then FrankenPHP will panic and provide this information to the user (who will need to allocate +// more total threads). Don't be greedy. func WithExtensionWorkers(name, fileName string, numThreads int, options ...WorkerOption) (Workers, Option) { w := &extensionWorkers{ name: name, diff --git a/workerextension.go b/workerextension.go index d852a6521..3ee41bf8b 100644 --- a/workerextension.go +++ b/workerextension.go @@ -10,7 +10,7 @@ type Workers interface { // The generated HTTP response will be written through the provided writer SendRequest(rw http.ResponseWriter, r *http.Request) error // SendMessage calls the closure passed to frankenphp_handle_request(), passes message as a parameter, and returns the value produced by the closure. - SendMessage(message any, rw http.ResponseWriter) any + SendMessage(message any, rw http.ResponseWriter) (any, error) // NumThreads returns the number of available threads NumThreads() int } @@ -43,7 +43,7 @@ func (w *extensionWorkers) NumThreads() int { } // EXPERIMENTAL: SendMessage sends a message to the worker and waits for a response. -func (w *extensionWorkers) SendMessage(message any, rw http.ResponseWriter) any { +func (w *extensionWorkers) SendMessage(message any, rw http.ResponseWriter) (any, error) { fc := newFrankenPHPContext() fc.logger = logger fc.worker = w.internalWorker @@ -52,5 +52,5 @@ func (w *extensionWorkers) SendMessage(message any, rw http.ResponseWriter) any w.internalWorker.handleRequest(fc) - return fc.handlerReturn + return fc.handlerReturn, nil } From 3d5907ae4a37a2c489830720433eff9808429e0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Fri, 31 Oct 2025 14:34:41 +0100 Subject: [PATCH 07/13] fix tests --- workerextension_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/workerextension_test.go b/workerextension_test.go index c18de126f..1719cb036 100644 --- a/workerextension_test.go +++ b/workerextension_test.go @@ -69,7 +69,10 @@ func TestWorkerExtensionSendMessage(t *testing.T) { require.NoError(t, err) t.Cleanup(Shutdown) - assert.Equal(t, "received message: Hello Workers", externalWorker.SendMessage("Hello Workers", nil)) + ret, err := externalWorker.SendMessage("Hello Workers", nil) + require.NoError(t, err) + + assert.Equal(t, "received message: Hello Workers", ret) } func TestErrorIf2WorkersHaveSameName(t *testing.T) { From 99b66626a98a9a15d628fa26861c60009b855b87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Fri, 31 Oct 2025 14:37:05 +0100 Subject: [PATCH 08/13] docs --- workerextension.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/workerextension.go b/workerextension.go index 3ee41bf8b..566c2b4b8 100644 --- a/workerextension.go +++ b/workerextension.go @@ -6,12 +6,12 @@ import ( // EXPERIMENTAL: Workers allows you to register a worker. type Workers interface { - // SendRequest calls the closure passed to frankenphp_handle_request() and updates the context. - // The generated HTTP response will be written through the provided writer + // SendRequest calls the closure passed to frankenphp_handle_request() and updates the PHP context . + // The generated HTTP response will be written through the provided writer. SendRequest(rw http.ResponseWriter, r *http.Request) error // SendMessage calls the closure passed to frankenphp_handle_request(), passes message as a parameter, and returns the value produced by the closure. SendMessage(message any, rw http.ResponseWriter) (any, error) - // NumThreads returns the number of available threads + // NumThreads returns the number of available threads. NumThreads() int } From 194bfd4e08142811acb2bc1afe3baedc211c6d0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Fri, 31 Oct 2025 16:50:31 +0100 Subject: [PATCH 09/13] errors --- context.go | 6 ++++-- frankenphp.go | 48 ++++++++++++++++++++++------------------------ threadregular.go | 12 +++++++----- worker.go | 14 ++++++++------ workerextension.go | 4 ++-- 5 files changed, 44 insertions(+), 40 deletions(-) diff --git a/context.go b/context.go index b039febaa..996b14d1f 100644 --- a/context.go +++ b/context.go @@ -150,9 +150,9 @@ func (fc *frankenPHPContext) clientHasClosed() bool { } // reject sends a response with the given status code and message -func (fc *frankenPHPContext) reject(statusCode int, message string) { +func (fc *frankenPHPContext) reject(statusCode int, message string) error { if fc.isDone { - return + return nil } rw := fc.responseWriter @@ -166,6 +166,8 @@ func (fc *frankenPHPContext) reject(statusCode int, message string) { } fc.closeContext() + + return ErrMaxTimeExceeded } func (fc *frankenPHPContext) rejectBadRequest(message string) { diff --git a/frankenphp.go b/frankenphp.go index 2e9b568fe..b4eab2755 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -51,6 +51,7 @@ var ( ErrRequestContextCreation = errors.New("error during request context creation") ErrScriptExecution = errors.New("error during PHP script execution") ErrNotRunning = errors.New("FrankenPHP is not running. For proper configuration visit: https://frankenphp.dev/docs/config/#caddyfile-config") + ErrMaxTimeExceeded = errors.New("max request handling time exceeded") isRunning bool onServerShutdown []func() @@ -66,31 +67,31 @@ var ( type syslogLevel int const ( - emerg syslogLevel = iota // system is unusable - alert // action must be taken immediately - crit // critical conditions - err // error conditions - warning // warning conditions - notice // normal but significant condition - info // informational - debug // debug-level messages + syslogLevelEmerg syslogLevel = iota // system is unusable + syslogLevelAlert // action must be taken immediately + syslogLevelCrit // critical conditions + syslogLevelErr // error conditions + syslogLevelWarn // warning conditions + syslogLevelNotice // normal but significant condition + syslogLevelInfo // informational + syslogLevelDebug // debug-level messages ) func (l syslogLevel) String() string { switch l { - case emerg: + case syslogLevelEmerg: return "emerg" - case alert: + case syslogLevelAlert: return "alert" - case crit: + case syslogLevelCrit: return "crit" - case err: + case syslogLevelErr: return "err" - case warning: + case syslogLevelWarn: return "warning" - case notice: + case syslogLevelNotice: return "notice" - case debug: + case syslogLevelDebug: return "debug" default: return "info" @@ -337,14 +338,11 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error // Detect if a worker is available to handle this request if fc.worker != nil { - fc.worker.handleRequest(fc) - - return nil + return fc.worker.handleRequest(fc) } // If no worker was available, send the request to non-worker threads - handleRequestWithRegularPHPThreads(fc) - return nil + return handleRequestWithRegularPHPThreads(fc) } //export go_ub_write @@ -561,19 +559,19 @@ func go_log(message *C.char, level C.int) { m := C.GoString(message) var le syslogLevel - if level < C.int(emerg) || level > C.int(debug) { - le = info + if level < C.int(syslogLevelEmerg) || level > C.int(syslogLevelDebug) { + le = syslogLevelInfo } else { le = syslogLevel(level) } switch le { - case emerg, alert, crit, err: + case syslogLevelEmerg, syslogLevelAlert, syslogLevelCrit, syslogLevelErr: logger.LogAttrs(context.Background(), slog.LevelError, m, slog.String("syslog_level", syslogLevel(level).String())) - case warning: + case syslogLevelWarn: logger.LogAttrs(context.Background(), slog.LevelWarn, m, slog.String("syslog_level", syslogLevel(level).String())) - case debug: + case syslogLevelDebug: logger.LogAttrs(context.Background(), slog.LevelDebug, m, slog.String("syslog_level", syslogLevel(level).String())) default: diff --git a/threadregular.go b/threadregular.go index 88cef7e79..0c07e3d73 100644 --- a/threadregular.go +++ b/threadregular.go @@ -84,14 +84,15 @@ func (handler *regularThread) afterRequest() { handler.requestContext = nil } -func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) { +func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) error { metrics.StartRequest() select { case regularRequestChan <- fc: // a thread was available to handle the request immediately <-fc.done metrics.StopRequest() - return + + return nil default: // no thread was available } @@ -104,14 +105,15 @@ func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) { metrics.DequeuedRequest() <-fc.done metrics.StopRequest() - return + + return nil case scaleChan <- fc: // the request has triggered scaling, continue to wait for a thread case <-timeoutChan(maxWaitTime): // the request has timed out stalling metrics.DequeuedRequest() - fc.reject(504, "Gateway Timeout") - return + + return fc.reject(504, "Gateway Timeout") } } } diff --git a/worker.go b/worker.go index 9b618dce7..b2ddce061 100644 --- a/worker.go +++ b/worker.go @@ -223,7 +223,7 @@ func (worker *worker) countThreads() int { return l } -func (worker *worker) handleRequest(fc *frankenPHPContext) { +func (worker *worker) handleRequest(fc *frankenPHPContext) error { metrics.StartWorkerRequest(worker.name) // dispatch requests to all worker threads in order @@ -234,7 +234,8 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) { worker.threadMutex.RUnlock() <-fc.done metrics.StopWorkerRequest(worker.name, time.Since(fc.startedAt)) - return + + return nil default: // thread is busy, continue } @@ -249,14 +250,15 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) { metrics.DequeuedWorkerRequest(worker.name) <-fc.done metrics.StopWorkerRequest(worker.name, time.Since(fc.startedAt)) - return + + return nil case scaleChan <- fc: // the request has triggered scaling, continue to wait for a thread case <-timeoutChan(maxWaitTime): - metrics.DequeuedWorkerRequest(worker.name) // the request has timed out stalling - fc.reject(504, "Gateway Timeout") - return + metrics.DequeuedWorkerRequest(worker.name) + + return fc.reject(504, "Gateway Timeout") } } } diff --git a/workerextension.go b/workerextension.go index 566c2b4b8..493346852 100644 --- a/workerextension.go +++ b/workerextension.go @@ -50,7 +50,7 @@ func (w *extensionWorkers) SendMessage(message any, rw http.ResponseWriter) (any fc.responseWriter = rw fc.handlerParameters = message - w.internalWorker.handleRequest(fc) + err := w.internalWorker.handleRequest(fc) - return fc.handlerReturn, nil + return fc.handlerReturn, err } From abf36a038760538c05300b2074c245d6397755a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Fri, 31 Oct 2025 18:08:52 +0100 Subject: [PATCH 10/13] improved error handling --- caddy/caddy_test.go | 13 ++++++++----- caddy/module.go | 8 +++++++- context.go | 36 +++++++++++++++++++----------------- frankenphp.go | 18 +++++++++++++++--- frankenphp_test.go | 9 ++++++--- threadregular.go | 4 +++- worker.go | 4 +++- 7 files changed, 61 insertions(+), 31 deletions(-) diff --git a/caddy/caddy_test.go b/caddy/caddy_test.go index bbe192a30..7ca641fdd 100644 --- a/caddy/caddy_test.go +++ b/caddy/caddy_test.go @@ -965,7 +965,7 @@ func TestMaxWaitTime(t *testing.T) { for range 10 { go func() { statusCode := getStatusCode("http://localhost:"+testPort+"/sleep.php?sleep=10", t) - if statusCode == http.StatusGatewayTimeout { + if statusCode == http.StatusServiceUnavailable { success.Store(true) } wg.Done() @@ -973,7 +973,7 @@ func TestMaxWaitTime(t *testing.T) { } wg.Wait() - require.True(t, success.Load(), "At least one request should have failed with a 504 Gateway Timeout status") + require.True(t, success.Load(), "At least one request should have failed with a 503 Service Unavailable status") } func TestMaxWaitTimeWorker(t *testing.T) { @@ -1012,23 +1012,26 @@ func TestMaxWaitTimeWorker(t *testing.T) { for range 10 { go func() { statusCode := getStatusCode("http://localhost:"+testPort+"/sleep.php?sleep=10&iteration=1", t) - if statusCode == http.StatusGatewayTimeout { + if statusCode == http.StatusServiceUnavailable { success.Store(true) } wg.Done() }() } wg.Wait() - require.True(t, success.Load(), "At least one request should have failed with a 504 Gateway Timeout status") + require.True(t, success.Load(), "At least one request should have failed with a 503 Service Unavailable status") // Fetch metrics resp, err := http.Get("http://localhost:2999/metrics") require.NoError(t, err, "failed to fetch metrics") - defer resp.Body.Close() + t.Cleanup(func() { + require.NoError(t, resp.Body.Close()) + }) // Read and parse metrics metrics := new(bytes.Buffer) _, err = metrics.ReadFrom(resp.Body) + require.NoError(t, err) expectedMetrics := ` # TYPE frankenphp_worker_queue_depth gauge diff --git a/caddy/module.go b/caddy/module.go index 8e6ce80e1..05240eacf 100644 --- a/caddy/module.go +++ b/caddy/module.go @@ -2,6 +2,7 @@ package caddy import ( "encoding/json" + "errors" "fmt" "log/slog" "net/http" @@ -150,6 +151,8 @@ func needReplacement(s string) bool { return strings.ContainsAny(s, "{}") } +var errRejected = &frankenphp.ErrRejected{} + // ServeHTTP implements caddyhttp.MiddlewareHandler. func (f *FrankenPHPModule) ServeHTTP(w http.ResponseWriter, r *http.Request, _ caddyhttp.Handler) error { origReq := r.Context().Value(caddyhttp.OriginalRequestCtxKey).(http.Request) @@ -192,8 +195,11 @@ func (f *FrankenPHPModule) ServeHTTP(w http.ResponseWriter, r *http.Request, _ c frankenphp.WithOriginalRequest(&origReq), frankenphp.WithWorkerName(workerName), ) + if err != nil { + return caddyhttp.Error(http.StatusInternalServerError, err) + } - if err = frankenphp.ServeHTTP(w, fr); err != nil { + if err = frankenphp.ServeHTTP(w, fr); err != nil && !errors.As(err, errRejected) { return caddyhttp.Error(http.StatusInternalServerError, err) } diff --git a/context.go b/context.go index 996b14d1f..648e3ffc5 100644 --- a/context.go +++ b/context.go @@ -2,6 +2,8 @@ package frankenphp import ( "context" + "errors" + "fmt" "log/slog" "net/http" "os" @@ -117,23 +119,23 @@ func (fc *frankenPHPContext) closeContext() { } // validate checks if the request should be outright rejected -func (fc *frankenPHPContext) validate() bool { +func (fc *frankenPHPContext) validate() error { if strings.Contains(fc.request.URL.Path, "\x00") { - fc.rejectBadRequest("Invalid request path") + fc.reject(ErrInvalidRequestPath) - return false + return ErrInvalidRequestPath } contentLengthStr := fc.request.Header.Get("Content-Length") if contentLengthStr != "" { if contentLength, err := strconv.Atoi(contentLengthStr); err != nil || contentLength < 0 { - fc.rejectBadRequest("invalid Content-Length header: " + contentLengthStr) + e := fmt.Errorf("%w: %s", ErrInvalidContentLengthHeader, contentLengthStr) - return false + fc.reject(e) } } - return true + return nil } func (fc *frankenPHPContext) clientHasClosed() bool { @@ -149,16 +151,22 @@ func (fc *frankenPHPContext) clientHasClosed() bool { } } -// reject sends a response with the given status code and message -func (fc *frankenPHPContext) reject(statusCode int, message string) error { +// reject sends a response with the given status code and error +func (fc *frankenPHPContext) reject(err error) { if fc.isDone { - return nil + return + } + + re := &ErrRejected{} + if !errors.As(err, re) { + // Should never happen + panic("only instance of ErrRejected can be passed to reject") } rw := fc.responseWriter if rw != nil { - rw.WriteHeader(statusCode) - _, _ = rw.Write([]byte(message)) + rw.WriteHeader(re.status) + _, _ = rw.Write([]byte(err.Error())) if f, ok := rw.(http.Flusher); ok { f.Flush() @@ -166,10 +174,4 @@ func (fc *frankenPHPContext) reject(statusCode int, message string) error { } fc.closeContext() - - return ErrMaxTimeExceeded -} - -func (fc *frankenPHPContext) rejectBadRequest(message string) { - fc.reject(http.StatusBadRequest, message) } diff --git a/frankenphp.go b/frankenphp.go index b4eab2755..8d3263b87 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -51,7 +51,10 @@ var ( ErrRequestContextCreation = errors.New("error during request context creation") ErrScriptExecution = errors.New("error during PHP script execution") ErrNotRunning = errors.New("FrankenPHP is not running. For proper configuration visit: https://frankenphp.dev/docs/config/#caddyfile-config") - ErrMaxTimeExceeded = errors.New("max request handling time exceeded") + + ErrInvalidRequestPath = ErrRejected{"invalid request path", http.StatusBadRequest} + ErrInvalidContentLengthHeader = ErrRejected{"invalid Content-Length header", http.StatusBadRequest} + ErrMaxWaitTimeExceeded = ErrRejected{"maximum request handling time exceeded", http.StatusServiceUnavailable} isRunning bool onServerShutdown []func() @@ -64,6 +67,15 @@ var ( maxWaitTime time.Duration ) +type ErrRejected struct { + message string + status int +} + +func (e ErrRejected) Error() string { + return e.message +} + type syslogLevel int const ( @@ -332,8 +344,8 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error fc.responseWriter = responseWriter - if !fc.validate() { - return nil + if err := fc.validate(); err != nil { + return err } // Detect if a worker is available to handle this request diff --git a/frankenphp_test.go b/frankenphp_test.go index 7b4b44dbc..1434c4373 100644 --- a/frankenphp_test.go +++ b/frankenphp_test.go @@ -78,7 +78,7 @@ func runTest(t *testing.T, test func(func(http.ResponseWriter, *http.Request), * } err := frankenphp.Init(initOpts...) - require.Nil(t, err) + require.NoError(t, err) defer frankenphp.Shutdown() handler := func(w http.ResponseWriter, r *http.Request) { @@ -86,7 +86,9 @@ func runTest(t *testing.T, test func(func(http.ResponseWriter, *http.Request), * assert.NoError(t, err) err = frankenphp.ServeHTTP(w, req) - assert.NoError(t, err) + if err != nil && !errors.As(err, &frankenphp.ErrRejected{}) { + assert.Fail(t, fmt.Sprintf("Received unexpected error:\n%+v", err)) + } } var ts *httptest.Server @@ -109,6 +111,7 @@ func runTest(t *testing.T, test func(func(http.ResponseWriter, *http.Request), * func testRequest(req *http.Request, handler func(http.ResponseWriter, *http.Request), t *testing.T) (string, *http.Response) { t.Helper() + w := httptest.NewRecorder() handler(w, req) resp := w.Result() @@ -988,7 +991,7 @@ func FuzzRequest(f *testing.F) { // The response status must be 400 if the request path contains null bytes if strings.Contains(req.URL.Path, "\x00") { assert.Equal(t, 400, resp.StatusCode) - assert.Contains(t, body, "Invalid request path") + assert.Contains(t, body, "invalid request path") return } diff --git a/threadregular.go b/threadregular.go index 0c07e3d73..64accc60c 100644 --- a/threadregular.go +++ b/threadregular.go @@ -113,7 +113,9 @@ func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) error { // the request has timed out stalling metrics.DequeuedRequest() - return fc.reject(504, "Gateway Timeout") + fc.reject(ErrMaxWaitTimeExceeded) + + return ErrMaxWaitTimeExceeded } } } diff --git a/worker.go b/worker.go index b2ddce061..3c495d4b3 100644 --- a/worker.go +++ b/worker.go @@ -258,7 +258,9 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) error { // the request has timed out stalling metrics.DequeuedWorkerRequest(worker.name) - return fc.reject(504, "Gateway Timeout") + fc.reject(ErrMaxWaitTimeExceeded) + + return ErrMaxWaitTimeExceeded } } } From d2eadcdcb937d1381176b756d925619aceb86b93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Fri, 31 Oct 2025 18:13:40 +0100 Subject: [PATCH 11/13] fix race --- caddy/module.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/caddy/module.go b/caddy/module.go index 05240eacf..e4b2f9e83 100644 --- a/caddy/module.go +++ b/caddy/module.go @@ -151,8 +151,6 @@ func needReplacement(s string) bool { return strings.ContainsAny(s, "{}") } -var errRejected = &frankenphp.ErrRejected{} - // ServeHTTP implements caddyhttp.MiddlewareHandler. func (f *FrankenPHPModule) ServeHTTP(w http.ResponseWriter, r *http.Request, _ caddyhttp.Handler) error { origReq := r.Context().Value(caddyhttp.OriginalRequestCtxKey).(http.Request) @@ -199,7 +197,7 @@ func (f *FrankenPHPModule) ServeHTTP(w http.ResponseWriter, r *http.Request, _ c return caddyhttp.Error(http.StatusInternalServerError, err) } - if err = frankenphp.ServeHTTP(w, fr); err != nil && !errors.As(err, errRejected) { + if err = frankenphp.ServeHTTP(w, fr); err != nil && !errors.As(err, &frankenphp.ErrRejected{}) { return caddyhttp.Error(http.StatusInternalServerError, err) } From 52fde720996b38ed1714ec3f9cbb99955cbd7d32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Sun, 2 Nov 2025 15:00:56 +0100 Subject: [PATCH 12/13] add missing return --- context.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/context.go b/context.go index 648e3ffc5..983f9a0e1 100644 --- a/context.go +++ b/context.go @@ -132,6 +132,8 @@ func (fc *frankenPHPContext) validate() error { e := fmt.Errorf("%w: %s", ErrInvalidContentLengthHeader, contentLengthStr) fc.reject(e) + + return e } } From 7445cedf8a16c43455f35bdf5147793720a0d943 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Mon, 3 Nov 2025 13:36:52 +0100 Subject: [PATCH 13/13] use %q in Errorf --- context.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/context.go b/context.go index 983f9a0e1..543bb4a7b 100644 --- a/context.go +++ b/context.go @@ -129,7 +129,7 @@ func (fc *frankenPHPContext) validate() error { contentLengthStr := fc.request.Header.Get("Content-Length") if contentLengthStr != "" { if contentLength, err := strconv.Atoi(contentLengthStr); err != nil || contentLength < 0 { - e := fmt.Errorf("%w: %s", ErrInvalidContentLengthHeader, contentLengthStr) + e := fmt.Errorf("%w: %q", ErrInvalidContentLengthHeader, contentLengthStr) fc.reject(e)