35
35
namespace Ripple \Driver \Workerman ;
36
36
37
37
use Closure ;
38
- use Co \System ;
39
- use Ripple \Stream ;
40
38
use Ripple \Kernel ;
39
+ use Ripple \Process ;
40
+ use Ripple \Stream ;
41
41
use Throwable ;
42
42
use Workerman \Events \EventInterface ;
43
43
use Workerman \Worker ;
@@ -73,7 +73,10 @@ class Driver4 implements EventInterface
73
73
protected array $ _timer = [];
74
74
75
75
/*** @var array */
76
- protected array $ _fd2ids = [];
76
+ protected array $ _fd2RIDs = [];
77
+
78
+ /*** @var array */
79
+ protected array $ _fd2WIDs = [];
77
80
78
81
/*** @var array */
79
82
protected array $ _signal2ids = [];
@@ -113,12 +116,10 @@ public function add($fd, $flag, $func, $args = []): bool|int
113
116
}
114
117
}
115
118
116
- // 未找到回调
117
119
if (!isset ($ closure )) {
118
120
return false ;
119
121
}
120
122
121
- // 注册信号处理器
122
123
$ id = onSignal ($ fd , $ closure );
123
124
$ this ->_signal2ids [$ fd ] = string2int ($ id );
124
125
return string2int ($ id );
@@ -127,47 +128,41 @@ public function add($fd, $flag, $func, $args = []): bool|int
127
128
}
128
129
129
130
case EventInterface::EV_TIMER :
130
- // 定时器
131
131
$ this ->_timer [] = $ timerId = repeat (function () use ($ func , $ args ) {
132
132
try {
133
133
call_user_func_array ($ func , $ args );
134
134
} catch (Throwable $ e ) {
135
135
Worker::stopAll (250 , $ e );
136
136
}
137
137
}, $ fd );
138
-
139
138
return string2int ($ timerId );
140
139
141
140
case EventInterface::EV_TIMER_ONCE :
142
- // 一次性定时器
143
141
$ this ->_timer [] = $ timerId = delay (function () use ($ func , $ args ) {
144
142
try {
145
143
call_user_func_array ($ func , $ args );
146
144
} catch (Throwable $ e ) {
147
145
Worker::stopAll (250 , $ e );
148
146
}
149
147
}, $ fd );
150
-
151
148
return string2int ($ timerId );
152
149
153
150
case EventInterface::EV_READ :
154
- // 读事件
155
151
$ stream = new Stream ($ fd );
156
152
$ eventId = $ stream ->onReadable (function (Stream $ stream ) use ($ func ) {
157
153
$ func ($ stream ->stream );
158
154
});
159
155
160
- $ this ->_fd2ids [$ stream ->id ][] = string2int ($ eventId );
156
+ $ this ->_fd2RIDs [$ stream ->id ][] = string2int ($ eventId );
161
157
return string2int ($ eventId );
162
158
163
159
case EventInterface::EV_WRITE :
164
- // 写事件
165
160
$ stream = new Stream ($ fd );
166
- $ eventId = $ stream ->onWritable (function (Stream $ stream ) use ($ func ) {
161
+ $ eventId = $ stream ->onWriteable (function (Stream $ stream ) use ($ func ) {
167
162
$ func ($ stream ->stream );
168
163
});
169
164
170
- $ this ->_fd2ids [$ stream ->id ][] = string2int ($ eventId );
165
+ $ this ->_fd2WIDs [$ stream ->id ][] = string2int ($ eventId );
171
166
return string2int ($ eventId );
172
167
}
173
168
return false ;
@@ -192,19 +187,26 @@ public function del($fd, $flag): void
192
187
}
193
188
194
189
if ($ flag === EventInterface::EV_READ || $ flag === EventInterface::EV_WRITE ) {
195
- // 取消读写事件监听
190
+ if (!$ fd ) {
191
+ return ;
192
+ }
193
+
196
194
$ streamId = get_resource_id ($ fd );
197
- if (isset ($ this ->_fd2ids [$ streamId ])) {
198
- foreach ($ this ->_fd2ids [$ streamId ] as $ id ) {
199
- $ this ->cancel ($ id );
195
+ if ($ flag === EventInterface::EV_READ ) {
196
+ foreach ($ this ->_fd2RIDs [$ streamId ] ?? [] as $ eventId ) {
197
+ cancel (int2string ($ eventId ));
198
+ }
199
+ unset($ this ->_fd2RIDs [$ streamId ]);
200
+ } else {
201
+ foreach ($ this ->_fd2WIDs [$ streamId ] ?? [] as $ eventId ) {
202
+ cancel (int2string ($ eventId ));
200
203
}
201
- unset($ this ->_fd2ids [$ streamId ]);
204
+ unset($ this ->_fd2WIDs [$ streamId ]);
202
205
}
203
206
return ;
204
207
}
205
208
206
209
if ($ flag === EventInterface::EV_SIGNAL ) {
207
- // 取消信号监听
208
210
$ signalId = $ this ->_signal2ids [$ fd ] ?? null ;
209
211
if ($ signalId ) {
210
212
$ this ->cancel ($ signalId );
@@ -247,8 +249,9 @@ public function loop(): void
247
249
Driver4::$ baseProcessId = (Kernel::getInstance ()->supportProcessControl () ? getmypid () : posix_getpid ());
248
250
} elseif (Driver4::$ baseProcessId !== (Kernel::getInstance ()->supportProcessControl () ? getmypid () : posix_getpid ())) {
249
251
Driver4::$ baseProcessId = (Kernel::getInstance ()->supportProcessControl () ? getmypid () : posix_getpid ());
250
- cancelAll ();
251
- System::Process ()->forkedTick ();
252
+ Process::getInstance ()->processedInMain (static function () {
253
+ Process::getInstance ()->forgetEvents ();
254
+ });
252
255
}
253
256
wait ();
254
257
0 commit comments