Skip to content

Commit f66cc58

Browse files
committed
Refactoring, addressing Michaels comments
1 parent 7053f9e commit f66cc58

File tree

8 files changed

+29
-544
lines changed

8 files changed

+29
-544
lines changed

athena-jdbc/src/main/java/com/amazonaws/athena/connectors/jdbc/manager/JdbcRecordHandler.java

+2-35
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@
8282
import java.sql.Connection;
8383
import java.sql.PreparedStatement;
8484
import java.sql.ResultSet;
85-
import java.sql.ResultSetMetaData;
8685
import java.sql.SQLDataException;
8786
import java.sql.SQLException;
8887
import java.sql.Struct;
@@ -142,21 +141,14 @@ protected JdbcCredentialProvider getCredentialProvider()
142141
return null;
143142
}
144143

145-
protected java.util.Optional<Boolean> getAutoCommit()
146-
{
147-
return java.util.Optional.of(false);
148-
}
149-
150144
@Override
151145
public void readWithConstraint(BlockSpiller blockSpiller, ReadRecordsRequest readRecordsRequest, QueryStatusChecker queryStatusChecker)
152146
throws Exception
153147
{
154148
LOGGER.info("Read Record Request {}: Catalog: {}, table {}, splits {}", readRecordsRequest.getQueryId(), readRecordsRequest.getCatalogName(), readRecordsRequest.getTableName(),
155149
readRecordsRequest.getSplit().getProperties());
156150
try (Connection connection = this.jdbcConnectionFactory.getConnection(getCredentialProvider())) {
157-
if (this.getAutoCommit().isPresent()) {
158-
connection.setAutoCommit(this.getAutoCommit().get()); // For consistency. This is needed to be false to enable streaming for some database types.
159-
}
151+
connection.setAutoCommit(false); // For consistency. This is needed to be false to enable streaming for some database types.
160152
try (PreparedStatement preparedStatement = buildSplitSql(connection, readRecordsRequest.getCatalogName(), readRecordsRequest.getTableName(),
161153
readRecordsRequest.getSchema(), readRecordsRequest.getConstraints(), readRecordsRequest.getSplit());
162154
ResultSet resultSet = preparedStatement.executeQuery()) {
@@ -185,9 +177,7 @@ else if (next.getType() instanceof ArrowType.Struct) {
185177
rowsReturnedFromDatabase++;
186178
}
187179
LOGGER.info("{} rows returned by database.", rowsReturnedFromDatabase);
188-
if (this.getAutoCommit().isPresent()) {
189-
connection.commit();
190-
}
180+
connection.commit();
191181
}
192182
}
193183
}
@@ -211,29 +201,6 @@ protected FieldWriterFactory makeFactory(Field field)
211201
};
212202
}
213203

