Skip to content

Commit b16f15c

Browse files
authored
Merge pull request #1 from driftphp/feature/using-console-bridge
Using console bridge for messages
2 parents 4ec2243 + d831819 commit b16f15c

File tree

9 files changed

+229
-73
lines changed

9 files changed

+229
-73
lines changed

Async/AMQPAdapter.php

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
use Bunny\Message;
2020
use Drift\Bus\Bus\CommandBus;
2121
use Drift\Bus\Exception\InvalidCommandException;
22+
use Drift\Console\OutputPrinter;
2223
use React\EventLoop\LoopInterface;
2324
use React\Promise\PromiseInterface;
24-
use Symfony\Component\Console\Output\OutputInterface;
2525

2626
/**
2727
* Class AMQPAdapter.
@@ -87,34 +87,34 @@ public function enqueue($command): PromiseInterface
8787
/**
8888
* Consume.
8989
*
90-
* @param CommandBus $bus
91-
* @param int $limit
92-
* @param OutputInterface $output
90+
* @param CommandBus $bus
91+
* @param int $limit
92+
* @param OutputPrinter $outputPrinter
9393
*
9494
* @throws InvalidCommandException
9595
*/
9696
public function consume(
9797
CommandBus $bus,
9898
int $limit,
99-
OutputInterface $output
99+
OutputPrinter $outputPrinter
100100
) {
101101
$this->resetIterations($limit);
102102

103103
$this
104104
->prepare()
105-
->then(function () use ($bus, $output) {
105+
->then(function () use ($bus, $outputPrinter) {
106106
return $this
107107
->channel
108108
->qos(0, 1, true)
109-
->then(function () use ($bus, $output) {
109+
->then(function () use ($bus, $outputPrinter) {
110110
return $this
111111
->channel
112-
->consume(function (Message $message, Channel $channel) use ($bus, $output) {
112+
->consume(function (Message $message, Channel $channel) use ($bus, $outputPrinter) {
113113
return $this
114114
->executeCommand(
115115
$bus,
116116
unserialize($message->content),
117-
$output,
117+
$outputPrinter,
118118
function () use ($message, $channel) {
119119
return $channel->ack($message);
120120
},

Async/AsyncAdapter.php

Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616
namespace Drift\Bus\Async;
1717

1818
use Drift\Bus\Bus\CommandBus;
19+
use Drift\Bus\Console\ConsumerLineMessage;
1920
use Drift\Bus\Exception\InvalidCommandException;
2021
use Drift\Bus\Exception\MissingHandlerException;
22+
use Drift\Console\OutputPrinter;
23+
use Drift\Console\TimeFormatter;
2124
use React\Promise\FulfilledPromise;
2225
use React\Promise\PromiseInterface;
23-
use Symfony\Component\Console\Output\OutputInterface;
2426

2527
/**
2628
* Interface AsyncAdapter.
@@ -64,43 +66,51 @@ abstract public function enqueue($command): PromiseInterface;
6466
/**
6567
* Consume.
6668
*
67-
* @param CommandBus $bus
68-
* @param int $limit
69-
* @param OutputInterface $output
69+
* @param CommandBus $bus
70+
* @param int $limit
71+
* @param OutputPrinter $outputPrinter
7072
*
7173
* @throws InvalidCommandException
7274
*/
7375
abstract public function consume(
7476
CommandBus $bus,
7577
int $limit,
76-
OutputInterface $output
78+
OutputPrinter $outputPrinter
7779
);
7880

7981
/**
8082
* Execute command.
8183
*
82-
* @param CommandBus $bus
83-
* @param object $command
84-
* @param OutputInterface $output
85-
* @param callable $ok
86-
* @param callable $ko
87-
* @param callable $finish
84+
* @param CommandBus $bus
85+
* @param object $command
86+
* @param OutputPrinter $outputPrinter
87+
* @param callable $ok
88+
* @param callable $ko
89+
* @param callable $finish
8890
*
8991
* @return PromiseInterface
9092
*/
9193
protected function executeCommand(
9294
CommandBus $bus,
9395
$command,
94-
OutputInterface $output,
96+
OutputPrinter $outputPrinter,
9597

9698
callable $ok,
9799
callable $ko,
98100
callable $finish
99101
): PromiseInterface {
102+
$from = microtime(true);
103+
100104
return $bus
101105
->execute($command)
102-
->then(function () use ($output, $command, $ok, $finish) {
103-
$this->printCommandMessage($command, $output, 'consumed');
106+
->then(function () use ($from, $outputPrinter, $command, $ok, $finish) {
107+
$to = microtime(true);
108+
109+
(new ConsumerLineMessage(
110+
$command,
111+
TimeFormatter::formatTime($to - $from),
112+
ConsumerLineMessage::CONSUMED
113+
))->print($outputPrinter);
104114

105115
return (new FulfilledPromise())
106116
->then(function () use ($ok) {
@@ -119,11 +129,17 @@ protected function executeCommand(
119129

120130
return false;
121131
});
122-
}, function (\Exception $exception) use ($output, $command, $ok, $ko) {
132+
}, function (\Exception $exception) use ($from, $outputPrinter, $command, $ok, $ko) {
133+
$to = microtime(true);
123134
$ignorable = $exception instanceof MissingHandlerException;
124-
$ignorable
125-
? $this->printCommandMessage($command, $output, 'ignored')
126-
: $this->printCommandMessage($command, $output, 'failed');
135+
136+
(new ConsumerLineMessage(
137+
$command,
138+
TimeFormatter::formatTime($to - $from),
139+
$ignorable
140+
? ConsumerLineMessage::IGNORED
141+
: ConsumerLineMessage::REJECTED
142+
))->print($outputPrinter);
127143

128144
return (
129145
$ignorable
@@ -171,22 +187,4 @@ public function canConsumeAnotherOne(): bool
171187

172188
return true;
173189
}
174-
175-
/**
176-
* Print command consumed.
177-
*
178-
* @param object $command
179-
* @param OutputInterface $output
180-
* @param string $status
181-
*/
182-
private function printCommandMessage(
183-
$command,
184-
OutputInterface $output,
185-
string $status
186-
) {
187-
$commandNamespace = get_class($command);
188-
$commandParts = explode('\\', $commandNamespace);
189-
$commandClass = end($commandParts);
190-
$output->writeln(sprintf('Command <%s> %s', $commandClass, $status));
191-
}
192190
}

Async/InMemoryAdapter.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717

1818
use Drift\Bus\Bus\CommandBus;
1919
use Drift\Bus\Exception\InvalidCommandException;
20+
use Drift\Console\OutputPrinter;
2021
use function Clue\React\Block\await;
2122
use React\EventLoop\LoopInterface;
2223
use React\Promise\FulfilledPromise;
2324
use React\Promise\PromiseInterface;
24-
use Symfony\Component\Console\Output\OutputInterface;
2525

2626
/**
2727
* Class DummyAdapter.
@@ -69,16 +69,16 @@ public function enqueue($command): PromiseInterface
6969
/**
7070
* Consume.
7171
*
72-
* @param CommandBus $bus
73-
* @param int $limit
74-
* @param OutputInterface $output
72+
* @param CommandBus $bus
73+
* @param int $limit
74+
* @param OutputPrinter $outputPrinter
7575
*
7676
* @throws InvalidCommandException
7777
*/
7878
public function consume(
7979
CommandBus $bus,
8080
int $limit,
81-
OutputInterface $output
81+
OutputPrinter $outputPrinter
8282
) {
8383
$this->resetIterations($limit);
8484

@@ -87,7 +87,7 @@ public function consume(
8787
->executeCommand(
8888
$bus,
8989
$command,
90-
$output,
90+
$outputPrinter,
9191
function () use ($key) {
9292
unset($this->queue[$key]);
9393
},

Async/RedisAdapter.php

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
use Clue\React\Redis\Client;
1919
use Drift\Bus\Bus\CommandBus;
2020
use Drift\Bus\Exception\InvalidCommandException;
21+
use Drift\Console\OutputPrinter;
2122
use function Clue\React\Block\await;
2223
use React\EventLoop\LoopInterface;
2324
use React\Promise\PromiseInterface;
24-
use Symfony\Component\Console\Output\OutputInterface;
2525

2626
/**
2727
* Class RedisAdapter.
@@ -73,35 +73,36 @@ public function enqueue($command): PromiseInterface
7373
/**
7474
* Consume.
7575
*
76-
* @param CommandBus $bus
77-
* @param int $limit
78-
* @param OutputInterface $output
76+
* @param CommandBus $bus
77+
* @param int $limit
78+
* @param OutputPrinter $outputPrinter
7979
*
8080
* @throws InvalidCommandException
8181
*/
8282
public function consume(
8383
CommandBus $bus,
8484
int $limit,
85-
OutputInterface $output
85+
OutputPrinter $outputPrinter
8686
) {
8787
$this->resetIterations($limit);
8888

8989
while (true) {
9090
$promise = $this
9191
->redis
9292
->blPop($this->key, 0)
93-
->then(function (array $job) use ($bus, $output) {
93+
->then(function (array $job) use ($bus, $outputPrinter) {
9494
return $this->executeCommand(
9595
$bus,
9696
unserialize($job[1]),
97-
$output,
97+
$outputPrinter,
9898
function () {},
9999
function () {},
100100
function () {}
101101
);
102102
});
103103

104104
$wasLastOne = await($promise, $this->loop);
105+
105106
if ($wasLastOne) {
106107
return;
107108
}

Console/CommandConsumer.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use Drift\Bus\Async\AsyncAdapter;
1919
use Drift\Bus\Bus\CommandBus;
20+
use Drift\Console\OutputPrinter;
2021
use React\EventLoop\LoopInterface;
2122
use Symfony\Component\Console\Command\Command;
2223
use Symfony\Component\Console\Input\InputInterface;
@@ -88,12 +89,16 @@ protected function configure()
8889
*/
8990
protected function execute(InputInterface $input, OutputInterface $output)
9091
{
92+
$outputPrinter = new OutputPrinter($output);
93+
(new ConsumerHeaderMessage('', 'Consumer built'))->print($outputPrinter);
94+
(new ConsumerHeaderMessage('', 'Started listening...'))->print($outputPrinter);
95+
9196
$this
9297
->asyncAdapter
9398
->consume(
9499
$this->commandBus,
95100
\intval($input->getOption('limit')),
96-
$output
101+
$outputPrinter
97102
);
98103

99104
return 0;

Console/ConsumerHeaderMessage.php

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the DriftPHP Project
5+
*
6+
* For the full copyright and license information, please view the LICENSE
7+
* file that was distributed with this source code.
8+
*
9+
* Feel free to edit as you please, and have fun.
10+
*
11+
* @author Marc Morera <[email protected]>
12+
*/
13+
14+
declare(strict_types=1);
15+
16+
namespace Drift\Bus\Console;
17+
18+
use Drift\Console\OutputPrinter;
19+
20+
/**
21+
* Class ConsumerHeaderMessage.
22+
*/
23+
final class ConsumerHeaderMessage
24+
{
25+
private $elapsedTime;
26+
private $message;
27+
28+
/**
29+
* ConsumerMessage constructor.
30+
*
31+
* @param string $elapsedTime
32+
* @param string $message
33+
*/
34+
public function __construct(
35+
string $elapsedTime,
36+
string $message
37+
) {
38+
$this->elapsedTime = $elapsedTime;
39+
$this->message = $message;
40+
}
41+
42+
/**
43+
* Print.
44+
*
45+
* @param OutputPrinter $outputPrinter
46+
*/
47+
public function print(OutputPrinter $outputPrinter)
48+
{
49+
$color = '32';
50+
51+
$outputPrinter->print("\033[01;{$color}mCONSM\033[0m ");
52+
$outputPrinter->print("(\e[00;37m".$this->elapsedTime.' | '.((int) (memory_get_usage() / 1000000))." MB\e[0m)");
53+
$outputPrinter->print(" {$this->message}");
54+
$outputPrinter->printLine();
55+
}
56+
}

0 commit comments

Comments
 (0)