Skip to content
Merged
1 change: 1 addition & 0 deletions changelog.d/18944.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Introduce `Clock.call_when_running(...)` to wrap startup code in a logcontext, ensuring we can identify which server generated the logs.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR can be reviewed commit-by-commit. There are lots of changes because I split out JSON and Clock utilities to avoid circular imports.

40 changes: 40 additions & 0 deletions scripts-dev/mypy_synapse_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@
category="per-homeserver-tenant-metrics",
)

PREFER_SYNAPSE_CLOCK_CALL_WHEN_RUNNING = ErrorCode(
"prefer-synapse-clock-call-when-running",
"`synapse.util.Clock.call_when_running` should be used instead of `reactor.callWhenRunning`",
category="synapse-reactor-clock",
)


class Sentinel(enum.Enum):
# defining a sentinel in this way allows mypy to correctly handle the
Expand Down Expand Up @@ -229,9 +235,43 @@ def get_method_signature_hook(
):
return check_is_cacheable_wrapper

if fullname in (
"twisted.internet.interfaces.IReactorCore.callWhenRunning",
"synapse.types.ISynapseThreadlessReactor.callWhenRunning",
"synapse.types.ISynapseReactor.callWhenRunning",
):
return check_call_when_running

return None


def check_call_when_running(ctx: MethodSigContext) -> CallableType:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lint pattern is coming from #18828 where we're also going to lint call_later and looping_call in the future.

"""
Ensure that the `reactor.callWhenRunning` callsites aren't used.

`synapse.util.Clock.call_when_running` should always be used instead of
`reactor.callWhenRunning`.

Since `reactor.callWhenRunning` is a reactor callback, the callback will start out
with the sentinel logcontext. `synapse.util.Clock` starts a default logcontext as we
want to know which server the logs came from.

Args:
ctx: The `FunctionSigContext` from mypy.
"""
signature: CallableType = ctx.default_signature
ctx.api.fail(
(
"Expected all `reactor.callWhenRunning` calls to use `synapse.util.Clock.call_when_running` instead. "
"This is so all Synapse code runs with a logcontext as we want to know which server the logs came from."
),
ctx.context,
code=PREFER_SYNAPSE_CLOCK_CALL_WHEN_RUNNING,
)

return signature


