Skip to content

Commit 8dc7efa

Browse files
authored
PYTHON-5821 - Fix ordering issue between event publish and logging for Pool monitoring tests (#2796)
1 parent f4219bd commit 8dc7efa

2 files changed

Lines changed: 38 additions & 36 deletions

File tree

pymongo/asynchronous/pool.py

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -760,11 +760,7 @@ def __init__(
760760
self._pending = 0
761761
self._max_connecting = self.opts.max_connecting
762762
self._client_id = client_id
763-
if self.enabled_for_cmap:
764-
assert self.opts._event_listeners is not None
765-
self.opts._event_listeners.publish_pool_created(
766-
self.address, self.opts.non_default_options
767-
)
763+
# Log before publishing event to prevent potential listener preemption in tests
768764
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
769765
_debug_log(
770766
_CONNECTION_LOGGER,
@@ -774,6 +770,11 @@ def __init__(
774770
serverPort=self.address[1],
775771
**self.opts.non_default_options,
776772
)
773+
if self.enabled_for_cmap:
774+
assert self.opts._event_listeners is not None
775+
self.opts._event_listeners.publish_pool_created(
776+
self.address, self.opts.non_default_options
777+
)
777778
# Similar to active_sockets but includes threads in the wait queue.
778779
self.operation_count: int = 0
779780
# Retain references to pinned connections to prevent the CPython GC
@@ -788,9 +789,6 @@ async def ready(self) -> None:
788789
async with self.lock:
789790
if self.state != PoolState.READY:
790791
self.state = PoolState.READY
791-
if self.enabled_for_cmap:
792-
assert self.opts._event_listeners is not None
793-
self.opts._event_listeners.publish_pool_ready(self.address)
794792
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
795793
_debug_log(
796794
_CONNECTION_LOGGER,
@@ -799,6 +797,9 @@ async def ready(self) -> None:
799797
serverHost=self.address[0],
800798
serverPort=self.address[1],
801799
)
800+
if self.enabled_for_cmap:
801+
assert self.opts._event_listeners is not None
802+
self.opts._event_listeners.publish_pool_ready(self.address)
802803

803804
@property
804805
def closed(self) -> bool:
@@ -859,9 +860,6 @@ async def _reset(
859860
else:
860861
for conn in sockets:
861862
await conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
862-
if self.enabled_for_cmap:
863-
assert listeners is not None
864-
listeners.publish_pool_closed(self.address)
865863
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
866864
_debug_log(
867865
_CONNECTION_LOGGER,
@@ -870,15 +868,11 @@ async def _reset(
870868
serverHost=self.address[0],
871869
serverPort=self.address[1],
872870
)
871+
if self.enabled_for_cmap:
872+
assert listeners is not None
873+
listeners.publish_pool_closed(self.address)
873874
else:
874875
if old_state != PoolState.PAUSED:
875-
if self.enabled_for_cmap:
876-
assert listeners is not None
877-
listeners.publish_pool_cleared(
878-
self.address,
879-
service_id=service_id,
880-
interrupt_connections=interrupt_connections,
881-
)
882876
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
883877
_debug_log(
884878
_CONNECTION_LOGGER,
@@ -888,6 +882,13 @@ async def _reset(
888882
serverPort=self.address[1],
889883
serviceId=service_id,
890884
)
885+
if self.enabled_for_cmap:
886+
assert listeners is not None
887+
listeners.publish_pool_cleared(
888+
self.address,
889+
service_id=service_id,
890+
interrupt_connections=interrupt_connections,
891+
)
891892
if not _IS_SYNC:
892893
await asyncio.gather(
893894
*[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value]

pymongo/synchronous/pool.py

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -758,11 +758,7 @@ def __init__(
758758
self._pending = 0
759759
self._max_connecting = self.opts.max_connecting
760760
self._client_id = client_id
761-
if self.enabled_for_cmap:
762-
assert self.opts._event_listeners is not None
763-
self.opts._event_listeners.publish_pool_created(
764-
self.address, self.opts.non_default_options
765-
)
761+
# Log before publishing event to prevent potential listener preemption in tests
766762
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
767763
_debug_log(
768764
_CONNECTION_LOGGER,
@@ -772,6 +768,11 @@ def __init__(
772768
serverPort=self.address[1],
773769
**self.opts.non_default_options,
774770
)
771+
if self.enabled_for_cmap:
772+
assert self.opts._event_listeners is not None
773+
self.opts._event_listeners.publish_pool_created(
774+
self.address, self.opts.non_default_options
775+
)
775776
# Similar to active_sockets but includes threads in the wait queue.
776777
self.operation_count: int = 0
777778
# Retain references to pinned connections to prevent the CPython GC
@@ -786,9 +787,6 @@ def ready(self) -> None:
786787
with self.lock:
787788
if self.state != PoolState.READY:
788789
self.state = PoolState.READY
789-
if self.enabled_for_cmap:
790-
assert self.opts._event_listeners is not None
791-
self.opts._event_listeners.publish_pool_ready(self.address)
792790
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
793791
_debug_log(
794792
_CONNECTION_LOGGER,
@@ -797,6 +795,9 @@ def ready(self) -> None:
797795
serverHost=self.address[0],
798796
serverPort=self.address[1],
799797
)
798+
if self.enabled_for_cmap:
799+
assert self.opts._event_listeners is not None
800+
self.opts._event_listeners.publish_pool_ready(self.address)
800801

801802
@property
802803
def closed(self) -> bool:
@@ -857,9 +858,6 @@ def _reset(
857858
else:
858859
for conn in sockets:
859860
conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
860-
if self.enabled_for_cmap:
861-
assert listeners is not None
862-
listeners.publish_pool_closed(self.address)
863861
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
864862
_debug_log(
865863
_CONNECTION_LOGGER,
@@ -868,15 +866,11 @@ def _reset(
868866
serverHost=self.address[0],
869867
serverPort=self.address[1],
870868
)
869+
if self.enabled_for_cmap:
870+
assert listeners is not None
871+
listeners.publish_pool_closed(self.address)
871872
else:
872873
if old_state != PoolState.PAUSED:
873-
if self.enabled_for_cmap:
874-
assert listeners is not None
875-
listeners.publish_pool_cleared(
876-
self.address,
877-
service_id=service_id,
878-
interrupt_connections=interrupt_connections,
879-
)
880874
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
881875
_debug_log(
882876
_CONNECTION_LOGGER,
@@ -886,6 +880,13 @@ def _reset(
886880
serverPort=self.address[1],
887881
serviceId=service_id,
888882
)
883+
if self.enabled_for_cmap:
884+
assert listeners is not None
885+
listeners.publish_pool_cleared(
886+
self.address,
887+
service_id=service_id,
888+
interrupt_connections=interrupt_connections,
889+
)
889890
if not _IS_SYNC:
890891
asyncio.gather(
891892
*[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value]

0 commit comments

Comments
 (0)