Skip to content

Commit 1620e01

Browse files
authored
Merge pull request #104 from george0st/change
Extend unit test & Add parallel run for unit tests
2 parents b077993 + c1a4c10 commit 1620e01

File tree

3 files changed

+167
-9
lines changed

3 files changed

+167
-9
lines changed

nifi/cql-processor/nifi-cql-processors/src/test/java/org/george0st/processors/cql/GetCQLBase.java

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,18 @@ protected MockFlowFile runTest(TestSetupRead setup) throws Exception {
7070
return runTestWithProperty(setup, null, null, null, false);
7171
}
7272

73+
protected List<MockFlowFile> runTestParallel(TestSetupRead setup, int parallel) throws Exception {
74+
return runTestParallelWithProperty(setup, null, null, null, false, parallel);
75+
}
76+
7377
protected MockFlowFile runTest(TestSetupRead setup, String content) throws Exception {
7478
return runTestWithProperty(setup, content, null, null, false);
7579
}
7680

81+
protected List<MockFlowFile> runTestParallel(TestSetupRead setup, String content, int parallel) throws Exception {
82+
return runTestParallelWithProperty(setup, content, null, null, false, parallel);
83+
}
84+
7785
protected MockFlowFile runTest(TestSetupRead setup, String content, boolean validate) throws Exception {
7886
return runTestWithProperty(setup, content, null, null, validate);
7987
}
@@ -96,6 +104,63 @@ protected MockFlowFile runTestWithProperty(TestSetupRead setup, String content,
96104
return result;
97105
}
98106

