Skip to content

Commit 7053f9e

Browse files
committed
added new connector for OpenSearch data source
1 parent 6451a04 commit 7053f9e

29 files changed

+3054
-8
lines changed

athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/data/BlockUtils.java

+9
Original file line numberDiff line numberDiff line change
@@ -849,6 +849,12 @@ else if (value instanceof Integer) {
849849
if (value == null) {
850850
float4Writer.writeNull();
851851
}
852+
else if (value instanceof java.lang.Integer) {
853+
float4Writer.writeFloat4((int) value);
854+
}
855+
else if (value instanceof java.lang.Double) {
856+
float4Writer.writeFloat4(((Double) value).floatValue());
857+
}
852858
else {
853859
float4Writer.writeFloat4((float) value);
854860
}
@@ -926,6 +932,9 @@ else if (value != null && value instanceof Long) {
926932
if (value == null) {
927933
bigIntWriter.writeNull();
928934
}
935+
else if (value instanceof java.lang.Integer) {
936+
bigIntWriter.writeBigInt(((Integer) value).longValue());
937+
}
929938
else {
930939
bigIntWriter.writeBigInt((long) value);
931940
}

athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/data/FieldBuilder.java

+20
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class FieldBuilder
4242
private final boolean nullable;
4343
//Using LinkedHashMap because Apache Arrow makes field order important so honoring that contract here
4444
private final Map<String, Field> children = new LinkedHashMap<>();
45+
private final Map<String, FieldBuilder> nestedChildren = new LinkedHashMap<>();
4546

4647
/**
4748
* Creates a FieldBuilder for a Field with the given name and type.
@@ -96,6 +97,12 @@ public FieldBuilder addField(String fieldName, ArrowType type, List<Field> child
9697
return this;
9798
}
9899

100+
public FieldBuilder addFieldBuilder(String fieldName, FieldBuilder child)
101+
{
102+
this.nestedChildren.put(fieldName, child);
103+
return this;
104+
}
105+
99106
public FieldBuilder addField(String fieldName, ArrowType type, boolean nullable, List<Field> children)
100107
{
101108
this.children.put(fieldName, new Field(fieldName, new FieldType(nullable, type, null), children));
@@ -263,18 +270,31 @@ public FieldBuilder addDateMilliField(String fieldName)
263270
return this;
264271
}
265272

273+
public String getName()
274+
{
275+
return this.name;
276+
}
277+
266278
public Field getChild(String fieldName)
267279
{
268280
return children.get(fieldName);
269281
}
270282

283+
public FieldBuilder getNestedChild(String fieldName)
284+
{
285+
return nestedChildren.get(fieldName);
286+
}
287+
271288
/**
272289
* Builds the fields.
273290
*
274291
* @return The newly constructed Field.
275292
*/
276293
public Field build()
277294
{
295+
for (Map.Entry<String, FieldBuilder> next : nestedChildren.entrySet()) {
296+
children.put(next.getKey(), next.getValue().build());
297+
}
278298
return new Field(name, new FieldType(nullable, type, null), new ArrayList<>(children.values()));
279299
}
280300
}

athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/data/SchemaBuilder.java

+12
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,18 @@ public SchemaBuilder addStructField(String fieldName)
8989
return this;
9090
}
9191

