Skip to content

Commit

Permalink
Update PHP language syntax and remove legacy workarounds
Browse files Browse the repository at this point in the history
  • Loading branch information
WyriHaximus committed Feb 24, 2024
1 parent 1f68392 commit d23f22a
Show file tree
Hide file tree
Showing 14 changed files with 51 additions and 111 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ By default, this will call `end()` on the destination stream once the
source stream emits an `end` event. This can be disabled like this:

```php
$source->pipe($dest, array('end' => false));
$source->pipe($dest, ['end' => false]);
```

Note that this only applies to the `end` event.
Expand Down Expand Up @@ -1126,7 +1126,7 @@ $through = new ThroughStream(function ($data) {
});
$through->on('data', $this->expectCallableOnceWith("[2, true]\n"));

$through->write(array(2, true));
$through->write([2, true]);
```

The callback function is allowed to throw an `Exception`. In this case,
Expand Down
10 changes: 5 additions & 5 deletions src/CompositeStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ public function __construct(ReadableStreamInterface $readable, WritableStreamInt
return;
}

Util::forwardEvents($this->readable, $this, array('data', 'end', 'error'));
Util::forwardEvents($this->writable, $this, array('drain', 'error', 'pipe'));
Util::forwardEvents($this->readable, $this, ['data', 'end', 'error']);
Util::forwardEvents($this->writable, $this, ['drain', 'error', 'pipe']);

$this->readable->on('close', array($this, 'close'));
$this->writable->on('close', array($this, 'close'));
$this->readable->on('close', [$this, 'close']);
$this->writable->on('close', [$this, 'close']);
}

public function isReadable()
Expand All @@ -46,7 +46,7 @@ public function resume()
$this->readable->resume();
}

public function pipe(WritableStreamInterface $dest, array $options = array())
public function pipe(WritableStreamInterface $dest, array $options = [])
{
return Util::pipe($this, $dest, $options);
}
Expand Down
55 changes: 12 additions & 43 deletions src/DuplexResourceStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,9 @@ public function __construct($stream, LoopInterface $loop = null, $readChunkSize
// Use unbuffered read operations on the underlying stream resource.
// Reading chunks from the stream may otherwise leave unread bytes in
// PHP's stream buffers which some event loop implementations do not
// trigger events on (edge triggered).
// This does not affect the default event loop implementation (level
// triggered), so we can ignore platforms not supporting this (HHVM).
// Pipe streams (such as STDIN) do not seem to require this and legacy
// PHP versions cause SEGFAULTs on unbuffered pipe streams, so skip this.
if (\function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) {
\stream_set_read_buffer($stream, 0);
}
// trigger events on (edge triggered). This does not affect the default
// event loop implementation (level triggered).
\stream_set_read_buffer($stream, 0);

if ($buffer === null) {
$buffer = new WritableResourceStream($stream, $loop);
Expand All @@ -77,16 +72,14 @@ public function __construct($stream, LoopInterface $loop = null, $readChunkSize
$this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize;
$this->buffer = $buffer;

$that = $this;

$this->buffer->on('error', function ($error) use ($that) {
$that->emit('error', array($error));
$this->buffer->on('error', function ($error) {
$this->emit('error', [$error]);
});

$this->buffer->on('close', array($this, 'close'));
$this->buffer->on('close', [$this, 'close']);

$this->buffer->on('drain', function () use ($that) {
$that->emit('drain');
$this->buffer->on('drain', function () {
$this->emit('drain');
});

$this->resume();
Expand All @@ -113,7 +106,7 @@ public function pause()
public function resume()
{
if (!$this->listening && $this->readable) {
$this->loop->addReadStream($this->stream, array($this, 'handleData'));
$this->loop->addReadStream($this->stream, [$this, 'handleData']);
$this->listening = true;
}
}
Expand Down Expand Up @@ -163,7 +156,7 @@ public function end($data = null)
$this->buffer->end($data);
}

public function pipe(WritableStreamInterface $dest, array $options = array())
public function pipe(WritableStreamInterface $dest, array $options = [])
{
return Util::pipe($this, $dest, $options);
}
Expand All @@ -187,41 +180,17 @@ public function handleData($stream)
\restore_error_handler();

if ($error !== null) {
$this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)));
$this->emit('error', [new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)]);
$this->close();
return;
}

if ($data !== '') {
$this->emit('data', array($data));
$this->emit('data', [$data]);
} elseif (\feof($this->stream)) {
// no data read => we reached the end and close the stream
$this->emit('end');
$this->close();
}
}

/**
* Returns whether this is a pipe resource in a legacy environment
*
* This works around a legacy PHP bug (#61019) that was fixed in PHP 5.4.28+
* and PHP 5.5.12+ and newer.
*
* @param resource $resource
* @return bool
* @link https://github.com/reactphp/child-process/issues/40
*
* @codeCoverageIgnore
*/
private function isLegacyPipe($resource)
{
if (\PHP_VERSION_ID < 50428 || (\PHP_VERSION_ID >= 50500 && \PHP_VERSION_ID < 50512)) {
$meta = \stream_get_meta_data($resource);

if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') {
return true;
}
}
return false;
}
}
43 changes: 7 additions & 36 deletions src/ReadableResourceStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,9 @@ public function __construct($stream, LoopInterface $loop = null, $readChunkSize
// Use unbuffered read operations on the underlying stream resource.
// Reading chunks from the stream may otherwise leave unread bytes in
// PHP's stream buffers which some event loop implementations do not
// trigger events on (edge triggered).
// This does not affect the default event loop implementation (level
// triggered), so we can ignore platforms not supporting this (HHVM).
// Pipe streams (such as STDIN) do not seem to require this and legacy
// PHP versions cause SEGFAULTs on unbuffered pipe streams, so skip this.
if (\function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) {
\stream_set_read_buffer($stream, 0);
}
// trigger events on (edge triggered). This does not affect the default
// event loop implementation (level triggered).
\stream_set_read_buffer($stream, 0);

$this->stream = $stream;
$this->loop = $loop ?: Loop::get();
Expand All @@ -93,12 +88,12 @@ public function pause()
public function resume()
{
if (!$this->listening && !$this->closed) {
$this->loop->addReadStream($this->stream, array($this, 'handleData'));
$this->loop->addReadStream($this->stream, [$this, 'handleData']);
$this->listening = true;
}
}

public function pipe(WritableStreamInterface $dest, array $options = array())
public function pipe(WritableStreamInterface $dest, array $options = [])
{
return Util::pipe($this, $dest, $options);
}
Expand Down Expand Up @@ -139,41 +134,17 @@ public function handleData()
\restore_error_handler();

if ($error !== null) {
$this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)));
$this->emit('error', [new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)]);
$this->close();
return;
}

if ($data !== '') {
$this->emit('data', array($data));
$this->emit('data', [$data]);
} elseif (\feof($this->stream)) {
// no data read => we reached the end and close the stream
$this->emit('end');
$this->close();
}
}

/**
* Returns whether this is a pipe resource in a legacy environment
*
* This works around a legacy PHP bug (#61019) that was fixed in PHP 5.4.28+
* and PHP 5.5.12+ and newer.
*
* @param resource $resource
* @return bool
* @link https://github.com/reactphp/child-process/issues/40
*
* @codeCoverageIgnore
*/
private function isLegacyPipe($resource)
{
if (\PHP_VERSION_ID < 50428 || (\PHP_VERSION_ID >= 50500 && \PHP_VERSION_ID < 50512)) {
$meta = \stream_get_meta_data($resource);

if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') {
return true;
}
}
return false;
}
}
4 changes: 2 additions & 2 deletions src/ReadableStreamInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public function resume();
* source stream emits an `end` event. This can be disabled like this:
*
* ```php
* $source->pipe($dest, array('end' => false));
* $source->pipe($dest, ['end' => false]);
* ```
*
* Note that this only applies to the `end` event.
Expand Down Expand Up @@ -322,7 +322,7 @@ public function resume();
* @param array $options
* @return WritableStreamInterface $dest stream as-is
*/
public function pipe(WritableStreamInterface $dest, array $options = array());
public function pipe(WritableStreamInterface $dest, array $options = []);

/**
* Closes the stream (forcefully).
Expand Down
8 changes: 4 additions & 4 deletions src/ThroughStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
* });
* $through->on('data', $this->expectCallableOnceWith("[2, true]\n"));
*
* $through->write(array(2, true));
* $through->write([2, true]);
* ```
*
* The callback function is allowed to throw an `Exception`. In this case,
Expand Down Expand Up @@ -108,7 +108,7 @@ public function resume()
}
}

