From 2b48b24abf3b59c3699ca99e144deb5c0020208f Mon Sep 17 00:00:00 2001 From: David Grudl Date: Thu, 12 Dec 2024 04:12:03 +0100 Subject: [PATCH] added Process [WIP] --- src/Utils/Process.php | 445 +++++++++++++++++++ src/Utils/exceptions.php | 16 + tests/Utils/Process.basic.phpt | 154 +++++++ tests/Utils/Process.consume.phpt | 35 ++ tests/Utils/Process.environment.phpt | 26 ++ tests/Utils/Process.input.phpt | 56 +++ tests/Utils/Process.output.phpt | 67 +++ tests/Utils/Process.piping.phpt | 32 ++ tests/Utils/fixtures.process/incremental.php | 27 ++ tests/Utils/fixtures.process/rev.php | 7 + tests/Utils/fixtures.process/tick.php | 8 + 11 files changed, 873 insertions(+) create mode 100644 src/Utils/Process.php create mode 100644 tests/Utils/Process.basic.phpt create mode 100644 tests/Utils/Process.consume.phpt create mode 100644 tests/Utils/Process.environment.phpt create mode 100644 tests/Utils/Process.input.phpt create mode 100644 tests/Utils/Process.output.phpt create mode 100644 tests/Utils/Process.piping.phpt create mode 100644 tests/Utils/fixtures.process/incremental.php create mode 100644 tests/Utils/fixtures.process/rev.php create mode 100644 tests/Utils/fixtures.process/tick.php diff --git a/src/Utils/Process.php b/src/Utils/Process.php new file mode 100644 index 00000000..6e4291fe --- /dev/null +++ b/src/Utils/Process.php @@ -0,0 +1,445 @@ + true]; + + /** @var resource */ + private mixed $inputPipe; + + /** @var resource[] */ + private array $outputPipes = []; + + /** @var string[] */ + private array $outputBuffers = []; + + /** @var int[] Number of bytes already read from buffers. */ + private array $outputBufferOffsets = []; + private float $startTime; + + + /** + * Starts an executable with given arguments. + * @param string $executable Path to the executable binary. + * @param string[] $arguments Arguments passed to the executable. + * @param string[]|null $env Environment variables or null to use the same environment as the current process. + * @param array $options Additional options for proc_open(), uses bypass_shell = true by default + * @param mixed $stdin Input data (string, resource, Process, or null). + * @param mixed $stdout Output target (string filename, resource, false for discard, or null for capture). + * @param mixed $stderr Error output target (same as $output). + * @param string|null $directory Working directory. + * @param float|null $timeout Time limit in seconds. + */ + public static function runExecutable( + string $executable, + array $arguments = [], + ?array $env = null, + array $options = [], + mixed $stdin = '', + mixed $stdout = null, + mixed $stderr = null, + ?string $directory = null, + ?float $timeout = self::DefaultTimeout, + ): self + { + return new self([$executable, ...$arguments], $env, $options, $directory, $stdin, $stdout, $stderr, $timeout); + } + + + /** + * Starts a process from a command string. The command will be interpreted by the shell. + * @param string $command Shell command to run. + * @param string[]|null $env Environment variables or null to use the same environment as the current process. + * @param array $options Options for proc_open(). + * @param mixed $stdin Input data (string, resource, Process, or null). + * @param mixed $stdout Output target (string filename, resource, false for discard, or null for capture). + * @param mixed $stderr Error output target (same as $output). + * @param string|null $directory Working directory. + * @param float|null $timeout Time limit in seconds. + */ + public static function runCommand( + string $command, + ?array $env = null, + array $options = [], + mixed $stdin = '', + mixed $stdout = null, + mixed $stderr = null, + ?string $directory = null, + ?float $timeout = self::DefaultTimeout, + ): self + { + return new self($command, $env, $options, $directory, $stdin, $stdout, $stderr, $timeout); + } + + + private function __construct( + string|array $command, + ?array $env, + array $options, + ?string $directory, + mixed $stdin, + mixed $stdout, + mixed $stderr, + private ?float $timeout, + ) { + $descriptors = [ + self::StdIn => $this->createInputDescriptor($stdin), + self::StdOut => $this->createOutputDescriptor(self::StdOut, $stdout), + self::StdErr => $this->createOutputDescriptor(self::StdErr, $stderr), + ]; + + $this->process = @proc_open($command, $descriptors, $pipes, $directory, $env, $options); + if (!is_resource($this->process)) { + throw new ProcessFailedException('Failed to start process: ' . Helpers::getLastError()); + } + + [$this->inputPipe, $this->outputPipes[self::StdOut], $this->outputPipes[self::StdErr]] = $pipes + $descriptors; + $this->writeInitialInput($stdin); + $this->startTime = microtime(true); + } + + + public function __destruct() + { + $this->outputBuffers = []; + $this->terminate(); + } + + + /** + * Checks if the process is currently running. + */ + public function isRunning(): bool + { + if (!$this->status['running']) { + return false; + } + + $this->status = proc_get_status($this->process); + if (!$this->status['running']) { + $this->close(); + } + + return $this->status['running']; + } + + + /** + * Finishes the process by waiting for its completion. + * While waiting, periodically checks for output and can invoke a callback with new output chunks. + * + * @param (callable(string, string): void)|null $callback + */ + public function wait(?\Closure $callback = null): void + { + while ($this->isRunning()) { + $this->enforceTimeout(); + if ($callback) { + $this->dispatchCallback($callback); + } + usleep(self::PollInterval); + } + + if ($callback) { + $this->dispatchCallback($callback); + } + } + + + /** + * Terminates the running process if it is still running. + */ + public function terminate(): void + { + if (!$this->isRunning()) { + return; + } elseif (Helpers::IsWindows) { + exec("taskkill /F /T /PID {$this->getPid()} 2>&1"); + } else { + proc_terminate($this->process); + } + $this->status['running'] = false; + $this->close(); + } + + + /** + * Returns the process exit code. If the process is still running, waits until it finishes. + */ + public function getExitCode(): int + { + $this->wait(); + return $this->status['exitcode'] ?? -1; + } + + + /** + * Returns true if the process terminated with exit code 0. + */ + public function isSuccess(): bool + { + return $this->getExitCode() === 0; + } + + + /** + * Waits for the process to finish and throws ProcessFailedException if exit code is not zero. + */ + public function ensureSuccess(): void + { + $code = $this->getExitCode(); + if ($code !== 0) { + throw new ProcessFailedException("Process failed with non-zero exit code: $code"); + } + } + + + /** + * Returns the PID of the running process, or null if it is not running. + */ + public function getPid(): ?int + { + return $this->isRunning() ? $this->status['pid'] : null; + } + + + /** + * Reads all remaining output into memory and returns it after waiting for the process to finish. + * Output from STDOUT. + */ + public function getStdOutput(): string + { + $this->wait(); + return $this->outputBuffers[self::StdOut] ?? throw new \LogicException('Cannot read output: output capturing was not enabled'); + } + + + /** + * Reads all remaining error output into memory and returns it after waiting for the process to finish. + * Output from STDERR. + */ + public function getStdError(): string + { + $this->wait(); + return $this->outputBuffers[self::StdErr] ?? throw new \LogicException('Cannot read output: output capturing was not enabled'); + } + + + /** + * Returns newly available STDOUT data since the last consumeOutput() call. + */ + public function consumeStdOutput(): string + { + return $this->consumeBuffer(self::StdOut); + } + + + /** + * Returns newly available STDERR data since the last consumeErrorOutput() call. + */ + public function consumeStdError(): string + { + return $this->consumeBuffer(self::StdErr); + } + + + /** + * Returns newly available data from the specified buffer and advances the read pointer. + */ + private function consumeBuffer(int $id): string + { + if (!isset($this->outputBuffers[$id])) { + throw new \LogicException('Cannot read output: output capturing was not enabled'); + } elseif ($this->isRunning()) { + $this->enforceTimeout(); + $this->readFromPipe($id); + } + $res = substr($this->outputBuffers[$id], $this->outputBufferOffsets[$id]); + $this->outputBufferOffsets[$id] = strlen($this->outputBuffers[$id]); + return $res; + } + + + /** + * Writes data into the process' STDIN. If STDIN is closed, throws exception. + */ + public function writeStdInput(string $string): void + { + if (!is_resource($this->inputPipe)) { + throw new Nette\InvalidStateException('Cannot write to process: STDIN pipe is closed'); + } + fwrite($this->inputPipe, $string); + } + + + /** + * Closes the STDIN pipe, indicating no more data will be sent. + */ + public function closeStdInput(): void + { + if (is_resource($this->inputPipe)) { + fclose($this->inputPipe); + } + } + + + /** + * Called periodically while waiting for process completion to invoke callback with new output/error data. + */ + private function dispatchCallback(\Closure $callback): void + { + $output = isset($this->outputBuffers[self::StdOut]) ? $this->consumeStdOutput() : ''; + $error = isset($this->outputBuffers[self::StdErr]) ? $this->consumeStdError() : ''; + if ($output !== '' || $error !== '') { + $callback($output, $error); + } + } + + + /** + * Checks if the timeout has expired. If yes, terminates the process. + */ + private function enforceTimeout(): void + { + if ($this->timeout !== null && (microtime(true) - $this->startTime) >= $this->timeout) { + $this->terminate(); + throw new ProcessTimeoutException('Process exceeded the time limit of ' . $this->timeout . ' seconds'); + } + } + + + /** + * Reads any new data from the specified pipe and appends it to the buffer. + */ + private function readFromPipe(int $id): void + { + if (!isset($this->outputBuffers[$id])) { + return; + } elseif (Helpers::IsWindows) { + fseek($this->outputPipes[$id], strlen($this->outputBuffers[$id])); + } else { + stream_set_blocking($this->outputPipes[$id], false); + } + $this->outputBuffers[$id] .= stream_get_contents($this->outputPipes[$id]); + } + + + /** + * Writes initial input data to the process. If input is a string, writes and closes input. + * If input is a resource, copies it and closes input. If it is another Process, links outputs (not on Windows). + */ + private function writeInitialInput(mixed $input): void + { + if ($input === null || $input instanceof self) { + // keeps input open until closeInput() is called + + } elseif (is_string($input)) { + fwrite($this->inputPipe, $input); + $this->closeStdInput(); + + } elseif (is_resource($input)) { + stream_copy_to_stream($input, $this->inputPipe); + $this->closeStdInput(); + + } else { + throw new Nette\InvalidArgumentException('Input must be string, resource or null, ' . get_debug_type($input) . ' given.'); + } + } + + + /** + * Determines the STDIN descriptor based on the type of input. + */ + private function createInputDescriptor(mixed $input): mixed + { + if (!$input instanceof self) { + return ['pipe', 'r']; + } elseif (!Helpers::IsWindows) { + return $input->outputPipes[self::StdOut]; + } else { + throw new Nette\NotSupportedException('Process piping is not supported on Windows'); + } + } + + + /** + * Determines the descriptor for STDOUT or STDERR based on the specified output target. + */ + private function createOutputDescriptor(int $id, mixed $output): mixed + { + if (is_resource($output)) { + return $output; + + } elseif (is_string($output)) { + return fopen($output, 'w'); + + } elseif ($output === false) { + return ['file', Helpers::IsWindows ? 'NUL' : '/dev/null', 'w']; + + } elseif ($output === null) { + $this->outputBuffers[$id] = ''; + $this->outputBufferOffsets[$id] = 0; + // TODO: timeout lze zajisti na windows jedine s tmpfile() + return Helpers::IsWindows ? tmpfile() : ['pipe', 'w']; + + } else { + throw new Nette\InvalidArgumentException('Output must be string, resource, bool or null, ' . get_debug_type($output) . ' given.'); + } + } + + + /** + * Closes all pipes and the process resource. + */ + private function close(): void + { + foreach ($this->outputPipes as $id => $_) { + $this->readFromPipe($id); + } + $this->closeStdInput(); + $this->closeOutputPipes(); + proc_close($this->process); + } + + + /** + * Closes all pipes. On Windows, tries to remove temporary files associated with them. + */ + private function closeOutputPipes(): void + { + foreach ($this->outputPipes as $id => &$pipe) { + if (!is_resource($pipe) || !isset($this->outputBufferOffsets[$id])) { // TODO + // already closed or not initialized by createOutputDescriptor() + } elseif (Helpers::IsWindows) { + $file = stream_get_meta_data($pipe)['uri']; + fclose($pipe); + @unlink($file); + } else { + fclose($pipe); + } + } + } +} diff --git a/src/Utils/exceptions.php b/src/Utils/exceptions.php index 30805ea3..3fbe8d93 100644 --- a/src/Utils/exceptions.php +++ b/src/Utils/exceptions.php @@ -48,3 +48,19 @@ class RegexpException extends \Exception class AssertionException extends \Exception { } + + +/** + * The process failed to run successfully. + */ +class ProcessFailedException extends \RuntimeException +{ +} + + +/** + * The process execution exceeded its timeout limit. + */ +class ProcessTimeoutException extends \RuntimeException +{ +} diff --git a/tests/Utils/Process.basic.phpt b/tests/Utils/Process.basic.phpt new file mode 100644 index 00000000..801da3bf --- /dev/null +++ b/tests/Utils/Process.basic.phpt @@ -0,0 +1,154 @@ +isSuccess()); + Assert::same(0, $process->getExitCode()); + Assert::same('hello', $process->getStdOutput()); + Assert::same('', $process->getStdError()); +}); + +test('run command successfully', function () { + $process = Process::runCommand('echo hello'); + Assert::true($process->isSuccess()); + Assert::same(0, $process->getExitCode()); + Assert::same('hello' . PHP_EOL, $process->getStdOutput()); + Assert::same('', $process->getStdError()); +}); + + +// Process execution - errors + +test('run executable with error', function () { + $process = Process::runExecutable(PHP_BINARY, ['-r', 'exit(1);']); + Assert::false($process->isSuccess()); + Assert::same(1, $process->getExitCode()); +}); + +test('run executable ensure success throws exception on error', function () { + Assert::exception( + fn() => Process::runExecutable(PHP_BINARY, ['-r', 'exit(1);'])->ensureSuccess(), + ProcessFailedException::class, + 'Process failed with non-zero exit code: 1', + ); +}); + +test('run command with error', function () { + $process = Process::runCommand('"' . PHP_BINARY . '" -r "exit(1);"'); + Assert::false($process->isSuccess()); + Assert::same(1, $process->getExitCode()); +}); + +test('run command ensure success throws exception on error', function () { + Assert::exception( + fn() => Process::runCommand('"' . PHP_BINARY . '" -r "exit(1);"')->ensureSuccess(), + ProcessFailedException::class, + 'Process failed with non-zero exit code: 1', + ); +}); + + +// Process state monitoring + +test('is running', function () { + $process = Process::runExecutable(PHP_BINARY, ['-r', 'sleep(1);']); + Assert::true($process->isRunning()); + $process->wait(); + Assert::false($process->isRunning()); +}); + +test('get pid', function () { + $process = Process::runExecutable(PHP_BINARY, ['-r', 'sleep(1);']); + Assert::type('int', $process->getPid()); + $process->wait(); + Assert::null($process->getPid()); +}); + + +// Waiting for process + +test('wait', function () { + $process = Process::runExecutable(PHP_BINARY, ['-r', 'echo "hello";']); + $process->wait(); + $process->wait(); + Assert::false($process->isRunning()); + Assert::same(0, $process->getExitCode()); + Assert::same('hello', $process->getStdOutput()); +}); + +test('wait with callback', function () { + $output = ''; + $error = ''; + $process = Process::runExecutable(PHP_BINARY, ['-r', 'echo "hello"; fwrite(STDERR, "error");']); + $process->wait(function ($stdOut, $stdErr) use (&$output, &$error) { + $output .= $stdOut; + $error .= $stdErr; + }); + Assert::same('hello', $output); + Assert::same('error', $error); +}); + + +// Automatically call wait() + +test('getStdOutput() automatically call wait()', function () { + $process = Process::runExecutable(PHP_BINARY, ['-r', 'echo "hello";']); + Assert::same('hello', $process->getStdOutput()); + Assert::false($process->isRunning()); +}); + +test('getExitCode() automatically call wait()', function () { + $process = Process::runExecutable(PHP_BINARY, ['-r', 'echo exit(2);']); + Assert::same(2, $process->getExitCode()); + Assert::false($process->isRunning()); +}); + + +// Terminating process + +test('terminate', function () { + $process = Process::runExecutable(PHP_BINARY, ['-r', 'sleep(5);']); + $process->terminate(); + Assert::false($process->isRunning()); +}); + +test('terminate() and then wait()', function () { + $process = Process::runExecutable(PHP_BINARY, ['-f', 'sleep(5);']); + $process->terminate(); + $process->wait(); + Assert::false($process->isRunning()); +}); + + +// Timeout + +test('timeout', function () { + Assert::exception( + fn() => Process::runExecutable(PHP_BINARY, ['-r', 'sleep(5);'], timeout: 0.1)->wait(), + ProcessTimeoutException::class, + 'Process exceeded the time limit of 0.1 seconds', + ); +}); + + +// bypass_shell + +if (Helpers::IsWindows) { + test('bypass_shell = false', function () { + $process = Process::runCommand('"' . PHP_BINARY . '" -r "echo 123;"', options: ['bypass_shell' => false]); + Assert::same('123', $process->getStdOutput()); + }); +} diff --git a/tests/Utils/Process.consume.phpt b/tests/Utils/Process.consume.phpt new file mode 100644 index 00000000..865b750f --- /dev/null +++ b/tests/Utils/Process.consume.phpt @@ -0,0 +1,35 @@ +consumeStdOutput()); + usleep(50_000); + Assert::same('lo', $process->consumeStdOutput()); + usleep(50_000); + Assert::same('wor', $process->consumeStdOutput()); + usleep(50_000); + Assert::same('ld', $process->consumeStdOutput()); + $process->wait(); + Assert::same('', $process->consumeStdOutput()); + Assert::same('helloworld', $process->getStdOutput()); +}); + +test('incremental error output consumption', function () { + $process = Process::runExecutable(PHP_BINARY, ['-f', __DIR__ . '/fixtures.process/incremental.php', 'stderr']); + usleep(50_000); + Assert::same('hello' . PHP_EOL, $process->consumeStdError()); + usleep(50_000); + Assert::same('world' . PHP_EOL, $process->consumeStdError()); + usleep(50_000); + Assert::same('', $process->consumeStdError()); + Assert::same('hello' . PHP_EOL . 'world' . PHP_EOL, $process->getStdError()); +}); diff --git a/tests/Utils/Process.environment.phpt b/tests/Utils/Process.environment.phpt new file mode 100644 index 00000000..25cfe838 --- /dev/null +++ b/tests/Utils/Process.environment.phpt @@ -0,0 +1,26 @@ + '123']); + Assert::same('123', $process->getStdOutput()); +}); + +test('no environment variables', function () { + $process = Process::runExecutable(PHP_BINARY, ['-r', 'echo !getenv("PATH") ? "ok" : "no";'], env: []); + Assert::same('ok', $process->getStdOutput()); +}); + +test('parent environment variables', function () { + $process = Process::runExecutable(PHP_BINARY, ['-r', 'echo getenv("PATH") ? "ok" : "no";']); + Assert::same('ok', $process->getStdOutput()); +}); diff --git a/tests/Utils/Process.input.phpt b/tests/Utils/Process.input.phpt new file mode 100644 index 00000000..ac007d0c --- /dev/null +++ b/tests/Utils/Process.input.phpt @@ -0,0 +1,56 @@ +getStdOutput()); +}); + +test('stream as input', function () { + $input = fopen('php://memory', 'r+'); + fwrite($input, 'Hello Input'); + rewind($input); + $process = Process::runExecutable(PHP_BINARY, ['-r', 'echo fgets(STDIN);'], stdin: $input); + Assert::same('Hello Input', $process->getStdOutput()); +}); + + +// Writing input + +test('write input', function () { + $process = Process::runExecutable(PHP_BINARY, ['-r', 'echo fgets(STDIN);'], stdin: null); + $process->writeStdInput('hello' . PHP_EOL); + $process->writeStdInput('world' . PHP_EOL); + $process->closeStdInput(); + Assert::same('hello' . PHP_EOL, $process->getStdOutput()); +}); + +test('writeStdInput() after closeStdInput() throws exception', function () { + $process = Process::runExecutable(PHP_BINARY, ['-r', 'echo fgets(STDIN);'], stdin: null); + $process->writeStdInput('hello' . PHP_EOL); + $process->closeStdInput(); + Assert::exception( + fn() => $process->writeStdInput('world' . PHP_EOL), + Nette\InvalidStateException::class, + 'Cannot write to process: STDIN pipe is closed', + ); +}); + +test('writeStdInput() throws exception when stdin is not null', function () { + $process = Process::runExecutable(PHP_BINARY, ['-r', 'echo fgets(STDIN);']); + Assert::exception( + fn() => $process->writeStdInput('hello' . PHP_EOL), + Nette\InvalidStateException::class, + 'Cannot write to process: STDIN pipe is closed', + ); +}); diff --git a/tests/Utils/Process.output.phpt b/tests/Utils/Process.output.phpt new file mode 100644 index 00000000..824fa979 --- /dev/null +++ b/tests/Utils/Process.output.phpt @@ -0,0 +1,67 @@ +wait(); + Assert::same('hello', file_get_contents($tempFile)); + unlink($tempFile); +}); + +test('setting stderr to false prevents reading from getStdError() or consumeStdError()', function () { + $process = Process::runExecutable(PHP_BINARY, ['-r', 'echo fwrite(STDERR, "hello");'], stderr: false); + $process->wait(); + Assert::exception( + fn() => $process->getStdError(), + LogicException::class, + 'Cannot read output: output capturing was not enabled', + ); + Assert::exception( + fn() => $process->consumeStdError(), + LogicException::class, + 'Cannot read output: output capturing was not enabled', + ); +}); + +test('stream as output', function () { + $tempFile = tempnam(sys_get_temp_dir(), 'process_test_'); + $output = fopen($tempFile, 'w'); + $process = Process::runExecutable(PHP_BINARY, ['-r', 'echo "hello";'], stdout: $output); + $process->wait(); + fclose($output); + Assert::same('hello', file_get_contents($tempFile)); + unlink($tempFile); +}); + +test('stream as error output', function () { + $tempFile = tempnam(sys_get_temp_dir(), 'process_test_'); + $output = fopen($tempFile, 'w'); + $process = Process::runExecutable(PHP_BINARY, ['-r', 'echo fwrite(STDERR, "hello");'], stderr: $output); + $process->wait(); + fclose($output); + Assert::same('hello', file_get_contents($tempFile)); + unlink($tempFile); +}); + +test('changing both stdout and stderr does not trigger callbacks in wait()', function () { + $tempFile = tempnam(sys_get_temp_dir(), 'process_test_'); + $output = fopen($tempFile, 'w'); + $wasCalled = false; + $process = Process::runExecutable(PHP_BINARY, ['-r', 'echo "hello";'], stdout: $output, stderr: $output); + $process->wait(function () use (&$wasCalled) { + $wasCalled = true; + }); + fclose($output); + Assert::false($wasCalled); + unlink($tempFile); +}); diff --git a/tests/Utils/Process.piping.phpt b/tests/Utils/Process.piping.phpt new file mode 100644 index 00000000..6cbc1ffa --- /dev/null +++ b/tests/Utils/Process.piping.phpt @@ -0,0 +1,32 @@ +wait(function ($stdOut, $stdErr) use (&$output) { + $output .= $stdOut; +}); + +Assert::same('kcit' . PHP_EOL . 'kcit' . PHP_EOL . 'kcit' . PHP_EOL, $output); diff --git a/tests/Utils/fixtures.process/incremental.php b/tests/Utils/fixtures.process/incremental.php new file mode 100644 index 00000000..d777e97d --- /dev/null +++ b/tests/Utils/fixtures.process/incremental.php @@ -0,0 +1,27 @@ +