88from configdb import connection_factory
99
1010
11- def test_multi_statements_default_true ():
12- conn = connection_factory ()
11+ @pytest .fixture
12+ def conn ():
13+ connection = connection_factory ()
14+ try :
15+ yield connection
16+ finally :
17+ connection .close ()
18+
19+
20+ def test_multi_statements_default_true (conn ):
1321 cursor = conn .cursor ()
1422
1523 cursor .execute ("select 17; select 2" )
@@ -19,98 +27,98 @@ def test_multi_statements_default_true():
1927
2028def test_multi_statements_false ():
2129 conn = connection_factory (multi_statements = False )
22- cursor = conn .cursor ()
23-
24- with pytest .raises (ProgrammingError ):
25- cursor .execute ("select 17; select 2" )
26-
27- cursor .execute ("select 17" )
28- rows = cursor .fetchall ()
29- assert rows == ((17 ,),)
30-
31-
32- def test_connection_concurrent_use_raises ():
33- """While a slow query holds the connection lock, any other access from
34- a second thread must raise ProgrammingError immediately (not block)."""
35- conn = connection_factory ()
3630 try :
37- thread_error = None
38- done = threading .Event ()
39-
40- def run_slow_query ():
41- nonlocal thread_error
42- try :
43- conn .query ("SELECT SLEEP(0.5)" )
44- result = conn .store_result ()
45- result .fetch_row ()
46- except Exception as exc : # pragma: no cover
47- thread_error = exc
48- finally :
49- done .set ()
50-
51- thread = threading .Thread (target = run_slow_query )
52- thread .start ()
53-
54- # Give the background thread time to acquire the lock and enter SLEEP.
55- time .sleep (0.1 )
56-
57- start = time .monotonic ()
58- with pytest .raises (ProgrammingError , match = "already in use" ):
59- conn .thread_id ()
60- # Should fail immediately, not wait for the SLEEP to finish.
61- assert time .monotonic () - start < 0.1
31+ cursor = conn .cursor ()
32+
33+ with pytest .raises (ProgrammingError ):
34+ cursor .execute ("select 17; select 2" )
6235
63- done . wait ( )
64- thread . join ()
65- assert thread_error is None
36+ cursor . execute ( "select 17" )
37+ rows = cursor . fetchall ()
38+ assert rows == (( 17 ,),)
6639 finally :
6740 conn .close ()
6841
6942
70- def test_result_concurrent_use_raises ():
43+ def test_ping_false_warns (conn ):
44+ with pytest .warns (DeprecationWarning , match = "reconnect parameter" ):
45+ conn .ping (False )
46+
47+
48+ def test_connection_concurrent_use_raises (conn ):
49+ """While a slow query holds the connection lock, any other access from
50+ a second thread must raise ProgrammingError immediately (not block)."""
51+ thread_error = None
52+ done = threading .Event ()
53+
54+ def run_slow_query ():
55+ nonlocal thread_error
56+ try :
57+ conn .query ("SELECT SLEEP(0.5)" )
58+ result = conn .store_result ()
59+ result .fetch_row ()
60+ except Exception as exc : # pragma: no cover
61+ thread_error = exc
62+ finally :
63+ done .set ()
64+
65+ thread = threading .Thread (target = run_slow_query )
66+ thread .start ()
67+
68+ # Give the background thread time to acquire the lock and enter SLEEP.
69+ time .sleep (0.1 )
70+
71+ start = time .monotonic ()
72+ with pytest .raises (ProgrammingError , match = "already in use" ):
73+ conn .thread_id ()
74+ # Should fail immediately, not wait for the SLEEP to finish.
75+ assert time .monotonic () - start < 0.1
76+
77+ done .wait ()
78+ thread .join ()
79+ assert thread_error is None
80+
81+
82+ def test_result_concurrent_use_raises (conn ):
7183 """While fetch_row holds the connection lock streaming a slow result,
7284 any other access from a second thread must raise ProgrammingError immediately."""
73- conn = connection_factory ()
74- try :
75- conn .query (
76- "SELECT 1 FROM information_schema.tables a "
77- "CROSS JOIN information_schema.tables b LIMIT 20000"
78- )
79- result = conn .use_result ()
80-
81- thread_error = None
82- done = threading .Event ()
83- started = threading .Event ()
84-
85- def fetch_all_rows ():
86- nonlocal thread_error
87- try :
88- started .set ()
89- rows = result .fetch_row (maxrows = 0 )
90- assert rows
91- except Exception as exc : # pragma: no cover
92- thread_error = exc
93- finally :
94- done .set ()
95-
96- thread = threading .Thread (target = fetch_all_rows )
97- thread .start ()
98-
99- assert started .wait (timeout = 1.0 )
100-
101- deadline = time .monotonic () + 1.0
102- while time .monotonic () < deadline :
103- try :
104- conn .thread_id ()
105- except ProgrammingError as exc :
106- assert "already in use" in str (exc )
107- break
108- time .sleep (0.01 )
109- else : # pragma: no cover
110- pytest .fail ("expected ProgrammingError while result.fetch_row() is running" )
111-
112- done .wait ()
113- thread .join ()
114- assert thread_error is None
115- finally :
116- conn .close ()
85+ conn .query (
86+ "SELECT 1 FROM information_schema.tables a "
87+ "CROSS JOIN information_schema.tables b LIMIT 20000"
88+ )
89+ result = conn .use_result ()
90+
91+ thread_error = None
92+ done = threading .Event ()
93+ started = threading .Event ()
94+
95+ def fetch_all_rows ():
96+ nonlocal thread_error
97+ try :
98+ started .set ()
99+ rows = result .fetch_row (maxrows = 0 )
100+ assert rows
101+ except Exception as exc : # pragma: no cover
102+ thread_error = exc
103+ finally :
104+ done .set ()
105+
106+ thread = threading .Thread (target = fetch_all_rows )
107+ thread .start ()
108+
109+ assert started .wait (timeout = 1.0 )
110+
111+ deadline = time .monotonic () + 1.0
112+ while time .monotonic () < deadline :
113+ try :
114+ conn .thread_id ()
115+ except ProgrammingError as exc :
116+ assert "already in use" in str (exc )
117+ break
118+ time .sleep (0.01 )
119+ else : # pragma: no cover
120+ pytest .fail ("expected ProgrammingError while result.fetch_row() is running" )
121+
122+ done .wait ()
123+ thread .join ()
124+ assert thread_error is None
0 commit comments