forked from thepaul/cassandra-dtest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
read_repair_test.py
206 lines (168 loc) · 8.96 KB
/
read_repair_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import time
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
from tools.assertions import assert_one
from dtest import PRINT_DEBUG, Tester, debug, create_ks
from tools.data import rows_to_list
from tools.decorators import since
class TestReadRepair(Tester):
def setUp(self):
Tester.setUp(self)
self.cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
self.cluster.populate(3).start(wait_for_binary_proto=True)
@since('3.0')
def alter_rf_and_run_read_repair_test(self):
"""
@jira_ticket CASSANDRA-10655
@jira_ticket CASSANDRA-10657
Test that querying only a subset of all the columns in a row doesn't confuse read-repair to avoid
the problem described in CASSANDRA-10655.
"""
session = self.patient_cql_connection(self.cluster.nodelist()[0])
session.execute("""CREATE KEYSPACE alter_rf_test
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};""")
session.execute("CREATE TABLE alter_rf_test.t1 (k int PRIMARY KEY, a int, b int);")
session.execute("INSERT INTO alter_rf_test.t1 (k, a, b) VALUES (1, 1, 1);")
cl_one_stmt = SimpleStatement("SELECT * FROM alter_rf_test.t1 WHERE k=1",
consistency_level=ConsistencyLevel.ONE)
# identify the initial replica and trigger a flush to ensure reads come from sstables
initial_replica, non_replicas = self.identify_initial_placement('alter_rf_test', 't1', 1)
debug("At RF=1 replica for data is " + initial_replica.name)
initial_replica.flush()
# At RF=1, it shouldn't matter which node we query, as the actual data should always come from the
# initial replica when reading at CL ONE
for n in self.cluster.nodelist():
debug("Checking " + n.name)
session = self.patient_exclusive_cql_connection(n)
assert_one(session, "SELECT * FROM alter_rf_test.t1 WHERE k=1", [1, 1, 1], cl=ConsistencyLevel.ONE)
# Alter so RF=n but don't repair, then execute a query which selects only a subset of the columns. Run this at
# CL ALL on one of the nodes which doesn't currently have the data, triggering a read repair.
# The expectation will be that every replicas will have been repaired for that column (but we make no assumptions
# on the other columns).
debug("Changing RF from 1 to 3")
session.execute("""ALTER KEYSPACE alter_rf_test
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};""")
cl_all_stmt = SimpleStatement("SELECT a FROM alter_rf_test.t1 WHERE k=1",
consistency_level=ConsistencyLevel.ALL)
debug("Executing SELECT on non-initial replica to trigger read repair " + non_replicas[0].name)
read_repair_session = self.patient_exclusive_cql_connection(non_replicas[0])
# result of the CL ALL query contains only the selected column
assert_one(read_repair_session, "SELECT a FROM alter_rf_test.t1 WHERE k=1", [1], cl=ConsistencyLevel.ALL)
# Check the results of the read repair by querying each replica again at CL ONE
debug("Re-running SELECTs at CL ONE to verify read repair")
for n in self.cluster.nodelist():
debug("Checking " + n.name)
session = self.patient_exclusive_cql_connection(n)
res = rows_to_list(session.execute(cl_one_stmt))
# Column a must be 1 everywhere, and column b must be either 1 or None everywhere
self.assertIn(res[0][:2], [[1, 1], [1, None]])
# Now query at ALL but selecting all columns
query = "SELECT * FROM alter_rf_test.t1 WHERE k=1"
debug("Executing SELECT on non-initial replica to trigger read repair " + non_replicas[0].name)
read_repair_session = self.patient_exclusive_cql_connection(non_replicas[0])
assert_one(session, query, [1, 1, 1], cl=ConsistencyLevel.ALL)
# Check all replica is fully up to date
debug("Re-running SELECTs at CL ONE to verify read repair")
for n in self.cluster.nodelist():
debug("Checking " + n.name)
session = self.patient_exclusive_cql_connection(n)
assert_one(session, query, [1, 1, 1], cl=ConsistencyLevel.ONE)
def identify_initial_placement(self, keyspace, table, key):
nodes = self.cluster.nodelist()
out, _, _ = nodes[0].nodetool("getendpoints alter_rf_test t1 1")
address = out.split('\n')[-2]
initial_replica = None
non_replicas = []
for node in nodes:
if node.address() == address:
initial_replica = node
else:
non_replicas.append(node)
self.assertIsNotNone(initial_replica, "Couldn't identify initial replica")
return initial_replica, non_replicas
@since('2.0')
def range_slice_query_with_tombstones_test(self):
"""
@jira_ticket CASSANDRA-8989
@jira_ticket CASSANDRA-9502
Range-slice queries with CL>ONE do unnecessary read-repairs.
Reading from table which contains collection type using token function and with CL > ONE causes overwhelming writes to replicas.
It's possible to check the behavior with tracing - pattern matching in system_traces.events.activity
"""
node1 = self.cluster.nodelist()[0]
session1 = self.patient_exclusive_cql_connection(node1)
session1.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': 2}")
session1.execute("""
CREATE TABLE ks.cf (
key int primary key,
value double,
txt text
);
""")
for n in range(1, 2500):
str = "foo bar %d iuhiu iuhiu ihi" % n
session1.execute("INSERT INTO ks.cf (key, value, txt) VALUES (%d, %d, '%s')" % (n, n, str))
self.cluster.flush()
self.cluster.stop()
self.cluster.start(wait_for_binary_proto=True)
session1 = self.patient_exclusive_cql_connection(node1)
for n in range(1, 1000):
session1.execute("DELETE FROM ks.cf WHERE key = %d" % (n))
time.sleep(1)
node1.flush()
time.sleep(1)
query = SimpleStatement("SELECT * FROM ks.cf LIMIT 100", consistency_level=ConsistencyLevel.LOCAL_QUORUM)
future = session1.execute_async(query, trace=True)
future.result()
trace = future.get_query_trace(max_wait=120)
self.pprint_trace(trace)
for trace_event in trace.events:
# Step 1, find coordinator node:
activity = trace_event.description
self.assertNotIn("Appending to commitlog", activity)
self.assertNotIn("Adding to cf memtable", activity)
self.assertNotIn("Acquiring switchLock read lock", activity)
@since('3.0')
def test_gcable_tombstone_resurrection_on_range_slice_query(self):
"""
@jira_ticket CASSANDRA-11427
Range queries before the 11427 will trigger read repairs for puregable tombstones on hosts that already compacted given tombstones.
This will result in constant transfer and compaction actions sourced by few nodes seeding purgeable tombstones and triggered e.g.
by periodical jobs scanning data range wise.
"""
node1, node2, _ = self.cluster.nodelist()
session1 = self.patient_cql_connection(node1)
create_ks(session1, 'gcts', 3)
query = """
CREATE TABLE gcts.cf1 (
key text,
c1 text,
PRIMARY KEY (key, c1)
)
WITH gc_grace_seconds=0
AND compaction = {'class': 'SizeTieredCompactionStrategy', 'enabled': 'false'};
"""
session1.execute(query)
# create row tombstone
delete_stmt = SimpleStatement("DELETE FROM gcts.cf1 WHERE key = 'a'", consistency_level=ConsistencyLevel.ALL)
session1.execute(delete_stmt)
# flush single sstable with tombstone
node1.flush()
node2.flush()
# purge tombstones from node2 (gc grace 0)
node2.compact()
# execute range slice query, which should not trigger read-repair for purged TS
future = session1.execute_async(SimpleStatement("SELECT * FROM gcts.cf1", consistency_level=ConsistencyLevel.ALL), trace=True)
future.result()
trace = future.get_query_trace(max_wait=120)
self.pprint_trace(trace)
for trace_event in trace.events:
activity = trace_event.description
self.assertNotIn("Sending READ_REPAIR message", activity)
def pprint_trace(self, trace):
"""Pretty print a trace"""
if PRINT_DEBUG:
print("-" * 40)
for t in trace.events:
print("%s\t%s\t%s\t%s" % (t.source, t.source_elapsed, t.description, t.thread_name))
print("-" * 40)