Skip to content

Commit 41e0aa5

Browse files
authored
Merge pull request #77 from george0st/change
Finish AstraDB tests
2 parents c74519c + 613ddcc commit 41e0aa5

File tree

9 files changed

+90
-29
lines changed

9 files changed

+90
-29
lines changed

nifi/cql-processor/nifi-cql-processors/src/main/java/org/george0st/processors/cql/helper/Setup.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ public class Setup {
1717

1818
public void setBatchSize(long batchSize) { this.batchSize = batchSize; }
1919
public long getBatchSize() { return batchSize > 0 ? batchSize : 200; }
20+
public String getOnlyTable() { return table!=null ? table.split("\\.")[1] : null; }
21+
public String getOnlyKeyspace() { return table!=null ? table.split("\\.")[0] : null; }
2022

2123
public Setup(){
2224
}

nifi/cql-processor/nifi-cql-processors/src/test/.test-properties.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,7 @@
1616
"batchType": "<batch type, possible values UNLOGGED, LOGGED>",
1717
"table": "<add 'keyspace.table' in CQL>",
1818
"dryRun": false,
19+
20+
"replication": "<setting for schema replication e.g. {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}>",
1921
"compaction": "<setting for compaction e.g. {'class':'SizeTieredCompactionStrategy'}>"
2022
}

nifi/cql-processor/nifi-cql-processors/src/test/java/org/george0st/processors/cql/helper/CqlCreateSchema.java

Lines changed: 69 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
11
package org.george0st.processors.cql.helper;
22

33
import com.datastax.oss.driver.api.core.CqlSession;
4+
import com.datastax.oss.driver.api.core.cql.ResultSet;
5+
import com.datastax.oss.driver.api.core.cql.Row;
46
import org.apache.commons.csv.CSVFormat;
57
import org.apache.commons.csv.CSVPrinter;
6-
import org.george0st.processors.cql.helper.RndGenerator;
7-
import org.george0st.processors.cql.helper.TestSetup;
88
import org.george0st.processors.cql.processor.CqlProcessor;
9-
import org.junit.jupiter.api.Test;
109

1110
import java.io.File;
1211
import java.io.FileWriter;
1312
import java.io.IOException;
1413
import java.time.format.DateTimeFormatter;
1514
import java.util.ArrayList;
16-
import java.util.Arrays;
1715
import java.util.List;
1816

1917

@@ -51,14 +49,14 @@ public String[] getPrimaryKeys(){
5149
return primaryKeys;
5250
}
5351

