diff --git a/caddy/app.go b/caddy/app.go index ad648efd7..4020631ec 100644 --- a/caddy/app.go +++ b/caddy/app.go @@ -33,6 +33,8 @@ type FrankenPHPApp struct { MaxThreads int `json:"max_threads,omitempty"` // Workers configures the worker scripts to start. Workers []workerConfig `json:"workers,omitempty"` + // TaskWorkers configures the task worker scripts to start. + TaskWorkers []workerConfig `json:"task_workers,omitempty"` // Overwrites the default php ini configuration PhpIni map[string]string `json:"php_ini,omitempty"` // The maximum amount of time a request may be stalled waiting for a thread @@ -127,6 +129,15 @@ func (f *FrankenPHPApp) Start() error { opts = append(opts, frankenphp.WithWorkers(w.Name, repl.ReplaceKnown(w.FileName, ""), w.Num, workerOpts...)) } + for _, tw := range f.TaskWorkers { + workerOpts := []frankenphp.WorkerOption{ + frankenphp.WithWorkerEnv(tw.Env), + frankenphp.WithWorkerWatchMode(tw.Watch), + frankenphp.AsTaskWorker(true, 0), // TODO: maxQueueLen configurable here? + } + + opts = append(opts, frankenphp.WithWorkers(tw.Name, repl.ReplaceKnown(tw.FileName, ""), tw.Num, workerOpts...)) + } frankenphp.Shutdown() if err := frankenphp.Init(opts...); err != nil { @@ -234,6 +245,13 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } } + case "task_worker": + twc, err := parseWorkerConfig(d) + if err != nil { + return err + } + f.TaskWorkers = append(f.TaskWorkers, twc) + case "worker": wc, err := parseWorkerConfig(d) if err != nil { diff --git a/caddy/caddy_test.go b/caddy/caddy_test.go index efd9e30aa..ae7ad7bbb 100644 --- a/caddy/caddy_test.go +++ b/caddy/caddy_test.go @@ -1423,3 +1423,31 @@ func TestWorkerMatchDirectiveWithoutFileServer(t *testing.T) { // the request should completely fall through the php_server module tester.AssertGetResponse("http://localhost:"+testPort+"/static.txt", http.StatusNotFound, "Request falls through") } + +func TestServerWithTaskWorker(t *testing.T) { + tester := caddytest.NewTester(t) + taskWorker, err := fastabs.FastAbs("../testdata/tasks/task-worker.php") + require.NoError(t, err) + tester.InitServer(` + { + skip_install_trust + admin localhost:2999 + + frankenphp { + num_threads 2 + task_worker `+taskWorker+` { + num 1 + } + } + } + `, "caddyfile") + + debugState := getDebugState(t, tester) + require.Len(t, debugState.ThreadDebugStates, 2, "there should be 3 threads") + require.Equal( + t, + debugState.ThreadDebugStates[1].Name, + "Task Worker PHP Thread - "+taskWorker, + "the second spawned thread should be the task worker", + ) +} diff --git a/cgi.go b/cgi.go index 4c11a285a..6c36cf698 100644 --- a/cgi.go +++ b/cgi.go @@ -277,13 +277,13 @@ func splitPos(path string, splitPath []string) int { // See: https://github.com/php/php-src/blob/345e04b619c3bc11ea17ee02cdecad6ae8ce5891/main/SAPI.h#L72 // //export go_update_request_info -func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info) C.bool { +func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info) { thread := phpThreads[threadIndex] fc := thread.getRequestContext() request := fc.request if request == nil { - return C.bool(fc.worker != nil) + return } authUser, authPassword, ok := request.BasicAuth() @@ -311,8 +311,6 @@ func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info) info.request_uri = thread.pinCString(request.URL.RequestURI()) info.proto_num = C.int(request.ProtoMajor*1000 + request.ProtoMinor) - - return C.bool(fc.worker != nil) } // SanitizedPathJoin performs filepath.Join(root, reqPath) that diff --git a/frankenphp.c b/frankenphp.c index 69e29ed9d..999318c71 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -73,6 +73,7 @@ frankenphp_config frankenphp_get_config() { bool should_filter_var = 0; __thread uintptr_t thread_index; __thread bool is_worker_thread = false; +__thread bool is_task_worker_thread = false; __thread zval *os_environment = NULL; static void frankenphp_update_request_context() { @@ -82,7 +83,12 @@ static void frankenphp_update_request_context() { /* status It is not reset by zend engine, set it to 200. */ SG(sapi_headers).http_response_code = 200; - is_worker_thread = go_update_request_info(thread_index, &SG(request_info)); + go_update_request_info(thread_index, &SG(request_info)); +} + +void frankenphp_update_thread_context(bool is_worker, bool is_task_worker) { + is_worker_thread = is_worker; + is_task_worker_thread = is_task_worker; } static void frankenphp_free_request_context() { @@ -411,6 +417,49 @@ PHP_FUNCTION(frankenphp_response_headers) /* {{{ */ } /* }}} */ +/* Handle a message in task worker mode */ +static bool frankenphp_handle_message(zend_fcall_info fci, + zend_fcall_info_cache fcc) { + zval *arg = go_frankenphp_worker_handle_task(thread_index); + if (arg == NULL) { + return false; + } + + /* Call the PHP func passed to frankenphp_handle_request() */ + zval retval = {0}; + fci.size = sizeof fci; + fci.retval = &retval; + fci.params = arg; + fci.param_count = 1; + zend_bool status = zend_call_function(&fci, &fcc) == SUCCESS; + + if (!status || Z_TYPE(retval) == IS_UNDEF) { + go_frankenphp_finish_task(thread_index, NULL); + zval_ptr_dtor(arg); + } else { + go_frankenphp_finish_task(thread_index, &retval); + } + + zval_ptr_dtor(&retval); + + /* + * If an exception occurred, print the message to the client before + * exiting + */ + if (EG(exception) && !zend_is_unwind_exit(EG(exception)) && + !zend_is_graceful_exit(EG(exception))) { + zend_exception_error(EG(exception), E_ERROR); + zend_bailout(); + } + + zend_try { php_output_end_all(); } + zend_end_try(); + + zval_ptr_dtor(arg); + + return true; +} + PHP_FUNCTION(frankenphp_handle_request) { zend_fcall_info fci; zend_fcall_info_cache fcc; @@ -420,6 +469,13 @@ PHP_FUNCTION(frankenphp_handle_request) { ZEND_PARSE_PARAMETERS_END(); if (!is_worker_thread) { + + /* thread is a task worker + * handle the message and do not reset globals */ + if (is_task_worker_thread) { + bool keep_running = frankenphp_handle_message(fci, fcc); + RETURN_BOOL(keep_running); + } /* not a worker, throw an error */ zend_throw_exception( spl_ce_RuntimeException, @@ -484,6 +540,25 @@ PHP_FUNCTION(frankenphp_handle_request) { RETURN_TRUE; } +PHP_FUNCTION(frankenphp_send_request) { + zval *zv; + char *worker_name = NULL; + size_t worker_name_len = 0; + + ZEND_PARSE_PARAMETERS_START(1, 2); + Z_PARAM_ZVAL(zv); + Z_PARAM_OPTIONAL + Z_PARAM_STRING(worker_name, worker_name_len); + ZEND_PARSE_PARAMETERS_END(); + + char *error = go_frankenphp_send_request(thread_index, zv, worker_name, + worker_name_len); + if (error) { + zend_throw_exception(spl_ce_RuntimeException, error, 0); + RETURN_THROWS(); + } +} + PHP_FUNCTION(headers_send) { zend_long response_code = 200; diff --git a/frankenphp.go b/frankenphp.go index 6e6f41c62..02cf73a28 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -37,6 +37,8 @@ import ( "unsafe" // debug on Linux //_ "github.com/ianlancetaylor/cgosymbolizer" + + "github.com/dunglas/frankenphp/internal/watcher" ) type contextKeyStruct struct{} @@ -52,7 +54,8 @@ var ( ErrScriptExecution = errors.New("error during PHP script execution") ErrNotRunning = errors.New("FrankenPHP is not running. For proper configuration visit: https://frankenphp.dev/docs/config/#caddyfile-config") - isRunning bool + isRunning bool + watcherIsEnabled bool loggerMu sync.RWMutex logger *slog.Logger @@ -151,6 +154,10 @@ func calculateMaxThreads(opt *opt) (int, int, int, error) { numWorkers += opt.workers[i].num } + for _, tw := range opt.taskWorkers { + numWorkers += tw.num + } + numThreadsIsSet := opt.numThreads > 0 maxThreadsIsSet := opt.maxThreads != 0 maxThreadsIsAuto := opt.maxThreads < 0 // maxthreads < 0 signifies auto mode (see phpmaintread.go) @@ -279,10 +286,23 @@ func Init(options ...Option) error { convertToRegularThread(getInactivePHPThread()) } + directoriesToWatch := getDirectoriesToWatch(append(opt.workers, opt.taskWorkers...)) + watcherIsEnabled = len(directoriesToWatch) > 0 // watcherIsEnabled needs to be set before initWorkers() + if err := initWorkers(opt.workers); err != nil { return err } + if err := initTaskWorkers(opt.taskWorkers); err != nil { + return err + } + + if watcherIsEnabled { + if err := watcher.InitWatcher(directoriesToWatch, RestartWorkers, logger); err != nil { + return err + } + } + initAutoScaling(mainThread) ctx := context.Background() @@ -300,8 +320,11 @@ func Shutdown() { return } - drainWatcher() + if watcherIsEnabled { + watcher.DrainWatcher() + } drainAutoScaling() + drainTaskWorkers() drainPHPThreads() metrics.Shutdown() @@ -629,3 +652,12 @@ func timeoutChan(timeout time.Duration) <-chan time.Time { return time.After(timeout) } + +func getDirectoriesToWatch(workerOpts []workerOpt) []string { + directoriesToWatch := []string{} + for _, w := range workerOpts { + directoriesToWatch = append(directoriesToWatch, w.watch...) + } + + return directoriesToWatch +} diff --git a/frankenphp.h b/frankenphp.h index c17df6061..cab6efb90 100644 --- a/frankenphp.h +++ b/frankenphp.h @@ -86,4 +86,6 @@ void frankenphp_register_bulk( void register_extensions(zend_module_entry *m, int len); +void frankenphp_update_thread_context(bool is_worker, bool is_task_worker); + #endif diff --git a/frankenphp.stub.php b/frankenphp.stub.php index 6c5a71cb5..7f239b994 100644 --- a/frankenphp.stub.php +++ b/frankenphp.stub.php @@ -4,6 +4,8 @@ function frankenphp_handle_request(callable $callback): bool {} +function frankenphp_send_request(mixed $task, string $workerName = ''): void {} + function headers_send(int $status = 200): int {} function frankenphp_finish_request(): bool {} diff --git a/frankenphp_arginfo.h b/frankenphp_arginfo.h index c1bd7b550..fdcb08fa5 100644 --- a/frankenphp_arginfo.h +++ b/frankenphp_arginfo.h @@ -6,6 +6,12 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_handle_request, 0, 1, ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0) ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_send_request, 0, 1, + IS_VOID, 0) +ZEND_ARG_TYPE_INFO(0, task, IS_MIXED, 0) +ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, worker_name, IS_STRING, 0, "\"\"") +ZEND_END_ARG_INFO() + ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_headers_send, 0, 0, IS_LONG, 0) ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, status, IS_LONG, 0, "200") ZEND_END_ARG_INFO() @@ -31,6 +37,7 @@ ZEND_END_ARG_INFO() #define arginfo_apache_response_headers arginfo_frankenphp_response_headers ZEND_FUNCTION(frankenphp_handle_request); +ZEND_FUNCTION(frankenphp_send_request); ZEND_FUNCTION(headers_send); ZEND_FUNCTION(frankenphp_finish_request); ZEND_FUNCTION(frankenphp_request_headers); @@ -39,6 +46,7 @@ ZEND_FUNCTION(frankenphp_response_headers); // clang-format off static const zend_function_entry ext_functions[] = { ZEND_FE(frankenphp_handle_request, arginfo_frankenphp_handle_request) + ZEND_FE(frankenphp_send_request, arginfo_frankenphp_send_request) ZEND_FE(headers_send, arginfo_headers_send) ZEND_FE(frankenphp_finish_request, arginfo_frankenphp_finish_request) ZEND_FALIAS(fastcgi_finish_request, frankenphp_finish_request, arginfo_fastcgi_finish_request) diff --git a/options.go b/options.go index 18c5ba20f..a891059d3 100644 --- a/options.go +++ b/options.go @@ -22,6 +22,7 @@ type opt struct { numThreads int maxThreads int workers []workerOpt + taskWorkers []workerOpt logger *slog.Logger metrics Metrics phpIni map[string]string @@ -35,6 +36,8 @@ type workerOpt struct { env PreparedEnv watch []string maxConsecutiveFailures int + isTaskWorker bool + maxQueueLen int } // WithNumThreads configures the number of PHP threads to start. @@ -80,7 +83,11 @@ func WithWorkers(name string, fileName string, num int, options ...WorkerOption) } } - o.workers = append(o.workers, worker) + if worker.isTaskWorker { + o.taskWorkers = append(o.taskWorkers, worker) + } else { + o.workers = append(o.workers, worker) + } return nil } @@ -141,3 +148,14 @@ func WithMaxWaitTime(maxWaitTime time.Duration) Option { return nil } } + +// EXPERIMENTAL: AsTaskWorker configures the worker as a task worker. +// no http requests will be handled. +// no globals resetting will be performed between tasks. +func AsTaskWorker(isTaskWorker bool, maxQueueLen int) WorkerOption { + return func(w *workerOpt) error { + w.isTaskWorker = isTaskWorker + w.maxQueueLen = maxQueueLen + return nil + } +} diff --git a/phpthread.go b/phpthread.go index a60aa8f0a..2f88e31fb 100644 --- a/phpthread.go +++ b/phpthread.go @@ -132,6 +132,10 @@ func (thread *phpThread) pinCString(s string) *C.char { return thread.pinString(s + "\x00") } +func (thread *phpThread) updateContext(isWorker bool, isTaskWorker bool) { + C.frankenphp_update_thread_context(C.bool(isWorker), C.bool(isTaskWorker)) +} + //export go_frankenphp_before_script_execution func go_frankenphp_before_script_execution(threadIndex C.uintptr_t) *C.char { thread := phpThreads[threadIndex] diff --git a/testdata/tasks/task-dispatcher-array.php b/testdata/tasks/task-dispatcher-array.php new file mode 100644 index 000000000..48938c78a --- /dev/null +++ b/testdata/tasks/task-dispatcher-array.php @@ -0,0 +1,16 @@ + "array task$i", + 'worker' => $workerName, + 'index' => $i, + ]); + } + echo "dispatched $taskCount tasks\n"; +}; diff --git a/testdata/tasks/task-dispatcher-string.php b/testdata/tasks/task-dispatcher-string.php new file mode 100644 index 000000000..49df56ec3 --- /dev/null +++ b/testdata/tasks/task-dispatcher-string.php @@ -0,0 +1,12 @@ + +import "C" +import ( + "errors" + "github.com/dunglas/frankenphp/internal/fastabs" + "sync" + "unsafe" +) + +type taskWorker struct { + threads []*phpThread + threadMutex sync.RWMutex + fileName string + taskChan chan *pendingTask + name string + num int + env PreparedEnv +} + +// representation of a thread that handles tasks directly assigned by go or via frankenphp_send_request() +// can also just execute a script in a loop +// implements the threadHandler interface +type taskWorkerThread struct { + thread *phpThread + taskWorker *taskWorker + dummyContext *frankenPHPContext + currentTask *pendingTask +} + +var taskWorkers []*taskWorker + +// EXPERIMENTAL: a task dispatched to a task worker +type pendingTask struct { + message any // the argument passed to frankenphp_send_request() or the return value of frankenphp_handle_request() + done sync.RWMutex + callback func() // optional callback for direct execution (tests) +} + +func initTaskWorkers(opts []workerOpt) error { + taskWorkers = make([]*taskWorker, 0, len(opts)) + ready := sync.WaitGroup{} + for _, opt := range opts { + fileName, err := fastabs.FastAbs(opt.fileName) + if err != nil { + return err + } + + if opt.maxQueueLen <= 0 { + opt.maxQueueLen = 10000 // default queue len, TODO: unlimited? + } + + tw := &taskWorker{ + threads: make([]*phpThread, 0, opt.num), + fileName: fileName, + taskChan: make(chan *pendingTask, opt.maxQueueLen), + name: opt.name, + num: opt.num, + env: opt.env, + } + taskWorkers = append(taskWorkers, tw) + + // start the actual PHP threads + ready.Add(tw.num) + for i := 0; i < tw.num; i++ { + thread := getInactivePHPThread() + convertToTaskWorkerThread(thread, tw) + go func(thread *phpThread) { + thread.state.waitFor(stateReady) + ready.Done() + }(thread) + } + } + ready.Wait() + + return nil +} + +func drainTaskWorkers() { + for _, tw := range taskWorkers { + tw.drainQueue() + } +} + +func convertToTaskWorkerThread(thread *phpThread, tw *taskWorker) *taskWorkerThread { + handler := &taskWorkerThread{ + thread: thread, + taskWorker: tw, + } + thread.setHandler(handler) + + return handler +} + +func (handler *taskWorkerThread) beforeScriptExecution() string { + thread := handler.thread + + switch thread.state.get() { + case stateTransitionRequested: + handler.taskWorker.detach(thread) + + return thread.transitionToNewHandler() + case stateBooting, stateTransitionComplete: + tw := handler.taskWorker + tw.threadMutex.Lock() + tw.threads = append(tw.threads, thread) + tw.threadMutex.Unlock() + thread.state.set(stateReady) + thread.updateContext(false, true) + + return handler.setupWorkerScript() + case stateReady: + + return handler.setupWorkerScript() + case stateRestarting: + thread.state.set(stateYielding) + thread.state.waitFor(stateReady, stateShuttingDown) + + return handler.beforeScriptExecution() + case stateShuttingDown: + handler.taskWorker.detach(thread) + // signal to stop + return "" + } + panic("unexpected state: " + thread.state.name()) +} + +func (handler *taskWorkerThread) setupWorkerScript() string { + fc, err := newDummyContext(handler.taskWorker.fileName, WithRequestPreparedEnv(handler.taskWorker.env)) + + if err != nil { + panic(err) + } + + handler.dummyContext = fc + clearSandboxedEnv(handler.thread) + + return handler.taskWorker.fileName +} + +func (handler *taskWorkerThread) afterScriptExecution(int) { + // restart the script +} + +func (handler *taskWorkerThread) getRequestContext() *frankenPHPContext { + return handler.dummyContext +} + +func (handler *taskWorkerThread) name() string { + return "Task Worker PHP Thread - " + handler.taskWorker.fileName +} + +func (tw *taskWorker) detach(thread *phpThread) { + tw.threadMutex.Lock() + defer tw.threadMutex.Unlock() + for i, t := range tw.threads { + if t == thread { + tw.threads = append(tw.threads[:i], tw.threads[i+1:]...) + return + } + } +} + +// make sure all tasks are done by re-queuing them until the channel is empty +func (tw *taskWorker) drainQueue() { + for { + select { + case pt := <-tw.taskChan: + tw.taskChan <- pt + pt.done.RLock() // wait for completion + default: + return + } + } +} + +func (tw *taskWorker) dispatch(t *pendingTask) error { + t.done.Lock() + select { + case tw.taskChan <- t: + return nil + default: + return errors.New("Task worker queue is full, cannot dispatch task: " + tw.name) + } +} + +func getTaskWorkerByName(name string) *taskWorker { + for _, w := range taskWorkers { + if w.name == name { + return w + } + } + + return nil +} + +//export go_frankenphp_worker_handle_task +func go_frankenphp_worker_handle_task(threadIndex C.uintptr_t) *C.zval { + thread := phpThreads[threadIndex] + handler, _ := thread.handler.(*taskWorkerThread) + thread.Unpin() + thread.state.markAsWaiting(true) + + select { + case task := <-handler.taskWorker.taskChan: + handler.currentTask = task + thread.state.markAsWaiting(false) + + // if the task has a callback, execute it (see types_test.go) + if task.callback != nil { + task.callback() + go_frankenphp_finish_task(threadIndex, nil) + + return go_frankenphp_worker_handle_task(threadIndex) + } + + zval := phpValue(task.message) + task.message = nil // free memory + thread.Pin(unsafe.Pointer(zval)) // TODO: refactor types.go so no pinning is required + + return zval + case <-handler.thread.drainChan: + thread.state.markAsWaiting(false) + // send an empty task to drain the thread + return nil + } +} + +//export go_frankenphp_finish_task +func go_frankenphp_finish_task(threadIndex C.uintptr_t, zv *C.zval) { + thread := phpThreads[threadIndex] + handler, ok := thread.handler.(*taskWorkerThread) + if !ok { + panic("thread is not a task thread: " + thread.handler.name()) + } + + if zv != nil { + result, err := goValue[any](zv) + if err != nil { + panic("failed to convert go_frankenphp_finish_task() return value: " + err.Error()) + } + handler.currentTask.message = result + } + handler.currentTask.done.Unlock() + handler.currentTask = nil +} + +//export go_frankenphp_send_request +func go_frankenphp_send_request(threadIndex C.uintptr_t, zv *C.zval, name *C.char, nameLen C.size_t) *C.char { + if zv == nil { + return phpThreads[threadIndex].pinCString("Task argument cannot be null") + } + + var tw *taskWorker + if nameLen != 0 { + tw = getTaskWorkerByName(C.GoStringN(name, C.int(nameLen))) + } else if len(taskWorkers) != 0 { + tw = taskWorkers[0] + } + + if tw == nil { + return phpThreads[threadIndex].pinCString("No worker found to handle this task: " + C.GoStringN(name, C.int(nameLen))) + } + + // convert the argument of frankenphp_send_request() to a Go value + goArg, err := goValue[any](zv) + if err != nil { + return phpThreads[threadIndex].pinCString("Failed to convert frankenphp_send_request() argument: " + err.Error()) + } + + err = tw.dispatch(&pendingTask{message: goArg}) + + if err != nil { + return phpThreads[threadIndex].pinCString(err.Error()) + } + + return nil +} diff --git a/threadtaskworker_test.go b/threadtaskworker_test.go new file mode 100644 index 000000000..c233e894e --- /dev/null +++ b/threadtaskworker_test.go @@ -0,0 +1,89 @@ +package frankenphp + +import ( + "bytes" + "log/slog" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func assertGetRequest(t *testing.T, url string, expectedBodyContains string, opts ...RequestOption) { + t.Helper() + r := httptest.NewRequest("GET", url, nil) + w := httptest.NewRecorder() + req, err := NewRequestWithContext(r, opts...) + assert.NoError(t, err) + assert.NoError(t, ServeHTTP(w, req)) + assert.Contains(t, w.Body.String(), expectedBodyContains) +} + +func TestDispatchToTaskWorkerFromWorker(t *testing.T) { + var buf bytes.Buffer + handler := slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug}) + logger := slog.New(handler) + + assert.NoError(t, Init( + WithWorkers("taskworker", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)), + WithWorkers("worker1", "./testdata/tasks/task-dispatcher-string.php", 1), + WithNumThreads(3), // regular thread, task worker thread, dispatcher threads + WithLogger(logger), + )) + + assertGetRequest(t, "http://example.com/testdata/tasks/task-dispatcher-string.php?count=4", "dispatched 4 tasks") + + // wait and shutdown to ensure all logs are flushed + time.Sleep(10 * time.Millisecond) + Shutdown() + + // task output appears in logs at info level + logOutput := buf.String() + assert.Contains(t, logOutput, "task0") + assert.Contains(t, logOutput, "task1") + assert.Contains(t, logOutput, "task2") + assert.Contains(t, logOutput, "task3") +} + +func TestDispatchArrayToTaskWorker(t *testing.T) { + var buf bytes.Buffer + handler := slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug}) + logger := slog.New(handler) + + assert.NoError(t, Init( + WithWorkers("taskworker", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)), + WithWorkers("worker2", "./testdata/tasks/task-dispatcher-array.php", 1), + WithNumThreads(3), // regular thread, task worker thread, dispatcher thread + WithLogger(logger), + )) + + assertGetRequest(t, "http://example.com/testdata/tasks/task-dispatcher-array.php?count=1", "dispatched 1 tasks") + + // wait and shutdown to ensure all logs are flushed + time.Sleep(10 * time.Millisecond) + Shutdown() + + // task output appears in logs at info level + logOutput := buf.String() + assert.Contains(t, logOutput, "array task0") +} + +func TestDispatchToMultipleWorkers(t *testing.T) { + var buf bytes.Buffer + handler := slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug}) + logger := slog.New(handler) + + assert.NoError(t, Init( + WithWorkers("worker1", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)), + WithWorkers("worker2", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)), + WithNumThreads(4), + WithLogger(logger), + )) + defer Shutdown() + + script := "http://example.com/testdata/tasks/task-dispatcher-string.php" + assertGetRequest(t, script+"?count=1&worker=worker1", "dispatched 1 tasks") + assertGetRequest(t, script+"?count=1&worker=worker2", "dispatched 1 tasks") + assertGetRequest(t, script+"?count=1&worker=worker3", "No worker found to handle this task") // fail +} diff --git a/threadworker.go b/threadworker.go index fd9c2867f..c8f3ad7eb 100644 --- a/threadworker.go +++ b/threadworker.go @@ -62,6 +62,7 @@ func (handler *workerThread) beforeScriptExecution() string { if handler.externalWorker != nil { handler.externalWorker.ThreadActivatedNotification(handler.thread.threadIndex) } + handler.thread.updateContext(true, false) setupWorkerScript(handler, handler.worker) return handler.worker.fileName case stateShuttingDown: diff --git a/types.c b/types.c index 9c4887b23..ce3835fb3 100644 --- a/types.c +++ b/types.c @@ -31,6 +31,8 @@ void __zval_double__(zval *zv, double val) { ZVAL_DOUBLE(zv, val); } void __zval_string__(zval *zv, zend_string *str) { ZVAL_STR(zv, str); } +void __zval_empty_string__(zval *zv) { ZVAL_EMPTY_STRING(zv); } + void __zval_arr__(zval *zv, zend_array *arr) { ZVAL_ARR(zv, arr); } zend_array *__zend_new_array__(uint32_t size) { return zend_new_array(size); } diff --git a/types.go b/types.go index 4e4bbbbed..a98313e66 100644 --- a/types.go +++ b/types.go @@ -263,7 +263,6 @@ func goValue[T any](zval *C.zval) (res T, err error) { resZero T ) t := C.zval_get_type(zval) - switch t { case C.IS_NULL: resAny = any(nil) @@ -382,6 +381,10 @@ func phpValue(value any) *C.zval { case float64: C.__zval_double__(&zval, C.double(v)) case string: + if v == "" { + C.__zval_empty_string__(&zval) + break + } str := (*C.zend_string)(PHPString(v, false)) C.__zval_string__(&zval, str) case map[string]any: @@ -435,3 +438,13 @@ func extractZvalValue(zval *C.zval, expectedType C.uint8_t) (unsafe.Pointer, err return nil, fmt.Errorf("unsupported zval type %d", expectedType) } + +// used for cleanup in tests +func zvalPtrDtor(p unsafe.Pointer) { + C.zval_ptr_dtor((*C.zval)(p)) +} + +// used for cleanup in tests +func zendStringRelease(p unsafe.Pointer) { + C.zend_string_release((*C.zend_string)(p)) +} diff --git a/types.h b/types.h index c82f479d4..72442cf30 100644 --- a/types.h +++ b/types.h @@ -19,6 +19,7 @@ void __zval_bool__(zval *zv, bool val); void __zval_long__(zval *zv, zend_long val); void __zval_double__(zval *zv, double val); void __zval_string__(zval *zv, zend_string *str); +void __zval_empty_string__(zval *zv); void __zval_arr__(zval *zv, zend_array *arr); zend_array *__zend_new_array__(uint32_t size); diff --git a/types_test.go b/types_test.go index 122fe930d..033dc2eab 100644 --- a/types_test.go +++ b/types_test.go @@ -1,37 +1,50 @@ package frankenphp import ( - "io" + "errors" "log/slog" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap/exp/zapslog" + "go.uber.org/zap/zaptest" ) // execute the function on a PHP thread directly // this is necessary if tests make use of PHP's internal allocation -func testOnDummyPHPThread(t *testing.T, test func()) { +func testOnDummyPHPThread(t *testing.T, cb func()) { t.Helper() - logger = slog.New(slog.NewTextHandler(io.Discard, nil)) - _, err := initPHPThreads(1, 1, nil) // boot 1 thread - assert.NoError(t, err) - handler := convertToTaskThread(phpThreads[0]) + logger = slog.New(zapslog.NewHandler(zaptest.NewLogger(t).Core())) + assert.NoError(t, Init( + WithWorkers("tw", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)), + WithNumThreads(2), + WithLogger(logger), + )) + defer Shutdown() + + assert.NoError(t, executeOnPHPThread(cb, "tw")) +} - task := newTask(test) - handler.execute(task) - task.waitForCompletion() +// executeOnPHPThread executes the callback func() directly on a task worker thread +// useful for testing purposes when dealing with PHP allocations +func executeOnPHPThread(callback func(), taskWorkerName string) error { + tw := getTaskWorkerByName(taskWorkerName) + if tw == nil { + return errors.New("no task worker found with name " + taskWorkerName) + } - drainPHPThreads() + return tw.dispatch(&pendingTask{callback: callback}) } func TestGoString(t *testing.T) { testOnDummyPHPThread(t, func() { originalString := "Hello, World!" - convertedString := GoString(PHPString(originalString, false)) + phpString := PHPString(originalString, false) + defer zendStringRelease(phpString) - assert.Equal(t, originalString, convertedString, "string -> zend_string -> string should yield an equal string") + assert.Equal(t, originalString, GoString(phpString), "string -> zend_string -> string should yield an equal string") }) } @@ -42,7 +55,9 @@ func TestPHPMap(t *testing.T) { "foo2": "bar2", } - convertedMap, err := GoMap[string](PHPMap(originalMap)) + phpArray := PHPMap(originalMap) + defer zvalPtrDtor(phpArray) + convertedMap, err := GoMap[string](phpArray) require.NoError(t, err) assert.Equal(t, originalMap, convertedMap, "associative array should be equal after conversion") @@ -59,7 +74,9 @@ func TestOrderedPHPAssociativeArray(t *testing.T) { Order: []string{"foo2", "foo1"}, } - convertedArray, err := GoAssociativeArray[string](PHPAssociativeArray(originalArray)) + phpArray := PHPAssociativeArray(originalArray) + defer zvalPtrDtor(phpArray) + convertedArray, err := GoAssociativeArray[string](phpArray) require.NoError(t, err) assert.Equal(t, originalArray, convertedArray, "associative array should be equal after conversion") @@ -70,7 +87,9 @@ func TestPHPPackedArray(t *testing.T) { testOnDummyPHPThread(t, func() { originalSlice := []string{"bar1", "bar2"} - convertedSlice, err := GoPackedArray[string](PHPPackedArray(originalSlice)) + phpArray := PHPPackedArray(originalSlice) + defer zvalPtrDtor(phpArray) + convertedSlice, err := GoPackedArray[string](phpArray) require.NoError(t, err) assert.Equal(t, originalSlice, convertedSlice, "slice should be equal after conversion") @@ -85,7 +104,9 @@ func TestPHPPackedArrayToGoMap(t *testing.T) { "1": "bar2", } - convertedMap, err := GoMap[string](PHPPackedArray(originalSlice)) + phpArray := PHPPackedArray(originalSlice) + defer zvalPtrDtor(phpArray) + convertedMap, err := GoMap[string](phpArray) require.NoError(t, err) assert.Equal(t, expectedMap, convertedMap, "convert a packed to an associative array") @@ -103,7 +124,9 @@ func TestPHPAssociativeArrayToPacked(t *testing.T) { } expectedSlice := []string{"bar1", "bar2"} - convertedSlice, err := GoPackedArray[string](PHPAssociativeArray(originalArray)) + phpArray := PHPAssociativeArray(originalArray) + defer zvalPtrDtor(phpArray) + convertedSlice, err := GoPackedArray[string](phpArray) require.NoError(t, err) assert.Equal(t, expectedSlice, convertedSlice, "convert an associative array to a slice") @@ -126,7 +149,9 @@ func TestNestedMixedArray(t *testing.T) { }, } - convertedArray, err := GoMap[any](PHPMap(originalArray)) + phpArray := PHPMap(originalArray) + defer zvalPtrDtor(phpArray) + convertedArray, err := GoMap[any](phpArray) require.NoError(t, err) assert.Equal(t, originalArray, convertedArray, "nested mixed array should be equal after conversion") diff --git a/worker.go b/worker.go index 3f2049ecd..ec66a9181 100644 --- a/worker.go +++ b/worker.go @@ -9,7 +9,6 @@ import ( "time" "github.com/dunglas/frankenphp/internal/fastabs" - "github.com/dunglas/frankenphp/internal/watcher" ) // represents a worker script and can have many threads assigned to it @@ -26,15 +25,12 @@ type worker struct { } var ( - workers []*worker - watcherIsEnabled bool + workers []*worker ) func initWorkers(opt []workerOpt) error { workers = make([]*worker, 0, len(opt)) workersReady := sync.WaitGroup{} - directoriesToWatch := getDirectoriesToWatch(opt) - watcherIsEnabled = len(directoriesToWatch) > 0 for _, o := range opt { w, err := newWorker(o) @@ -64,15 +60,6 @@ func initWorkers(opt []workerOpt) error { workersReady.Wait() - if !watcherIsEnabled { - return nil - } - - watcherIsEnabled = true - if err := watcher.InitWatcher(directoriesToWatch, RestartWorkers, logger); err != nil { - return err - } - return nil } @@ -144,34 +131,36 @@ func DrainWorkers() { func drainWorkerThreads() []*phpThread { ready := sync.WaitGroup{} drainedThreads := make([]*phpThread, 0) + threadsToDrain := make([]*phpThread, 0) for _, worker := range workers { worker.threadMutex.RLock() - ready.Add(len(worker.threads)) - for _, thread := range worker.threads { - if !thread.state.requestSafeStateChange(stateRestarting) { - ready.Done() - // no state change allowed == thread is shutting down - // we'll proceed to restart all other threads anyways - continue - } - close(thread.drainChan) - drainedThreads = append(drainedThreads, thread) - go func(thread *phpThread) { - thread.state.waitFor(stateYielding) - ready.Done() - }(thread) - } + threadsToDrain = append(threadsToDrain, worker.threads...) worker.threadMutex.RUnlock() } - ready.Wait() - return drainedThreads -} + for _, taskWorker := range taskWorkers { + taskWorker.threadMutex.RLock() + threadsToDrain = append(threadsToDrain, taskWorker.threads...) + taskWorker.threadMutex.RUnlock() + } -func drainWatcher() { - if watcherIsEnabled { - watcher.DrainWatcher() + for _, thread := range threadsToDrain { + if !thread.state.requestSafeStateChange(stateRestarting) { + // no state change allowed == thread is shutting down + // we'll proceed to restart all other threads anyways + continue + } + ready.Add(1) + close(thread.drainChan) + drainedThreads = append(drainedThreads, thread) + go func(thread *phpThread) { + thread.state.waitFor(stateYielding) + ready.Done() + }(thread) } + ready.Wait() + + return drainedThreads } // RestartWorkers attempts to restart all workers gracefully @@ -188,14 +177,6 @@ func RestartWorkers() { } } -func getDirectoriesToWatch(workerOpts []workerOpt) []string { - directoriesToWatch := []string{} - for _, w := range workerOpts { - directoriesToWatch = append(directoriesToWatch, w.watch...) - } - return directoriesToWatch -} - func (worker *worker) attachThread(thread *phpThread) { worker.threadMutex.Lock() worker.threads = append(worker.threads, thread) @@ -213,14 +194,6 @@ func (worker *worker) detachThread(thread *phpThread) { worker.threadMutex.Unlock() } -func (worker *worker) countThreads() int { - worker.threadMutex.RLock() - l := len(worker.threads) - worker.threadMutex.RUnlock() - - return l -} - func (worker *worker) handleRequest(fc *frankenPHPContext) { metrics.StartWorkerRequest(worker.name)