public function pipe(WritableStreamInterface $dest, array $options = array())
public function pipe(WritableStreamInterface $dest, array $options = [])
{
return Util::pipe($this, $dest, $options);
}
Expand All @@ -133,14 +133,14 @@ public function write($data)
try {
$data = \call_user_func($this->callback, $data);
} catch (\Exception $e) {
$this->emit('error', array($e));
$this->emit('error', [$e]);
$this->close();

return false;
}
}

$this->emit('data', array($data));
$this->emit('data', [$data]);

// emit drain event on next resume if currently paused (throttled)
if ($this->paused) {
Expand Down
4 changes: 2 additions & 2 deletions src/Util.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ final class Util
* @return WritableStreamInterface $dest stream as-is
* @see ReadableStreamInterface::pipe() for more details
*/
public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = array())
public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = [])
{
// source not readable => NO-OP
if (!$source->isReadable()) {
Expand All @@ -27,7 +27,7 @@ public static function pipe(ReadableStreamInterface $source, WritableStreamInter
return $dest;
}

$dest->emit('pipe', array($source));
$dest->emit('pipe', [$source]);

// forward all source data events as $dest->write()
$source->on('data', $dataer = function ($data) use ($source, $dest) {
Expand Down
4 changes: 2 additions & 2 deletions src/WritableResourceStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public function write($data)
if (!$this->listening && $this->data !== '') {
$this->listening = true;

$this->loop->addWriteStream($this->stream, array($this, 'handleWrite'));
$this->loop->addWriteStream($this->stream, [$this, 'handleWrite']);
}

return !isset($this->data[$this->softLimit - 1]);
Expand Down Expand Up @@ -137,7 +137,7 @@ public function handleWrite()
// Should this turn out to be a permanent error later, it will eventually
// send *nothing* and we can detect this.
if (($sent === 0 || $sent === false) && $error !== null) {
$this->emit('error', array(new \RuntimeException('Unable to write to stream: ' . $error)));
$this->emit('error', [new \RuntimeException('Unable to write to stream: ' . $error)]);
$this->close();

return;
Expand Down
6 changes: 3 additions & 3 deletions tests/CompositeStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public function itShouldReceiveForwardedEvents()
$composite->on('data', $this->expectCallableOnce());
$composite->on('drain', $this->expectCallableOnce());

$readable->emit('data', array('foo'));
$readable->emit('data', ['foo']);
$writable->emit('drain');
}

Expand All @@ -241,7 +241,7 @@ public function itShouldHandlePipingCorrectly()

$input = new ThroughStream();
$input->pipe($composite);
$input->emit('data', array('foo'));
$input->emit('data', ['foo']);
}

/** @test */
Expand All @@ -262,6 +262,6 @@ public function itShouldForwardPipeCallsToReadableStream()
->with('foo');

$composite->pipe($output);
$readable->emit('data', array('foo'));
$readable->emit('data', ['foo']);
}
}
2 changes: 1 addition & 1 deletion tests/DuplexResourceStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ public function testBufferEventsShouldBubbleUp()
$conn->on('error', $this->expectCallableOnce());

$buffer->emit('drain');
$buffer->emit('error', array(new \RuntimeException('Whoops')));
$buffer->emit('error', [new \RuntimeException('Whoops')]);
}

