88use React \EventLoop \LoopInterface ;
99use React \Http \Message \Response ;
1010use React \Http \Message \ServerRequest ;
11+ use React \Http \Middleware \InactiveConnectionTimeoutMiddleware ;
1112use React \Promise ;
1213use React \Promise \CancellablePromiseInterface ;
1314use React \Promise \PromiseInterface ;
@@ -87,6 +88,8 @@ final class StreamingServer extends EventEmitter
8788
8889 /** @var Clock */
8990 private $ clock ;
91+ private $ loop ;
92+ private $ idleConnectionTimeout ;
9093
9194 /**
9295 * Creates an HTTP server that invokes the given callback for each incoming HTTP request
@@ -98,14 +101,18 @@ final class StreamingServer extends EventEmitter
98101 *
99102 * @param LoopInterface $loop
100103 * @param callable $requestHandler
104+ * @param float $idleConnectTimeout
101105 * @see self::listen()
102106 */
103- public function __construct (LoopInterface $ loop , $ requestHandler )
107+ public function __construct (LoopInterface $ loop , $ requestHandler, $ idleConnectTimeout = InactiveConnectionTimeoutMiddleware:: DEFAULT_TIMEOUT )
104108 {
105109 if (!\is_callable ($ requestHandler )) {
106110 throw new \InvalidArgumentException ('Invalid request handler given ' );
107111 }
108112
113+ $ this ->loop = $ loop ;
114+ $ this ->idleConnectionTimeout = $ idleConnectTimeout ;
115+
109116 $ this ->callback = $ requestHandler ;
110117 $ this ->clock = new Clock ($ loop );
111118 $ this ->parser = new RequestHeaderParser ($ this ->clock );
@@ -135,7 +142,27 @@ public function __construct(LoopInterface $loop, $requestHandler)
135142 */
136143 public function listen (ServerInterface $ socket )
137144 {
138- $ socket ->on ('connection ' , array ($ this ->parser , 'handle ' ));
145+ $ socket ->on ('connection ' , array ($ this , 'handle ' ));
146+ }
147+
148+ /** @internal */
149+ public function handle (ConnectionInterface $ conn )
150+ {
151+ $ timer = $ this ->loop ->addTimer ($ this ->idleConnectionTimeout , function () use ($ conn ) {
152+ $ conn ->close ();
153+ });
154+ $ loop = $ this ->loop ;
155+ $ conn ->once ('data ' , function () use ($ loop , $ timer ) {
156+ $ loop ->cancelTimer ($ timer );
157+ });
158+ $ conn ->on ('end ' , function () use ($ loop , $ timer ) {
159+ $ loop ->cancelTimer ($ timer );
160+ });
161+ $ conn ->on ('close ' , function () use ($ loop , $ timer ) {
162+ $ loop ->cancelTimer ($ timer );
163+ });
164+
165+ $ this ->parser ->handle ($ conn );
139166 }
140167
141168 /** @internal */
@@ -353,7 +380,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
353380
354381 // either wait for next request over persistent connection or end connection
355382 if ($ persist ) {
356- $ this ->parser -> handle ($ connection );
383+ $ this ->handle ($ connection );
357384 } else {
358385 $ connection ->end ();
359386 }
@@ -374,10 +401,10 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
374401 // write streaming body and then wait for next request over persistent connection
375402 if ($ persist ) {
376403 $ body ->pipe ($ connection , array ('end ' => false ));
377- $ parser = $ this -> parser ;
378- $ body ->on ('end ' , function () use ($ connection , $ parser , $ body ) {
404+ $ that = $ this ;
405+ $ body ->on ('end ' , function () use ($ connection , $ that , $ body ) {
379406 $ connection ->removeListener ('close ' , array ($ body , 'close ' ));
380- $ parser ->handle ($ connection );
407+ $ that ->handle ($ connection );
381408 });
382409 } else {
383410 $ body ->pipe ($ connection );
0 commit comments