Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stored procedures schema and introspection (Incremental PR - I) #142

Merged
merged 3 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions crates/configuration/src/introspection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
//! Configuration and state for our connector.
use serde::Deserialize;

#[derive(Deserialize, Debug)]
pub struct IntrospectStoredProcedureArgument {
pub name: String,
pub r#type: String,
pub is_nullable: bool,
pub max_length: u8,
pub is_output: bool,
}

#[derive(Deserialize, Debug)]
pub struct IntrospectStoredProcedure {
pub schema: String,
pub name: String,
#[serde(default)]
pub arguments: Vec<IntrospectStoredProcedureArgument>,
}

#[derive(Deserialize, Debug)]
pub struct IntrospectionTable {
pub name: String,
Expand Down
78 changes: 74 additions & 4 deletions crates/configuration/src/version1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ use crate::secret::Secret;
use crate::{uri, ConnectionUri};

use query_engine_metadata::metadata;
use query_engine_metadata::metadata::stored_procedures::{
StoredProcedureArgumentInfo, StoredProcedureInfo, StoredProcedures,
};
use query_engine_metadata::metadata::{database, Nullable};

use query_engine_metrics::metrics;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
Expand All @@ -17,8 +21,13 @@ use std::collections::BTreeSet;
use thiserror::Error;
use tiberius::Query;

// TODO(KC): Move the `table_configuration.sql` to the `static` folder present
// in the root of this repo.
const TABLE_CONFIGURATION_QUERY: &str = include_str!("table_configuration.sql");

const STORED_PROCS_CONFIGURATION_QUERY: &str =
include_str!("../../../static/introspect_stored_procedures.sql");

const TYPES_QUERY: &str = "SELECT name FROM sys.types FOR JSON PATH";

const CURRENT_VERSION: u32 = 1;
Expand Down Expand Up @@ -192,9 +201,17 @@ pub async fn configure(

let type_names: Vec<TypeItem> = serde_json::from_str(types_row.get(0).unwrap()).unwrap();

metadata.comparison_operators = get_comparison_operators(&type_names).await;
metadata.comparison_operators = get_comparison_operators(&type_names);

metadata.aggregate_functions = get_aggregate_functions(&type_names);

let stored_procedures_row =
select_first_row(&mssql_pool, STORED_PROCS_CONFIGURATION_QUERY).await;

let stored_procedures: Vec<introspection::IntrospectStoredProcedure> =
serde_json::from_str(stored_procedures_row.get(0).unwrap()).unwrap();

metadata.aggregate_functions = get_aggregate_functions(&type_names).await;
metadata.stored_procedures = get_stored_procedures(stored_procedures);

Ok(RawConfiguration {
version: 1,
Expand All @@ -208,9 +225,45 @@ struct TypeItem {
name: database::ScalarType,
}

fn get_stored_procedures(
introspected_stored_procedures: Vec<introspection::IntrospectStoredProcedure>,
) -> query_engine_metadata::metadata::stored_procedures::StoredProcedures {
let mut metadata_stored_procs = BTreeMap::new();
for stored_procedure in introspected_stored_procedures.into_iter() {
let metadata_stored_procedure = StoredProcedureInfo {
name: stored_procedure.name.clone(),
schema: stored_procedure.schema,
arguments: stored_procedure
.arguments
.into_iter()
.map(|sp| -> (String, StoredProcedureArgumentInfo) {
(
sp.name.clone(),
StoredProcedureArgumentInfo {
name: sp.name,
r#type: query_engine_metadata::metadata::ScalarType(sp.r#type),
nullable: if sp.is_nullable {
Nullable::Nullable
} else {
Nullable::NonNullable
},
is_output: sp.is_output,
description: None,
},
)
})
.collect(),
returns: None,
description: None,
};
metadata_stored_procs.insert(stored_procedure.name, metadata_stored_procedure);
}
StoredProcedures(metadata_stored_procs)
}

// we lookup all types in sys.types, then use our hardcoded ideas about each one to attach
// aggregate functions
async fn get_aggregate_functions(type_names: &Vec<TypeItem>) -> database::AggregateFunctions {
fn get_aggregate_functions(type_names: &Vec<TypeItem>) -> database::AggregateFunctions {
let mut aggregate_functions = BTreeMap::new();

for type_name in type_names {
Expand Down Expand Up @@ -337,7 +390,7 @@ fn get_aggregate_functions_for_type(

// we lookup all types in sys.types, then use our hardcoded ideas about each one to attach
// comparison operators
async fn get_comparison_operators(type_names: &Vec<TypeItem>) -> database::ComparisonOperators {
fn get_comparison_operators(type_names: &Vec<TypeItem>) -> database::ComparisonOperators {
let mut comparison_operators = BTreeMap::new();

for type_name in type_names {
Expand Down Expand Up @@ -579,6 +632,21 @@ pub fn occurring_scalar_types(metadata: &metadata::Metadata) -> BTreeSet<metadat
.values()
.flat_map(|v| v.arguments.values().map(|c| c.r#type.clone()));

// TODO(KC): include types of the native mutations

let stored_procedures_argument_types = metadata
.stored_procedures
.0
.values()
.flat_map(|v| v.arguments.values().map(|c| c.r#type.clone()));

let stored_procedures_column_types = metadata
.stored_procedures
.0
.values()
.filter_map(|v| v.returns.as_ref())
.flat_map(|v| v.values().map(|c| c.r#type.clone()));

let aggregate_types = metadata
.aggregate_functions
.0
Expand All @@ -594,6 +662,8 @@ pub fn occurring_scalar_types(metadata: &metadata::Metadata) -> BTreeSet<metadat
tables_column_types
.chain(native_queries_column_types)
.chain(native_queries_arguments_types)
.chain(stored_procedures_argument_types)
.chain(stored_procedures_column_types)
.chain(aggregate_types)
.chain(comparison_operator_types)
.collect::<BTreeSet<metadata::ScalarType>>()
Expand Down
1 change: 1 addition & 0 deletions crates/ndc-sqlserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ serde_json = { version = "1.0.116", features = ["raw_value"] }
tokio = { version = "1.37.0", features = ["full"] }
tracing = "0.1.40"
prometheus = "0.13.3"
thiserror = "1.0.59"

[dev-dependencies]
ndc-test = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/ndc-sqlserver/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl connector::Connector for SQLServer {
async fn get_schema(
configuration: &Self::Configuration,
) -> Result<JsonResponse<models::SchemaResponse>, connector::SchemaError> {
schema::get_schema(configuration).await.map(Into::into)
schema::get_schema(configuration).map(Into::into)
}

/// Explain a query by creating an execution plan
Expand Down
Loading
Loading