def analyze_prometheus_metric_classes(ctx: ClassDefContext) -> None:
"""
Cross-check the list of Prometheus metric classes against the
Expand Down
2 changes: 1 addition & 1 deletion scripts-dev/sign_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.util import json_encoder
from synapse.util.json import json_encoder


def main() -> None:
Expand Down
64 changes: 24 additions & 40 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@
from synapse.config.database import DatabaseConnectionConfig
from synapse.config.homeserver import HomeServerConfig
from synapse.logging.context import (
LoggingContext,
make_deferred_yieldable,
run_in_background,
)
from synapse.notifier import ReplicationNotifier
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.database import DatabasePool, LoggingTransaction, make_conn
from synapse.storage.databases.main import FilteringWorkerStore
from synapse.storage.databases.main.account_data import AccountDataWorkerStore
Expand Down Expand Up @@ -98,8 +98,7 @@
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.types import ISynapseReactor
from synapse.util import SYNAPSE_VERSION, Clock
from synapse.util.stringutils import random_string
from synapse.util import SYNAPSE_VERSION

# Cast safety: Twisted does some naughty magic which replaces the
# twisted.internet.reactor module with a Reactor instance at runtime.
Expand Down Expand Up @@ -318,31 +317,16 @@ def set_room_is_public(self, room_id: str, is_public: bool) -> NoReturn:
)


class MockHomeserver:
def __init__(self, config: HomeServerConfig):
self.clock = Clock(reactor)
self.config = config
self.hostname = config.server.server_name
self.version_string = SYNAPSE_VERSION
self.instance_id = random_string(5)

def get_clock(self) -> Clock:
return self.clock

def get_reactor(self) -> ISynapseReactor:
return reactor

def get_instance_id(self) -> str:
return self.instance_id

def get_instance_name(self) -> str:
return "master"
class MockHomeserver(HomeServer):
DATASTORE_CLASS = DataStore

def should_send_federation(self) -> bool:
return False

def get_replication_notifier(self) -> ReplicationNotifier:
return ReplicationNotifier()
def __init__(self, config: HomeServerConfig):
super().__init__(
hostname=config.server.server_name,
config=config,
reactor=reactor,
version_string=f"Synapse/{SYNAPSE_VERSION}",
)


class Porter:
Expand All @@ -351,12 +335,12 @@ def __init__(
sqlite_config: Dict[str, Any],
progress: "Progress",
batch_size: int,
hs_config: HomeServerConfig,
hs: HomeServer,
):
self.sqlite_config = sqlite_config
self.progress = progress
self.batch_size = batch_size
self.hs_config = hs_config
self.hs = hs

async def setup_table(self, table: str) -> Tuple[str, int, int, int, int]:
if table in APPEND_ONLY_TABLES:
Expand Down Expand Up @@ -676,8 +660,7 @@ def build_db_store(

engine = create_engine(db_config.config)

hs = MockHomeserver(self.hs_config)
server_name = hs.hostname
server_name = self.hs.hostname

with make_conn(
db_config=db_config,
Expand All @@ -688,16 +671,16 @@ def build_db_store(
engine.check_database(
db_conn, allow_outdated_version=allow_outdated_version
)
prepare_database(db_conn, engine, config=self.hs_config)
prepare_database(db_conn, engine, config=self.hs.config)
# Type safety: ignore that we're using Mock homeservers here.
store = Store(
DatabasePool(
hs, # type: ignore[arg-type]
self.hs,
db_config,
engine,
),
db_conn,
hs, # type: ignore[arg-type]
self.hs,
)
db_conn.commit()

Expand Down Expand Up @@ -795,7 +778,7 @@ async def run(self) -> None:
return

self.postgres_store = self.build_db_store(
self.hs_config.database.get_single_database()
self.hs.config.database.get_single_database()
)

await self.remove_ignored_background_updates_from_database()
Expand Down Expand Up @@ -1584,6 +1567,8 @@ def main() -> None:
config = HomeServerConfig()
config.parse_config_dict(hs_config, "", "")

hs = MockHomeserver(config)

def start(stdscr: Optional["curses.window"] = None) -> None:
progress: Progress
if stdscr:
Expand All @@ -1595,15 +1580,14 @@ def start(stdscr: Optional["curses.window"] = None) -> None:
sqlite_config=sqlite_config,
progress=progress,
batch_size=args.batch_size,
hs_config=config,
hs=hs,
)

@defer.inlineCallbacks
def run() -> Generator["defer.Deferred[Any]", Any, None]:
with LoggingContext("synapse_port_db_run"):
yield defer.ensureDeferred(porter.run())
yield defer.ensureDeferred(porter.run())

reactor.callWhenRunning(run)
hs.get_clock().call_when_running(run)

reactor.run()

Expand Down
2 changes: 1 addition & 1 deletion synapse/_scripts/update_synapse_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def run() -> None:
)
)

reactor.callWhenRunning(run)
hs.get_clock().call_when_running(run)

reactor.run()

Expand Down
2 changes: 1 addition & 1 deletion synapse/api/auth/mas.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
from synapse.metrics import SERVER_NAME_LABEL
from synapse.synapse_rust.http_client import HttpClient
from synapse.types import JsonDict, Requester, UserID, create_requester
from synapse.util import json_decoder
from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
from synapse.util.json import json_decoder

from . import introspection_response_timer

Expand Down
2 changes: 1 addition & 1 deletion synapse/api/auth/msc3861_delegated.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@
from synapse.metrics import SERVER_NAME_LABEL
from synapse.synapse_rust.http_client import HttpClient
from synapse.types import Requester, UserID, create_requester
from synapse.util import json_decoder
from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
from synapse.util.json import json_decoder

from . import introspection_response_timer

Expand Down
2 changes: 1 addition & 1 deletion synapse/api/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from twisted.web import http

from synapse.util import json_decoder
from synapse.util.json import json_decoder

if typing.TYPE_CHECKING:
from synapse.config.homeserver import HomeServerConfig
Expand Down
2 changes: 1 addition & 1 deletion synapse/api/ratelimiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from synapse.config.ratelimiting import RatelimitSettings
from synapse.storage.databases.main import DataStore
from synapse.types import Requester
from synapse.util import Clock
from synapse.util.clock import Clock

if TYPE_CHECKING:
# To avoid circular imports:
Expand Down
5 changes: 3 additions & 2 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def redirect_stdio_to_logs() -> None:


def register_start(
cb: Callable[P, Awaitable], *args: P.args, **kwargs: P.kwargs
hs: "HomeServer", cb: Callable[P, Awaitable], *args: P.args, **kwargs: P.kwargs
) -> None:
"""Register a callback with the reactor, to be called once it is running

