4
4
use Bunny \AbstractClient ;
5
5
use Bunny \ClientStateEnum ;
6
6
use Bunny \Exception \ClientException ;
7
+ use Bunny \Protocol \Buffer ;
7
8
use Bunny \Protocol \HeartbeatFrame ;
8
9
use Bunny \Protocol \MethodConnectionStartFrame ;
9
10
use Bunny \Protocol \MethodConnectionTuneFrame ;
@@ -20,7 +21,17 @@ class Client extends \Bunny\Async\Client
20
21
21
22
/**
22
23
* Client constructor.
23
- * @param array $options see {@link AbstractClient} for available options
24
+ * @param array $options = [
25
+ * "host" => "127.0.0.1",
26
+ * "port" => 5672,
27
+ * "vhost" => "/",
28
+ * "mechanism" => "AMQPLAIN"
29
+ * "user" => "guest",
30
+ * "password" => "guest",
31
+ * "timeout" => 10,
32
+ * "heartbeat" => 60,
33
+ * "heartbeat_callback" => function(){}
34
+ * ] {@see AbstractClient::__construct()} and {@see \Workerman\RabbitMQ\Client::authResponse()}
24
35
* @param LoggerInterface|null $logger
25
36
*/
26
37
public function __construct (array $ options = [], LoggerInterface $ logger = null )
@@ -68,6 +79,36 @@ protected function flushWriteBuffer()
68
79
}
69
80
}
70
81
82
+ /**
83
+ * Override to support PLAIN mechanism
84
+ * @param MethodConnectionStartFrame $start
85
+ * @return bool|Promise\PromiseInterface
86
+ */
87
+ protected function authResponse (MethodConnectionStartFrame $ start )
88
+ {
89
+ if (strpos ($ start ->mechanisms , ($ mechanism = $ this ->options ['mechanism ' ] ?? 'AMQPLAIN ' )) === false ) {
90
+ throw new ClientException ("Server does not support {$ this ->options ['mechanism ' ]} mechanism (supported: {$ start ->mechanisms }). " );
91
+ }
92
+
93
+ if ($ mechanism === 'PLAIN ' ){
94
+ return $ this ->connectionStartOk ([], $ mechanism , sprintf ("\0%s \0%s " , $ this ->options ["user " ], $ this ->options ["password " ]), "en_US " );
95
+ }elseif ($ mechanism === 'AMQPLAIN ' ){
96
+
97
+ $ responseBuffer = new Buffer ();
98
+ $ this ->writer ->appendTable ([
99
+ "LOGIN " => $ this ->options ["user " ],
100
+ "PASSWORD " => $ this ->options ["password " ],
101
+ ], $ responseBuffer );
102
+
103
+ $ responseBuffer ->discard (4 );
104
+
105
+ return $ this ->connectionStartOk ([], $ mechanism , $ responseBuffer ->read ($ responseBuffer ->getLength ()), "en_US " );
106
+ }else {
107
+
108
+ throw new ClientException ("Client does not support {$ mechanism } mechanism. " );
109
+ }
110
+ }
111
+
71
112
/**
72
113
* Connects to AMQP server.
73
114
*
0 commit comments