Skip to content

Commit

Permalink
Refactoring of SparkQueryDispatcher by removing unnecessary class
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsi-amazon committed Apr 15, 2024
1 parent 2649200 commit 22ee379
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 27 deletions.
3 changes: 2 additions & 1 deletion spark/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ CLUSTER: 'CLUSTER';
CLUSTERED: 'CLUSTERED';
CODEGEN: 'CODEGEN';
COLLATE: 'COLLATE';
COLLATION: 'COLLATION';
COLLECTION: 'COLLECTION';
COLUMN: 'COLUMN';
COLUMNS: 'COLUMNS';
Expand Down Expand Up @@ -554,7 +555,7 @@ BRACKETED_COMMENT
;

WS
: [ \r\n\t]+ -> channel(HIDDEN)
: [ \t\n\f\r\u000B\u00A0\u1680\u2000\u2001\u2002\u2003\u2004\u2005\u2006\u2007\u2008\u2009\u200A\u2028\u202F\u205F\u3000]+ -> channel(HIDDEN)
;

// Catch-all for anything we can't recognize.
Expand Down
20 changes: 11 additions & 9 deletions spark/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ statement
| ctes? dmlStatementNoWith #dmlStatement
| USE identifierReference #use
| USE namespace identifierReference #useNamespace
| SET CATALOG (identifier | stringLit) #setCatalog
| SET CATALOG (errorCapturingIdentifier | stringLit) #setCatalog
| CREATE namespace (IF NOT EXISTS)? identifierReference
(commentSpec |
locationSpec |
Expand Down Expand Up @@ -210,6 +210,7 @@ statement
| (MSCK)? REPAIR TABLE identifierReference
(option=(ADD|DROP|SYNC) PARTITIONS)? #repairTable
| op=(ADD | LIST) identifier .*? #manageResource
| SET COLLATION collationName=identifier #setCollation
| SET ROLE .*? #failNativeCommand
| SET TIME ZONE interval #setTimeZone
| SET TIME ZONE timezone #setTimeZone
Expand Down Expand Up @@ -392,7 +393,7 @@ describeFuncName
;

describeColName
: nameParts+=identifier (DOT nameParts+=identifier)*
: nameParts+=errorCapturingIdentifier (DOT nameParts+=errorCapturingIdentifier)*
;

ctes
Expand Down Expand Up @@ -429,7 +430,7 @@ property
;

propertyKey
: identifier (DOT identifier)*
: errorCapturingIdentifier (DOT errorCapturingIdentifier)*
| stringLit
;

Expand Down Expand Up @@ -683,18 +684,18 @@ pivotClause
;

pivotColumn
: identifiers+=identifier
| LEFT_PAREN identifiers+=identifier (COMMA identifiers+=identifier)* RIGHT_PAREN
: identifiers+=errorCapturingIdentifier
| LEFT_PAREN identifiers+=errorCapturingIdentifier (COMMA identifiers+=errorCapturingIdentifier)* RIGHT_PAREN
;

pivotValue
: expression (AS? identifier)?
: expression (AS? errorCapturingIdentifier)?
;

unpivotClause
: UNPIVOT nullOperator=unpivotNullClause? LEFT_PAREN
operator=unpivotOperator
RIGHT_PAREN (AS? identifier)?
RIGHT_PAREN (AS? errorCapturingIdentifier)?
;

unpivotNullClause
Expand Down Expand Up @@ -736,7 +737,7 @@ unpivotColumn
;

unpivotAlias
: AS? identifier
: AS? errorCapturingIdentifier
;

lateralView
Expand Down Expand Up @@ -1188,7 +1189,7 @@ complexColTypeList
;

complexColType
: identifier COLON? dataType (NOT NULL)? commentSpec?
: errorCapturingIdentifier COLON? dataType (NOT NULL)? commentSpec?
;

whenClause
Expand Down Expand Up @@ -1662,6 +1663,7 @@ nonReserved
| CLUSTERED
| CODEGEN
| COLLATE
| COLLATION
| COLLECTION
| COLUMN
| COLUMNS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.opensearch.client.Client;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClient;
Expand Down Expand Up @@ -44,8 +43,6 @@ public class SparkQueryDispatcher {

private DataSourceService dataSourceService;

private DataSourceUserAuthorizationHelperImpl dataSourceUserAuthorizationHelper;

private JobExecutionResponseReader jobExecutionResponseReader;

private FlintIndexMetadataService flintIndexMetadataService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.common.inject.Singleton;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
import org.opensearch.sql.legacy.metrics.GaugeMetric;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
Expand Down Expand Up @@ -68,7 +67,6 @@ public StateStore stateStore(NodeClient client, ClusterService clusterService) {
public SparkQueryDispatcher sparkQueryDispatcher(
EMRServerlessClientFactory emrServerlessClientFactory,
DataSourceService dataSourceService,
DataSourceUserAuthorizationHelperImpl dataSourceUserAuthorizationHelper,
JobExecutionResponseReader jobExecutionResponseReader,
FlintIndexMetadataServiceImpl flintIndexMetadataReader,
NodeClient client,
Expand All @@ -78,7 +76,6 @@ public SparkQueryDispatcher sparkQueryDispatcher(
return new SparkQueryDispatcher(
emrServerlessClientFactory,
dataSourceService,
dataSourceUserAuthorizationHelper,
jobExecutionResponseReader,
flintIndexMetadataReader,
client,
Expand Down Expand Up @@ -123,12 +120,6 @@ public JobExecutionResponseReader jobExecutionResponseReader(NodeClient client)
return new JobExecutionResponseReader(client);
}

@Provides
public DataSourceUserAuthorizationHelperImpl dataSourceUserAuthorizationHelper(
NodeClient client) {
return new DataSourceUserAuthorizationHelperImpl(client);
}

private void registerStateStoreMetrics(StateStore stateStore) {
GaugeMetric<Long> activeSessionMetric =
new GaugeMetric<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
import org.opensearch.sql.datasources.encryptor.EncryptorImpl;
import org.opensearch.sql.datasources.glue.GlueDataSourceFactory;
import org.opensearch.sql.datasources.service.DataSourceMetadataStorage;
Expand Down Expand Up @@ -205,7 +204,6 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService(
new SparkQueryDispatcher(
emrServerlessClientFactory,
this.dataSourceService,
new DataSourceUserAuthorizationHelperImpl(client),
jobExecutionResponseReader,
new FlintIndexMetadataServiceImpl(client),
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClient;
Expand Down Expand Up @@ -86,7 +85,6 @@ public class SparkQueryDispatcherTest {
@Mock private EMRServerlessClientFactory emrServerlessClientFactory;
@Mock private DataSourceService dataSourceService;
@Mock private JobExecutionResponseReader jobExecutionResponseReader;
@Mock private DataSourceUserAuthorizationHelperImpl dataSourceUserAuthorizationHelper;
@Mock private FlintIndexMetadataService flintIndexMetadataService;

@Mock(answer = RETURNS_DEEP_STUBS)
Expand Down Expand Up @@ -116,7 +114,6 @@ void setUp() {
new SparkQueryDispatcher(
emrServerlessClientFactory,
dataSourceService,
dataSourceUserAuthorizationHelper,
jobExecutionResponseReader,
flintIndexMetadataService,
openSearchClient,
Expand Down

0 comments on commit 22ee379

Please sign in to comment.