2525use Drift \CommandBus \Tests \CommandHandler \ChangeYetAnotherThingHandler ;
2626use Drift \CommandBus \Tests \Context ;
2727use Drift \CommandBus \Tests \Middleware \Middleware1 ;
28+ use Symfony \Component \Process \Process ;
2829use function Clue \React \Block \await ;
2930use function Clue \React \Block \awaitAll ;
3031
@@ -105,8 +106,6 @@ public function testInfrastructure()
105106
106107 /**
107108 * Test by reading only 1 command.
108- *
109- * @group async1
110109 */
111110 public function test1Command ()
112111 {
@@ -129,22 +128,21 @@ public function test1Command()
129128 await ($ promise2 , $ this ->getLoop ());
130129 await ($ promise3 , $ this ->getLoop ());
131130
132- $ this ->assertNull ($ this ->getContextValue ('middleware1 ' ));
131+ if ($ this instanceof InMemoryAsyncTest) {
132+ $ this ->assertNull ($ this ->getContextValue ('middleware1 ' ));
133+ }
134+
133135 $ this ->assertFalse (file_exists ('/tmp/a.thing ' ));
134- $ output = $ this ->runCommand ([
135- 'command-bus:consume-commands ' ,
136- '--limit ' => 1 ,
137- ]);
136+ $ output = $ this ->consumeCommands (1 );
138137
139138 $ this ->assertContains ("\033[01;32mConsumed \033[0m ChangeAThing " , $ output );
140139 $ this ->assertNotContains ("\033[01;32mConsumed \033[0m ChangeAnotherThing " , $ output );
141- $ this ->assertTrue ($ this ->getContextValue ('middleware1 ' ));
142- $ this ->assertTrue (file_exists ('/tmp/a.thing ' ));
140+ if ($ this instanceof InMemoryAsyncTest) {
141+ $ this ->assertTrue ($ this ->getContextValue ('middleware1 ' ));
142+ }
143143
144- $ output2 = $ this ->runCommand ([
145- 'command-bus:consume-commands ' ,
146- '--limit ' => 1 ,
147- ]);
144+ $ this ->assertTrue (file_exists ('/tmp/a.thing ' ));
145+ $ output2 = $ this ->consumeCommands (1 );
148146
149147 $ this ->assertNotContains ("\033[01;32mConsumed \033[0m ChangeAThing " , $ output2 );
150148 $ this ->assertContains ("\033[01;36mIgnored \033[0m ChangeBThing " , $ output2 );
@@ -153,8 +151,6 @@ public function test1Command()
153151
154152 /**
155153 * Test by reading 2 commands.
156- *
157- * @group async2
158154 */
159155 public function test2Commands ()
160156 {
@@ -174,27 +170,60 @@ public function test2Commands()
174170
175171 awaitAll ($ promises , $ this ->getLoop ());
176172
177- $ output = $ this ->runCommand ([
178- 'command-bus:consume-commands ' ,
179- '--limit ' => 2 ,
180- ]);
173+ $ output = $ this ->consumeCommands (2 );
181174
182175 $ this ->assertContains ("\033[01;32mConsumed \033[0m ChangeAThing " , $ output );
183176 $ this ->assertContains ("\033[01;32mConsumed \033[0m ChangeAnotherThing " , $ output );
184177 $ this ->assertNotContains ("\033[01;32mConsumed \033[0m ChangeYetAnotherThing " , $ output );
185-
186- $ output = $ this ->runCommand ([
187- 'command-bus:consume-commands ' ,
188- '--limit ' => 1 ,
189- ]);
178+ $ output = $ this ->consumeCommands (1 );
190179
191180 $ this ->assertNotContains ("\033[01;32mConsumed \033[0m ChangeAThing " , $ output );
192181 $ this ->assertNotContains ("\033[01;32mConsumed \033[0m ChangeAnotherThing " , $ output );
193182 $ this ->assertContains ("\033[01;32mConsumed \033[0m ChangeYetAnotherThing " , $ output );
194183 }
195184
185+ /**
186+ * Test async commands
187+ */
188+ public function testAsyncCommands ()
189+ {
190+ $ this ->resetInfrastructure ();
191+
192+ $ process = $ this ->runAsyncCommand ([
193+ 'command-bus:consume-commands '
194+ ]);
195+
196+ usleep (200000 );
197+
198+ $ promise1 = $ this
199+ ->getCommandBus ()
200+ ->execute (new ChangeAThing ('thing ' ));
201+
202+ await ($ promise1 , $ this ->getLoop ());
203+ usleep (200000 );
204+
205+ $ promises = [];
206+ $ promises [] = $ this
207+ ->getCommandBus ()
208+ ->execute (new ChangeAnotherThing ('thing ' ));
209+
210+ $ promises [] = $ this
211+ ->getCommandBus ()
212+ ->execute (new ChangeYetAnotherThing ('thing ' ));
213+
214+ awaitAll ($ promises , $ this ->getLoop ());
215+ usleep (200000 );
216+ $ output = $ process ->getOutput ();
217+ $ this ->assertContains ("\033[01;32mConsumed \033[0m ChangeAThing " , $ output );
218+ $ this ->assertContains ("\033[01;32mConsumed \033[0m ChangeAnotherThing " , $ output );
219+ $ this ->assertContains ("\033[01;32mConsumed \033[0m ChangeYetAnotherThing " , $ output );
220+ $ process ->stop ();
221+ }
222+
196223 /**
197224 * Reset infrastructure.
225+ *
226+ * We wait .1 second to sure that the infrastructure is properly created
198227 */
199228 private function resetInfrastructure ()
200229 {
@@ -228,6 +257,27 @@ private function createInfrastructure(): string
228257 ]);
229258 }
230259
260+ /**
261+ * Consume commands
262+ *
263+ * @param int $limit
264+ *
265+ * @return string
266+ */
267+ protected function consumeCommands (int $ limit ): string
268+ {
269+ $ process = $ this ->runAsyncCommand ([
270+ 'command-bus:consume-commands ' ,
271+ "--limit= $ limit " ,
272+ ]);
273+
274+ while ($ process ->isRunning ()) {
275+ usleep (100000 );
276+ }
277+
278+ return $ process ->getOutput ();
279+ }
280+
231281 /**
232282 * Check infrastructure.
233283 *
0 commit comments