Skip to content

Commit

Permalink
Support recent version of coverage tool
Browse files Browse the repository at this point in the history
(coverage 7.2.5)

Without these changes many warns/errors are emitted:
- mocked objects in "setUp" method like "cleanup.IPCFlag" cannot be repatched
  at the level of the test functions, otherwise tests are aborted,
  this is the  behavior of coverage version 7.2.5

Signed-off-by: Ronan Abhamon <[email protected]>
  • Loading branch information
Wescoeur authored and MarkSymsCtx committed Jul 5, 2023
1 parent a82ff6c commit fdca39c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 59 deletions.
12 changes: 3 additions & 9 deletions tests/test_LVHDSR.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,9 @@ def get_dummy_vhd(self, vdi_uuid, hidden):
test_vhdInfo.hidden = hidden
self.mock_vhdutil.getVHDInfo.return_value = test_vhdInfo

@mock.patch('LVHDSR.lvutil.LvmLockContext', autospec=True)
@mock.patch('LVHDSR.Lock', autospec=True)
@mock.patch('SR.XenAPI')
def test_clone_success(self, mock_xenapi, mock_lock,
mock_lock_context):
def test_clone_success(self, mock_xenapi, mock_lock):
"""
Successfully create clone
"""
Expand Down Expand Up @@ -319,11 +317,9 @@ def test_clone_success(self, mock_xenapi, mock_lock,
# Assert
self.assertIsNotNone(clone)

@mock.patch('LVHDSR.lvutil.LvmLockContext', autospec=True)
@mock.patch('LVHDSR.Lock', autospec=True)
@mock.patch('SR.XenAPI')
def test_snapshot_attached_success(
self, mock_xenapi, mock_lock, mock_lock_context):
def test_snapshot_attached_success(self, mock_xenapi, mock_lock):
"""
LVHDSR.snapshot, attached on host, no CBT
"""
Expand Down Expand Up @@ -358,11 +354,9 @@ def test_snapshot_attached_success(
# Assert
self.assertIsNotNone(snap)

@mock.patch('LVHDSR.lvutil.LvmLockContext', autospec=True)
@mock.patch('LVHDSR.Lock', autospec=True)
@mock.patch('SR.XenAPI')
def test_snapshot_attached_cbt_success(
self, mock_xenapi, mock_lock, mock_lock_context):
def test_snapshot_attached_cbt_success(self, mock_xenapi, mock_lock):
"""
LVHDSR.snapshot, attached on host, with CBT
"""
Expand Down
75 changes: 25 additions & 50 deletions tests/test_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def create_cleanup_sr(xapi, uuid=None):

class TestSR(unittest.TestCase):
def setUp(self):
self.sleep_patcher = mock.patch('cleanup.time.sleep')
self.sleep_patcher.start()
time_sleep_patcher = mock.patch('cleanup.time.sleep')
self.mock_time_sleep = time_sleep_patcher.start()

updateBlockInfo_patcher = mock.patch('cleanup.VDI.updateBlockInfo')
self.mock_updateBlockInfo = updateBlockInfo_patcher.start()
Expand Down Expand Up @@ -116,59 +116,47 @@ def test_lock_if_no_locking_is_used(self):

self.assertEqual(0, sr._locked)

@mock.patch('cleanup.IPCFlag', autospec=True)
def test_lock_succeeds_if_lock_is_acquired(
self,
mock_ipc_flag):
def test_lock_succeeds_if_lock_is_acquired(self):
"""
After performing a lock, the counter equals to 1
"""

self.setup_abort_flag(mock_ipc_flag)
self.setup_abort_flag(self.mock_IPCFlag)
sr = create_cleanup_sr(self.xapi_mock)
sr._srLock = AlwaysFreeLock()

sr.lock()

self.assertEqual(1, sr._locked)

@mock.patch('cleanup.IPCFlag', autospec=True)
def test_lock_raises_exception_if_abort_requested(
self,
mock_ipc_flag):
def test_lock_raises_exception_if_abort_requested(self):
"""
If IPC abort was requested, lock raises AbortException
"""

self.setup_abort_flag(mock_ipc_flag, should_abort=True)
self.setup_abort_flag(self.mock_IPCFlag, should_abort=True)
sr = create_cleanup_sr(self.xapi_mock)
sr._srLock = AlwaysLockedLock()

self.assertRaises(cleanup.AbortException, sr.lock)

@mock.patch('cleanup.IPCFlag', autospec=True)
def test_lock_raises_exception_if_unable_to_acquire_lock(
self,
mock_ipc_flag):
def test_lock_raises_exception_if_unable_to_acquire_lock(self):
"""
If the lock is busy, SMException is raised
"""

self.setup_abort_flag(mock_ipc_flag)
self.setup_abort_flag(self.mock_IPCFlag)
sr = create_cleanup_sr(self.xapi_mock)
sr._srLock = AlwaysLockedLock()

self.assertRaises(util.SMException, sr.lock)

@mock.patch('cleanup.IPCFlag', autospec=True)
def test_lock_leaves_sr_consistent_if_unable_to_acquire_lock(
self,
mock_ipc_flag):
def test_lock_leaves_sr_consistent_if_unable_to_acquire_lock(self):
"""
If the lock is busy, the lock counter is not incremented
"""

self.setup_abort_flag(mock_ipc_flag)
self.setup_abort_flag(self.mock_IPCFlag)
sr = create_cleanup_sr(self.xapi_mock)
sr._srLock = AlwaysLockedLock()

Expand Down Expand Up @@ -398,62 +386,55 @@ def test__abort_returns_true_when_get_lock(
ret = cleanup._abort(None)
self.assertEqual(ret, True)

@mock.patch('cleanup.IPCFlag', autospec=True)
@mock.patch('cleanup.init')
def test__abort_return_false_if_flag_not_set(
self,
mock_init,
mock_ipcflag):
mock_init):
"""
If flag not set return False.
"""
mock_init.return_value = None

# Fake the flag returning False.
mock_ipcflag.return_value.set.return_value = False
self.mock_IPCFlag.return_value.set.return_value = False

# Not important for this test but we call it so mock it.
cleanup.lockActive = AlwaysLockedLock()

ret = cleanup._abort(None)

self.assertEqual(mock_ipcflag.return_value.set.call_count, 1)
self.assertEqual(self.mock_IPCFlag.return_value.set.call_count, 1)
self.assertEqual(ret, False)

@mock.patch('cleanup.IPCFlag', autospec=True)
@mock.patch('cleanup.init')
def test__abort_should_raise_if_cant_get_lock(
self,
mock_init,
mock_ipcflag):
def test__abort_should_raise_if_cant_get_lock(self, mock_init):
"""
_abort should raise an exception if it completely
fails to get lockActive.
"""
mock_init.return_value = None

# Fake return true so we don't bomb out straight away.
mock_ipcflag.return_value.set.return_value = True
self.mock_IPCFlag.return_value.set.return_value = True

# Fake never getting the lock.
cleanup.lockActive = AlwaysLockedLock()

with self.assertRaises(util.CommandException):
cleanup._abort(None)

@mock.patch('cleanup.IPCFlag', autospec=True)
@mock.patch('cleanup.init')
def test__abort_should_succeed_if_aquires_on_second_attempt(
self,
mock_init,
mock_ipcflag):
mock_init
):
"""
_abort should succeed if gets lock on second attempt
"""
mock_init.return_value = None

# Fake return true so we don't bomb out straight away.
mock_ipcflag.return_value.set.return_value = True
self.mock_IPCFlag.return_value.set.return_value = True

# Use side effect to fake failing to get the lock
# on the first call, succeeding on the second.
Expand All @@ -467,20 +448,19 @@ def test__abort_should_succeed_if_aquires_on_second_attempt(
self.assertEqual(mocked_lock.acquireNoblock.call_count, 2)
self.assertEqual(ret, True)

@mock.patch('cleanup.IPCFlag', autospec=True)
@mock.patch('cleanup.init')
def test__abort_should_fail_if_reaches_maximum_retries_for_lock(
self,
mock_init,
mock_ipcflag):
mock_init
):
"""
_abort should fail if we max out the number of attempts for
obtaining the lock.
"""
mock_init.return_value = None

# Fake return true so we don't bomb out straight away.
mock_ipcflag.return_value.set.return_value = True
self.mock_IPCFlag.return_value.set.return_value = True

# Fake a series of failed attempts to get the lock.
mocked_lock = AlwaysLockedLock()
Expand All @@ -506,17 +486,13 @@ def test__abort_should_fail_if_reaches_maximum_retries_for_lock(
self.assertEqual(mocked_lock.acquireNoblock.call_count,
cleanup.SR.LOCK_RETRY_ATTEMPTS + 1)

@mock.patch('cleanup.IPCFlag', autospec=True)
@mock.patch('cleanup.init')
def test__abort_succeeds_if_gets_lock_on_final_attempt(
self,
mock_init,
mock_ipcflag):
def test__abort_succeeds_if_gets_lock_on_final_attempt(self, mock_init):
"""
_abort succeeds if we get the lockActive on the final retry
"""
mock_init.return_value = None
mock_ipcflag.return_value.set.return_value = True
self.mock_IPCFlag.return_value.set.return_value = True
mocked_lock = AlwaysLockedLock()
mocked_lock.acquireNoblock = mock.Mock()

Expand Down Expand Up @@ -1587,8 +1563,7 @@ def test_tag_children_for_relink_activation_second_phase(self):
# Remove called 3 times, twice from set, once on failure
self.assertEqual(3, self.xapi_mock.removeFromConfigVDI.call_count)

@mock.patch('cleanup.time.sleep', autospec=True)
def test_tag_children_for_relink_blocked(self, mock_sleep):
def test_tag_children_for_relink_blocked(self):
"""
Cleanup: tag for relink, blocked - exception
"""
Expand All @@ -1605,7 +1580,7 @@ def test_tag_children_for_relink_blocked(self, mock_sleep):

self.assertIn('Failed to tag vdi', str(sme.exception))

self.assertGreater(mock_sleep.call_count, 5)
self.assertGreater(self.mock_time_sleep.call_count, 5)

@mock.patch('cleanup.util.get_this_host', autospec=True)
@mock.patch('cleanup._gcLoop', autospec=True)
Expand Down

0 comments on commit fdca39c

Please sign in to comment.