54-
public String getColumnDefinitions(){
52+
private String getColumnDefinitions(){
5553
StringBuilder bld=new StringBuilder();
5654
for (int i=0;i<columns.length/2;i++)
5755
bld.append(String.format("%s %s,", columns[i*2],columns[i*2+1]));
5856
return bld.toString();
5957
}
6058

61-
public String[] getColumns(){
59+
private String[] getColumns(){
6260
List<String> bld=new ArrayList<String>();
6361
for (int i=0;i<columns.length/2;i++)
6462
bld.add(columns[i*2]);
@@ -70,25 +68,71 @@ protected File getRandomFile(){
7068
}
7169

7270
public void Create() {
73-
// // Drop key space
74-
// session.execute(f"DROP KEYSPACE IF EXISTS {self._run_setup['keyspace']};");
75-
76-
// // Create key space
77-
// session.execute(f"CREATE KEYSPACE IF NOT EXISTS {self._run_setup['keyspace']} " +
78-
// "WITH replication = {" +
79-
// f"'class':'{self._run_setup['keyspace_replication_class']}', " +
80-
// f"'replication_factor' : {self._run_setup['keyspace_replication_factor']}" +
81-
// "};");
82-
83-
// DROP TABLE
84-
session.execute(String.format("DROP TABLE IF EXISTS %s;", setup.table));
85-
86-
// CREATE TABLE
87-
String createTable = String.format("CREATE TABLE IF NOT EXISTS %s ", setup.table) +
88-
String.format("(%s ", getColumnDefinitions()) +
89-
String.format("PRIMARY KEY (%s)) ", String.join(",", primaryKeys)) +
90-
String.format("WITH compaction = %s;", ((TestSetup) setup).compaction);
91-
session.execute(createTable);
71+
if (!requestedKeyspace(setup.getOnlyKeyspace())) {
72+
// Create key space, if not exist
73+
session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s;",
74+
setup.getOnlyKeyspace(),
75+
((TestSetup) setup).replication));
76+
}
77+
78+
if (!requestedTable(setup.getOnlyKeyspace(), setup.getOnlyTable())) {
79+
// DROP TABLE
80+
//session.execute(String.format("DROP TABLE IF EXISTS %s;", setup.table));
81+
82+
// CREATE TABLE
83+
String createTable = String.format("CREATE TABLE IF NOT EXISTS %s ", setup.table) +
84+
String.format("(%s ", getColumnDefinitions()) +
85+
String.format("PRIMARY KEY (%s)) ", String.join(",", primaryKeys)) +
86+
String.format("WITH compaction = %s;", ((TestSetup) setup).compaction);
87+
session.execute(createTable);
88+
}
89+
}
90+
91+
/**
92+
* Check, if keyspace has the requested structure
93+
*
94+
* @param keyspaceName tested key space
95+
* @return true - the requested content, false - different content
96+
*/
97+
private boolean requestedKeyspace(String keyspaceName){
98+
try {
99+
return session.execute(String.format("SELECT keyspace_name FROM system_schema.keyspaces "+
100+
"WHERE keyspace_name='%s';", keyspaceName)).one() != null;
101+
}
102+
catch(Exception ex){
103+
}
104+
return false;
105+
}
106+
107+
/**
108+
* Check, if table has the requested structure
109+
*
110+
* @param keyspaceName tested key space
111+
* @param tableName tested table
112+
* @return true - the requested content, false - different content
113+
*/
114+
private boolean requestedTable(String keyspaceName, String tableName){
115+
boolean result=false;
116+
117+
try {
118+
ResultSet rs = session.execute(String.format("SELECT column_name, kind, type " +
119+
"FROM system_schema.columns " +
120+
"WHERE keyspace_name = '%s' AND table_name = '%s'", keyspaceName, tableName));
121+
122+
List<String> dbColumns=new ArrayList<>();
123+
for (Row row: rs) dbColumns.add(row.getString("column_name"));
124+
125+
result=true;
126+
for (String column: getColumns()) {
127+
if (!dbColumns.contains(column)) {
128+
result = false;
129+
break;
130+
}
131+
}
132+
}
133+
catch(Exception ex){
134+
}
135+
return result;
92136
}
93137

94138
public File generateRndCSVFile(int csvItems, boolean sequenceID) throws IOException {

nifi/cql-processor/nifi-cql-processors/src/test/java/org/george0st/processors/cql/helper/TestSetup.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class TestSetup extends Setup {
2424
public long connectionTimeout;
2525
public long requestTimeout;
2626
public String consistencyLevel;
27+
public String replication;
2728
public String compaction;
2829

2930
private TestSetup(){
@@ -47,6 +48,8 @@ public static TestSetup getInstance(String propertyFile) throws IOException {
4748
// default setting
4849
if (setup.compaction == null)
4950
setup.compaction = "{'class':'SizeTieredCompactionStrategy'}";
51+
if (setup.replication == null)
52+
setup.replication = "{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}";
5053
return setup;
5154
}
5255
}

nifi/cql-processor/nifi-cql-processors/src/test/test-astra.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
"pwd": "secret, c:/Java/[email protected]",
88
"connectionTimeout": "900",
99
"requestTimeout": "60",
10-
"consistencyLevel": "LOCAL_ONE",
10+
"consistencyLevel": "LOCAL_QUORUM",
1111

12-
"writeConsistencyLevel": "LOCAL_ONE",
12+
"writeConsistencyLevel": "LOCAL_QUORUM",
1313
"batchSize": 200,
1414
"batchType": "UNLOGGED",
1515
"table": "prftest.csv2cql_test3",
1616
"dryRun": false,
17+
18+
"replication": "{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}",
1719
"compaction": "{'class': 'UnifiedCompactionStrategy', 'scaling_parameters': 'L10, T10'}"
1820
}
1921

nifi/cql-processor/nifi-cql-processors/src/test/test-cassandra.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,7 @@
1515
"batchType": "UNLOGGED",
1616
"table": "prftest.csv2cql_test3",
1717
"dryRun": false,
18+
19+
"replication": "{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}",
1820
"compaction": "{'class': 'UnifiedCompactionStrategy', 'scaling_parameters': 'L10, T10'}"
1921
}

nifi/cql-processor/nifi-cql-processors/src/test/test-scylla.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,7 @@
1313
"batchType": "UNLOGGED",
1414
"table": "prftest.csv2cql_test3",
1515
"dryRun": false,
16+
17+
"replication": "{'class' : 'SimpleStrategy', 'replication_factor' : 1}",
1618
"compaction": "{'class':'SizeTieredCompactionStrategy'}"
1719
}

nifi/cql-processor/nifi-cql-processors/src/test/test-yugabyte.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@
1212
"batchType": "UNLOGGED",
1313
"table": "prftest.csv2cql_test3",
1414
"dryRun": false,
15+
1516
"compaction": "{'class':'SizeTieredCompactionStrategy'}"
1617
}

nifi/cql-processor/nifi-cql/src/main/java/org/george0st/cql/CQLControllerService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,15 +203,18 @@ public void onEnabled(final ConfigurationContext context) throws InitializationE
203203
}
204204

205205
protected String getURI(final ConfigurationContext context) {
206-
final String ip = context.getProperty(IP_ADDRESSES).evaluateAttributeExpressions().getValue();
206+
//final String ip = context.getProperty(IP_ADDRESSES).evaluateAttributeExpressions().getValue();
207+
final String ip = context.getProperty(IP_ADDRESSES).getValue();
208+
final String secureConnectionBundl = context.getProperty(SECURE_CONNECTION_BUNDLE).getValue();
209+
207210
// final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
208211
// final String user = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
209212
// if (!ip.contains("@") && user != null && passw != null) {
210213
// return uri.replaceFirst("://", "://" + URLEncoder.encode(user, StandardCharsets.UTF_8) + ":" + URLEncoder.encode(passw, StandardCharsets.UTF_8) + "@");
211214
// } else {
212215
// return uri;
213216
// }
214-
return ip;
217+
return ip != null ? ip: secureConnectionBundl;
215218
}
216219

217220
@Override

0 commit comments

Comments
 (0)