214-
public static void printResultSet(ResultSet rs) throws SQLException
215-
{
216-
LOGGER.info("Printing Record Handler Result Set:");
217-
ResultSetMetaData rsmd = rs.getMetaData();
218-
int columns = rsmd.getColumnCount();
219-
220-
// Print the column names
221-
String columnNames = "";
222-
for (int i = 1; i <= columns; i++) {
223-
columnNames += rsmd.getColumnName(i) + ", ";
224-
}
225-
LOGGER.info(columnNames);
226-
227-
// Print the column values
228-
while (rs.next()) {
229-
String newColumn = "";
230-
for (int i = 1; i <= columns; i++) {
231-
newColumn += rs.getObject(i) + ", ";
232-
}
233-
LOGGER.info(newColumn);
234-
}
235-
}
236-
237204
/**
238205
* Create a field extractor for complex Nested type.
239206
* @param field Field's metadata information.

athena-opensearch/athena-opensearch.yaml

+4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ Parameters:
1717
Description: 'This is the name of the lambda function that will be created. This name must satisfy the pattern ^[a-z0-9-_]{1,64}$'
1818
Type: String
1919
AllowedPattern: ^[a-z0-9-_]{1,64}$
20+
GlueDatabaseName:
21+
Description: 'Name of Glue Database that contains schemas of indices in OpenSearch Domain, also used as Database name.'
22+
Type: String
2023
DefaultConnectionString:
2124
Description: 'The default connection string is used when catalog is "lambda:${LambdaFunctionName}". Catalog specific Connection Strings can be added later. Format: ${DatabaseType}://${NativeJdbcConnectionString}.'
2225
Type: String
@@ -68,6 +71,7 @@ Resources:
6871
disable_spill_encryption: !Ref DisableSpillEncryption
6972
spill_bucket: !Ref SpillBucket
7073
spill_prefix: !Ref SpillPrefix
74+
glue_database: !Ref GlueDatabaseName
7175
default: !Ref DefaultConnectionString
7276
FunctionName: !Ref LambdaFunctionName
7377
Handler: "com.amazonaws.athena.connectors.opensearch.OpensearchMuxCompositeHandler"

athena-opensearch/src/main/java/com/amazonaws/athena/connectors/opensearch/OpensearchConstants.java

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public final class OpensearchConstants
2525
public static final String OPENSEARCH_DRIVER_CLASS = "org.opensearch.jdbc.Driver";
2626
public static final int OPENSEARCH_DEFAULT_PORT = 443;
2727
public static final String OPENSEARCH_QUOTE_CHARACTER = "`";
28+
public static final int OPENSEARCH_FETCH_SIZE = 10000;
2829

2930
private OpensearchConstants() {}
3031
}

athena-opensearch/src/main/java/com/amazonaws/athena/connectors/opensearch/OpensearchGlueHandler.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,6 @@ public void getPartitions(final BlockWriter blockWriter, final GetTableLayoutReq
6868

6969
public boolean isDisabled()
7070
{
71-
if (super.getAwsGlue() == null) {
72-
return true;
73-
}
74-
else {
75-
return false;
76-
}
71+
return super.getAwsGlue() == null;
7772
}
7873
}

athena-opensearch/src/main/java/com/amazonaws/athena/connectors/opensearch/OpensearchMetadataHandler.java

+15-17
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,11 @@ public class OpensearchMetadataHandler
107107
private static final Logger LOGGER = LoggerFactory.getLogger(OpensearchMetadataHandler.class);
108108
private static final int MAX_SPLITS_PER_REQUEST = 1000_000;
109109
private final String schemaName;
110-
private static final String DATA_STREAM_REGEX = "(\\.ds-)(.+)(-\\d\\d\\d\\d\\d\\d)";
110+
// data streams follow the following naming convention: .ds-<data-stream>-<generation> where generation is a six-digit, zero-padded integer
111+
private static final String DATA_STREAM_REGEX = "(\\.ds-)(.+)(-\\d{6})";
111112

112113
private OpensearchGlueHandler glueHandler;
113-
private final boolean awsGlue;
114+
private final boolean isGlueDisabled;
114115

115116
/**
116117
* Instantiates handler to be used by Lambda function directly.
@@ -133,10 +134,10 @@ public OpensearchMetadataHandler(DatabaseConnectionConfig databaseConnectionConf
133134
public OpensearchMetadataHandler(DatabaseConnectionConfig databaseConnectionConfig, JdbcConnectionFactory jdbcConnectionFactory, java.util.Map<String, String> configOptions)
134135
{
135136
super(databaseConnectionConfig, jdbcConnectionFactory, configOptions);
136-
this.schemaName = configOptions.getOrDefault("schema_name", "default");
137+
this.schemaName = configOptions.getOrDefault("glue_database", "default");
137138

138139
this.glueHandler = new OpensearchGlueHandler(configOptions);
139-
this.awsGlue = glueHandler.isDisabled();
140+
this.isGlueDisabled = glueHandler.isDisabled();
140141
}
141142

142143
@VisibleForTesting
@@ -148,10 +149,10 @@ protected OpensearchMetadataHandler(
148149
java.util.Map<String, String> configOptions)
149150
{
150151
super(databaseConnectionConfig, secretsManager, athena, jdbcConnectionFactory, configOptions);
151-
this.schemaName = configOptions.getOrDefault("schema_name", "default");
152+
this.schemaName = configOptions.getOrDefault("glue_database", "default");
152153

153154
this.glueHandler = new OpensearchGlueHandler(configOptions);
154-
this.awsGlue = glueHandler.isDisabled();
155+
this.isGlueDisabled = glueHandler.isDisabled();
155156
}
156157

157158
@Override
@@ -259,7 +260,7 @@ public GetTableResponse doGetTable(final BlockAllocator blockAllocator, final Ge
259260

260261
// Look at GLUE catalog first.
261262
try {
262-
if (!this.awsGlue) {
263+
if (!this.isGlueDisabled) {
263264
schema = glueHandler.doGetTable(blockAllocator, getTableRequest).getSchema();
264265
LOGGER.info("glueDoGetTable: Retrieved schema for table[{}] from AWS Glue.", getTableRequest.getTableName());
265266
}
@@ -369,22 +370,16 @@ private List<TableName> listTables(final Connection jdbcConnection, final String
369370
throws SQLException
370371
{
371372
try (ResultSet resultSet = getTables(jdbcConnection, databaseName)) {
372-
ImmutableList.Builder<TableName> list = ImmutableList.builder();
373+
HashSet<TableName> list = new HashSet<TableName>();
373374
while (resultSet.next()) {
374375
// checks if table name is internal table ex: .kibana
375376
String tableName = resultSet.getString("TABLE_NAME");
376-
if (tableName.startsWith(".")) {
377-
if (!tableName.matches(DATA_STREAM_REGEX)) {
378-
continue;
379-
}
377+
if (tableName.startsWith(".") && !tableName.matches(DATA_STREAM_REGEX)) {
378+
continue;
380379
}
381-
// update schema from null because Opensearch doesn't have schema but change later
382-
// resultSet.updateString("TABLE_SCHEM", databaseName);
383-
384-
// Temporary in schemaName
385380
list.add(getSchemaTableName(resultSet, databaseName));
386381
}
387-
return list.build();
382+
return new ImmutableList.builder().addAll(list).build();
388383
}
389384
}
390385

@@ -393,6 +388,9 @@ private TableName getSchemaTableName(final ResultSet resultSet, String schemaNam
393388
{
394389
String tableName = resultSet.getString("TABLE_NAME");
395390
String tableSchema = resultSet.getString("TABLE_SCHEM");
391+
// Data streams follow the following naming convention: (.ds-)(<data-stream>)(-<generation>) where generation is a six-digit, zero-padded integer
392+
// and each paranthesis signifies matching groups.
393+
// Therefore I use the second matching group as table name because it represents the single resource name that stores time series data across multiple indices
396394
Pattern pattern = Pattern.compile(DATA_STREAM_REGEX);
397395
Matcher matcher = pattern.matcher(tableName);
398396
if (matcher.find()) {

athena-opensearch/src/main/java/com/amazonaws/athena/connectors/opensearch/OpensearchUtils.java athena-opensearch/src/main/java/com/amazonaws/athena/connectors/opensearch/OpensearchNestedStructBuilder.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import java.util.Arrays;
2929
import java.util.List;
3030

31-
public class OpensearchUtils
31+
public class OpensearchNestedStructBuilder
3232
{
33-
private OpensearchUtils()
33+
private OpensearchNestedStructBuilder()
3434
{
3535
}
3636

athena-opensearch/src/main/java/com/amazonaws/athena/connectors/opensearch/OpensearchRecordHandler.java

+4-11
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@
4848
import static com.amazonaws.athena.connectors.opensearch.OpensearchConstants.OPENSEARCH_DEFAULT_PORT;
4949
import static com.amazonaws.athena.connectors.opensearch.OpensearchConstants.OPENSEARCH_DRIVER_CLASS;
5050
import static com.amazonaws.athena.connectors.opensearch.OpensearchConstants.OPENSEARCH_NAME;
51-
import static com.amazonaws.athena.connectors.opensearch.OpensearchConstants.OPENSEARCH_QUOTE_CHARACTER;;
51+
import static com.amazonaws.athena.connectors.opensearch.OpensearchConstants.OPENSEARCH_QUOTE_CHARACTER;
52+
import static com.amazonaws.athena.connectors.opensearch.OpensearchConstants.OPENSEARCH_FETCH_SIZE;
5253

5354

5455
/**
@@ -59,8 +60,6 @@ public class OpensearchRecordHandler
5960
{
6061
private static final Logger LOGGER = LoggerFactory.getLogger(OpensearchRecordHandler.class);
6162

62-
private static final String OPENSEARCH_QUOTE_CHARACTER = "`";
63-
6463
private final JdbcSplitQueryBuilder jdbcSplitQueryBuilder;
6564

6665
/**
@@ -98,14 +97,8 @@ public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalog
9897
{
9998
PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, null, tableName.getTableName(), schema, constraints, split);
10099

101-
// Disable fetching all rows.
102-
preparedStatement.setFetchSize(10000);
100+
// by setting fetch size we explicitly force pagination to happen, otherwise it will fall back to legacy engine and only return 200 rows
101+
preparedStatement.setFetchSize(OPENSEARCH_FETCH_SIZE);
103102
return preparedStatement;
104103
}
105-
106-
@Override
107-
protected java.util.Optional<Boolean> getAutoCommit()
108-
{
109-
return java.util.Optional.empty();
110-
}
111104
}

0 commit comments

Comments
 (0)