18
18
from twisted .internet .interfaces import IDelayedCall
19
19
20
20
from synapse .api .constants import EventTypes
21
- from synapse .api .errors import ShadowBanError
21
+ from synapse .api .errors import ShadowBanError , SynapseError
22
22
from synapse .api .ratelimiting import Ratelimiter
23
23
from synapse .config .workers import MAIN_PROCESS_INSTANCE_NAME
24
24
from synapse .logging .opentracing import set_tag
45
45
)
46
46
from synapse .util .events import generate_fake_event_id
47
47
from synapse .util .metrics import Measure
48
+ from synapse .util .sentinel import Sentinel
48
49
49
50
if TYPE_CHECKING :
50
51
from synapse .server import HomeServer
@@ -146,10 +147,37 @@ async def process() -> None:
146
147
)
147
148
148
149
async def _unsafe_process_new_event (self ) -> None :
150
+ # We purposefully fetch the current max room stream ordering before
151
+ # doing anything else, as it could increment duing processing of state
152
+ # deltas. We want to avoid updating `delayed_events_stream_pos` past
153
+ # the stream ordering of the state deltas we've processed. Otherwise
154
+ # we'll leave gaps in our processing.
155
+ room_max_stream_ordering = self ._store .get_room_max_stream_ordering ()
156
+
157
+ # Check that there are actually any delayed events to process. If not, bail early.
158
+ delayed_events_count = await self ._store .get_count_of_delayed_events ()
159
+ if delayed_events_count == 0 :
160
+ # There are no delayed events to process. Update the
161
+ # `delayed_events_stream_pos` to the latest `events` stream pos and
162
+ # exit early.
163
+ self ._event_pos = room_max_stream_ordering
164
+
165
+ logger .debug (
166
+ "No delayed events to process. Updating `delayed_events_stream_pos` to max stream ordering (%s)" ,
167
+ room_max_stream_ordering ,
168
+ )
169
+
170
+ await self ._store .update_delayed_events_stream_pos (room_max_stream_ordering )
171
+
172
+ event_processing_positions .labels (
173
+ name = "delayed_events" , ** {SERVER_NAME_LABEL : self .server_name }
174
+ ).set (room_max_stream_ordering )
175
+
176
+ return
177
+
149
178
# If self._event_pos is None then means we haven't fetched it from the DB yet
150
179
if self ._event_pos is None :
151
180
self ._event_pos = await self ._store .get_delayed_events_stream_pos ()
152
- room_max_stream_ordering = self ._store .get_room_max_stream_ordering ()
153
181
if self ._event_pos > room_max_stream_ordering :
154
182
# apparently, we've processed more events than exist in the database!
155
183
# this can happen if events are removed with history purge or similar.
@@ -167,7 +195,7 @@ async def _unsafe_process_new_event(self) -> None:
167
195
self ._clock , name = "delayed_events_delta" , server_name = self .server_name
168
196
):
169
197
room_max_stream_ordering = self ._store .get_room_max_stream_ordering ()
170
- if self ._event_pos = = room_max_stream_ordering :
198
+ if self ._event_pos > = room_max_stream_ordering :
171
199
return
172
200
173
201
logger .debug (
@@ -202,23 +230,81 @@ async def _handle_state_deltas(self, deltas: List[StateDelta]) -> None:
202
230
Process current state deltas to cancel other users' pending delayed events
203
231
that target the same state.
204
232
"""
233
+ # Get the senders of each delta's state event (as sender information is
234
+ # not currently stored in the `current_state_deltas` table).
235
+ event_id_and_sender_dict = await self ._store .get_senders_for_event_ids (
236
+ [delta .event_id for delta in deltas if delta .event_id is not None ]
237
+ )
238
+
239
+ # Note: No need to batch as `get_current_state_deltas` will only ever
240
+ # return 100 rows at a time.
205
241
for delta in deltas :
242
+ logger .debug (
243
+ "Handling: %r %r, %s" , delta .event_type , delta .state_key , delta .event_id
244
+ )
245
+
246
+ # `delta.event_id` and `delta.sender` can be `None` in a few valid
247
+ # cases (see the docstring of
248
+ # `get_current_state_delta_membership_changes_for_user` for details).
206
249
if delta .event_id is None :
207
- logger .debug (
208
- "Not handling delta for deleted state: %r %r" ,
250
+ # TODO: Differentiate between this being caused by a state reset
251
+ # which removed a user from a room, or the homeserver
252
+ # purposefully having left the room. We can do so by checking
253
+ # whether there are any local memberships still left in the
254
+ # room. If so, then this is the result of a state reset.
255
+ #
256
+ # If it is a state reset, we should avoid cancelling new,
257
+ # delayed state events due to old state resurfacing. So we
258
+ # should skip and log a warning in this case.
259
+ #
260
+ # If the homeserver has left the room, then we should cancel all
261
+ # delayed state events intended for this room, as there is no
262
+ # need to try and send a delayed event into a room we've left.
263
+ logger .warning (
264
+ "Skipping state delta (%r, %r) without corresponding event ID. "
265
+ "This can happen if the homeserver has left the room (in which "
266
+ "case this can be ignored), or if there has been a state reset "
267
+ "which has caused the sender to be kicked out of the room" ,
209
268
delta .event_type ,
210
269
delta .state_key ,
211
270
)
212
271
continue
213
272
214
- logger . debug (
215
- "Handling: %r %r, %s" , delta .event_type , delta . state_key , delta . event_id
273
+ sender_str = event_id_and_sender_dict . get (
274
+ delta .event_id , Sentinel . UNSET_SENTINEL
216
275
)
276
+ if sender_str is None :
277
+ # An event exists, but the `sender` field was "null" and Synapse
278
+ # incorrectly accepted the event. This is not expected.
279
+ logger .error (
280
+ "Skipping state delta with event ID '%s' as 'sender' was None. "
281
+ "This is unexpected - please report it as a bug!" ,
282
+ delta .event_id ,
283
+ )
284
+ continue
285
+ if sender_str is Sentinel .UNSET_SENTINEL :
286
+ # We have an event ID, but the event was not found in the
287
+ # datastore. This can happen if a room, or its history, is
288
+ # purged. State deltas related to the room are left behind, but
289
+ # the event no longer exists.
290
+ #
291
+ # As we cannot get the sender of this event, we can't calculate
292
+ # whether to cancel delayed events related to this one. So we skip.
293
+ logger .debug (
294
+ "Skipping state delta with event ID '%s' - the room, or its history, may have been purged" ,
295
+ delta .event_id ,
296
+ )
297
+ continue
217
298
218
- event = await self ._store .get_event (delta .event_id , allow_none = True )
219
- if not event :
299
+ try :
300
+ sender = UserID .from_string (sender_str )
301
+ except SynapseError as e :
302
+ logger .error (
303
+ "Skipping state delta with Matrix User ID '%s' that failed to parse: %s" ,
304
+ sender_str ,
305
+ e ,
306
+ )
220
307
continue
221
- sender = UserID .from_string (event .sender )
222
308
223
309
next_send_ts = await self ._store .cancel_delayed_state_events (
224
310
room_id = delta .room_id ,
0 commit comments