Skip to content

Commit f19532f

Browse files
committed
Add parallel run
1 parent 71144a1 commit f19532f

File tree

2 files changed

+116
-9
lines changed

2 files changed

+116
-9
lines changed

nifi/cql-processor/nifi-cql-processors/src/test/java/org/george0st/processors/cql/GetCQLBase.java

Lines changed: 85 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,18 @@ protected MockFlowFile runTest(TestSetupRead setup) throws Exception {
7070
return runTestWithProperty(setup, null, null, null, false);
7171
}
7272

73+
protected List<MockFlowFile> runTestParallel(TestSetupRead setup, int parallel) throws Exception {
74+
return runTestParallelWithProperty(setup, null, null, null, false, parallel);
75+
}
76+
7377
protected MockFlowFile runTest(TestSetupRead setup, String content) throws Exception {
7478
return runTestWithProperty(setup, content, null, null, false);
7579
}
7680

81+
protected List<MockFlowFile> runTestParallel(TestSetupRead setup, String content, int parallel) throws Exception {
82+
return runTestParallelWithProperty(setup, content, null, null, false, parallel);
83+
}
84+
7785
protected MockFlowFile runTest(TestSetupRead setup, String content, boolean validate) throws Exception {
7886
return runTestWithProperty(setup, content, null, null, validate);
7987
}
@@ -96,6 +104,73 @@ protected MockFlowFile runTestWithProperty(TestSetupRead setup, String content,
96104
return result;
97105
}
98106

