Skip to content

Commit 42ac46f

Browse files
committed
Refactoring, addressing Michaels comments
1 parent ed2d591 commit 42ac46f

File tree

10 files changed

+111
-528
lines changed

10 files changed

+111
-528
lines changed

athena-jdbc/src/main/java/com/amazonaws/athena/connectors/jdbc/manager/JdbcRecordHandler.java

+1-24
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@
8282
import java.sql.Connection;
8383
import java.sql.PreparedStatement;
8484
import java.sql.ResultSet;
85-
import java.sql.ResultSetMetaData;
8685
import java.sql.SQLDataException;
8786
import java.sql.SQLException;
8887
import java.sql.Struct;
@@ -92,6 +91,7 @@
9291
import java.util.HashMap;
9392
import java.util.List;
9493
import java.util.Map;
94+
import java.util.Optional;
9595
import java.util.concurrent.TimeUnit;
9696

9797
/**
@@ -211,29 +211,6 @@ protected FieldWriterFactory makeFactory(Field field)
211211
};
212212
}
213213

214-
public static void printResultSet(ResultSet rs) throws SQLException
215-
{
216-
LOGGER.info("Printing Record Handler Result Set:");
217-
ResultSetMetaData rsmd = rs.getMetaData();
218-
int columns = rsmd.getColumnCount();
219-
220-
// Print the column names
221-
String columnNames = "";
222-
for (int i = 1; i <= columns; i++) {
223-
columnNames += rsmd.getColumnName(i) + ", ";
224-
}
225-
LOGGER.info(columnNames);
226-
227-
// Print the column values
228-
while (rs.next()) {
229-
String newColumn = "";
230-
for (int i = 1; i <= columns; i++) {
231-
newColumn += rs.getObject(i) + ", ";
232-
}
233-
LOGGER.info(newColumn);
234-
}
235-
}
236-
237214
/**
238215
* Create a field extractor for complex Nested type.
239216
* @param field Field's metadata information.

athena-opensearch/athena-opensearch.yaml

+4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ Parameters:
1717
Description: 'This is the name of the lambda function that will be created. This name must satisfy the pattern ^[a-z0-9-_]{1,64}$'
1818
Type: String
1919
AllowedPattern: ^[a-z0-9-_]{1,64}$
20+
GlueDatabaseName:
21+
Description: 'Name of Glue Database that contains schemas of indices in OpenSearch Domain, also used as Database name.'
22+
Type: String
2023
DefaultConnectionString:
2124
Description: 'The default connection string is used when catalog is "lambda:${LambdaFunctionName}". Catalog specific Connection Strings can be added later. Format: ${DatabaseType}://${NativeJdbcConnectionString}.'
2225
Type: String
@@ -68,6 +71,7 @@ Resources:
6871
disable_spill_encryption: !Ref DisableSpillEncryption
6972
spill_bucket: !Ref SpillBucket
7073
spill_prefix: !Ref SpillPrefix
74+
glue_database: !Ref GlueDatabaseName
7175
default: !Ref DefaultConnectionString
7276
FunctionName: !Ref LambdaFunctionName
7377
Handler: "com.amazonaws.athena.connectors.opensearch.OpensearchMuxCompositeHandler"

athena-opensearch/src/main/java/com/amazonaws/athena/connectors/opensearch/OpensearchConstants.java

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public final class OpensearchConstants
2525
public static final String OPENSEARCH_DRIVER_CLASS = "org.opensearch.jdbc.Driver";
2626
public static final int OPENSEARCH_DEFAULT_PORT = 443;
2727
public static final String OPENSEARCH_QUOTE_CHARACTER = "`";
28+
public static final int OPENSEARCH_FETCH_SIZE = 10000;
2829

2930
private OpensearchConstants() {}
3031
}

athena-opensearch/src/main/java/com/amazonaws/athena/connectors/opensearch/OpensearchGlueHandler.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,6 @@ public void getPartitions(final BlockWriter blockWriter, final GetTableLayoutReq
6868

6969
public boolean isDisabled()
7070
{
71-
if (super.getAwsGlue() == null) {
72-
return true;
73-
}
74-
else {
75-
return false;
76-
}
71+
return super.getAwsGlue() == null;
7772
}
7873
}

athena-opensearch/src/main/java/com/amazonaws/athena/connectors/opensearch/OpensearchMetadataHandler.java

+17-18
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,11 @@ public class OpensearchMetadataHandler
107107
private static final Logger LOGGER = LoggerFactory.getLogger(OpensearchMetadataHandler.class);
108108
private static final int MAX_SPLITS_PER_REQUEST = 1000_000;
109109
private final String schemaName;
110-
private static final String DATA_STREAM_REGEX = "(\\.ds-)(.+)(-\\d\\d\\d\\d\\d\\d)";
110+
// data streams follow the following naming convention: .ds-<data-stream>-<generation> where generation is a six-digit, zero-padded integer
111+
private static final String DATA_STREAM_REGEX = "(\\.ds-)(.+)(-\\d{6})";
111112

112113
private OpensearchGlueHandler glueHandler;
113-
private final boolean awsGlue;
114+
private final boolean isGlueDisabled;
114115

115116
/**
116117
* Instantiates handler to be used by Lambda function directly.
@@ -133,10 +134,10 @@ public OpensearchMetadataHandler(DatabaseConnectionConfig databaseConnectionConf
133134
public OpensearchMetadataHandler(DatabaseConnectionConfig databaseConnectionConfig, JdbcConnectionFactory jdbcConnectionFactory, java.util.Map<String, String> configOptions)
134135
{
135136
super(databaseConnectionConfig, jdbcConnectionFactory, configOptions);
136-
this.schemaName = configOptions.getOrDefault("schema_name", "default");
137+
this.schemaName = configOptions.getOrDefault("glue_database", "default");
137138

138139
this.glueHandler = new OpensearchGlueHandler(configOptions);
139-
this.awsGlue = glueHandler.isDisabled();
140+
this.isGlueDisabled = glueHandler.isDisabled();
140141
}
141142

142143
@VisibleForTesting
@@ -148,10 +149,10 @@ protected OpensearchMetadataHandler(
148149
java.util.Map<String, String> configOptions)
149150
{
150151
super(databaseConnectionConfig, secretsManager, athena, jdbcConnectionFactory, configOptions);
151-
this.schemaName = configOptions.getOrDefault("schema_name", "default");
152+
this.schemaName = configOptions.getOrDefault("glue_database", "default");
152153

153154
this.glueHandler = new OpensearchGlueHandler(configOptions);
154-
this.awsGlue = glueHandler.isDisabled();
155+
this.isGlueDisabled = glueHandler.isDisabled();
155156
}
156157

157158
@Override
@@ -259,7 +260,7 @@ public GetTableResponse doGetTable(final BlockAllocator blockAllocator, final Ge
259260

260261
// Look at GLUE catalog first.
261262
try {
262-
if (!this.awsGlue) {
263+
if (!this.isGlueDisabled) {
263264
schema = glueHandler.doGetTable(blockAllocator, getTableRequest).getSchema();
264265
LOGGER.info("glueDoGetTable: Retrieved schema for table[{}] from AWS Glue.", getTableRequest.getTableName());
265266
}
@@ -311,7 +312,7 @@ else if (columnType instanceof ArrowType.Struct) {
311312
else {
312313
if (columnName.contains(".")) {
313314
FieldBuilder rootField = schemaBuilder.getNestedField(columnName.split("\\.")[0]);
314-
FieldBuilder newField = OpensearchUtils.createNestedStruct(columnName, rootField, columnType);
315+
FieldBuilder newField = OpensearchNestedStructBuilder.createNestedStruct(columnName, rootField, columnType);
315316
schemaBuilder.addNestedField(columnName.split("\\.")[0], newField);
316317
}
317318
else {
@@ -369,22 +370,17 @@ private List<TableName> listTables(final Connection jdbcConnection, final String
369370
throws SQLException
370371
{
371372
try (ResultSet resultSet = getTables(jdbcConnection, databaseName)) {
372-
ImmutableList.Builder<TableName> list = ImmutableList.builder();
373+
HashSet<TableName> list = new HashSet<TableName>();
374+
ImmutableList.Builder<TableName> finalList = ImmutableList.builder();
373375
while (resultSet.next()) {
374376
// checks if table name is internal table ex: .kibana
375377
String tableName = resultSet.getString("TABLE_NAME");
376-
if (tableName.startsWith(".")) {
377-
if (!tableName.matches(DATA_STREAM_REGEX)) {
378-
continue;
379-
}
378+
if (tableName.startsWith(".") && !tableName.matches(DATA_STREAM_REGEX)) {
379+
continue;
380380
}
381-
// update schema from null because Opensearch doesn't have schema but change later
382-
// resultSet.updateString("TABLE_SCHEM", databaseName);
383-
384-
// Temporary in schemaName
385381
list.add(getSchemaTableName(resultSet, databaseName));
386382
}
387-
return list.build();
383+
return finalList.addAll(list).build();
388384
}
389385
}
390386

@@ -393,6 +389,9 @@ private TableName getSchemaTableName(final ResultSet resultSet, String schemaNam
393389
{
394390
String tableName = resultSet.getString("TABLE_NAME");
395391
String tableSchema = resultSet.getString("TABLE_SCHEM");
392+
// Data streams follow the following naming convention: (.ds-)(<data-stream>)(-<generation>) where generation is a six-digit, zero-padded integer
393+
// and each paranthesis signifies matching groups.
394+
// Therefore I use the second matching group as table name because it represents the single resource name that stores time series data across multiple indices
396395
Pattern pattern = Pattern.compile(DATA_STREAM_REGEX);
397396
Matcher matcher = pattern.matcher(tableName);
398397
if (matcher.find()) {

athena-opensearch/src/main/java/com/amazonaws/athena/connectors/opensearch/OpensearchUtils.java athena-opensearch/src/main/java/com/amazonaws/athena/connectors/opensearch/OpensearchNestedStructBuilder.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import java.util.Arrays;
2929
import java.util.List;
3030

31-
public class OpensearchUtils
31+
public class OpensearchNestedStructBuilder
3232
{
33-
private OpensearchUtils()
33+
private OpensearchNestedStructBuilder()
3434
{
3535
}
3636

athena-opensearch/src/main/java/com/amazonaws/athena/connectors/opensearch/OpensearchRecordHandler.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,13 @@
4444
import java.sql.Connection;
4545
import java.sql.PreparedStatement;
4646
import java.sql.SQLException;
47+
import java.util.Optional;
4748

4849
import static com.amazonaws.athena.connectors.opensearch.OpensearchConstants.OPENSEARCH_DEFAULT_PORT;
4950
import static com.amazonaws.athena.connectors.opensearch.OpensearchConstants.OPENSEARCH_DRIVER_CLASS;
5051
import static com.amazonaws.athena.connectors.opensearch.OpensearchConstants.OPENSEARCH_NAME;
51-
import static com.amazonaws.athena.connectors.opensearch.OpensearchConstants.OPENSEARCH_QUOTE_CHARACTER;;
52+
import static com.amazonaws.athena.connectors.opensearch.OpensearchConstants.OPENSEARCH_QUOTE_CHARACTER;
53+
import static com.amazonaws.athena.connectors.opensearch.OpensearchConstants.OPENSEARCH_FETCH_SIZE;
5254

5355

5456
/**
@@ -59,8 +61,6 @@ public class OpensearchRecordHandler
5961
{
6062
private static final Logger LOGGER = LoggerFactory.getLogger(OpensearchRecordHandler.class);
6163

62-
private static final String OPENSEARCH_QUOTE_CHARACTER = "`";
63-
6464
private final JdbcSplitQueryBuilder jdbcSplitQueryBuilder;
6565

6666
/**
@@ -98,8 +98,8 @@ public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalog
9898
{
9999
PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, null, tableName.getTableName(), schema, constraints, split);
100100

101-
// Disable fetching all rows.
102-
preparedStatement.setFetchSize(10000);
101+
// by setting fetch size we explicitly force pagination to happen, otherwise it will fall back to legacy engine and only return 200 rows
102+
preparedStatement.setFetchSize(OPENSEARCH_FETCH_SIZE);
103103
return preparedStatement;
104104
}
105105

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*-
2+
* #%L
3+
* athena-opensearch
4+
* %%
5+
* Copyright (C) 2019 Amazon Web Services
6+
* %%
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
* #L%
19+
*/
20+
package com.amazonaws.athena.connectors.opensearch;
21+
22+
import org.junit.Test;
23+
import static org.junit.Assert.assertEquals;
24+
25+
import java.util.ArrayList;
26+
import java.util.Arrays;
27+
28+
import com.amazonaws.athena.connector.lambda.data.FieldBuilder;
29+
import org.apache.arrow.vector.types.pojo.ArrowType;
30+
import org.apache.arrow.vector.types.pojo.Field;
31+
import org.apache.arrow.vector.types.pojo.FieldType;
32+
import org.apache.arrow.vector.types.Types;
33+
34+
public class OpensearchNestedStructBuilderTest {
35+
36+
@Test
37+
public void createNestedStructTest()
38+
{
39+
// Simple 1 layer struct
40+
String attribute1 = "Struct1.attributeString";
41+
String attribute2 = "Struct1.attributeInt";
42+
43+
FieldBuilder simpleStruct = OpensearchNestedStructBuilder.createNestedStruct(attribute1, null, Types.MinorType.VARCHAR.getType());
44+
simpleStruct = OpensearchNestedStructBuilder.createNestedStruct(attribute2, simpleStruct, Types.MinorType.INT.getType());
45+
46+
FieldBuilder expectedSimpleStruct =
47+
FieldBuilder.newBuilder("Struct1", Types.MinorType.STRUCT.getType());
48+
expectedSimpleStruct.addStringField("attributeString");
49+
expectedSimpleStruct.addIntField("attributeInt");
50+
51+
assertEquals(expectedSimpleStruct.build(), simpleStruct.build());
52+
53+
// Nested struct
54+
String nested_attribute1 = "Struct1.attributeDateDay";
55+
String nested_attribute2 = "Struct1.Struct2.attributeString1";
56+
String nested_attribute3 = "Struct1.Struct3.attributeInt1";
57+
String nested_attribute4 = "Struct1.Struct2.attributeString2";
58+
String nested_attribute5 = "Struct1.Struct3.attributeFloat1";
59+
60+
FieldBuilder nestedStruct = OpensearchNestedStructBuilder.createNestedStruct(nested_attribute1, null, Types.MinorType.DATEDAY.getType());
61+
nestedStruct = OpensearchNestedStructBuilder.createNestedStruct(nested_attribute2, nestedStruct, Types.MinorType.VARCHAR.getType());
62+
nestedStruct = OpensearchNestedStructBuilder.createNestedStruct(nested_attribute3, nestedStruct, Types.MinorType.INT.getType());
63+
nestedStruct = OpensearchNestedStructBuilder.createNestedStruct(nested_attribute4, nestedStruct, Types.MinorType.VARCHAR.getType());
64+
nestedStruct = OpensearchNestedStructBuilder.createNestedStruct(nested_attribute5, nestedStruct, Types.MinorType.FLOAT8.getType());
65+
66+
FieldBuilder struct2 =
67+
FieldBuilder.newBuilder("Struct2", Types.MinorType.STRUCT.getType());
68+
struct2.addStringField("attributeString1");
69+
struct2.addStringField("attributeString2");
70+
FieldBuilder struct3 =
71+
FieldBuilder.newBuilder("Struct3", Types.MinorType.STRUCT.getType());
72+
struct3.addIntField("attributeInt1");
73+
struct3.addFloat8Field("attributeFloat1");
74+
Field attr5 = new Field("attributeDateDay", FieldType.nullable(Types.MinorType.DATEDAY.getType()), null);
75+
Field expectedNestedStruct = new Field("Struct1", FieldType.nullable(Types.MinorType.STRUCT.getType()), new ArrayList<>(Arrays.asList(attr5, struct2.build(), struct3.build())));
76+
77+
assertEquals(expectedNestedStruct, nestedStruct.build());
78+
}
79+
}

0 commit comments

Comments
 (0)