forked from avocado-framework/avocado-misc-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rcutorture.py
137 lines (115 loc) · 4.48 KB
/
rcutorture.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: 2016 IBM
# Author:Abdul Haleem <[email protected]>
# Praveen K Pandey <[email protected]>
#
import os
import re
import time
import multiprocessing
from avocado import Test
from avocado import main
from avocado.utils import process
from avocado.utils import linux_modules
class Rcutorture(Test):
"""
CONFIG_RCU_TORTURE_TEST enables an intense torture test of the RCU
infratructure. It creates an rcutorture kernel module that can be
loaded to run a torture test.
"""
def setUp(self):
"""
Verifies if CONFIG_RCU_TORTURE_TEST is enabled
"""
self.results = []
self.log.info("Check if CONFIG_RCU_TORTURE_TEST is enabled\n")
ret = linux_modules.check_kernel_config('CONFIG_RCU_TORTURE_TEST')
if ret == 0:
self.fail("CONFIG_RCU_TORTURE_TEST is not set in .config !!\n")
self.log.info("Check rcutorture module is already loaded\n")
if linux_modules.module_is_loaded('rcutorture'):
linux_modules.unload_module('rcutorture')
def cpus_toggle(self):
"""
Toggle CPUS online and offline
"""
totalcpus = multiprocessing.cpu_count()
full_count = int(totalcpus) - 1
half_count = int(totalcpus) / 2 - 1
shalf_count = int(totalcpus) / 2
fcpu = "0 - " "%s" % half_count
scpu = "%s - %s" % (shalf_count, full_count)
self.log.info("Online all cpus %s", totalcpus)
for cpu in range(0, full_count):
online = 'echo 1 > /sys/devices/system/cpu/cpu%s/online' % cpu
process.system(online)
time.sleep(10)
self.log.info("Offline all cpus 0 - %s\n", full_count)
for cpu in range(0, full_count):
offline = 'echo 0 > /sys/devices/system/cpu/cpu%s/online' % cpu
process.system(offline)
time.sleep(10)
self.log.info("Online all cpus 0 - %s\n", full_count)
for cpu in range(0, full_count):
online = 'echo 1 > /sys/devices/system/cpu/cpu%s/online' % cpu
process.system(online)
self.log.info(
"Offline and online first half cpus %s\n", fcpu)
for cpu in range(0, half_count):
offline = 'echo 0 > /sys/devices/system/cpu/cpu%s/online' % cpu
process.system(offline)
time.sleep(10)
online = 'echo 1 > /sys/devices/system/cpu/cpu%s/online' % cpu
process.system(online)
self.log.info("Offline and online second half cpus %s\n", scpu)
for cpu in range(shalf_count, full_count):
offline = 'echo 0 > /sys/devices/system/cpu/cpu%s/online' % cpu
process.system(offline)
time.sleep(10)
online = 'echo 1 > /sys/devices/system/cpu/cpu%s/online' % cpu
process.system(online)
def test(self):
"""
Runs rcutorture test for specified time.
"""
seconds = 15
os.chdir(self.logdir)
if linux_modules.load_module('rcutorture'):
self.cpus_toggle()
time.sleep(seconds)
self.cpus_toggle()
linux_modules.unload_module('rcutorture')
dmesg = process.system_output('dmesg')
res = re.search(r'rcu-torture: Reader', dmesg, re.M | re.I)
self.results = str(res).splitlines()
"""
Runs log ananlysis on the dmesg logs
Checks for know bugs
"""
pipe1 = [r for r in self.results if "!!! Reader Pipe:" in r]
if len(pipe1) != 0:
self.error('\nBUG: grace-period failure !')
pipe2 = [r for r in self.results if "Reader Pipe" in r]
for p in pipe2:
nmiss = p.split(" ")[7]
if int(nmiss):
self.error('\nBUG: rcutorture tests failed !')
batch = [s for s in self.results if "Reader Batch" in s]
for b in batch:
nmiss = b.split(" ")[7]
if int(nmiss):
self.log.info("\nWarning: near mis failure !!")
if __name__ == "__main__":
main()