107+
protected List<MockFlowFile> runTestParallelWithProperty(TestSetupRead setup, String content, PropertyDescriptor property, String propertyValue, boolean validate, int parallel) throws Exception {
108+
List<MockFlowFile> results;
109+
110+
if (content!=null)
111+
for (int i=0;i<parallel;i++)
112+
testRunner.enqueue(content);
113+
setup.setProperty(testRunner, testService);
114+
if (property != null)
115+
setup.setProperty(testRunner, property, propertyValue);
116+
testRunner.enableControllerService(testService);
117+
if (parallel>1)
118+
testRunner.setThreadCount(parallel);
119+
results = coreTestParallel(setup, content, validate, parallel);
120+
testRunner.disableControllerService(testService);
121+
return results;
122+
}
123+
124+
private void printResult(MockFlowFile result, TestSetupRead setup, long start, long finish){
125+
long countWrite;
126+
127+
countWrite = Long.parseLong(result.getAttribute(CQLAttributes.READ_COUNT));
128+
System.out.printf("Source: '%s'; READ; '%s': %s (%d ms); Items: %d; Perf: %.1f [calls/sec]%s",
129+
setup.name,
130+
"FlowFile",
131+
ReadableValue.fromMillisecond(finish - start),
132+
finish - start,
133+
countWrite,
134+
countWrite / ((finish - start) / 1000.0),
135+
System.lineSeparator());
136+
}
137+
138+
private List<MockFlowFile> coreTestParallel(TestSetupRead setup, String content, boolean validate, int parallel) throws Exception {
139+
try {
140+
long finish, start, countWrite;
141+
List<MockFlowFile> results;
142+
boolean ok;
143+
144+
start = System.currentTimeMillis();
145+
testRunner.run(parallel);
146+
if (!testRunner.getFlowFilesForRelationship(PutCQL.REL_SUCCESS).isEmpty()) {
147+
results = testRunner.getFlowFilesForRelationship(PutCQL.REL_SUCCESS);
148+
ok = testRunner.getFlowFilesForRelationship(PutCQL.REL_FAILURE).isEmpty();
149+
finish = System.currentTimeMillis();
150+
151+
if (ok) {
152+
for (MockFlowFile result: results)
153+
printResult(result, setup, start, finish);
154+
return results;
155+
}
156+
}
157+
}
158+
catch (Exception ex) {
159+
throw new Exception("Error in PROCESSOR");
160+
}
161+
return null;
162+
}
163+
99164
private MockFlowFile coreTest(TestSetupRead setup, String content, boolean validate) throws Exception {
100165
try {
101166
long finish, start, countWrite;
@@ -110,15 +175,7 @@ private MockFlowFile coreTest(TestSetupRead setup, String content, boolean valid
110175
finish = System.currentTimeMillis();
111176

112177
if (ok) {
113-
countWrite = Long.parseLong(result.getAttribute(CQLAttributes.READ_COUNT));
114-
System.out.printf("Source: '%s'; READ; '%s': %s (%d ms); Items: %d; Perf: %.1f [calls/sec]%s",
115-
setup.name,
116-
"FlowFile",
117-
ReadableValue.fromMillisecond(finish - start),
118-
finish - start,
119-
countWrite,
120-
countWrite / ((finish - start) / 1000.0),
121-
System.lineSeparator());
178+
printResult(result, setup, start, finish);
122179
return result;
123180
}
124181
}

nifi/cql-processor/nifi-cql-processors/src/test/java/org/george0st/processors/cql/GetCQLPerformance.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import org.apache.nifi.reporting.InitializationException;
44
import org.apache.nifi.util.MockFlowFile;
5+
import org.apache.nifi.util.TestRunner;
6+
import org.apache.nifi.util.TestRunners;
57
import org.george0st.processors.cql.helper.CqlTestSchema;
68
import org.george0st.processors.cql.helper.TestSetupRead;
79
import org.george0st.processors.cql.helper.TestSetupWrite;
@@ -10,6 +12,7 @@
1012
import org.junit.jupiter.api.Test;
1113

1214
import java.io.IOException;
15+
import java.util.List;
1316

1417
import static org.junit.jupiter.api.Assertions.*;
1518

@@ -92,4 +95,57 @@ public void testRead100K() throws Exception {
9295
}
9396
}
9497

98+
@Test
99+
public void testRead150K() throws Exception {
100+
MockFlowFile result;
101+
102+
// Prepare data for performance test
103+
PutCQLPerformance putCQL = new PutCQLPerformance();
104+
putCQL.init();
105+
putCQL.csvRandomWrite150K();
106+
107+
// sleep before read (2 seconds)
108+
Thread.sleep(2000);
109+
110+
// Read data
111+
for (TestSetupRead setup : setups) {
112+
setup.columnNames="colbigint, colint";
113+
114+
result = runTest(setup);
115+
116+
// check amount of read items
117+
assertNotNull(result, String.format("Issue with processing in '%s'", setup.name));
118+
assertEquals(150_000, Long.parseLong(result.getAttribute(CQLAttributes.READ_COUNT)));
119+
}
120+
}
121+
122+
@Test
123+
public void testRead10KMultiple() throws Exception {
124+
String content = "\"colbigint\",\"colint\",\"coltext\",\"colfloat\",\"coldouble\",\"coldate\",\"coltime\",\"coltimestamp\",\"colboolean\",\"coluuid\",\"colsmallint\",\"coltinyint\",\"coltimeuuid\",\"colvarchar\"\n" +
125+
"\"0\",\"1064\",\"zeVOKGnORq\",\"627.6811\",\"395.8522407512559\",\"1971-11-12\",\"03:37:15\",\"2000-09-25T22:18:45Z\",\"false\",\"6080071f-4dd1-4ea5-b711-9ad0716e242a\",\"8966\",\"55\",\"f45e58f5-c3b7-11ef-8d19-97ae87be7c54\",\"Tzxsw\"\n" +
126+
"\"1\",\"1709\",\"7By0z5QEXh\",\"652.03955\",\"326.9081263857284\",\"2013-12-17\",\"08:43:09\",\"2010-04-27T07:02:27Z\",\"false\",\"7d511666-2f81-41c4-9d5c-a5fa87f7d1c3\",\"24399\",\"38\",\"f45e8006-c3b7-11ef-8d19-172ff8d0d752\",\"exAbN\"\n";
127+
List<MockFlowFile> results;
128+
129+
// Prepare data for performance test
130+
PutCQLPerformance putCQL = new PutCQLPerformance();
131+
putCQL.init();
132+
putCQL.csvRandomWrite10K();
133+
134+
// sleep before read (2 seconds)
135+
Thread.sleep(2000);
136+
137+
// Read data
138+
for (TestSetupRead setup : setups) {
139+
setup.columnNames="colbigint, colint";
140+
141+
//results = runTestParallel(setup, content, 5);
142+
results = runTestParallel(setup, 5);
143+
144+
// check amount of read items
145+
for (MockFlowFile result: results) {
146+
assertNotNull(result, String.format("Issue with processing in '%s'", setup.name));
147+
assertEquals(10_000, Long.parseLong(result.getAttribute(CQLAttributes.READ_COUNT)));
148+
}
149+
}
150+
}
95151
}

