diff --git a/pcpostprocess/hergQC.py b/pcpostprocess/hergQC.py index 2342151..f5c4e4a 100644 --- a/pcpostprocess/hergQC.py +++ b/pcpostprocess/hergQC.py @@ -7,6 +7,7 @@ import numpy as np import scipy.stats +NOISE_LEN = 200 class QCDict: @@ -309,10 +310,10 @@ def qc2(self, recording, method=3): # Not sure if this is good... snr = scipy.stats.signaltonoise(recording) elif method == 2: - noise = np.std(recording[:200]) + noise = np.std(recording[:NOISE_LEN]) snr = (np.max(recording) - np.min(recording) - 2 * noise) / noise elif method == 3: - noise = np.std(recording[:200]) + noise = np.std(recording[:NOISE_LEN]) snr = (np.std(recording) / noise) ** 2 if snr < self.snrc or not np.isfinite(snr): @@ -328,15 +329,15 @@ def qc3(self, recording1, recording2, method=3): if method == 1: rmsdc = 2 # A/F * F elif method == 2: - noise_1 = np.std(recording1[:200]) + noise_1 = np.std(recording1[:NOISE_LEN]) peak_1 = (np.max(recording1) - noise_1) - noise_2 = np.std(recording2[:200]) + noise_2 = np.std(recording2[:NOISE_LEN]) peak_2 = (np.max(recording2) - noise_2) rmsdc = max(np.mean([peak_1, peak_2]) * 0.1, np.mean([noise_1, noise_2]) * 5) elif method == 3: - noise_1 = np.std(recording1[:200]) - noise_2 = np.std(recording2[:200]) + noise_1 = np.std(recording1[:NOISE_LEN]) + noise_2 = np.std(recording2[:NOISE_LEN]) rmsd0_1 = np.sqrt(np.mean((recording1) ** 2)) rmsd0_2 = np.sqrt(np.mean((recording2) ** 2)) rmsdc = max(np.mean([rmsd0_1, rmsd0_2]) * self.rmsd0c, @@ -348,7 +349,7 @@ def qc3(self, recording1, recording2, method=3): else: result = True - return (result, rmsd) + return (result, rmsdc - rmsd) def qc4(self, rseals, cms, rseriess): # Check R_seal, C_m, R_series stability @@ -417,7 +418,7 @@ def qc5(self, recording1, recording2, win=None, label=''): else: result = True - return (result, max_diff) + return (result, max_diffc - max_diff) def qc5_1(self, recording1, recording2, win=None, label=''): # Check RMSD_0 drops after E-4031 application @@ -450,7 +451,7 @@ def qc5_1(self, recording1, recording2, win=None, label=''): else: result = True - return (result, rmsd0_diff) + return (result, rmsd0_diffc - rmsd0_diff) def qc6(self, recording1, win=None, label=''): # Check subtracted staircase +40mV step up is non negative @@ -458,7 +459,6 @@ def qc6(self, recording1, win=None, label=''): i, f = win else: i, f = 0, -1 - val = np.mean(recording1[i:f]) if self.plot_dir and self._debug: if win is not None: @@ -467,8 +467,10 @@ def qc6(self, recording1, win=None, label=''): plt.savefig(os.path.join(self.plot_dir, f"qc6_{label}")) plt.clf() + val = np.mean(recording1[i:f]) + # valc = -0.005 * np.abs(np.sqrt(np.mean((recording1) ** 2))) # or just 0 - valc = self.negative_tolc * np.std(recording1[:200]) + valc = self.negative_tolc * np.std(recording1[:NOISE_LEN]) if (val < valc) or not (np.isfinite(val) and np.isfinite(valc)): self.logger.debug(f"qc6_{label} val: {val}, valc: {valc}") @@ -476,7 +478,7 @@ def qc6(self, recording1, win=None, label=''): else: result = True - return (result, val) + return (result, valc - val) def filter_capacitive_spikes(self, current, times, voltage_step_times): """ diff --git a/tests/test_herg_qc.py b/tests/test_herg_qc.py index 4561257..aed37d5 100644 --- a/tests/test_herg_qc.py +++ b/tests/test_herg_qc.py @@ -6,7 +6,7 @@ import numpy as np from syncropatch_export.trace import Trace -from pcpostprocess.hergQC import hERGQC +from pcpostprocess.hergQC import hERGQC, NOISE_LEN class TestHergQC(unittest.TestCase): @@ -54,15 +54,14 @@ def passed(result): # qc1 checks that rseal, cm, rseries are within range rseal_lo, rseal_hi = 1e8, 1e12 rseal_mid = (rseal_lo + rseal_hi) / 2 + rseal_tol = 0.1 cm_lo, cm_hi = 1e-12, 1e-10 cm_mid = (cm_lo + cm_hi) / 2 + cm_tol = 1e-13 rseries_lo, rseries_hi = 1e6, 2.5e7 rseries_mid = (rseries_lo + rseries_hi) / 2 - - rseal_tol = 0.1 - cm_tol = 1e-13 rseries_tol = 0.1 test_matrix = [ @@ -105,14 +104,17 @@ def test_qc2(self): # qc2 checks that raw and subtracted SNR are above a minimum threshold test_matrix = [ - (10, True), # snr = 70.75 - (6.03125, True), # snr = 25.02 - (6.015625, False), # snr = 24.88 - (1, False), # snr = 1.0 + (40, True), # snr = 130286 + (10, True), # snr = 8082 + (1, True), # snr = 74 + (0.601, True), # snr = 25.07 + (0.6, False), # snr = 24.98 + (0.5, False), # snr = 17 + (0.1, False), # snr = 0.5 ] for i, expected in test_matrix: - recording = np.asarray([0, 1] * 500 + [0, i] * 500) # snr = 70.75 + recording = np.asarray([0, 0.1] * (NOISE_LEN//2) + [i] * 500) result = hergqc.qc2(recording) self.assertEqual(result[0], expected, f"({i}: {result[1]})") @@ -128,20 +130,37 @@ def test_qc3(self): voltage=self.voltage) # qc3 checks that rmsd of two sweeps are similar + + # Test with same noise, different signal test_matrix = [ - (0, True), - (1, True), - (2, True), - (3, True), - (4, False), - (5, False), - (6, False), + (-10, False), + (-9, False), + (-8, False), # rmsdc - rmsd = -0.6761186497920804 + (-7, True), # rmsdc - rmsd = 0.25355095037585507 + (0, True), # rmsdc - rmsd = 6.761238263598085 + (8, True), # rmsdc - rmsd = 0.6761272774054383 + (9, False), # rmsdc - rmsd = -0.08451158778363332 + (10, False), ] + recording1 = np.asarray([0, 0.1] * (NOISE_LEN//2) + [40] * 500) for i, expected in test_matrix: - recording0 = np.asarray([0, 1] * 1000) - recording1 = recording0 + i - result = hergqc.qc3(recording0, recording1) + recording2 = np.asarray([0, 0.1] * (NOISE_LEN//2) + [40 + i] * 500) + result = hergqc.qc3(recording1, recording2) + self.assertEqual(result[0], expected, f"({i}: {result[1]})") + + # Test with same signal, different noise + # TODO: Find failing example + test_matrix = [ + (10, True), + (100, True), + (1000, True), + ] + + recording1 = np.asarray([0, 0.1] * (NOISE_LEN//2) + [40] * 500) + for i, expected in test_matrix: + recording2 = np.asarray([0, 0.1 * i] * (NOISE_LEN//2) + [40] * 500) + result = hergqc.qc3(recording1, recording2) self.assertEqual(result[0], expected, f"({i}: {result[1]})") # TODO: Test on select data @@ -159,19 +178,85 @@ def passed(result): ) # qc4 checks that rseal, cm, rseries are similar before/after E-4031 change + r_lo, r_hi = 1e6, 3e7 + c_lo, c_hi = 1e-12, 1e-10 + + # Test rseals + cms = [c_lo, c_lo] + rseriess = [r_lo, r_lo] + + test_matrix = [ + (1.1, True), + (3.0, True), + (3.5, False), + (5.0, False), + ] + + for i, expected in test_matrix: + rseals = [r_lo, i * r_lo] + self.assertEqual( + passed(hergqc.qc4(rseals, cms, rseriess)), + expected, + f"({i}: {rseals}, {cms}, {rseriess})", + ) + + rseals = [r_hi, i * r_hi] + self.assertEqual( + passed(hergqc.qc4(rseals, cms, rseriess)), + expected, + f"({i}: {rseals}, {cms}, {rseriess})", + ) + + # Test cms + rseals = [r_lo, r_lo] + rseriess = [r_lo, r_lo] + test_matrix = [ - [([1, 1], [1, 1], [1, 1]), True], - [([1, 2], [1, 2], [1, 2]), True], - [([1, 3], [1, 3], [1, 3]), True], - [([1, 4], [1, 4], [1, 4]), False], - [([1, 5], [1, 5], [1, 5]), False], + (1.1, True), + (3.0, True), + (3.5, False), + (5.0, False), ] - for (rseals, cms, rseriess), expected in test_matrix: + for i, expected in test_matrix: + cms = [c_lo, i * c_lo] self.assertEqual( passed(hergqc.qc4(rseals, cms, rseriess)), expected, - f"({rseals}, {cms}, {rseriess})", + f"({i}: {rseals}, {cms}, {rseriess})", + ) + + cms = [c_hi, i * c_hi] + self.assertEqual( + passed(hergqc.qc4(rseals, cms, rseriess)), + expected, + f"({i}: {rseals}, {cms}, {rseriess})", + ) + + # Test rseriess + cms = [c_lo, c_lo] + rseals = [r_lo, r_lo] + + test_matrix = [ + (1.1, True), + (3.0, True), + (3.5, False), + (5.0, False), + ] + + for i, expected in test_matrix: + rseriess = [r_lo, i * r_lo] + self.assertEqual( + passed(hergqc.qc4(rseals, cms, rseriess)), + expected, + f"({i}: {rseals}, {cms}, {rseriess})", + ) + + rseriess = [r_hi, i * r_hi] + self.assertEqual( + passed(hergqc.qc4(rseals, cms, rseriess)), + expected, + f"({i}: {rseals}, {cms}, {rseriess})", ) # TODO: Test on select data @@ -185,30 +270,28 @@ def test_qc5(self): sampling_rate=self.sampling_rate, plot_dir=plot_dir, voltage=self.voltage ) - # qc5 checks that the maximum current during the second half of the + # qc5 checks that the maximum current during the second half of the # staircase changes by at least 75% of the raw trace after E-4031 addition test_matrix = [ - (-2.0, True), (-1.0, True), - (-0.75, True), - (-0.5, False), - (0.0, False), + (0.1, True), + (0.2, True), # max_diffc - max_diff = -0.5 + (0.25, True), # max_diffc - max_diff = 0.0 + (0.3, False), # max_diffc - max_diff = 0.5 (0.5, False), - (0.75, False), (1.0, False), - (2.0, False), ] + recording1 = np.asarray([0, 0.1] * (NOISE_LEN // 2) + [10] * 500) for i, expected in test_matrix: - recording0 = np.asarray([0, 1] * 1000) - recording1 = recording0 + i - result = hergqc.qc5(recording0, recording1) + recording2 = np.asarray([0, 0.1] * (NOISE_LEN // 2) + [10 * i] * 500) + result = hergqc.qc5(recording1, recording2) self.assertEqual(result[0], expected, f"({i}: {result[1]})") # TODO: Test on select data def test_qc5_1(self): - plot_dir = os.path.join(self.output_dir, "test_qc5") + plot_dir = os.path.join(self.output_dir, "test_qc5_1") if not os.path.exists(plot_dir): os.makedirs(plot_dir) @@ -216,34 +299,54 @@ def test_qc5_1(self): sampling_rate=self.sampling_rate, plot_dir=plot_dir, voltage=self.voltage ) - # qc5_1 checks that the RMSD to zero of staircase protocol changes + # qc5_1 checks that the RMSD to zero of staircase protocol changes # by at least 50% of the raw trace after E-4031 addition. test_matrix = [ - (0.1, True), - (0.2, True), - (0.3, True), - (0.4, True), - (0.49, True), - (0.5, True), - (0.51, False), - (0.6, False), - (0.7, False), - (0.8, False), - (0.9, False), - (1.0, False), + (-1.0, False), # rmsd0_diffc - rmsd0_diff = 4.2 + (-0.5, False), # rmsd0_diffc - rmsd0_diff = 0.0001 + (-0.4, True), # rmsd0_diffc - rmsd0_diff = -0.8 + (-0.1, True), # rmsd0_diffc - rmsd0_diff = -3.4 + (0.1, True), # rmsd0_diffc - rmsd0_diff = -3.4 + (0.4, True), # rmsd0_diffc - rmsd0_diff = -0.8 + (0.5, False), # rmsd0_diffc - rmsd0_diff = 0.0001 + (1.0, False), # rmsd0_diffc - rmsd0_diff = 4.2 ] + recording1 = np.asarray([0, 0.1] * (NOISE_LEN // 2) + [10] * 500) for i, expected in test_matrix: - recording0 = np.asarray([0, 1] * 1000) - recording1 = recording0 * i - result = hergqc.qc5_1(recording0, recording1) + recording2 = np.asarray([0, 0.1] * (NOISE_LEN // 2) + [10 * i] * 500) + result = hergqc.qc5_1(recording1, recording2) self.assertEqual(result[0], expected, f"({i}: {result[1]})") # TODO: Test on select data def test_qc6(self): + plot_dir = os.path.join(self.output_dir, "test_qc2") + if not os.path.exists(plot_dir): + os.makedirs(plot_dir) + + hergqc = hERGQC( + sampling_rate=self.sampling_rate, plot_dir=plot_dir, voltage=self.voltage + ) + + # qc6 checks that the first step up to +40 mV, before the staircase, in + # the subtracted trace is bigger than -2 x estimated noise level. + test_matrix = [ + (-1, False), # valc - val = 9.9 + (-0.1, False), # valc - val = 0.9 + (-0.02, False), # valc - val = 0.1 + (-0.01, True), # valc - val = -1.38 + (-0.001, True), # valc - val = -0.09 + (0.001, True), # valc - val = -0.11 + (0.1, True), # valc - val = -1.1 + (1, True), # valc - val = -10.1 + ] + for i, expected in test_matrix: + recording = np.asarray([0, 0.1] * (NOISE_LEN // 2) + [10 * i] * 500) + result = hergqc.qc6(recording, win=[NOISE_LEN, -1]) + self.assertEqual(result[0], expected, f"({i}: {result[1]})") + # TODO: Test on select data - pass def test_run_qc(self): self.assertTrue(np.all(np.isfinite(self.voltage)))