Skip to content

Commit 6fdecdb

Browse files
authored
Merge pull request #90 from george0st/change
Unit test extension (test error states also)
2 parents d05b15e + 9cee4be commit 6fdecdb

File tree

4 files changed

+146
-51
lines changed

4 files changed

+146
-51
lines changed

nifi/cql-processor/nifi-cql-processors/src/main/java/org/george0st/processors/cql/PutCQL.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.george0st.processors.cql;
1818

1919
import com.datastax.oss.driver.api.core.CqlSession;
20+
import com.datastax.oss.driver.api.core.servererrors.InvalidQueryException;
2021
import org.apache.nifi.annotation.behavior.*;
2122
import org.apache.nifi.components.AllowableValue;
2223
import org.apache.nifi.components.PropertyDescriptor;
@@ -204,8 +205,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
204205
session.getProvenanceReporter().send(flowFile, clientService.getURI());
205206
session.transfer(flowFile, REL_SUCCESS);
206207
}
207-
} catch (Exception ex) {
208-
getLogger().error("PutCQL, OnTrigger error", ex);
208+
}
209+
catch (InvalidQueryException ex){
210+
getLogger().error("PutCQL, OnTrigger: InvalidQuery error", ex);
211+
session.transfer(flowFile, REL_FAILURE);
212+
}
213+
catch (Exception ex) {
214+
getLogger().error("PutCQL, OnTrigger: Error", ex);
209215
session.transfer(flowFile, REL_FAILURE);
210216
}
211217
}

nifi/cql-processor/nifi-cql-processors/src/test/java/org/george0st/processors/cql/PutCQLBase.java

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -97,19 +97,19 @@ public void Close() throws InterruptedException {
9797
}
9898
}
9999