nifi/cql-processor/nifi-cql-processors/src/test/java/org/george0st/processors/cql/PutCQLPerformance.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,21 @@ void csvSequenceWrite100K() throws Exception {
8888
assertEquals(100_000, Long.parseLong(result.getAttribute(CQLAttributes.WRITE_COUNT)));
8989
}
9090
}
91+
92+
@Test
93+
@Disabled
94+
@DisplayName("SEQ Write - 150K items")
95+
void csvSequenceWrite150K() throws Exception {
96+
String content=new CqlTestSchema().generateRndCSVString(150_000,true);
97+
MockFlowFile result;
98+
99+
for (TestSetupWrite setup : setups) {
100+
result = runTest(setup, content);
101+
assertNotNull(result, String.format("Issue with processing in '%s'", setup.name));
102+
assertEquals(150_000, Long.parseLong(result.getAttribute(CQLAttributes.WRITE_COUNT)));
103+
}
104+
}
105+
91106
// endregion SEQ Write
92107

93108
// region SEQ Write/Validate
@@ -157,6 +172,21 @@ void csvSequenceWriteValidate100K() throws Exception {
157172
assertEquals(100_000, Long.parseLong(result.getAttribute(CQLAttributes.WRITE_COUNT)));
158173
}
159174
}
175+
176+
@Test
177+
@Disabled
178+
@DisplayName("SEQ Write/Validate - 150K items")
179+
void csvSequenceWriteValidate150K() throws Exception {
180+
String content=new CqlTestSchema().generateRndCSVString(150_000,true);
181+
MockFlowFile result;
182+
183+
for (TestSetupWrite setup : setups) {
184+
result = runTest(setup, content, true);
185+
assertNotNull(result, String.format("Issue with processing in '%s'", setup.name));
186+
assertEquals(150_000, Long.parseLong(result.getAttribute(CQLAttributes.WRITE_COUNT)));
187+
}
188+
}
189+
160190
// endregion SEQ Write/Validate
161191

162192
// region RND Write
@@ -226,6 +256,21 @@ void csvRandomWrite100K() throws Exception {
226256
assertEquals(100_000, Long.parseLong(result.getAttribute(CQLAttributes.WRITE_COUNT)));
227257
}
228258
}
259+
260+
@Test
261+
@Disabled
262+
@DisplayName("RND Write - 150K items")
263+
void csvRandomWrite150K() throws Exception {
264+
String content=new CqlTestSchema().generateRndCSVString(150_000,false);
265+
MockFlowFile result;
266+
267+
for (TestSetupWrite setup : setups) {
268+
result = runTest(setup, content);
269+
assertNotNull(result, String.format("Issue with processing in '%s'", setup.name));
270+
assertEquals(150_000, Long.parseLong(result.getAttribute(CQLAttributes.WRITE_COUNT)));
271+
}
272+
}
273+
229274
// endregion RND Write
230275

231276
}

0 commit comments

Comments
 (0)