Skip to content

Commit c9a85ad

Browse files
authored
PYTHON-5090 Convert test.test_monitor to async (#2106)
1 parent 85ca6f1 commit c9a85ad

File tree

3 files changed

+147
-4
lines changed

3 files changed

+147
-4
lines changed

test/asynchronous/test_monitor.py

+121
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
# Copyright 2014-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Test the monitor module."""
16+
from __future__ import annotations
17+
18+
import asyncio
19+
import gc
20+
import subprocess
21+
import sys
22+
import warnings
23+
from functools import partial
24+
25+
sys.path[0:0] = [""]
26+
27+
from test.asynchronous import AsyncIntegrationTest, async_client_context, connected, unittest
28+
from test.utils import (
29+
ServerAndTopologyEventListener,
30+
async_wait_until,
31+
)
32+
33+
from pymongo.periodic_executor import _EXECUTORS
34+
35+
_IS_SYNC = False
36+
37+
38+
def unregistered(ref):
39+
gc.collect()
40+
return ref not in _EXECUTORS
41+
42+
43+
def get_executors(client):
44+
executors = []
45+
for server in client._topology._servers.values():
46+
executors.append(server._monitor._executor)
47+
executors.append(server._monitor._rtt_monitor._executor)
48+
executors.append(client._kill_cursors_executor)
49+
executors.append(client._topology._Topology__events_executor)
50+
return [e for e in executors if e is not None]
51+
52+
53+
class TestMonitor(AsyncIntegrationTest):
54+
async def create_client(self):
55+
listener = ServerAndTopologyEventListener()
56+
client = await self.unmanaged_async_single_client(event_listeners=[listener])
57+
await connected(client)
58+
return client
59+
60+
async def test_cleanup_executors_on_client_del(self):
61+
with warnings.catch_warnings(record=True) as w:
62+
warnings.simplefilter("always")
63+
client = await self.create_client()
64+
executors = get_executors(client)
65+
self.assertEqual(len(executors), 4)
66+
67+
# Each executor stores a weakref to itself in _EXECUTORS.
68+
executor_refs = [(r, r()._name) for r in _EXECUTORS.copy() if r() in executors]
69+
70+
del executors
71+
del client
72+
73+
for ref, name in executor_refs:
74+
await async_wait_until(
75+
partial(unregistered, ref), f"unregister executor: {name}", timeout=5
76+
)
77+
78+
def resource_warning_caught():
79+
gc.collect()
80+
for warning in w:
81+
if (
82+
issubclass(warning.category, ResourceWarning)
83+
and "Call AsyncMongoClient.close() to safely shut down your client and free up resources."
84+
in str(warning.message)
85+
):
86+
return True
87+
return False
88+
89+
await async_wait_until(resource_warning_caught, "catch resource warning")
90+
91+
async def test_cleanup_executors_on_client_close(self):
92+
client = await self.create_client()
93+
executors = get_executors(client)
94+
self.assertEqual(len(executors), 4)
95+
96+
await client.close()
97+
98+
for executor in executors:
99+
await async_wait_until(
100+
lambda: executor._stopped, f"closed executor: {executor._name}", timeout=5
101+
)
102+
103+
@async_client_context.require_sync
104+
def test_no_thread_start_runtime_err_on_shutdown(self):
105+
"""Test we silence noisy runtime errors fired when the AsyncMongoClient spawns a new thread
106+
on process shutdown."""
107+
command = [
108+
sys.executable,
109+
"-c",
110+
"from pymongo import AsyncMongoClient; c = AsyncMongoClient()",
111+
]
112+
completed_process: subprocess.CompletedProcess = subprocess.run(
113+
command, capture_output=True
114+
)
115+
116+
self.assertFalse(completed_process.stderr)
117+
self.assertFalse(completed_process.stdout)
118+
119+
120+
if __name__ == "__main__":
121+
unittest.main()

test/test_monitor.py

+25-4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"""Test the monitor module."""
1616
from __future__ import annotations
1717

18+
import asyncio
1819
import gc
1920
import subprocess
2021
import sys
@@ -23,14 +24,16 @@
2324

2425
sys.path[0:0] = [""]
2526

26-
from test import IntegrationTest, connected, unittest
27+
from test import IntegrationTest, client_context, connected, unittest
2728
from test.utils import (
2829
ServerAndTopologyEventListener,
2930
wait_until,
3031
)
3132

3233
from pymongo.periodic_executor import _EXECUTORS
3334

35+
_IS_SYNC = True
36+
3437

3538
def unregistered(ref):
3639
gc.collect()
@@ -55,8 +58,8 @@ def create_client(self):
5558
return client
5659

5760
def test_cleanup_executors_on_client_del(self):
58-
with warnings.catch_warnings():
59-
warnings.simplefilter("ignore")
61+
with warnings.catch_warnings(record=True) as w:
62+
warnings.simplefilter("always")
6063
client = self.create_client()
6164
executors = get_executors(client)
6265
self.assertEqual(len(executors), 4)
@@ -70,6 +73,19 @@ def test_cleanup_executors_on_client_del(self):
7073
for ref, name in executor_refs:
7174
wait_until(partial(unregistered, ref), f"unregister executor: {name}", timeout=5)
7275

76+
def resource_warning_caught():
77+
gc.collect()
78+
for warning in w:
79+
if (
80+
issubclass(warning.category, ResourceWarning)
81+
and "Call MongoClient.close() to safely shut down your client and free up resources."
82+
in str(warning.message)
83+
):
84+
return True
85+
return False
86+
87+
wait_until(resource_warning_caught, "catch resource warning")
88+
7389
def test_cleanup_executors_on_client_close(self):
7490
client = self.create_client()
7591
executors = get_executors(client)
@@ -80,10 +96,15 @@ def test_cleanup_executors_on_client_close(self):
8096
for executor in executors:
8197
wait_until(lambda: executor._stopped, f"closed executor: {executor._name}", timeout=5)
8298

99+
@client_context.require_sync
83100
def test_no_thread_start_runtime_err_on_shutdown(self):
84101
"""Test we silence noisy runtime errors fired when the MongoClient spawns a new thread
85102
on process shutdown."""
86-
command = [sys.executable, "-c", "from pymongo import MongoClient; c = MongoClient()"]
103+
command = [
104+
sys.executable,
105+
"-c",
106+
"from pymongo import MongoClient; c = MongoClient()",
107+
]
87108
completed_process: subprocess.CompletedProcess = subprocess.run(
88109
command, capture_output=True
89110
)

tools/synchro.py

+1
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ def async_only_test(f: str) -> bool:
226226
"test_load_balancer.py",
227227
"test_logger.py",
228228
"test_max_staleness.py",
229+
"test_monitor.py",
229230
"test_monitoring.py",
230231
"test_mongos_load_balancing.py",
231232
"test_on_demand_csfle.py",

0 commit comments

Comments
 (0)