Expand Down Expand Up @@ -278,7 +278,8 @@ async def wrapper() -> None:
# on as normal.
os._exit(1)

reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper()))
clock = hs.get_clock()
clock.call_when_running(lambda: defer.ensureDeferred(wrapper()))


def listen_metrics(bind_addresses: StrCollection, port: int) -> None:
Expand Down
6 changes: 2 additions & 4 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,9 @@ def start(config_options: List[str]) -> None:
handle_startup_exception(e)

async def start() -> None:
# Re-establish log context now that we're back from the reactor
with LoggingContext("start"):
await _base.start(hs)
await _base.start(hs)

register_start(start)
register_start(hs, start)

# redirect stdio to the logs, if configured.
if not hs.config.logging.no_redirect_stdio:
Expand Down
18 changes: 8 additions & 10 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,19 +377,17 @@ def setup(config_options: List[str]) -> SynapseHomeServer:
handle_startup_exception(e)

async def start() -> None:
# Re-establish log context now that we're back from the reactor
with LoggingContext("start"):
# Load the OIDC provider metadatas, if OIDC is enabled.
if hs.config.oidc.oidc_enabled:
oidc = hs.get_oidc_handler()
# Loading the provider metadata also ensures the provider config is valid.
await oidc.load_metadata()
# Load the OIDC provider metadatas, if OIDC is enabled.
if hs.config.oidc.oidc_enabled:
oidc = hs.get_oidc_handler()
# Loading the provider metadata also ensures the provider config is valid.
await oidc.load_metadata()

await _base.start(hs)
await _base.start(hs)

hs.get_datastores().main.db_pool.updates.start_doing_background_updates()
hs.get_datastores().main.db_pool.updates.start_doing_background_updates()

register_start(start)
register_start(hs, start)

return hs

Expand Down
2 changes: 1 addition & 1 deletion synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main import DataStore
from synapse.types import DeviceListUpdates, JsonMapping
from synapse.util import Clock
from synapse.util.clock import Clock

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down
2 changes: 1 addition & 1 deletion synapse/events/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from synapse.synapse_rust.events import EventInternalMetadata
from synapse.types import EventID, JsonDict, StrCollection
from synapse.types.state import StateFilter
from synapse.util import Clock
from synapse.util.clock import Clock
from synapse.util.stringutils import random_string

if TYPE_CHECKING:
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@
StrCollection,
get_domain_from_id,
)
from synapse.util import Clock
from synapse.util.clock import Clock
from synapse.util.metrics import Measure
from synapse.util.retryutils import filter_destinations_by_retry_limiter

Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/sender/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
)
from synapse.metrics import SERVER_NAME_LABEL
from synapse.types import JsonDict
from synapse.util import json_decoder
from synapse.util.json import json_decoder
from synapse.util.metrics import measure_func

if TYPE_CHECKING:
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/deactivate_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self, hs: "HomeServer"):
# Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do).
if hs.config.worker.worker_app is None:
hs.get_reactor().callWhenRunning(self._start_user_parting)
hs.get_clock().call_when_running(self._start_user_parting)
else:
self._notify_account_deactivated_client = (
ReplicationNotifyAccountDeactivatedServlet.make_client(hs)
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,7 @@ def __init__(self, hs: "HomeServer"):
# rolling-restarting Synapse.
if self._is_main_device_list_writer:
# On start up check if there are any updates pending.
hs.get_reactor().callWhenRunning(self._handle_new_device_update_async)
hs.get_clock().call_when_running(self._handle_new_device_update_async)
self.device_list_updater = DeviceListUpdater(hs, self)
hs.get_federation_registry().register_edu_handler(
EduTypes.DEVICE_LIST_UPDATE,
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
set_tag,
)
from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
from synapse.util import json_encoder
from synapse.util.json import json_encoder
from synapse.util.stringutils import random_string

if TYPE_CHECKING:
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@
get_domain_from_id,
get_verify_key_from_cross_signing_key,
)
from synapse.util import json_decoder
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.cancellation import cancellable
from synapse.util.json import json_decoder
from synapse.util.retryutils import (
NotRetryingDestination,
filter_destinations_by_retry_limiter,
Expand Down
Loading
Loading