Skip to content

Commit 83513b7

Browse files
authored
Speed up sliding sync by computing extensions in parallel (#17884)
The main change here is to add a helper function `gather_optional_coroutines`, which works in a similar way as `yieldable_gather_results` but takes a set of coroutines rather than a function
1 parent 58deef5 commit 83513b7

File tree

5 files changed

+285
-13
lines changed

5 files changed

+285
-13
lines changed

changelog.d/17884.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Minor speed-up of sliding sync by computing extensions results in parallel.

synapse/handlers/sliding_sync/extensions.py

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@
4949
SlidingSyncConfig,
5050
SlidingSyncResult,
5151
)
52-
from synapse.util.async_helpers import concurrently_execute
52+
from synapse.util.async_helpers import (
53+
concurrently_execute,
54+
gather_optional_coroutines,
55+
)
5356

5457
if TYPE_CHECKING:
5558
from synapse.server import HomeServer
@@ -97,26 +100,26 @@ async def get_extensions_response(
97100
if sync_config.extensions is None:
98101
return SlidingSyncResult.Extensions()
99102

100-
to_device_response = None
103+
to_device_coro = None
101104
if sync_config.extensions.to_device is not None:
102-
to_device_response = await self.get_to_device_extension_response(
105+
to_device_coro = self.get_to_device_extension_response(
103106
sync_config=sync_config,
104107
to_device_request=sync_config.extensions.to_device,
105108
to_token=to_token,
106109
)
107110

108-
e2ee_response = None
111+
e2ee_coro = None
109112
if sync_config.extensions.e2ee is not None:
110-
e2ee_response = await self.get_e2ee_extension_response(
113+
e2ee_coro = self.get_e2ee_extension_response(
111114
sync_config=sync_config,
112115
e2ee_request=sync_config.extensions.e2ee,
113116
to_token=to_token,
114117
from_token=from_token,
115118
)
116119

117-
account_data_response = None
120+
account_data_coro = None
118121
if sync_config.extensions.account_data is not None:
119-
account_data_response = await self.get_account_data_extension_response(
122+
account_data_coro = self.get_account_data_extension_response(
120123
sync_config=sync_config,
121124
previous_connection_state=previous_connection_state,
122125
new_connection_state=new_connection_state,
@@ -127,9 +130,9 @@ async def get_extensions_response(
127130
from_token=from_token,
128131
)
129132

130-
receipts_response = None
133+
receipts_coro = None
131134
if sync_config.extensions.receipts is not None:
132-
receipts_response = await self.get_receipts_extension_response(
135+
receipts_coro = self.get_receipts_extension_response(
133136
sync_config=sync_config,
134137
previous_connection_state=previous_connection_state,
135138
new_connection_state=new_connection_state,
@@ -141,9 +144,9 @@ async def get_extensions_response(
141144
from_token=from_token,
142145
)
143146

144-
typing_response = None
147+
typing_coro = None
145148
if sync_config.extensions.typing is not None:
146-
typing_response = await self.get_typing_extension_response(
149+
typing_coro = self.get_typing_extension_response(
147150
sync_config=sync_config,
148151
actual_lists=actual_lists,
149152
actual_room_ids=actual_room_ids,
@@ -153,6 +156,20 @@ async def get_extensions_response(
153156
from_token=from_token,
154157
)
155158

159+
(
160+
to_device_response,
161+
e2ee_response,
162+
account_data_response,
163+
receipts_response,
164+
typing_response,
165+
) = await gather_optional_coroutines(
166+
to_device_coro,
167+
e2ee_coro,
168+
account_data_coro,
169+
receipts_coro,
170+
typing_coro,
171+
)
172+
156173
return SlidingSyncResult.Extensions(
157174
to_device=to_device_response,
158175
e2ee=e2ee_response,

synapse/logging/context.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from types import TracebackType
3838
from typing import (
3939
TYPE_CHECKING,
40+
Any,
4041
Awaitable,
4142
Callable,
4243
Optional,
@@ -850,6 +851,45 @@ def run_in_background(
850851
return d
851852

852853

854+
def run_coroutine_in_background(
855+
coroutine: typing.Coroutine[Any, Any, R],
856+
) -> "defer.Deferred[R]":
857+
"""Run the coroutine, ensuring that the current context is restored after
858+
return from the function, and that the sentinel context is set once the
859+
deferred returned by the function completes.
860+
861+
Useful for wrapping coroutines that you don't yield or await on (for
862+
instance because you want to pass it to deferred.gatherResults()).
863+
864+
This is a special case of `run_in_background` where we can accept a
865+
coroutine directly rather than a function. We can do this because coroutines
866+
do not run until called, and so calling an async function without awaiting
867+
cannot change the log contexts.
868+
"""
869+
870+
current = current_context()
871+
d = defer.ensureDeferred(coroutine)
872+
873+
# The function may have reset the context before returning, so
874+
# we need to restore it now.
875+
ctx = set_current_context(current)
876+
877+
# The original context will be restored when the deferred
878+
# completes, but there is nothing waiting for it, so it will
879+
# get leaked into the reactor or some other function which
880+
# wasn't expecting it. We therefore need to reset the context
881+
# here.
882+
#
883+
# (If this feels asymmetric, consider it this way: we are
884+
# effectively forking a new thread of execution. We are
885+
# probably currently within a ``with LoggingContext()`` block,
886+
# which is supposed to have a single entry and exit point. But
887+
# by spawning off another deferred, we are effectively
888+
# adding a new exit point.)
889+
d.addBoth(_set_context_cb, ctx)
890+
return d
891+
892+
853893
T = TypeVar("T")
854894

855895

synapse/util/async_helpers.py

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
)
5252

5353
import attr
54-
from typing_extensions import Concatenate, Literal, ParamSpec
54+
from typing_extensions import Concatenate, Literal, ParamSpec, Unpack
5555

5656
from twisted.internet import defer
5757
from twisted.internet.defer import CancelledError
@@ -61,6 +61,7 @@
6161
from synapse.logging.context import (
6262
PreserveLoggingContext,
6363
make_deferred_yieldable,
64+
run_coroutine_in_background,
6465
run_in_background,
6566
)
6667
from synapse.util import Clock
@@ -344,6 +345,7 @@ async def yieldable_gather_results_delaying_cancellation(
344345
T2 = TypeVar("T2")
345346
T3 = TypeVar("T3")
346347
T4 = TypeVar("T4")
348+
T5 = TypeVar("T5")
347349

348350

349351
@overload
@@ -402,6 +404,112 @@ def gather_results( # type: ignore[misc]
402404
return deferred.addCallback(tuple)
403405

404406

407+
@overload
408+
async def gather_optional_coroutines(
409+
*coroutines: Unpack[Tuple[Optional[Coroutine[Any, Any, T1]]]],
410+
) -> Tuple[Optional[T1]]: ...
411+
412+
413+
@overload
414+
async def gather_optional_coroutines(
415+
*coroutines: Unpack[
416+
Tuple[
417+
Optional[Coroutine[Any, Any, T1]],
418+
Optional[Coroutine[Any, Any, T2]],
419+
]
420+
],
421+
) -> Tuple[Optional[T1], Optional[T2]]: ...
422+
423+
424+
@overload
425+
async def gather_optional_coroutines(
426+
*coroutines: Unpack[
427+
Tuple[
428+
Optional[Coroutine[Any, Any, T1]],
429+
Optional[Coroutine[Any, Any, T2]],
430+
Optional[Coroutine[Any, Any, T3]],
431+
]
432+
],
433+
) -> Tuple[Optional[T1], Optional[T2], Optional[T3]]: ...
434+
435+
436+
@overload
437+
async def gather_optional_coroutines(
438+
*coroutines: Unpack[
439+
Tuple[
440+
Optional[Coroutine[Any, Any, T1]],
441+
Optional[Coroutine[Any, Any, T2]],
442+
Optional[Coroutine[Any, Any, T3]],
443+
Optional[Coroutine[Any, Any, T4]],
444+
]
445+
],
446+
) -> Tuple[Optional[T1], Optional[T2], Optional[T3], Optional[T4]]: ...
447+
448+
449+
@overload
450+
async def gather_optional_coroutines(
451+
*coroutines: Unpack[
452+
Tuple[
453+
Optional[Coroutine[Any, Any, T1]],
454+
Optional[Coroutine[Any, Any, T2]],
455+
Optional[Coroutine[Any, Any, T3]],
456+
Optional[Coroutine[Any, Any, T4]],
457+
Optional[Coroutine[Any, Any, T5]],
458+
]
459+
],
460+
) -> Tuple[Optional[T1], Optional[T2], Optional[T3], Optional[T4], Optional[T5]]: ...
461+
462+
463+
async def gather_optional_coroutines(
464+
*coroutines: Unpack[Tuple[Optional[Coroutine[Any, Any, T1]], ...]],
465+
) -> Tuple[Optional[T1], ...]:
466+
"""Helper function that allows waiting on multiple coroutines at once.
467+
468+
The return value is a tuple of the return values of the coroutines in order.
469+
470+
If a `None` is passed instead of a coroutine, it will be ignored and a None
471+
is returned in the tuple.
472+
473+
Note: For typechecking we need to have an explicit overload for each
474+
distinct number of coroutines passed in. If you see type problems, it's
475+
likely because you're using many arguments and you need to add a new
476+
overload above.
477+
"""
478+
479+
try:
480+
results = await make_deferred_yieldable(
481+
defer.gatherResults(
482+
[
483+
run_coroutine_in_background(coroutine)
484+
for coroutine in coroutines
485+
if coroutine is not None
486+
],
487+
consumeErrors=True,
488+
)
489+
)
490+
491+
results_iter = iter(results)
492+
return tuple(
493+
next(results_iter) if coroutine is not None else None
494+
for coroutine in coroutines
495+
)
496+
except defer.FirstError as dfe:
497+
# unwrap the error from defer.gatherResults.
498+
499+
# The raised exception's traceback only includes func() etc if
500+
# the 'await' happens before the exception is thrown - ie if the failure
501+
# happens *asynchronously* - otherwise Twisted throws away the traceback as it
502+
# could be large.
503+
#
504+
# We could maybe reconstruct a fake traceback from Failure.frames. Or maybe
505+
# we could throw Twisted into the fires of Mordor.
506+
507+
# suppress exception chaining, because the FirstError doesn't tell us anything
508+
# very interesting.
509+
assert isinstance(dfe.subFailure.value, BaseException)
510+
raise dfe.subFailure.value from None
511+
512+
405513
@attr.s(slots=True, auto_attribs=True)
406514
class _LinearizerEntry:
407515
# The number of things executing.

0 commit comments

Comments
 (0)