88from configdb import connection_factory
99
1010
11- def test_multi_statements_default_true ():
12- conn = connection_factory ()
11+ @pytest .fixture
12+ def conn ():
13+ connection = connection_factory ()
14+ try :
15+ yield connection
16+ finally :
17+ connection .close ()
18+
19+
20+ def test_multi_statements_default_true (conn ):
1321 cursor = conn .cursor ()
1422
1523 cursor .execute ("select 17; select 2" )
@@ -19,105 +27,98 @@ def test_multi_statements_default_true():
1927
2028def test_multi_statements_false ():
2129 conn = connection_factory (multi_statements = False )
22- cursor = conn .cursor ()
30+ try :
31+ cursor = conn .cursor ()
2332
24- with pytest .raises (ProgrammingError ):
25- cursor .execute ("select 17; select 2" )
33+ with pytest .raises (ProgrammingError ):
34+ cursor .execute ("select 17; select 2" )
2635
27- cursor .execute ("select 17" )
28- rows = cursor .fetchall ()
29- assert rows == ((17 ,),)
36+ cursor .execute ("select 17" )
37+ rows = cursor .fetchall ()
38+ assert rows == ((17 ,),)
39+ finally :
40+ conn .close ()
3041
3142
32- def test_ping_false_warns ():
33- conn = connection_factory ()
43+ def test_ping_false_warns (conn ):
3444 with pytest .warns (DeprecationWarning , match = "reconnect parameter" ):
3545 conn .ping (False )
36- conn .close ()
3746
3847
39- def test_connection_concurrent_use_raises ():
48+ def test_connection_concurrent_use_raises (conn ):
4049 """While a slow query holds the connection lock, any other access from
4150 a second thread must raise ProgrammingError immediately (not block)."""
42- conn = connection_factory ()
43- try :
44- thread_error = None
45- done = threading .Event ()
46-
47- def run_slow_query ():
48- nonlocal thread_error
49- try :
50- conn .query ("SELECT SLEEP(0.5)" )
51- result = conn .store_result ()
52- result .fetch_row ()
53- except Exception as exc : # pragma: no cover
54- thread_error = exc
55- finally :
56- done .set ()
57-
58- thread = threading .Thread (target = run_slow_query )
59- thread .start ()
60-
61- # Give the background thread time to acquire the lock and enter SLEEP.
62- time .sleep (0.1 )
63-
64- start = time .monotonic ()
65- with pytest .raises (ProgrammingError , match = "already in use" ):
66- conn .thread_id ()
67- # Should fail immediately, not wait for the SLEEP to finish.
68- assert time .monotonic () - start < 0.1
69-
70- done .wait ()
71- thread .join ()
72- assert thread_error is None
73- finally :
74- conn .close ()
75-
76-
77- def test_result_concurrent_use_raises ():
51+ thread_error = None
52+ done = threading .Event ()
53+
54+ def run_slow_query ():
55+ nonlocal thread_error
56+ try :
57+ conn .query ("SELECT SLEEP(0.5)" )
58+ result = conn .store_result ()
59+ result .fetch_row ()
60+ except Exception as exc : # pragma: no cover
61+ thread_error = exc
62+ finally :
63+ done .set ()
64+
65+ thread = threading .Thread (target = run_slow_query )
66+ thread .start ()
67+
68+ # Give the background thread time to acquire the lock and enter SLEEP.
69+ time .sleep (0.1 )
70+
71+ start = time .monotonic ()
72+ with pytest .raises (ProgrammingError , match = "already in use" ):
73+ conn .thread_id ()
74+ # Should fail immediately, not wait for the SLEEP to finish.
75+ assert time .monotonic () - start < 0.1
76+
77+ done .wait ()
78+ thread .join ()
79+ assert thread_error is None
80+
81+
82+ def test_result_concurrent_use_raises (conn ):
7883 """While fetch_row holds the connection lock streaming a slow result,
7984 any other access from a second thread must raise ProgrammingError immediately."""
80- conn = connection_factory ()
81- try :
82- conn .query (
83- "SELECT 1 FROM information_schema.tables a "
84- "CROSS JOIN information_schema.tables b LIMIT 20000"
85- )
86- result = conn .use_result ()
87-
88- thread_error = None
89- done = threading .Event ()
90- started = threading .Event ()
91-
92- def fetch_all_rows ():
93- nonlocal thread_error
94- try :
95- started .set ()
96- rows = result .fetch_row (maxrows = 0 )
97- assert rows
98- except Exception as exc : # pragma: no cover
99- thread_error = exc
100- finally :
101- done .set ()
102-
103- thread = threading .Thread (target = fetch_all_rows )
104- thread .start ()
105-
106- assert started .wait (timeout = 1.0 )
107-
108- deadline = time .monotonic () + 1.0
109- while time .monotonic () < deadline :
110- try :
111- conn .thread_id ()
112- except ProgrammingError as exc :
113- assert "already in use" in str (exc )
114- break
115- time .sleep (0.01 )
116- else : # pragma: no cover
117- pytest .fail ("expected ProgrammingError while result.fetch_row() is running" )
118-
119- done .wait ()
120- thread .join ()
121- assert thread_error is None
122- finally :
123- conn .close ()
85+ conn .query (
86+ "SELECT 1 FROM information_schema.tables a "
87+ "CROSS JOIN information_schema.tables b LIMIT 20000"
88+ )
89+ result = conn .use_result ()
90+
91+ thread_error = None
92+ done = threading .Event ()
93+ started = threading .Event ()
94+
95+ def fetch_all_rows ():
96+ nonlocal thread_error
97+ try :
98+ started .set ()
99+ rows = result .fetch_row (maxrows = 0 )
100+ assert rows
101+ except Exception as exc : # pragma: no cover
102+ thread_error = exc
103+ finally :
104+ done .set ()
105+
106+ thread = threading .Thread (target = fetch_all_rows )
107+ thread .start ()
108+
109+ assert started .wait (timeout = 1.0 )
110+
111+ deadline = time .monotonic () + 1.0
112+ while time .monotonic () < deadline :
113+ try :
114+ conn .thread_id ()
115+ except ProgrammingError as exc :
116+ assert "already in use" in str (exc )
117+ break
118+ time .sleep (0.01 )
119+ else : # pragma: no cover
120+ pytest .fail ("expected ProgrammingError while result.fetch_row() is running" )
121+
122+ done .wait ()
123+ thread .join ()
124+ assert thread_error is None
0 commit comments