/**
Expand Down
8 changes: 4 additions & 4 deletions tests/Stub/ReadableStreamStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@ public function isReadable()
// trigger data event
public function write($data)
{
$this->emit('data', array($data));
$this->emit('data', [$data]);
}

// trigger error event
public function error($error)
{
$this->emit('error', array($error));
$this->emit('error', [$error]);
}

// trigger end event
public function end()
{
$this->emit('end', array());
$this->emit('end', []);
}

public function pause()
Expand All @@ -52,7 +52,7 @@ public function close()
$this->emit('close');
}

public function pipe(WritableStreamInterface $dest, array $options = array())
public function pipe(WritableStreamInterface $dest, array $options = [])
{
Util::pipe($this, $dest, $options);

Expand Down
4 changes: 2 additions & 2 deletions tests/ThroughStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public function pipingStuffIntoItShouldWork()
$through->on('data', $this->expectCallableOnceWith('foo'));

$readable->pipe($through);
$readable->emit('data', array('foo'));
$readable->emit('data', ['foo']);
}

/** @test */
Expand Down Expand Up @@ -243,7 +243,7 @@ public function writeAfterEndShouldReturnFalse()
public function writeDataWillCloseStreamShouldReturnFalse()
{
$through = new ThroughStream();
$through->on('data', array($through, 'close'));
$through->on('data', [$through, 'close']);

$this->assertFalse($through->write('foo'));
}
Expand Down
Loading

0 comments on commit d23f22a

Please sign in to comment.