diff --git a/src/Client/Capture.php b/src/Client/Capture.php new file mode 100644 index 0000000..c4a8083 --- /dev/null +++ b/src/Client/Capture.php @@ -0,0 +1,64 @@ +onEvent = $handler; + $this->status = 'inject'; } /** - * @return Closure + * @param Throwable|Exception $exception + * + * @return void */ - public function getWriteCapture(): Closure + public function onFail(Throwable|Exception $exception): void { - return function (string $content, Closure $pass) { - return $pass($content); - }; - } + $this->status = 'fail'; - /** - * @return Closure - */ - public function getReadCapture(): Closure - { - return function (string|false $content, Closure $pass) { - if ($content === false) { - return $pass($content); - } - - $this->buffer .= $content; - $this->processBuffer(); - return $pass($content); - }; + foreach ($this->iterators as $iterator) { + $iterator->onError($exception); + } } /** + * @param Throwable|Exception $exception + * * @return void */ - private function processBuffer(): void + public function onError(Throwable|Exception $exception): void { - if (!$this->isSSE) { - $headerEnd = strpos($this->buffer, "\r\n\r\n"); - if ($headerEnd !== false) { - $header = substr($this->buffer, 0, $headerEnd); - $this->buffer = substr($this->buffer, $headerEnd + 4); - $this->parseHeaders($header); - - if (!$this->isSSE) { - throw new RuntimeException('Response is not SSE'); - } + $this->status = 'error'; - $this->response = new Response(200, $this->headers, ''); - $this->isChunked = isset($this->headers['TRANSFER-ENCODING']) && strtolower($this->headers['TRANSFER-ENCODING']) === 'chunked'; - } + foreach ($this->iterators as $iterator) { + $iterator->onError($exception); } - $this->isChunked ? $this->processChunkedBuffer() : $this->parseEvents(); } /** - * @param string $header + * @param \GuzzleHttp\Psr7\Response $response * * @return void */ - private function parseHeaders(string $header): void + public function onComplete(Response $response): void { - $lines = explode("\r\n", $header); + $this->status = 'complete'; - $firstLine = array_shift($lines); - if (!$firstLine || count(explode(' ', $firstLine)) < 3) { - throw new RuntimeException('Header parsing failed'); + if ($this->onComplete instanceof Closure) { + ($this->onComplete)($response); } - foreach ($lines as $line) { - if (str_contains($line, ': ')) { - [$key, $value] = explode(': ', $line, 2); - $this->headers[strtoupper($key)] = $value; - } + foreach ($this->iterators as $iterator) { + $iterator->onEvent(null); } + } - $this->isSSE = isset($this->headers['CONTENT-TYPE']) && str_contains($this->headers['CONTENT-TYPE'], 'text/event-stream'); + /** + * @param array $headers + * + * @return void + */ + public function processHeader(array $headers): void + { } /** + * @param string $content + * * @return void */ - private function processChunkedBuffer(): void + public function processContent(string $content): void { - while (true) { - $sizeEnd = strpos($this->buffer, "\r\n"); - if ($sizeEnd === false) { - return; - } + $this->buffer .= $content; + while (($eventEnd = strpos($this->buffer, "\n\n")) !== false) { + $eventString = substr($this->buffer, 0, $eventEnd); + $this->buffer = substr($this->buffer, $eventEnd + 2); - $sizeHex = trim(substr($this->buffer, 0, $sizeEnd)); - if (!ctype_xdigit($sizeHex)) { - $this->parseEvents(); - return; + // Split the data by lines + $eventData = []; + $lines = explode("\n", $eventString); + foreach ($lines as $line) { + $keyValue = explode(':', $line, 2); + if (count($keyValue) === 2) { + $eventData[trim($keyValue[0])] = trim($keyValue[1]); + } else { + $eventData[] = $line; + } } - $size = hexdec($sizeHex); - if ($size === 0) { - $this->buffer = substr($this->buffer, $sizeEnd + 2); - break; + if ($this->onEvent instanceof Closure) { + ($this->onEvent)($eventData); } - $chunkStart = $sizeEnd + 2; - - if (strlen($this->buffer) < $chunkStart + $size + 2) { - return; + foreach ($this->iterators as $iterator) { + $iterator->onEvent($eventData); } - - $chunkData = substr($this->buffer, $chunkStart, $size); - $this->buffer = substr($this->buffer, $chunkStart + $size + 2); - $this->buffer .= $chunkData; } + } - $this->parseEvents(); + /*** @return string */ + public function getStatus(): string + { + return $this->status; } + /*** @var array */ + protected array $iterators = []; /** - * @return void + * @return iterable */ - private function parseEvents(): void + public function getIterator(): iterable { - while (($eventEnd = strpos($this->buffer, "\n\n")) !== false) { - $eventData = substr($this->buffer, 0, $eventEnd); - $this->buffer = substr($this->buffer, $eventEnd + 2); + return $this->iterators[] = new class ($this) implements Iterator { + /*** @var \Revolt\EventLoop\Suspension[] */ + protected array $waiters = []; + + /** + * @param ServerSentEvents $capture + */ + public function __construct(protected readonly ServerSentEvents $capture) + { + } - $event = $this->parseEvent($eventData); + /** + * @param array|null $event + * + * @return void + */ + public function onEvent(array|null $event): void + { + while ($suspension = array_shift($this->waiters)) { + Coroutine::resume($suspension, $event); + } + } - if (isset($this->onEvent)) { - ($this->onEvent)($event); + /*** @return void */ + public function onComplete(): void + { + while ($suspension = array_shift($this->waiters)) { + Coroutine::resume($suspension); + } } - } - } - /** - * @param string $eventData - * - * @return array - */ - private function parseEvent(string $eventData): array - { - $event = []; - $lines = explode("\n", $eventData); + /** + * @param Throwable $exception + * + * @return void + */ + public function onError(Throwable $exception): void + { + while ($suspension = array_shift($this->waiters)) { + Coroutine::throw($suspension, $exception); + } + } - foreach ($lines as $line) { - if (!str_contains($line, ':')) { - continue; + /** + * @return array|null + */ + public function current(): array|null + { + $this->waiters[] = $suspension = getSuspension(); + return Coroutine::suspend($suspension); } - [$field, $value] = explode(':', $line, 2); - $event[trim($field)] = trim($value); - } - return $event; + /** + * @return mixed + */ + public function key(): mixed + { + return null; + } + + /** + * @return bool + */ + public function valid(): bool + { + return in_array($this->capture->getStatus(), ['pending', 'inject'], true); + } + + /** + * @return void + */ + public function next(): void + { + // nothing happens + } + + /** + * @return void + */ + public function rewind(): void + { + // nothing happens + } + }; } + + /*** @var \Closure|null */ + public Closure|null $onEvent = null; + + /*** @var \Closure|null */ + public Closure|null $onComplete = null; }