Skip to content

Commit

Permalink
Stored procedures schema and introspection (Incremental PR - I) (#142)
Browse files Browse the repository at this point in the history
<!-- The PR description should answer 2 (maybe 3) important questions:
-->

### What

<!-- What is this PR trying to accomplish (and why, if it's not
obvious)? -->

This PR adds introspection and schema support for stored procedures in
MSSQL. This PR is the first incremental PR towards adding stored
procedures support.

### How

<!-- How is it trying to accomplish it (what are the implementation
steps)? -->

### Introspection

A database query is made to get all the metadata about stored procedures
present in the database. As it is a stored procedure, there is no return
type tied to it, so we leave the `returns` part empty and we expect the
user to fill in the appropriate schema.

### Schema

We do schema generation for a stored procedure, only if it has a
`returns` associated with it. Otherwise, we won't be able to generate a
graphql schema for the stored procedure.
  • Loading branch information
codingkarthik committed Jul 2, 2024
1 parent 7f7efa7 commit 5602bb4
Show file tree
Hide file tree
Showing 13 changed files with 539 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions crates/configuration/src/introspection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
//! Configuration and state for our connector.
use serde::Deserialize;

#[derive(Deserialize, Debug)]
pub struct IntrospectStoredProcedureArgument {
pub name: String,
pub r#type: String,
pub is_nullable: bool,
pub max_length: u8,
pub is_output: bool,
}

#[derive(Deserialize, Debug)]
pub struct IntrospectStoredProcedure {
pub schema: String,
pub name: String,
#[serde(default)]
pub arguments: Vec<IntrospectStoredProcedureArgument>,
}

#[derive(Deserialize, Debug)]
pub struct IntrospectionTable {
pub name: String,
Expand Down
78 changes: 74 additions & 4 deletions crates/configuration/src/version1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ use crate::secret::Secret;
use crate::{uri, ConnectionUri};

use query_engine_metadata::metadata;
use query_engine_metadata::metadata::stored_procedures::{
StoredProcedureArgumentInfo, StoredProcedureInfo, StoredProcedures,
};
use query_engine_metadata::metadata::{database, Nullable};

use query_engine_metrics::metrics;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
Expand All @@ -17,8 +21,13 @@ use std::collections::BTreeSet;
use thiserror::Error;
use tiberius::Query;

// TODO(KC): Move the `table_configuration.sql` to the `static` folder present
// in the root of this repo.
const TABLE_CONFIGURATION_QUERY: &str = include_str!("table_configuration.sql");

const STORED_PROCS_CONFIGURATION_QUERY: &str =
include_str!("../../../static/introspect_stored_procedures.sql");

const TYPES_QUERY: &str = "SELECT name FROM sys.types FOR JSON PATH";

const CURRENT_VERSION: u32 = 1;
Expand Down Expand Up @@ -192,9 +201,17 @@ pub async fn configure(

let type_names: Vec<TypeItem> = serde_json::from_str(types_row.get(0).unwrap()).unwrap();

metadata.comparison_operators = get_comparison_operators(&type_names).await;
metadata.comparison_operators = get_comparison_operators(&type_names);

metadata.aggregate_functions = get_aggregate_functions(&type_names);

let stored_procedures_row =
select_first_row(&mssql_pool, STORED_PROCS_CONFIGURATION_QUERY).await;

let stored_procedures: Vec<introspection::IntrospectStoredProcedure> =
serde_json::from_str(stored_procedures_row.get(0).unwrap()).unwrap();

metadata.aggregate_functions = get_aggregate_functions(&type_names).await;
metadata.stored_procedures = get_stored_procedures(stored_procedures);

Ok(RawConfiguration {
version: 1,
Expand All @@ -208,9 +225,45 @@ struct TypeItem {
name: database::ScalarType,
}

fn get_stored_procedures(
introspected_stored_procedures: Vec<introspection::IntrospectStoredProcedure>,
) -> query_engine_metadata::metadata::stored_procedures::StoredProcedures {
let mut metadata_stored_procs = BTreeMap::new();
for stored_procedure in introspected_stored_procedures.into_iter() {
let metadata_stored_procedure = StoredProcedureInfo {
name: stored_procedure.name.clone(),
schema: stored_procedure.schema,
arguments: stored_procedure
.arguments
.into_iter()
.map(|sp| -> (String, StoredProcedureArgumentInfo) {
(
sp.name.clone(),
StoredProcedureArgumentInfo {
name: sp.name,
r#type: query_engine_metadata::metadata::ScalarType(sp.r#type),
nullable: if sp.is_nullable {
Nullable::Nullable
} else {
Nullable::NonNullable
},
is_output: sp.is_output,
description: None,
},
)
})
.collect(),
returns: None,
description: None,
};
metadata_stored_procs.insert(stored_procedure.name, metadata_stored_procedure);
}
StoredProcedures(metadata_stored_procs)
}

// we lookup all types in sys.types, then use our hardcoded ideas about each one to attach
// aggregate functions
async fn get_aggregate_functions(type_names: &Vec<TypeItem>) -> database::AggregateFunctions {
fn get_aggregate_functions(type_names: &Vec<TypeItem>) -> database::AggregateFunctions {
let mut aggregate_functions = BTreeMap::new();

for type_name in type_names {
Expand Down Expand Up @@ -337,7 +390,7 @@ fn get_aggregate_functions_for_type(

// we lookup all types in sys.types, then use our hardcoded ideas about each one to attach
// comparison operators
async fn get_comparison_operators(type_names: &Vec<TypeItem>) -> database::ComparisonOperators {
fn get_comparison_operators(type_names: &Vec<TypeItem>) -> database::ComparisonOperators {
let mut comparison_operators = BTreeMap::new();

for type_name in type_names {
Expand Down Expand Up @@ -579,6 +632,21 @@ pub fn occurring_scalar_types(metadata: &metadata::Metadata) -> BTreeSet<metadat
.values()
.flat_map(|v| v.arguments.values().map(|c| c.r#type.clone()));

// TODO(KC): include types of the native mutations

let stored_procedures_argument_types = metadata
.stored_procedures
.0
.values()
.flat_map(|v| v.arguments.values().map(|c| c.r#type.clone()));

let stored_procedures_column_types = metadata
.stored_procedures
.0
.values()
.filter_map(|v| v.returns.as_ref())
.flat_map(|v| v.values().map(|c| c.r#type.clone()));

let aggregate_types = metadata
.aggregate_functions
.0
Expand All @@ -594,6 +662,8 @@ pub fn occurring_scalar_types(metadata: &metadata::Metadata) -> BTreeSet<metadat
tables_column_types
.chain(native_queries_column_types)
.chain(native_queries_arguments_types)
.chain(stored_procedures_argument_types)
.chain(stored_procedures_column_types)
.chain(aggregate_types)
.chain(comparison_operator_types)
.collect::<BTreeSet<metadata::ScalarType>>()
Expand Down
1 change: 1 addition & 0 deletions crates/ndc-sqlserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ serde_json = { version = "1.0.116", features = ["raw_value"] }
tokio = { version = "1.37.0", features = ["full"] }
tracing = "0.1.40"
prometheus = "0.13.3"
thiserror = "1.0.59"

[dev-dependencies]
ndc-test = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/ndc-sqlserver/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl connector::Connector for SQLServer {
async fn get_schema(
configuration: &Self::Configuration,
) -> Result<JsonResponse<models::SchemaResponse>, connector::SchemaError> {
schema::get_schema(configuration).await.map(Into::into)
schema::get_schema(configuration).map(Into::into)
}

/// Explain a query by creating an execution plan
Expand Down
Loading

0 comments on commit 5602bb4

Please sign in to comment.