Skip to content

Commit 85ca6f1

Browse files
authored
PYTHON-4579 Stop gossiping $clusterTime on SDAM connections (#1925)
1 parent 61fecca commit 85ca6f1

11 files changed

+113
-55
lines changed

pymongo/asynchronous/monitor.py

+4-12
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
import logging
2222
import time
2323
import weakref
24-
from typing import TYPE_CHECKING, Any, Mapping, Optional, cast
24+
from typing import TYPE_CHECKING, Any, Optional
2525

2626
from pymongo import common, periodic_executor
2727
from pymongo._csot import MovingMinimum
28-
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
28+
from pymongo.errors import NetworkTimeout, _OperationCancelled
2929
from pymongo.hello import Hello
3030
from pymongo.lock import _async_create_lock
3131
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
@@ -255,13 +255,7 @@ async def _check_server(self) -> ServerDescription:
255255
self._conn_id = None
256256
start = time.monotonic()
257257
try:
258-
try:
259-
return await self._check_once()
260-
except (OperationFailure, NotPrimaryError) as exc:
261-
# Update max cluster time even when hello fails.
262-
details = cast(Mapping[str, Any], exc.details)
263-
await self._topology.receive_cluster_time(details.get("$clusterTime"))
264-
raise
258+
return await self._check_once()
265259
except ReferenceError:
266260
raise
267261
except Exception as error:
@@ -358,7 +352,6 @@ async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float]
358352
359353
Can raise ConnectionFailure or OperationFailure.
360354
"""
361-
cluster_time = self._topology.max_cluster_time()
362355
start = time.monotonic()
363356
if conn.more_to_come:
364357
# Read the next streaming hello (MongoDB 4.4+).
@@ -368,13 +361,12 @@ async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float]
368361
):
369362
# Initiate streaming hello (MongoDB 4.4+).
370363
response = await conn._hello(
371-
cluster_time,
372364
self._server_description.topology_version,
373365
self._settings.heartbeat_frequency,
374366
)
375367
else:
376368
# New connection handshake or polling hello (MongoDB <4.4).
377-
response = await conn._hello(cluster_time, None, None)
369+
response = await conn._hello(None, None)
378370
duration = _monotonic_duration(start)
379371
return response, duration
380372

pymongo/asynchronous/network.py

+4
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ async def command(
207207
)
208208

209209
response_doc = unpacked_docs[0]
210+
if not conn.ready:
211+
cluster_time = response_doc.get("$clusterTime")
212+
if cluster_time:
213+
conn._cluster_time = cluster_time
210214
if client:
211215
await client._process_response(response_doc, session)
212216
if check:

pymongo/asynchronous/pool.py

+7-6
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@
102102
from pymongo.pyopenssl_context import _sslConn
103103
from pymongo.read_concern import ReadConcern
104104
from pymongo.read_preferences import _ServerMode
105-
from pymongo.typings import ClusterTime, _Address, _CollationIn
105+
from pymongo.typings import _Address, _CollationIn
106106
from pymongo.write_concern import WriteConcern
107107

108108
try:
@@ -310,6 +310,8 @@ def __init__(
310310
self.connect_rtt = 0.0
311311
self._client_id = pool._client_id
312312
self.creation_time = time.monotonic()
313+
# For gossiping $clusterTime from the connection handshake to the client.
314+
self._cluster_time = None
313315

314316
def set_conn_timeout(self, timeout: Optional[float]) -> None:
315317
"""Cache last timeout to avoid duplicate calls to conn.settimeout."""
@@ -374,11 +376,10 @@ def hello_cmd(self) -> dict[str, Any]:
374376
return {HelloCompat.LEGACY_CMD: 1, "helloOk": True}
375377

376378
async def hello(self) -> Hello:
377-
return await self._hello(None, None, None)
379+
return await self._hello(None, None)
378380

379381
async def _hello(
380382
self,
381-
cluster_time: Optional[ClusterTime],
382383
topology_version: Optional[Any],
383384
heartbeat_frequency: Optional[int],
384385
) -> Hello[dict[str, Any]]:
@@ -401,9 +402,6 @@ async def _hello(
401402
if self.opts.connect_timeout:
402403
self.set_conn_timeout(self.opts.connect_timeout + heartbeat_frequency)
403404

404-
if not performing_handshake and cluster_time is not None:
405-
cmd["$clusterTime"] = cluster_time
406-
407405
creds = self.opts._credentials
408406
if creds:
409407
if creds.mechanism == "DEFAULT" and creds.username:
@@ -1316,6 +1314,9 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
13161314
conn.close_conn(ConnectionClosedReason.ERROR)
13171315
raise
13181316

1317+
if handler:
1318+
await handler.client._topology.receive_cluster_time(conn._cluster_time)
1319+
13191320
return conn
13201321

13211322
@contextlib.asynccontextmanager

pymongo/asynchronous/topology.py

-1
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,6 @@ async def _process_change(
501501

502502
self._description = new_td
503503
await self._update_servers()
504-
self._receive_cluster_time_no_lock(server_description.cluster_time)
505504

506505
if self._publish_tp and not suppress_event:
507506
assert self._events is not None

pymongo/synchronous/monitor.py

+4-12
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
import logging
2222
import time
2323
import weakref
24-
from typing import TYPE_CHECKING, Any, Mapping, Optional, cast
24+
from typing import TYPE_CHECKING, Any, Optional
2525

2626
from pymongo import common, periodic_executor
2727
from pymongo._csot import MovingMinimum
28-
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
28+
from pymongo.errors import NetworkTimeout, _OperationCancelled
2929
from pymongo.hello import Hello
3030
from pymongo.lock import _create_lock
3131
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
@@ -253,13 +253,7 @@ def _check_server(self) -> ServerDescription:
253253
self._conn_id = None
254254
start = time.monotonic()
255255
try:
256-
try:
257-
return self._check_once()
258-
except (OperationFailure, NotPrimaryError) as exc:
259-
# Update max cluster time even when hello fails.
260-
details = cast(Mapping[str, Any], exc.details)
261-
self._topology.receive_cluster_time(details.get("$clusterTime"))
262-
raise
256+
return self._check_once()
263257
except ReferenceError:
264258
raise
265259
except Exception as error:
@@ -356,7 +350,6 @@ def _check_with_socket(self, conn: Connection) -> tuple[Hello, float]:
356350
357351
Can raise ConnectionFailure or OperationFailure.
358352
"""
359-
cluster_time = self._topology.max_cluster_time()
360353
start = time.monotonic()
361354
if conn.more_to_come:
362355
# Read the next streaming hello (MongoDB 4.4+).
@@ -366,13 +359,12 @@ def _check_with_socket(self, conn: Connection) -> tuple[Hello, float]:
366359
):
367360
# Initiate streaming hello (MongoDB 4.4+).
368361
response = conn._hello(
369-
cluster_time,
370362
self._server_description.topology_version,
371363
self._settings.heartbeat_frequency,
372364
)
373365
else:
374366
# New connection handshake or polling hello (MongoDB <4.4).
375-
response = conn._hello(cluster_time, None, None)
367+
response = conn._hello(None, None)
376368
duration = _monotonic_duration(start)
377369
return response, duration
378370