92+
/**
93+
* Adds a new Nested STRUCT Field to the Schema as a top-level Field.
94+
*
95+
* @param fieldName The name of the field to add.
96+
* @return This SchemaBuilder itself.
97+
*/
98+
public SchemaBuilder addNestedField(String fieldName, FieldBuilder newField)
99+
{
100+
nestedFieldBuilderMap.put(fieldName, newField);
101+
return this;
102+
}
103+
92104
/**
93105
* Adds a new LIST Field to the Schema as a top-level Field.
94106
*

athena-jdbc/src/main/java/com/amazonaws/athena/connectors/jdbc/manager/JdbcMetadataHandler.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public abstract class JdbcMetadataHandler
7676
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcMetadataHandler.class);
7777
private static final String SQL_SPLITS_STRING = "select min(%s), max(%s) from %s.%s;";
7878
private static final int DEFAULT_NUM_SPLITS = 20;
79-
private final JdbcConnectionFactory jdbcConnectionFactory;
79+
protected final JdbcConnectionFactory jdbcConnectionFactory;
8080
private final DatabaseConnectionConfig databaseConnectionConfig;
8181
private final SplitterFactory splitterFactory = new SplitterFactory();
8282

@@ -275,7 +275,7 @@ private ResultSet getColumns(final String catalogName, final TableName tableHand
275275
return metadata.getColumns(
276276
catalogName,
277277
escapeNamePattern(tableHandle.getSchemaName(), escape),
278-
escapeNamePattern(tableHandle.getTableName(), escape),
278+
escapeNamePattern(tableHandle.getTableName(), null),
279279
null);
280280
}
281281

athena-jdbc/src/main/java/com/amazonaws/athena/connectors/jdbc/manager/JdbcRecordHandler.java

+81-6
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,20 @@
7676
import org.slf4j.Logger;
7777
import org.slf4j.LoggerFactory;
7878

79+
import javax.swing.RowFilter.Entry;
80+
7981
import java.sql.Array;
8082
import java.sql.Connection;
8183
import java.sql.PreparedStatement;
8284
import java.sql.ResultSet;
85+
import java.sql.ResultSetMetaData;
86+
import java.sql.SQLDataException;
8387
import java.sql.SQLException;
88+
import java.sql.Struct;
89+
import java.sql.Timestamp;
8490
import java.util.ArrayList;
8591
import java.util.Arrays;
92+
import java.util.HashMap;
8693
import java.util.List;
8794
import java.util.Map;
8895
import java.util.concurrent.TimeUnit;
@@ -135,14 +142,21 @@ protected JdbcCredentialProvider getCredentialProvider()
135142
return null;
136143
}
137144

145+
protected java.util.Optional<Boolean> getAutoCommit()
146+
{
147+
return java.util.Optional.of(false);
148+
}
149+
138150
@Override
139151
public void readWithConstraint(BlockSpiller blockSpiller, ReadRecordsRequest readRecordsRequest, QueryStatusChecker queryStatusChecker)
140152
throws Exception
141153
{
142-
LOGGER.info("{}: Catalog: {}, table {}, splits {}", readRecordsRequest.getQueryId(), readRecordsRequest.getCatalogName(), readRecordsRequest.getTableName(),
154+
LOGGER.info("Read Record Request {}: Catalog: {}, table {}, splits {}", readRecordsRequest.getQueryId(), readRecordsRequest.getCatalogName(), readRecordsRequest.getTableName(),
143155
readRecordsRequest.getSplit().getProperties());
144156
try (Connection connection = this.jdbcConnectionFactory.getConnection(getCredentialProvider())) {
145-
connection.setAutoCommit(false); // For consistency. This is needed to be false to enable streaming for some database types.
157+
if (this.getAutoCommit().isPresent()) {
158+
connection.setAutoCommit(this.getAutoCommit().get()); // For consistency. This is needed to be false to enable streaming for some database types.
159+
}
146160
try (PreparedStatement preparedStatement = buildSplitSql(connection, readRecordsRequest.getCatalogName(), readRecordsRequest.getTableName(),
147161
readRecordsRequest.getSchema(), readRecordsRequest.getConstraints(), readRecordsRequest.getSplit());
148162
ResultSet resultSet = preparedStatement.executeQuery()) {
@@ -153,6 +167,9 @@ public void readWithConstraint(BlockSpiller blockSpiller, ReadRecordsRequest rea
153167
if (next.getType() instanceof ArrowType.List) {
154168
rowWriterBuilder.withFieldWriterFactory(next.getName(), makeFactory(next));
155169
}
170+
else if (next.getType() instanceof ArrowType.Struct) {
171+
rowWriterBuilder.withFieldWriterFactory(next.getName(), makeStructFactory(next));
172+
}
156173
else {
157174
rowWriterBuilder.withExtractor(next.getName(), makeExtractor(next, resultSet, partitionValues));
158175
}
@@ -168,8 +185,9 @@ public void readWithConstraint(BlockSpiller blockSpiller, ReadRecordsRequest rea
168185
rowsReturnedFromDatabase++;
169186
}
170187
LOGGER.info("{} rows returned by database.", rowsReturnedFromDatabase);
171-
172-
connection.commit();
188+
if (this.getAutoCommit().isPresent()) {
189+
connection.commit();
190+
}
173191
}
174192
}
175193
}
@@ -193,6 +211,54 @@ protected FieldWriterFactory makeFactory(Field field)
193211
};
194212
}
195213

214+
public static void printResultSet(ResultSet rs) throws SQLException
215+
{
216+
LOGGER.info("Printing Record Handler Result Set:");
217+
ResultSetMetaData rsmd = rs.getMetaData();
218+
int columns = rsmd.getColumnCount();
219+
220+
// Print the column names
221+
String columnNames = "";
222+
for (int i = 1; i <= columns; i++) {
223+
columnNames += rsmd.getColumnName(i) + ", ";
224+
}
225+
LOGGER.info(columnNames);
226+
227+
// Print the column values
228+
while (rs.next()) {
229+
String newColumn = "";
230+
for (int i = 1; i <= columns; i++) {
231+
newColumn += rs.getObject(i) + ", ";
232+
}
233+
LOGGER.info(newColumn);
234+
}
235+
}
236+
237+
/**
238+
* Create a field extractor for complex Nested type.
239+
* @param field Field's metadata information.
240+
* @return Extractor for the List type.
241+
*/
242+
protected FieldWriterFactory makeStructFactory(Field field)
243+
{
244+
return (FieldVector vector, Extractor extractor, ConstraintProjector constraint) ->
245+
(FieldWriter) (Object context, int rowNum) ->
246+
{
247+
Object nestedObj = ((ResultSet) context).getObject(field.getName());
248+
if (!((ResultSet) context).wasNull() && nestedObj instanceof Struct) {
249+
Map<String, Object> renestedMap = new HashMap<>();
250+
for (Object obj : ((Struct) nestedObj).getAttributes()) {
251+
Map.Entry<?, ?> entry = (Map.Entry<?, ?>) obj;
252+
String key = entry.getKey().toString();
253+
Object value = entry.getValue();
254+
renestedMap.put(key, value);
255+
}
256+
BlockUtils.setComplexValue(vector, rowNum, FieldResolver.DEFAULT, renestedMap);
257+
}
258+
return true;
259+
};
260+
}
261+
196262
/**
197263
* Creates an Extractor for the given field. In this example the extractor just creates some random data.
198264
*/
@@ -278,8 +344,17 @@ protected Extractor makeExtractor(Field field, ResultSet resultSet, Map<String,
278344
case DATEMILLI:
279345
return (DateMilliExtractor) (Object context, NullableDateMilliHolder dst) ->
280346
{
281-
if (resultSet.getTimestamp(fieldName) != null) {
282-
dst.value = resultSet.getTimestamp(fieldName).getTime();
347+
//try catch needed for OpenSearch type date which actually is returned as timestamp
348+
try {
349+
if (resultSet.getTimestamp(fieldName) != null) {
350+
dst.value = resultSet.getTimestamp(fieldName).getTime();
351+
}
352+
}
353+
catch (SQLDataException e) {
354+
//OpenSearch returns it as type String
355+
if (resultSet.getString(fieldName) != null) {
356+
dst.value = Timestamp.valueOf(resultSet.getString(fieldName)).getTime();
357+
}
283358
}
284359
dst.isSet = resultSet.wasNull() ? 0 : 1;
285360
};

athena-jdbc/src/test/java/com/amazonaws/athena/connectors/jdbc/TestBase.java

+21
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import java.sql.ResultSet;
2626
import java.sql.SQLException;
27+
import java.sql.Struct;
2728
import java.util.Arrays;
2829
import java.util.concurrent.atomic.AtomicInteger;
2930

@@ -78,6 +79,26 @@ else if (argument instanceof String) {
7879
});
7980

8081

82+
Mockito.when(resultSet.getObject(any())).thenAnswer((Answer<Struct>) invocation -> {
83+
Struct structObj = Mockito.mock(Struct.class);
84+
Object argument = invocation.getArguments()[0];
85+
Mockito.when(structObj.getAttributes()).thenAnswer((Answer<Object[]>) invocation2 -> {
86+
if (argument instanceof Integer) {
87+
int colIndex = (Integer) argument - 1;
88+
return (Object[]) rows[rowNumber.get()][colIndex];
89+
}
90+
else if (argument instanceof String) {
91+
int colIndex = Arrays.asList(columnNames).indexOf(argument);
92+
return (Object[]) rows[rowNumber.get()][colIndex];
93+
}
94+
else {
95+
throw new RuntimeException("Unexpected argument type " + argument.getClass());
96+
}
97+
});
98+
return structObj;
99+
});
100+
101+
81102
Mockito.when(resultSet.getDouble(any())).thenAnswer((Answer<Double>) invocation -> {
82103
Object argument = invocation.getArguments()[0];
83104
if (argument instanceof Integer) {

0 commit comments

Comments
 (0)