@@ -107,10 +107,11 @@ public class OpensearchMetadataHandler
107
107
private static final Logger LOGGER = LoggerFactory .getLogger (OpensearchMetadataHandler .class );
108
108
private static final int MAX_SPLITS_PER_REQUEST = 1000_000 ;
109
109
private final String schemaName ;
110
- private static final String DATA_STREAM_REGEX = "(\\ .ds-)(.+)(-\\ d\\ d\\ d\\ d\\ d\\ d)" ;
110
+ // data streams follow the following naming convention: .ds-<data-stream>-<generation> where generation is a six-digit, zero-padded integer
111
+ private static final String DATA_STREAM_REGEX = "(\\ .ds-)(.+)(-\\ d{6})" ;
111
112
112
113
private OpensearchGlueHandler glueHandler ;
113
- private final boolean awsGlue ;
114
+ private final boolean isGlueDisabled ;
114
115
115
116
/**
116
117
* Instantiates handler to be used by Lambda function directly.
@@ -136,7 +137,7 @@ public OpensearchMetadataHandler(DatabaseConnectionConfig databaseConnectionConf
136
137
this .schemaName = configOptions .getOrDefault ("schema_name" , "default" );
137
138
138
139
this .glueHandler = new OpensearchGlueHandler (configOptions );
139
- this .awsGlue = glueHandler .isDisabled ();
140
+ this .isGlueDisabled = glueHandler .isDisabled ();
140
141
}
141
142
142
143
@ VisibleForTesting
@@ -151,7 +152,7 @@ protected OpensearchMetadataHandler(
151
152
this .schemaName = configOptions .getOrDefault ("schema_name" , "default" );
152
153
153
154
this .glueHandler = new OpensearchGlueHandler (configOptions );
154
- this .awsGlue = glueHandler .isDisabled ();
155
+ this .isGlueDisabled = glueHandler .isDisabled ();
155
156
}
156
157
157
158
@ Override
@@ -259,7 +260,7 @@ public GetTableResponse doGetTable(final BlockAllocator blockAllocator, final Ge
259
260
260
261
// Look at GLUE catalog first.
261
262
try {
262
- if (!this .awsGlue ) {
263
+ if (!this .isGlueDisabled ) {
263
264
schema = glueHandler .doGetTable (blockAllocator , getTableRequest ).getSchema ();
264
265
LOGGER .info ("glueDoGetTable: Retrieved schema for table[{}] from AWS Glue." , getTableRequest .getTableName ());
265
266
}
@@ -369,22 +370,16 @@ private List<TableName> listTables(final Connection jdbcConnection, final String
369
370
throws SQLException
370
371
{
371
372
try (ResultSet resultSet = getTables (jdbcConnection , databaseName )) {
372
- ImmutableList . Builder <TableName > list = ImmutableList . builder ();
373
+ HashSet <TableName > list = new HashSet < TableName > ();
373
374
while (resultSet .next ()) {
374
375
// checks if table name is internal table ex: .kibana
375
376
String tableName = resultSet .getString ("TABLE_NAME" );
376
- if (tableName .startsWith ("." )) {
377
- if (!tableName .matches (DATA_STREAM_REGEX )) {
378
- continue ;
379
- }
377
+ if (tableName .startsWith ("." ) && !tableName .matches (DATA_STREAM_REGEX )) {
378
+ continue ;
380
379
}
381
- // update schema from null because Opensearch doesn't have schema but change later
382
- // resultSet.updateString("TABLE_SCHEM", databaseName);
383
-
384
- // Temporary in schemaName
385
380
list .add (getSchemaTableName (resultSet , databaseName ));
386
381
}
387
- return list .build ();
382
+ return new ImmutableList . builder (). addAll ( list ) .build ();
388
383
}
389
384
}
390
385
@@ -393,6 +388,9 @@ private TableName getSchemaTableName(final ResultSet resultSet, String schemaNam
393
388
{
394
389
String tableName = resultSet .getString ("TABLE_NAME" );
395
390
String tableSchema = resultSet .getString ("TABLE_SCHEM" );
391
+ // Data streams follow the following naming convention: (.ds-)(<data-stream>)(-<generation>) where generation is a six-digit, zero-padded integer
392
+ // and each paranthesis signifies matching groups.
393
+ // Therefore I use the second matching group as table name because it represents the single resource name that stores time series data across multiple indices
396
394
Pattern pattern = Pattern .compile (DATA_STREAM_REGEX );
397
395
Matcher matcher = pattern .matcher (tableName );
398
396
if (matcher .find ()) {
0 commit comments