@@ -22,6 +22,15 @@ class Client extends \Bunny\Async\Client
22
22
/** @var null|callable 重启事件回调 */
23
23
protected $ restartCallback = null ;
24
24
25
+ /**
26
+ * Version 5.x uses a new event interface
27
+ * @return bool
28
+ */
29
+ public static function isNewEventInterface (): bool
30
+ {
31
+ return version_compare (Worker::VERSION , '5.0.0 ' , '>= ' );
32
+ }
33
+
25
34
/**
26
35
* Client constructor.
27
36
* @param array $options = [
@@ -87,23 +96,44 @@ protected function flushWriteBuffer()
87
96
} else {
88
97
$ deferred = new Promise \Deferred ();
89
98
90
- $ this -> eventLoop -> add ( $ this -> getStream (), EventInterface:: EV_WRITE , function ($ stream ) use ($ deferred ) {
99
+ $ streamFunction = function ($ stream ) use ($ deferred ) {
91
100
try {
92
101
$ this ->write ();
93
102
94
103
if ($ this ->writeBuffer ->isEmpty ()) {
95
- $ this ->eventLoop ->del ($ stream , EventInterface::EV_WRITE );
104
+ // support workerman 5.x
105
+ if (method_exists ($ this ->eventLoop , 'offWritable ' )) {
106
+ $ this ->eventLoop ->offWritable ($ stream );
107
+ }
108
+ // ver earlier than 5.x
109
+ else {
110
+ $ this ->eventLoop ->del ($ stream , EventInterface::EV_WRITE );
111
+ }
96
112
$ this ->flushWriteBufferPromise = null ;
97
113
$ deferred ->resolve (true );
98
114
}
99
115
100
116
} catch (\Exception $ e ) {
101
- $ this ->eventLoop ->del ($ stream , EventInterface::EV_WRITE );
117
+ // support workerman 5.x
118
+ if (method_exists ($ this ->eventLoop , 'offWritable ' )) {
119
+ $ this ->eventLoop ->offWritable ($ stream );
120
+ }
121
+ // ver earlier than 5.x
122
+ else {
123
+ $ this ->eventLoop ->del ($ stream , EventInterface::EV_WRITE );
124
+ }
102
125
$ this ->flushWriteBufferPromise = null ;
103
126
$ deferred ->reject ($ e );
104
127
}
105
- });
106
-
128
+ };
129
+ // support workerman 5.x
130
+ if (method_exists ($ this ->eventLoop , 'onWritable ' )) {
131
+ $ this ->eventLoop ->onWritable ($ this ->getStream (), $ streamFunction );
132
+ }
133
+ // ver earlier than 5.x
134
+ else {
135
+ $ this ->eventLoop ->add ($ this ->getStream (), EventInterface::EV_WRITE , $ streamFunction );
136
+ }
107
137
return $ this ->flushWriteBufferPromise = $ deferred ->promise ();
108
138
}
109
139
}
@@ -155,7 +185,14 @@ public function connect()
155
185
$ this ->writer ->appendProtocolHeader ($ this ->writeBuffer );
156
186
157
187
try {
158
- $ this ->eventLoop ->add ($ this ->getStream (), EventInterface::EV_READ , [$ this , "onDataAvailable " ]);
188
+ // support workerman 5.x
189
+ if (method_exists ($ this ->eventLoop , 'onReadable ' )) {
190
+ $ this ->eventLoop ->onReadable ($ this ->getStream (), [$ this , "onDataAvailable " ]);
191
+ }
192
+ // ver earlier than 5.x
193
+ else {
194
+ $ this ->eventLoop ->add ($ this ->getStream (), EventInterface::EV_READ , [$ this , "onDataAvailable " ]);
195
+ }
159
196
} catch (\Exception $ e ) {
160
197
return Promise \reject ($ e );
161
198
}
@@ -239,7 +276,14 @@ public function disconnect($replyCode = 0, $replyText = "")
239
276
}
240
277
return $ this ->connectionClose ($ replyCode , $ replyText , 0 , 0 );
241
278
})->then (function () use ($ replyCode , $ replyText ){
242
- $ this ->eventLoop ->del ($ this ->getStream (), EventInterface::EV_READ );
279
+ // support workerman 5.x
280
+ if (method_exists ($ this ->eventLoop , 'offReadable ' )) {
281
+ $ this ->eventLoop ->offReadable ($ this ->getStream ());
282
+ }
283
+ // ver earlier than 5.x
284
+ else {
285
+ $ this ->eventLoop ->del ($ this ->getStream (), EventInterface::EV_READ );
286
+ }
243
287
$ this ->closeStream ();
244
288
$ this ->init ();
245
289
if ($ replyCode !== 0 ) {
@@ -252,13 +296,18 @@ public function disconnect($replyCode = 0, $replyText = "")
252
296
// 延迟重启
253
297
if (($ restartInterval = $ this ->options ['restart_interval ' ] ?? 0 ) > 0 ) {
254
298
Worker::log ("RabbitMQ client will restart in $ restartInterval seconds. " );
255
- $ this ->eventLoop ->add (
256
- $ restartInterval ,
257
- EventInterface::EV_TIMER_ONCE ,
258
- function () use ($ replyCode , $ replyText , $ restartInterval ) {
259
- Worker::stopAll (0 ,"RabbitMQ client disconnected: [ {$ replyCode }] {$ replyText }" );
260
- }
261
- );
299
+
300
+ $ timerFunction = function () use ($ replyCode , $ replyText , $ restartInterval ) {
301
+ Worker::stopAll (0 ,"RabbitMQ client disconnected: [ {$ replyCode }] {$ replyText }" );
302
+ };
303
+ // support workerman 5.x
304
+ if (method_exists ($ this ->eventLoop , 'delay ' )) {
305
+ $ this ->eventLoop ->delay ($ restartInterval , $ timerFunction );
306
+ }
307
+ // ver earlier than 5.x
308
+ else {
309
+ $ this ->eventLoop ->add ($ restartInterval , EventInterface::EV_TIMER_ONCE , $ timerFunction );
310
+ }
262
311
return null ;
263
312
}
264
313
// 立即重启
0 commit comments