@@ -19,6 +19,9 @@ class Client extends \Bunny\Async\Client
19
19
/** @var LoggerInterface */
20
20
protected $ logger ;
21
21
22
+ /** @var null|callable 重启事件回调 */
23
+ protected $ restartCallback = null ;
24
+
22
25
/**
23
26
* Client constructor.
24
27
* @param array $options = [
@@ -43,6 +46,31 @@ public function __construct(array $options = [], LoggerInterface $logger = null)
43
46
$ this ->eventLoop = Worker::$ globalEvent ;
44
47
}
45
48
49
+ /**
50
+ * 注册重启回调
51
+ * - 回调函数的返回值应为 Promise\PromiseInterface|null
52
+ * - 入参为当前client实例、replyCode、replyText
53
+ *
54
+ * @param callable $callback = function (Client $client, $replyCode, $replyText): Promise\PromiseInterface|null {}
55
+ * @return $this
56
+ */
57
+ public function registerRestartCallback (callable $ callback ): Client
58
+ {
59
+ $ this ->restartCallback = $ callback ;
60
+ return $ this ;
61
+ }
62
+
63
+ /**
64
+ * 移除重启回调
65
+ *
66
+ * @return $this
67
+ */
68
+ public function unregisterRestartCallback (): Client
69
+ {
70
+ $ this ->restartCallback = null ;
71
+ return $ this ;
72
+ }
73
+
46
74
/**
47
75
* Asynchronously sends buffered data over the wire.
48
76
*
@@ -170,7 +198,7 @@ public function connect()
170
198
*
171
199
* @param int $replyCode
172
200
* @param string $replyText
173
- * @return Promise\PromiseInterface
201
+ * @return Promise\PromiseInterface|null
174
202
*/
175
203
public function disconnect ($ replyCode = 0 , $ replyText = "" )
176
204
{
@@ -215,18 +243,28 @@ public function disconnect($replyCode = 0, $replyText = "")
215
243
$ this ->closeStream ();
216
244
$ this ->init ();
217
245
if ($ replyCode !== 0 ) {
218
- if (($ restartInterval = $ this ->options ['restart_interval ' ] ?? 0 ) > 0 ) {
219
- Worker::log ("RabbitMQ client will restart in $ restartInterval seconds. " );
220
- $ this ->eventLoop ->add (
221
- $ restartInterval ,
222
- EventInterface::EV_TIMER_ONCE ,
223
- function () use ($ replyCode , $ replyText , $ restartInterval ) {
224
- Worker::stopAll (0 ,"RabbitMQ client disconnected: [ {$ replyCode }] {$ replyText }" );
225
- }
226
- );
227
- return null ;
228
- } else {
229
- Worker::stopAll (0 ,"RabbitMQ client disconnected: [ {$ replyCode }] {$ replyText }" );
246
+ // 触发重启事件回调
247
+ if ($ this ->restartCallback ) {
248
+ return call_user_func ($ this ->restartCallback , $ this , $ replyCode , $ replyText );
249
+ }
250
+ // 默认重启流程
251
+ else {
252
+ // 延迟重启
253
+ if (($ restartInterval = $ this ->options ['restart_interval ' ] ?? 0 ) > 0 ) {
254
+ Worker::log ("RabbitMQ client will restart in $ restartInterval seconds. " );
255
+ $ this ->eventLoop ->add (
256
+ $ restartInterval ,
257
+ EventInterface::EV_TIMER_ONCE ,
258
+ function () use ($ replyCode , $ replyText , $ restartInterval ) {
259
+ Worker::stopAll (0 ,"RabbitMQ client disconnected: [ {$ replyCode }] {$ replyText }" );
260
+ }
261
+ );
262
+ return null ;
263
+ }
264
+ // 立即重启
265
+ else {
266
+ Worker::stopAll (0 ,"RabbitMQ client disconnected: [ {$ replyCode }] {$ replyText }" );
267
+ }
230
268
}
231
269
}
232
270
return $ this ;
0 commit comments