forked from thepaul/cassandra-dtest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
user_functions_test.py
282 lines (219 loc) · 13.6 KB
/
user_functions_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
import math
import time
from distutils.version import LooseVersion
from cassandra import FunctionFailure
from dtest import CASSANDRA_VERSION_FROM_BUILD, Tester, debug, create_ks
from tools.assertions import assert_invalid, assert_none, assert_one
from tools.decorators import since
from tools.misc import ImmutableMapping
@since('2.2')
class TestUserFunctions(Tester):
if CASSANDRA_VERSION_FROM_BUILD >= '3.0':
cluster_options = ImmutableMapping({'enable_user_defined_functions': 'true',
'enable_scripted_user_defined_functions': 'true'})
else:
cluster_options = ImmutableMapping({'enable_user_defined_functions': 'true'})
def prepare(self, create_keyspace=True, nodes=1, rf=1):
cluster = self.cluster
cluster.populate(nodes).start()
node1 = cluster.nodelist()[0]
time.sleep(0.2)
session = self.patient_cql_connection(node1)
if create_keyspace:
create_ks(session, 'ks', rf)
return session
def test_migration(self):
""" Test migration of user functions """
cluster = self.cluster
# Uses 3 nodes just to make sure function mutations are correctly serialized
cluster.populate(3).start()
node1 = cluster.nodelist()[0]
node2 = cluster.nodelist()[1]
node3 = cluster.nodelist()[2]
time.sleep(0.2)
# The latter three sessions use a whitelist policy, and then don't wait for schema agreement
# So we create `schema_wait_session` to use for schema agreement blocking, and DDL changes
schema_wait_session = self.patient_cql_connection(node1)
create_ks(schema_wait_session, 'ks', 1)
schema_wait_session.cluster.control_connection.wait_for_schema_agreement()
node1_session = self.patient_exclusive_cql_connection(node1, keyspace='ks')
node2_session = self.patient_exclusive_cql_connection(node2, keyspace='ks')
node3_session = self.patient_exclusive_cql_connection(node3, keyspace='ks')
schema_wait_session.execute("""
CREATE TABLE udf_kv (
key int primary key,
value double
);
""")
schema_wait_session.cluster.control_connection.wait_for_schema_agreement()
node1_session.execute("INSERT INTO udf_kv (key, value) VALUES ({}, {})".format(1, 1))
node1_session.execute("INSERT INTO udf_kv (key, value) VALUES ({}, {})".format(2, 2))
node1_session.execute("INSERT INTO udf_kv (key, value) VALUES ({}, {})".format(3, 3))
schema_wait_session.execute("""
create or replace function x_sin ( input double ) called on null input
returns double language java as 'if (input==null) return null;
return Double.valueOf(Math.sin(input.doubleValue()));'
""")
schema_wait_session.execute("""
create or replace function x_cos ( input double ) called on null input
returns double language java as 'if (input==null) return null;
return Double.valueOf(Math.cos(input.doubleValue()));'
""")
schema_wait_session.execute("""
create or replace function x_tan ( input double ) called on null input
returns double language java as 'if (input==null) return null;
return Double.valueOf(Math.tan(input.doubleValue()));'
""")
schema_wait_session.cluster.control_connection.wait_for_schema_agreement()
assert_one(node1_session,
"SELECT key, value, x_sin(value), x_cos(value), x_tan(value) FROM ks.udf_kv where key = %d" % 1,
[1, 1.0, 0.8414709848078965, 0.5403023058681398, 1.5574077246549023])
assert_one(node2_session,
"SELECT key, value, x_sin(value), x_cos(value), x_tan(value) FROM ks.udf_kv where key = %d" % 2,
[2, 2.0, math.sin(2.0), math.cos(2.0), math.tan(2.0)])
assert_one(node3_session,
"SELECT key, value, x_sin(value), x_cos(value), x_tan(value) FROM ks.udf_kv where key = %d" % 3,
[3, 3.0, math.sin(3.0), math.cos(3.0), math.tan(3.0)])
session4 = self.patient_cql_connection(node1)
# check that functions are correctly confined to namespaces
assert_invalid(session4,
"SELECT key, value, sin(value), cos(value), tan(value) FROM ks.udf_kv where key = 4",
"Unknown function 'sin'")
# try giving existing function bad input, should error
assert_invalid(node1_session,
"SELECT key, value, x_sin(key), foo_cos(KEYy), foo_tan(key) FROM ks.udf_kv where key = 1",
"Type error: key cannot be passed as argument 0 of function ks.x_sin of type double")
node2_session.execute("drop function x_sin")
node3_session.execute("drop function x_cos")
node1_session.execute("drop function x_tan")
schema_wait_session.cluster.control_connection.wait_for_schema_agreement()
assert_invalid(node1_session, "SELECT key, value, sin(value), cos(value), tan(value) FROM udf_kv where key = 1")
assert_invalid(node2_session, "SELECT key, value, sin(value), cos(value), tan(value) FROM udf_kv where key = 1")
assert_invalid(node3_session, "SELECT key, value, sin(value), cos(value), tan(value) FROM udf_kv where key = 1")
# try creating function returning the wrong type, should error
assert_invalid(node1_session,
"CREATE FUNCTION bad_sin ( input double ) CALLED ON NULL INPUT RETURNS uuid LANGUAGE java AS 'return Math.sin(input);';",
"Type mismatch: cannot convert from double to UUID")
def udf_overload_test(self):
session = self.prepare(nodes=3)
session.execute("CREATE TABLE tab (k text PRIMARY KEY, v int)")
session.execute("INSERT INTO tab (k, v) VALUES ('foo' , 1);")
# create overloaded udfs
session.execute("CREATE FUNCTION overloaded(v varchar) called on null input RETURNS text LANGUAGE java AS 'return \"f1\";'")
session.execute("CREATE OR REPLACE FUNCTION overloaded(i int) called on null input RETURNS text LANGUAGE java AS 'return \"f2\";'")
session.execute("CREATE OR REPLACE FUNCTION overloaded(v1 text, v2 text) called on null input RETURNS text LANGUAGE java AS 'return \"f3\";'")
session.execute("CREATE OR REPLACE FUNCTION overloaded(v ascii) called on null input RETURNS text LANGUAGE java AS 'return \"f1\";'")
# ensure that works with correct specificity
assert_invalid(session, "SELECT v FROM tab WHERE k = overloaded('foo')")
assert_none(session, "SELECT v FROM tab WHERE k = overloaded((text) 'foo')")
assert_none(session, "SELECT v FROM tab WHERE k = overloaded((ascii) 'foo')")
assert_none(session, "SELECT v FROM tab WHERE k = overloaded((varchar) 'foo')")
# try non-existent functions
assert_invalid(session, "DROP FUNCTION overloaded(boolean)")
assert_invalid(session, "DROP FUNCTION overloaded(bigint)")
# try dropping overloaded - should fail because ambiguous
assert_invalid(session, "DROP FUNCTION overloaded")
session.execute("DROP FUNCTION overloaded(varchar)")
assert_invalid(session, "SELECT v FROM tab WHERE k = overloaded((text)'foo')")
session.execute("DROP FUNCTION overloaded(text, text)")
assert_invalid(session, "SELECT v FROM tab WHERE k = overloaded((text)'foo',(text)'bar')")
session.execute("DROP FUNCTION overloaded(ascii)")
assert_invalid(session, "SELECT v FROM tab WHERE k = overloaded((ascii)'foo')")
# should now work - unambiguous
session.execute("DROP FUNCTION overloaded")
def udf_scripting_test(self):
session = self.prepare()
session.execute("create table nums (key int primary key, val double);")
for x in range(1, 4):
session.execute("INSERT INTO nums (key, val) VALUES (%d, %d)" % (x, float(x)))
session.execute("CREATE FUNCTION x_sin(val double) called on null input returns double language javascript as 'Math.sin(val)'")
assert_one(session, "SELECT key, val, x_sin(val) FROM nums where key = %d" % 1, [1, 1.0, math.sin(1.0)])
assert_one(session, "SELECT key, val, x_sin(val) FROM nums where key = %d" % 2, [2, 2.0, math.sin(2.0)])
assert_one(session, "SELECT key, val, x_sin(val) FROM nums where key = %d" % 3, [3, 3.0, math.sin(3.0)])
session.execute("create function y_sin(val double) called on null input returns double language javascript as 'Math.sin(val).toString()'")
assert_invalid(session, "select y_sin(val) from nums where key = 1", expected=FunctionFailure)
assert_invalid(session, "create function compilefail(key int) called on null input returns double language javascript as 'foo bar';")
session.execute("create function plustwo(key int) called on null input returns double language javascript as 'key+2'")
assert_one(session, "select plustwo(key) from nums where key = 3", [5])
def default_aggregate_test(self):
session = self.prepare()
session.execute("create table nums (key int primary key, val double);")
for x in range(1, 10):
session.execute("INSERT INTO nums (key, val) VALUES (%d, %d)" % (x, float(x)))
assert_one(session, "SELECT min(key) FROM nums", [1])
assert_one(session, "SELECT max(val) FROM nums", [9.0])
assert_one(session, "SELECT sum(key) FROM nums", [45])
assert_one(session, "SELECT avg(val) FROM nums", [5.0])
assert_one(session, "SELECT count(*) FROM nums", [9])
def aggregate_udf_test(self):
session = self.prepare()
session.execute("create table nums (key int primary key, val int);")
for x in range(1, 4):
session.execute("INSERT INTO nums (key, val) VALUES (%d, %d)" % (x, x))
session.execute("create function plus(key int, val int) called on null input returns int language java as 'return Integer.valueOf(key.intValue() + val.intValue());'")
session.execute("create function stri(key int) called on null input returns text language java as 'return key.toString();'")
session.execute("create aggregate suma (int) sfunc plus stype int finalfunc stri initcond 10")
assert_one(session, "select suma(val) from nums", ["16"])
session.execute("create function test(a int, b double) called on null input returns int language javascript as 'a + b;'")
session.execute("create aggregate aggy(double) sfunc test stype int")
assert_invalid(session, "create aggregate aggtwo(int) sfunc aggy stype int")
assert_invalid(session, "create aggregate aggthree(int) sfunc test stype int finalfunc aggtwo")
def udf_with_udt_test(self):
"""
Test UDFs that operate on non-frozen UDTs.
@jira_ticket CASSANDRA-7423
@since 3.6
"""
session = self.prepare()
session.execute("create type test (a text, b int);")
session.execute("create function funk(udt test) called on null input returns int language java as 'return Integer.valueOf(udt.getInt(\"b\"));';")
if self.cluster.version() >= LooseVersion('3.6'):
frozen_vals = (False, True)
else:
frozen_vals = (True,)
for frozen in frozen_vals:
debug("Using {} UDTs".format("frozen" if frozen else "non-frozen"))
table_name = "tab_frozen" if frozen else "tab"
column_type = "frozen<test>" if frozen else "test"
session.execute("create table {} (key int primary key, udt {});".format(table_name, column_type))
session.execute("insert into %s (key, udt) values (1, {a: 'un', b:1});" % (table_name,))
session.execute("insert into %s (key, udt) values (2, {a: 'deux', b:2});" % (table_name,))
session.execute("insert into %s (key, udt) values (3, {a: 'trois', b:3});" % (table_name,))
assert_one(session, "select sum(funk(udt)) from {}".format(table_name), [6])
assert_invalid(session, "drop type test;")
@since('2.2')
def udf_with_udt_keyspace_isolation_test(self):
"""
Ensure functions dont allow a UDT from another keyspace
@jira_ticket CASSANDRA-9409
@since 2.2
"""
session = self.prepare()
session.execute("create type udt (a text, b int);")
create_ks(session, 'user_ks', 1)
# ensure we cannot use a udt from another keyspace as function argument
assert_invalid(
session,
"CREATE FUNCTION overloaded(v ks.udt) called on null input RETURNS text LANGUAGE java AS 'return \"f1\";'",
"Statement on keyspace user_ks cannot refer to a user type in keyspace ks"
)
# ensure we cannot use a udt from another keyspace as return value
assert_invalid(
session,
("CREATE FUNCTION test(v text) called on null input RETURNS ks.udt "
"LANGUAGE java AS 'return null;';"),
"Statement on keyspace user_ks cannot refer to a user type in keyspace ks"
)
def aggregate_with_udt_keyspace_isolation_test(self):
"""
Ensure aggregates dont allow a UDT from another keyspace
@jira_ticket CASSANDRA-9409
"""
session = self.prepare()
session.execute("create type udt (a int);")
create_ks(session, 'user_ks', 1)
assert_invalid(
session,
"create aggregate suma (ks.udt) sfunc plus stype int finalfunc stri initcond 10",
"Statement on keyspace user_ks cannot refer to a user type in keyspace ks"
)