107+
protected List<MockFlowFile> runTestParallelWithProperty(TestSetupRead setup, String content, PropertyDescriptor property, String propertyValue, boolean validate, int parallel) throws Exception {
108+
List<MockFlowFile> results;
109+
110+
if (content!=null)
111+
for (int i=0;i<parallel;i++)
112+
testRunner.enqueue(content);
113+
setup.setProperty(testRunner, testService);
114+
if (property != null)
115+
setup.setProperty(testRunner, property, propertyValue);
116+
testRunner.enableControllerService(testService);
117+
if (parallel>1)
118+
testRunner.setThreadCount(parallel);
119+
results = coreTestParallel(setup, content, validate);
120+
testRunner.disableControllerService(testService);
121+
return results;
122+
}
123+
124+
private void printResult(MockFlowFile result, TestSetupRead setup, long start, long finish){
125+
long countWrite;
126+
127+
countWrite = Long.parseLong(result.getAttribute(CQLAttributes.READ_COUNT));
128+
System.out.printf("Source: '%s'; READ; '%s': %s (%d ms); Items: %d; Perf: %.1f [calls/sec]%s",
129+
setup.name,
130+
"FlowFile",
131+
ReadableValue.fromMillisecond(finish - start),
132+
finish - start,
133+
countWrite,
134+
countWrite / ((finish - start) / 1000.0),
135+
System.lineSeparator());
136+
}
137+
138+
private List<MockFlowFile> coreTestParallel(TestSetupRead setup, String content, boolean validate) throws Exception {
139+
try {
140+
long finish, start, countWrite;
141+
List<MockFlowFile> results;
142+
boolean ok;
143+
144+
start = System.currentTimeMillis();
145+
testRunner.run(2);
146+
if (!testRunner.getFlowFilesForRelationship(PutCQL.REL_SUCCESS).isEmpty()) {
147+
results = testRunner.getFlowFilesForRelationship(PutCQL.REL_SUCCESS);
148+
ok = testRunner.getFlowFilesForRelationship(PutCQL.REL_FAILURE).isEmpty();
149+
finish = System.currentTimeMillis();
150+
151+
if (ok) {
152+
for (MockFlowFile result: results) {
153+
printResult(result, setup, start, finish);
154+
// countWrite = Long.parseLong(result.getAttribute(CQLAttributes.READ_COUNT));
155+
// System.out.printf("Source: '%s'; READ; '%s': %s (%d ms); Items: %d; Perf: %.1f [calls/sec]%s",
156+
// setup.name,
157+
// "FlowFile",
158+
// ReadableValue.fromMillisecond(finish - start),
159+
// finish - start,
160+
// countWrite,
161+
// countWrite / ((finish - start) / 1000.0),
162+
// System.lineSeparator());
163+
}
164+
return results;
165+
}
166+
}
167+
}
168+
catch (Exception ex) {
169+
throw new Exception("Error in PROCESSOR");
170+
}
171+
return null;
172+
}
173+
99174
private MockFlowFile coreTest(TestSetupRead setup, String content, boolean validate) throws Exception {
100175
try {
101176
long finish, start, countWrite;
@@ -110,15 +185,16 @@ private MockFlowFile coreTest(TestSetupRead setup, String content, boolean valid
110185
finish = System.currentTimeMillis();
111186

112187
if (ok) {
113-
countWrite = Long.parseLong(result.getAttribute(CQLAttributes.READ_COUNT));
114-
System.out.printf("Source: '%s'; READ; '%s': %s (%d ms); Items: %d; Perf: %.1f [calls/sec]%s",
115-
setup.name,
116-
"FlowFile",
117-
ReadableValue.fromMillisecond(finish - start),
118-
finish - start,
119-
countWrite,
120-
countWrite / ((finish - start) / 1000.0),
121-
System.lineSeparator());
188+
printResult(result, setup, start, finish);
189+
// countWrite = Long.parseLong(result.getAttribute(CQLAttributes.READ_COUNT));
190+
// System.out.printf("Source: '%s'; READ; '%s': %s (%d ms); Items: %d; Perf: %.1f [calls/sec]%s",
191+
// setup.name,
192+
// "FlowFile",
193+
// ReadableValue.fromMillisecond(finish - start),
194+
// finish - start,
195+
// countWrite,
196+
// countWrite / ((finish - start) / 1000.0),
197+
// System.lineSeparator());
122198
return result;
123199
}
124200
}

nifi/cql-processor/nifi-cql-processors/src/test/java/org/george0st/processors/cql/GetCQLPerformance.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import org.apache.nifi.reporting.InitializationException;
44
import org.apache.nifi.util.MockFlowFile;
5+
import org.apache.nifi.util.TestRunner;
6+
import org.apache.nifi.util.TestRunners;
57
import org.george0st.processors.cql.helper.CqlTestSchema;
68
import org.george0st.processors.cql.helper.TestSetupRead;
79
import org.george0st.processors.cql.helper.TestSetupWrite;
@@ -10,6 +12,7 @@
1012
import org.junit.jupiter.api.Test;
1113

1214
import java.io.IOException;
15+
import java.util.List;
1316

1417
import static org.junit.jupiter.api.Assertions.*;
1518

@@ -116,4 +119,32 @@ public void testRead150K() throws Exception {
116119
}
117120
}
118121

122+
@Test
123+
public void testRead10KMultiple() throws Exception {
124+
String content = "\"colbigint\",\"colint\",\"coltext\",\"colfloat\",\"coldouble\",\"coldate\",\"coltime\",\"coltimestamp\",\"colboolean\",\"coluuid\",\"colsmallint\",\"coltinyint\",\"coltimeuuid\",\"colvarchar\"\n" +
125+
"\"0\",\"1064\",\"zeVOKGnORq\",\"627.6811\",\"395.8522407512559\",\"1971-11-12\",\"03:37:15\",\"2000-09-25T22:18:45Z\",\"false\",\"6080071f-4dd1-4ea5-b711-9ad0716e242a\",\"8966\",\"55\",\"f45e58f5-c3b7-11ef-8d19-97ae87be7c54\",\"Tzxsw\"\n" +
126+
"\"1\",\"1709\",\"7By0z5QEXh\",\"652.03955\",\"326.9081263857284\",\"2013-12-17\",\"08:43:09\",\"2010-04-27T07:02:27Z\",\"false\",\"7d511666-2f81-41c4-9d5c-a5fa87f7d1c3\",\"24399\",\"38\",\"f45e8006-c3b7-11ef-8d19-172ff8d0d752\",\"exAbN\"\n";
127+
List<MockFlowFile> results;
128+
129+
// Prepare data for performance test
130+
PutCQLPerformance putCQL = new PutCQLPerformance();
131+
putCQL.init();
132+
putCQL.csvRandomWrite10K();
133+
134+
// sleep before read (2 seconds)
135+
Thread.sleep(2000);
136+
137+
// Read data
138+
for (TestSetupRead setup : setups) {
139+
setup.columnNames="colbigint, colint";
140+
141+
results = runTestParallel(setup, content, 2);
142+
143+
// check amount of read items
144+
for (MockFlowFile result: results) {
145+
assertNotNull(result, String.format("Issue with processing in '%s'", setup.name));
146+
assertEquals(10_000, Long.parseLong(result.getAttribute(CQLAttributes.READ_COUNT)));
147+
}
148+
}
149+
}
119150
}

0 commit comments

Comments
 (0)