pymongo/synchronous/network.py

+4
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ def command(
207207
)
208208

209209
response_doc = unpacked_docs[0]
210+
if not conn.ready:
211+
cluster_time = response_doc.get("$clusterTime")
212+
if cluster_time:
213+
conn._cluster_time = cluster_time
210214
if client:
211215
client._process_response(response_doc, session)
212216
if check:

pymongo/synchronous/pool.py

+7-6
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@
102102
from pymongo.synchronous.auth import _AuthContext
103103
from pymongo.synchronous.client_session import ClientSession
104104
from pymongo.synchronous.mongo_client import MongoClient, _MongoClientErrorHandler
105-
from pymongo.typings import ClusterTime, _Address, _CollationIn
105+
from pymongo.typings import _Address, _CollationIn
106106
from pymongo.write_concern import WriteConcern
107107

108108
try:
@@ -310,6 +310,8 @@ def __init__(
310310
self.connect_rtt = 0.0
311311
self._client_id = pool._client_id
312312
self.creation_time = time.monotonic()
313+
# For gossiping $clusterTime from the connection handshake to the client.
314+
self._cluster_time = None
313315

314316
def set_conn_timeout(self, timeout: Optional[float]) -> None:
315317
"""Cache last timeout to avoid duplicate calls to conn.settimeout."""
@@ -374,11 +376,10 @@ def hello_cmd(self) -> dict[str, Any]:
374376
return {HelloCompat.LEGACY_CMD: 1, "helloOk": True}
375377

376378
def hello(self) -> Hello:
377-
return self._hello(None, None, None)
379+
return self._hello(None, None)
378380

379381
def _hello(
380382
self,
381-
cluster_time: Optional[ClusterTime],
382383
topology_version: Optional[Any],
383384
heartbeat_frequency: Optional[int],
384385
) -> Hello[dict[str, Any]]:
@@ -401,9 +402,6 @@ def _hello(
401402
if self.opts.connect_timeout:
402403
self.set_conn_timeout(self.opts.connect_timeout + heartbeat_frequency)
403404

404-
if not performing_handshake and cluster_time is not None:
405-
cmd["$clusterTime"] = cluster_time
406-
407405
creds = self.opts._credentials
408406
if creds:
409407
if creds.mechanism == "DEFAULT" and creds.username:
@@ -1310,6 +1308,9 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
13101308
conn.close_conn(ConnectionClosedReason.ERROR)
13111309
raise
13121310

1311+
if handler:
1312+
handler.client._topology.receive_cluster_time(conn._cluster_time)
1313+
13131314
return conn
13141315

13151316
@contextlib.contextmanager

pymongo/synchronous/topology.py

-1
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,6 @@ def _process_change(
501501

502502
self._description = new_td
503503
self._update_servers()
504-
self._receive_cluster_time_no_lock(server_description.cluster_time)
505504

506505
if self._publish_tp and not suppress_event:
507506
assert self._events is not None

test/asynchronous/test_session.py

+38-4
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@
3636
async_client_context,
3737
unittest,
3838
)
39+
from test.asynchronous.helpers import client_knobs
3940
from test.utils import (
4041
EventListener,
42+
HeartbeatEventListener,
4143
OvertCommandListener,
4244
async_wait_until,
4345
)
@@ -1135,12 +1137,10 @@ async def asyncSetUp(self):
11351137
if "$clusterTime" not in (await async_client_context.hello):
11361138
raise SkipTest("$clusterTime not supported")
11371139

1140+
# Sessions prose test: 3) $clusterTime in commands
11381141
async def test_cluster_time(self):
11391142
listener = SessionTestListener()
1140-
# Prevent heartbeats from updating $clusterTime between operations.
1141-
client = await self.async_rs_or_single_client(
1142-
event_listeners=[listener], heartbeatFrequencyMS=999999
1143-
)
1143+
client = await self.async_rs_or_single_client(event_listeners=[listener])
11441144
collection = client.pymongo_test.collection
11451145
# Prepare for tests of find() and aggregate().
11461146
await collection.insert_many([{} for _ in range(10)])
@@ -1219,6 +1219,40 @@ async def aggregate():
12191219
f"{f.__name__} sent wrong $clusterTime with {event.command_name}",
12201220
)
12211221

1222+
# Sessions prose test: 20) Drivers do not gossip `$clusterTime` on SDAM commands
1223+
async def test_cluster_time_not_used_by_sdam(self):
1224+
heartbeat_listener = HeartbeatEventListener()
1225+
cmd_listener = OvertCommandListener()
1226+
with client_knobs(min_heartbeat_interval=0.01):
1227+
c1 = await self.async_single_client(
1228+
event_listeners=[heartbeat_listener, cmd_listener], heartbeatFrequencyMS=10
1229+
)
1230+
cluster_time = (await c1.admin.command({"ping": 1}))["$clusterTime"]
1231+
self.assertEqual(c1._topology.max_cluster_time(), cluster_time)
1232+
1233+
# Advance the server's $clusterTime by performing an insert via another client.
1234+
await self.db.test.insert_one({"advance": "$clusterTime"})
1235+
# Wait until the client C1 processes the next pair of SDAM heartbeat started + succeeded events.
1236+
heartbeat_listener.reset()
1237+
1238+
async def next_heartbeat():
1239+
events = heartbeat_listener.events
1240+
for i in range(len(events) - 1):
1241+
if isinstance(events[i], monitoring.ServerHeartbeatStartedEvent):
1242+
if isinstance(events[i + 1], monitoring.ServerHeartbeatSucceededEvent):
1243+
return True
1244+
return False
1245+
1246+
await async_wait_until(
1247+
next_heartbeat, "never found pair of heartbeat started + succeeded events"
1248+
)
1249+
# Assert that C1's max $clusterTime is still the same and has not been updated by SDAM.
1250+
cmd_listener.reset()
1251+
await c1.admin.command({"ping": 1})
1252+
started = cmd_listener.started_events[0]
1253+
self.assertEqual(started.command_name, "ping")
1254+
self.assertEqual(started.command["$clusterTime"], cluster_time)
1255+
12221256

12231257
if __name__ == "__main__":
12241258
unittest.main()

test/test_discovery_and_monitoring.py

+9-11
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ class TestClusterTimeComparison(unittest.TestCase):
244244
def test_cluster_time_comparison(self):
245245
t = create_mock_topology("mongodb://host")
246246

247-
def send_cluster_time(time, inc, should_update):
247+
def send_cluster_time(time, inc):
248248
old = t.max_cluster_time()
249249
new = {"clusterTime": Timestamp(time, inc)}
250250
got_hello(
@@ -259,16 +259,14 @@ def send_cluster_time(time, inc, should_update):
259259
)
260260

261261
actual = t.max_cluster_time()
262-
if should_update:
263-
self.assertEqual(actual, new)
264-
else:
265-
self.assertEqual(actual, old)
266-
267-
send_cluster_time(0, 1, True)
268-
send_cluster_time(2, 2, True)
269-
send_cluster_time(2, 1, False)
270-
send_cluster_time(1, 3, False)
271-
send_cluster_time(2, 3, True)
262+
# We never update $clusterTime from monitoring connections.
263+
self.assertEqual(actual, old)
264+
265+
send_cluster_time(0, 1)
266+
send_cluster_time(2, 2)
267+
send_cluster_time(2, 1)
268+
send_cluster_time(1, 3)
269+
send_cluster_time(2, 3)
272270

273271

274272
class TestIgnoreStaleErrors(IntegrationTest):

0 commit comments

Comments
 (0)