100-
protected FlowFile runTest(TestSetup setup, String content) {
100+
protected FlowFile runTest(TestSetup setup, String content) throws Exception {
101101
return runTestWithProperty(setup, content, null, null, false);
102102
}
103103

104-
protected FlowFile runTest(TestSetup setup, String content, boolean validate) {
104+
protected FlowFile runTest(TestSetup setup, String content, boolean validate) throws Exception {
105105
return runTestWithProperty(setup, content, null, null, validate);
106106
}
107107

108-
protected FlowFile runTestWithProperty(TestSetup setup, String content, PropertyDescriptor property, String propertyValue){
108+
protected FlowFile runTestWithProperty(TestSetup setup, String content, PropertyDescriptor property, String propertyValue) throws Exception {
109109
return runTestWithProperty(setup, content, property, propertyValue, false);
110110
}
111111

112-
protected FlowFile runTestWithProperty(TestSetup setup, String content, PropertyDescriptor property, String propertyValue, boolean validate){
112+
protected FlowFile runTestWithProperty(TestSetup setup, String content, PropertyDescriptor property, String propertyValue, boolean validate) throws Exception {
113113
HashMap<String, String> attributes = new HashMap<String, String>(Map.of("CQLName",setup.name));
114114
FlowFile result;
115115

@@ -123,58 +123,60 @@ protected FlowFile runTestWithProperty(TestSetup setup, String content, Property
123123
return result;
124124
}
125125

126-
private FlowFile coreTest(TestSetup setup, String content, boolean validate){
126+
private FlowFile coreTest(TestSetup setup, String content, boolean validate) throws Exception {
127127
try {
128128
long finish, start, countWrite, countRead;
129129
FlowFile result;
130130
boolean ok;
131131

132132
start = System.currentTimeMillis();
133133
testRunner.run();
134-
result = testRunner.getFlowFilesForRelationship(PutCQL.REL_SUCCESS).getLast();
135-
ok = testRunner.getFlowFilesForRelationship(PutCQL.REL_FAILURE).isEmpty();
136-
finish = System.currentTimeMillis();
137-
138-
if (ok) {
139-
countWrite = Long.parseLong(result.getAttribute(PutCQL.ATTRIBUTE_COUNT));
140-
System.out.printf("Source: '%s'; WRITE; '%s': %s (%d ms); Items: %d; Perf: %.1f [calls/sec]%s",
141-
result.getAttribute("CQLName"),
142-
"FlowFile",
143-
ReadableValue.fromMillisecond(finish - start),
144-
finish - start,
145-
countWrite,
146-
countWrite / ((finish - start) / 1000.0),
147-
System.lineSeparator());
148-
149-
if (validate) {
150-
// delay (before read for synch on CQL side)
151-
Thread.sleep(3000);
152-
153-
// validate (read value from CSV and from CQL and compare content)
154-
try (CqlSession session=testService.getSession()) {
155-
start = System.currentTimeMillis();
156-
countRead = (new CsvCqlValidate(session, setup, CqlTestSchema.primaryKeys)).executeContent(content);
157-
finish = System.currentTimeMillis();
158-
}
159-
System.out.printf("Source: '%s'; VALIDATE; '%s': %s (%d ms); Items: %d; Perf: %.1f [calls/sec]%s",
134+
if (!testRunner.getFlowFilesForRelationship(PutCQL.REL_SUCCESS).isEmpty()) {
135+
result = testRunner.getFlowFilesForRelationship(PutCQL.REL_SUCCESS).getLast();
136+
ok = testRunner.getFlowFilesForRelationship(PutCQL.REL_FAILURE).isEmpty();
137+
finish = System.currentTimeMillis();
138+
139+
if (ok) {
140+
countWrite = Long.parseLong(result.getAttribute(PutCQL.ATTRIBUTE_COUNT));
141+
System.out.printf("Source: '%s'; WRITE; '%s': %s (%d ms); Items: %d; Perf: %.1f [calls/sec]%s",
160142
result.getAttribute("CQLName"),
161143
"FlowFile",
162144
ReadableValue.fromMillisecond(finish - start),
163145
finish - start,
164-
countRead,
165-
countRead / ((finish - start) / 1000.0),
146+
countWrite,
147+
countWrite / ((finish - start) / 1000.0),
166148
System.lineSeparator());
167149

168-
if (countWrite != countRead)
169-
throw new Exception("The amount of Write and Read operations are different");
150+
if (validate) {
151+
// delay (before read for synch on CQL side)
152+
Thread.sleep(3000);
153+
154+
// validate (read value from CSV and from CQL and compare content)
155+
try (CqlSession session = testService.getSession()) {
156+
start = System.currentTimeMillis();
157+
countRead = (new CsvCqlValidate(session, setup, CqlTestSchema.primaryKeys)).executeContent(content);
158+
finish = System.currentTimeMillis();
159+
}
160+
System.out.printf("Source: '%s'; VALIDATE; '%s': %s (%d ms); Items: %d; Perf: %.1f [calls/sec]%s",
161+
result.getAttribute("CQLName"),
162+
"FlowFile",
163+
ReadableValue.fromMillisecond(finish - start),
164+
finish - start,
165+
countRead,
166+
countRead / ((finish - start) / 1000.0),
167+
System.lineSeparator());
168+
169+
if (countWrite != countRead)
170+
throw new Exception("The amount of Write and Read operations are different");
171+
}
172+
return result;
170173
}
171-
return result;
172174
}
173-
return null;
174175
}
175-
catch(Exception ex) {
176-
return null;
176+
catch (Exception ex) {
177+
throw new Exception("Error in PROCESSOR");
177178
}
179+
return null;
178180
}
179181

180182
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.george0st.processors.cql;
18+
19+
import org.apache.nifi.flowfile.FlowFile;
20+
import org.apache.nifi.reporting.InitializationException;
21+
import org.george0st.processors.cql.helper.TestSetup;
22+
import org.junit.jupiter.api.Test;
23+
24+
import java.io.IOException;
25+
26+
import static org.junit.jupiter.api.Assertions.*;
27+
28+
public class PutCQLErrorFunction extends PutCQLBase {
29+
// Helper
30+
// https://medium.com/@mr.sinchan.banerjee/nifi-custom-processor-series-part-3-junit-test-with-nifi-mock-a935a1a4e3e5
31+
public PutCQLErrorFunction() throws IOException, InterruptedException, InitializationException {
32+
super();
33+
}
34+
35+
@Test
36+
public void testErrorNonExistColumnInCSV() throws Exception {
37+
String content = "aaa\n" +
38+
"0\n" +
39+
"1\n";
40+
FlowFile result;
41+
42+
for (TestSetup setup: setups) {
43+
result = runTest(setup, content);
44+
assertNull(result, String.format("Expected null, besed on simulation error in '%s'", setup.name));
45+
}
46+
}
47+
48+
@Test
49+
public void testErrorMissingPrimaryKeyColumnInCSV() throws Exception {
50+
String content = "colbigint\n" +
51+
"0\n" +
52+
"1\n";
53+
FlowFile result;
54+
55+
for (TestSetup setup: setups) {
56+
result = runTest(setup, content);
57+
assertNull(result, String.format("Expected null, based on simulation error in '%s'", setup.name));
58+
}
59+
}
60+
61+
@Test
62+
public void testErrorInvalidIntTypeValueInCSV() throws Exception {
63+
String content = "colbigint,colint\n" +
64+
"0,Peter\n" +
65+
"1,John\n";
66+
FlowFile result;
67+
68+
for (TestSetup setup: setups) {
69+
result = runTest(setup, content);
70+
assertNull(result, String.format("Expected null, based on simulation error in '%s'", setup.name));
71+
}
72+
}
73+
74+
@Test
75+
public void testErrorInvalidFloatTypeValueInCSV() throws Exception {
76+
String content = "colbigint,colint,colfloat\n" +
77+
"0,1064,Peter\n" +
78+
"1,1709,John\n";
79+
FlowFile result;
80+
81+
for (TestSetup setup: setups) {
82+
result = runTest(setup, content);
83+
assertNull(result, String.format("Expected null, based on simulation error in '%s'", setup.name));
84+
}
85+
}
86+
87+
}

nifi/cql-processor/nifi-cql-processors/src/test/java/org/george0st/processors/cql/PutCQLFunction.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public PutCQLFunction() throws IOException, InterruptedException, Initialization
3434
}
3535

3636
@Test
37-
public void testBasic() {
37+
public void testBasic() throws Exception {
3838
String content = "\"colbigint\",\"colint\",\"coltext\",\"colfloat\",\"coldouble\",\"coldate\",\"coltime\",\"coltimestamp\",\"colboolean\",\"coluuid\",\"colsmallint\",\"coltinyint\",\"coltimeuuid\",\"colvarchar\"\n" +
3939
"\"0\",\"1064\",\"zeVOKGnORq\",\"627.6811\",\"395.8522407512559\",\"1971-11-12\",\"03:37:15\",\"2000-09-25T22:18:45Z\",\"false\",\"6080071f-4dd1-4ea5-b711-9ad0716e242a\",\"8966\",\"55\",\"f45e58f5-c3b7-11ef-8d19-97ae87be7c54\",\"Tzxsw\"\n" +
4040
"\"1\",\"1709\",\"7By0z5QEXh\",\"652.03955\",\"326.9081263857284\",\"2013-12-17\",\"08:43:09\",\"2010-04-27T07:02:27Z\",\"false\",\"7d511666-2f81-41c4-9d5c-a5fa87f7d1c3\",\"24399\",\"38\",\"f45e8006-c3b7-11ef-8d19-172ff8d0d752\",\"exAbN\"\n" +
@@ -51,7 +51,7 @@ public void testBasic() {
5151
}
5252

5353
@Test
54-
public void testBatchLoggedTypes() {
54+
public void testBatchLoggedTypes() throws Exception {
5555
String content = "\"colbigint\",\"colint\",\"coltext\",\"colfloat\",\"coldouble\",\"coldate\",\"coltime\",\"coltimestamp\",\"colboolean\",\"coluuid\",\"colsmallint\",\"coltinyint\",\"coltimeuuid\",\"colvarchar\"\n" +
5656
"\"0\",\"1064\",\"zeVOKGnORq\",\"627.6811\",\"395.8522407512559\",\"1971-11-12\",\"03:37:15\",\"2000-09-25T22:18:45Z\",\"false\",\"6080071f-4dd1-4ea5-b711-9ad0716e242a\",\"8966\",\"55\",\"f45e58f5-c3b7-11ef-8d19-97ae87be7c54\",\"Tzxsw\"\n" +
5757
"\"1\",\"1709\",\"7By0z5QEXh\",\"652.03955\",\"326.9081263857284\",\"2013-12-17\",\"08:43:09\",\"2010-04-27T07:02:27Z\",\"false\",\"7d511666-2f81-41c4-9d5c-a5fa87f7d1c3\",\"24399\",\"38\",\"f45e8006-c3b7-11ef-8d19-172ff8d0d752\",\"exAbN\"\n" +
@@ -68,7 +68,7 @@ public void testBatchLoggedTypes() {
6868
}
6969

7070
@Test
71-
public void testBatchUnLoggedTypes() {
71+
public void testBatchUnLoggedTypes() throws Exception {
7272
String content = "\"colbigint\",\"colint\",\"coltext\",\"colfloat\",\"coldouble\",\"coldate\",\"coltime\",\"coltimestamp\",\"colboolean\",\"coluuid\",\"colsmallint\",\"coltinyint\",\"coltimeuuid\",\"colvarchar\"\n" +
7373
"\"0\",\"1064\",\"zeVOKGnORq\",\"627.6811\",\"395.8522407512559\",\"1971-11-12\",\"03:37:15\",\"2000-09-25T22:18:45Z\",\"false\",\"6080071f-4dd1-4ea5-b711-9ad0716e242a\",\"8966\",\"55\",\"f45e58f5-c3b7-11ef-8d19-97ae87be7c54\",\"Tzxsw\"\n" +
7474
"\"1\",\"1709\",\"7By0z5QEXh\",\"652.03955\",\"326.9081263857284\",\"2013-12-17\",\"08:43:09\",\"2010-04-27T07:02:27Z\",\"false\",\"7d511666-2f81-41c4-9d5c-a5fa87f7d1c3\",\"24399\",\"38\",\"f45e8006-c3b7-11ef-8d19-172ff8d0d752\",\"exAbN\"\n" +
@@ -85,7 +85,7 @@ public void testBatchUnLoggedTypes() {
8585
}
8686

8787
@Test
88-
public void testMoreItems() {
88+
public void testMoreItems() throws Exception {
8989
String content = "\"colbigint\",\"colint\",\"coltext\",\"colfloat\",\"coldouble\",\"coldate\",\"coltime\",\"coltimestamp\",\"colboolean\",\"coluuid\",\"colsmallint\",\"coltinyint\",\"coltimeuuid\",\"colvarchar\"\n" +
9090
"\"0\",\"1064\",\"zeVOKGnORq\",\"627.6811\",\"395.8522407512559\",\"1971-11-12\",\"03:37:15\",\"2000-09-25T22:18:45Z\",\"false\",\"6080071f-4dd1-4ea5-b711-9ad0716e242a\",\"8966\",\"55\",\"f45e58f5-c3b7-11ef-8d19-97ae87be7c54\",\"Tzxsw\"\n" +
9191
"\"1\",\"1709\",\"7By0z5QEXh\",\"652.03955\",\"326.9081263857284\",\"2013-12-17\",\"08:43:09\",\"2010-04-27T07:02:27Z\",\"false\",\"7d511666-2f81-41c4-9d5c-a5fa87f7d1c3\",\"24399\",\"38\",\"f45e8006-c3b7-11ef-8d19-172ff8d0d752\",\"exAbN\"\n" +
@@ -115,7 +115,7 @@ public void testMoreItems() {
115115
}
116116

117117
@Test
118-
public void testNoQuotas() {
118+
public void testNoQuotas() throws Exception {
119119
String content = "colbigint,colint,coltext,colfloat,coldouble,coldate,coltime,coltimestamp,colboolean,coluuid,colsmallint,coltinyint,coltimeuuid,colvarchar\n" +
120120
"10,1064,zeVOKGnORq,627.6811,395.8522407512559,1971-11-12,03:37:15,2000-09-25T22:18:45Z,false,6080071f-4dd1-4ea5-b711-9ad0716e242a,8966,55,f45e58f5-c3b7-11ef-8d19-97ae87be7c54,Tzxsw\n" +
121121
"11,1709,7By0z5QEXh,652.03955,326.9081263857284,2013-12-17,08:43:09,2010-04-27T07:02:27Z,false,7d511666-2f81-41c4-9d5c-a5fa87f7d1c3,24399,38,f45e8006-c3b7-11ef-8d19-172ff8d0d752,exAbN\n" +
@@ -132,7 +132,7 @@ public void testNoQuotas() {
132132
}
133133

134134
@Test
135-
public void testEmptyInput() {
135+
public void testEmptyInput() throws Exception {
136136
String content = "";
137137
FlowFile result;
138138

@@ -145,7 +145,7 @@ public void testEmptyInput() {
145145
}
146146

147147
@Test
148-
public void testOnlyHeader() {
148+
public void testOnlyHeader() throws Exception {
149149
String content = "\"colbigint\",\"colint\",\"coltext\",\"colfloat\",\"coldouble\",\"coldate\",\"coltime\",\"coltimestamp\",\"colboolean\",\"coluuid\",\"colsmallint\",\"coltinyint\",\"coltimeuuid\",\"colvarchar\"\n";
150150
FlowFile result;
151151

@@ -158,7 +158,7 @@ public void testOnlyHeader() {
158158
}
159159

160160
@Test
161-
public void testOnlyHeaderNoQuotas() {
161+
public void testOnlyHeaderNoQuotas() throws Exception {
162162
String content = "colbigint,colint,coltext,colfloat,coldouble,coldate,coltime,coltimestamp,colboolean,coluuid,colsmallint,coltinyint,coltimeuuid,colvarchar\n";
163163
FlowFile result;
164164

@@ -171,7 +171,7 @@ public void testOnlyHeaderNoQuotas() {
171171
}
172172

173173
@Test
174-
public void testBasicRepeat5() {
174+
public void testBasicRepeat5() throws Exception {
175175
String content = "\"colbigint\",\"colint\",\"coltext\",\"colfloat\",\"coldouble\",\"coldate\",\"coltime\",\"coltimestamp\",\"colboolean\",\"coluuid\",\"colsmallint\",\"coltinyint\",\"coltimeuuid\",\"colvarchar\"\n" +
176176
"\"0\",\"1064\",\"zeVOKGnORq\",\"627.6811\",\"395.8522407512559\",\"1971-11-12\",\"03:37:15\",\"2000-09-25T22:18:45Z\",\"false\",\"6080071f-4dd1-4ea5-b711-9ad0716e242a\",\"8966\",\"55\",\"f45e58f5-c3b7-11ef-8d19-97ae87be7c54\",\"Tzxsw\"\n" +
177177
"\"1\",\"1709\",\"7By0z5QEXh\",\"652.03955\",\"326.9081263857284\",\"2013-12-17\",\"08:43:09\",\"2010-04-27T07:02:27Z\",\"false\",\"7d511666-2f81-41c4-9d5c-a5fa87f7d1c3\",\"24399\",\"38\",\"f45e8006-c3b7-11ef-8d19-172ff8d0d752\",\"exAbN\"\n" +
@@ -190,7 +190,7 @@ public void testBasicRepeat5() {
190190
}
191191

192192
@Test
193-
public void testNoQuotas2() {
193+
public void testNoQuotas2() throws Exception {
194194
String content = "colbigint,colint,coltext,colfloat,coldouble,coldate,coltime,coltimestamp,colboolean,coluuid,colsmallint,coltinyint,coltimeuuid,colvarchar\n" +
195195
"0,1064,zeVOKGnORq,627.6811,395.8522407512559,1971-11-12,03:37:15,2000-09-25T22:18:45Z,false,6080071f-4dd1-4ea5-b711-9ad0716e242a,8966,55,f45e58f5-c3b7-11ef-8d19-97ae87be7c54,Tzxswn\n" +
196196
"1,1709,7By0z5QEXh,652.03955,326.9081263857284,2013-12-17,08:43:09,2010-04-27T07:02:27Z,false,7d511666-2f81-41c4-9d5c-a5fa87f7d1c3,24399,38,f45e8006-c3b7-11ef-8d19-172ff8d0d752,exAbNn\n" +

0 commit comments

Comments
 (0)