Skip to content

Commit

Permalink
Merge pull request #594 from splitgraph/serde-schema-field-json
Browse files Browse the repository at this point in the history
Migrate unstable field json repr to a canonical one
  • Loading branch information
gruuya authored Aug 8, 2024
2 parents b89ab15 + c46f11d commit 5a77e9a
Show file tree
Hide file tree
Showing 13 changed files with 63 additions and 120 deletions.
3 changes: 3 additions & 0 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ pub enum CatalogError {
#[error("Internal SQL error: {0:?}")]
SqlxError(sqlx::Error),

#[error("Failed parsing JSON: {0}")]
SerdeJsonError(#[from] serde_json::Error),

#[error(transparent)]
TonicStatus(#[from] Status),

Expand Down
12 changes: 7 additions & 5 deletions src/catalog/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ pub struct RepositoryStore {

impl From<RepositoryError> for CatalogError {
fn from(err: RepositoryError) -> CatalogError {
CatalogError::SqlxError(match err {
RepositoryError::UniqueConstraintViolation(e) => e,
RepositoryError::FKConstraintViolation(e) => e,
RepositoryError::SqlxError(e) => e,
})
match err {
RepositoryError::UniqueConstraintViolation(e) => CatalogError::SqlxError(e),
RepositoryError::FKConstraintViolation(e) => CatalogError::SqlxError(e),
RepositoryError::SqlxError(e) => CatalogError::SqlxError(e),
RepositoryError::SerdeJsonError(e) => CatalogError::SerdeJsonError(e),
}
}
}

Expand Down Expand Up @@ -212,6 +213,7 @@ impl TableStore for RepositoryStore {
}
}
RepositoryError::SqlxError(e) => CatalogError::SqlxError(e),
RepositoryError::SerdeJsonError(e) => CatalogError::SerdeJsonError(e),
})
}

Expand Down
7 changes: 3 additions & 4 deletions src/frontend/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use arrow::record_batch::RecordBatch;
#[cfg(feature = "frontend-arrow-flight")]
use arrow_flight::flight_service_client::FlightServiceClient;

use arrow_integration_test::{schema_from_json, schema_to_json};
use arrow_schema::SchemaRef;
use bytes::Buf;

Expand Down Expand Up @@ -107,7 +106,7 @@ fn plan_to_etag(plan: &LogicalPlan) -> String {

// Construct a content-type header value that also includes schema information.
fn content_type_with_schema(schema: SchemaRef) -> HeaderValue {
let schema_string = schema_to_json(schema.as_ref()).to_string();
let schema_string = serde_json::to_string(&schema).unwrap();
let output = utf8_percent_encode(&schema_string, NON_ALPHANUMERIC);

HeaderValue::from_str(format!("application/json; arrow-schema={output}").as_str())
Expand Down Expand Up @@ -415,8 +414,8 @@ pub async fn upload(
let value_bytes = load_part(part).await?;

schema = Some(Arc::new(
schema_from_json(
&serde_json::from_slice::<serde_json::Value>(value_bytes.as_slice())
serde_json::from_str(
&String::from_utf8(value_bytes)
.map_err(ApiError::UploadSchemaDeserializationError)?,
)
.map_err(ApiError::UploadSchemaParseError)?,
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/http_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
// ```
//
// (maybe we need a recover for every route to minimize the amount of back and forth with Warp?)
use std::string::FromUtf8Error;

use datafusion::error::DataFusionError;

use warp::hyper::{Body, Response, StatusCode};
Expand All @@ -63,8 +65,8 @@ pub enum ApiError {
UploadMissingFile,
UploadMissingFilename,
UploadMissingFilenameExtension(String),
UploadSchemaDeserializationError(serde_json::Error),
UploadSchemaParseError(arrow::error::ArrowError),
UploadSchemaDeserializationError(FromUtf8Error),
UploadSchemaParseError(serde_json::Error),
UploadFileLoadError(Box<dyn std::error::Error + Send + Sync>),
UploadBodyLoadError(warp::Error),
UploadHasHeaderParseError,
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ pub mod nodes;
pub mod object_store;
pub mod provider;
pub mod repository;
pub mod schema;
pub mod system_tables;
pub mod utils;
pub mod version;
Expand Down
37 changes: 35 additions & 2 deletions src/repository/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,39 @@ macro_rules! implement_repository {
#[async_trait]
impl Repository for $repo {
async fn setup(&self) {
// TODO remove the entire block a couple of releases past 0.5.7
{
// Migrate the old field JSON format to the canonical one
#[derive(sqlx::FromRow)]
struct ColumnSchema {
id: i64,
r#type: String,
}

// Fetch all rows from the table
let maybe_rows: Result<Vec<ColumnSchema>, sqlx::Error> = sqlx::query_as::<_, ColumnSchema>("SELECT id, type FROM table_column")
.fetch_all(&self.executor)
.await;
if let Ok(rows) = maybe_rows {
for row in rows {
// Load the `type` value using `arrow_integration_test::field_from_json`
if let Ok(value) = serde_json::from_str::<serde_json::Value>(row.r#type.as_str()) {
if let Ok(field) = arrow_integration_test::field_from_json(&value) {
// Convert the `Field` to a `serde_json` representation
if let Ok(field_json) = serde_json::to_string(&field) {
// Update the table with the new `type` value
let _ = sqlx::query("UPDATE table_column SET type = $1 WHERE id = $2")
.bind(field_json)
.bind(row.id)
.execute(&self.executor)
.await;
}
}
}
}
}
}

$repo::MIGRATOR
.run(&self.executor)
.await
Expand Down Expand Up @@ -219,8 +252,8 @@ impl Repository for $repo {

let fields: Vec<(String, String)> = schema.fields()
.iter()
.map(|f| (f.name().clone(), field_to_json(f).to_string()))
.collect();
.map(|f| Ok((f.name().clone(), serde_json::to_string(&f)?)))
.collect::<Result<_>>()?;

builder.push_values(fields, |mut b, col| {
b.push_bind(new_version_id)
Expand Down
13 changes: 9 additions & 4 deletions src/repository/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,19 @@ pub struct AllDatabaseFunctionsResult {
}

/// Wrapper for conversion of database-specific error codes into actual errors
#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
UniqueConstraintViolation(sqlx::Error),
#[error(transparent)]
FKConstraintViolation(sqlx::Error),

// All other errors
#[error(transparent)]
SqlxError(sqlx::Error),

#[error("Failed parsing JSON: {0}")]
SerdeJsonError(#[from] serde_json::Error),
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -317,7 +323,7 @@ pub mod tests {
table_uuid: Some(Uuid::default()),
table_version_id: Some(version),
column_name: Some("date".to_string()),
column_type: Some("{\"children\":[],\"name\":\"date\",\"nullable\":false,\"type\":{\"name\":\"date\",\"unit\":\"MILLISECOND\"}}".to_string()),
column_type: Some("{\"name\":\"date\",\"data_type\":\"Date64\",\"nullable\":false,\"dict_id\":0,\"dict_is_ordered\":false,\"metadata\":{}}".to_string()),
},
AllDatabaseColumnsResult {
database_name,
Expand All @@ -327,8 +333,7 @@ pub mod tests {
table_uuid: Some(Uuid::default()),
table_version_id: Some(version),
column_name: Some("value".to_string()),
column_type: Some("{\"children\":[],\"name\":\"value\",\"nullable\":false,\"type\":{\"name\":\"floatingpoint\",\"precision\":\"DOUBLE\"}}"
.to_string()),
column_type: Some("{\"name\":\"value\",\"data_type\":\"Float64\",\"nullable\":false,\"dict_id\":0,\"dict_is_ordered\":false,\"metadata\":{}}".to_string()),
},
]
}
Expand Down
1 change: 0 additions & 1 deletion src/repository/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{fmt::Debug, time::Duration};

use arrow_integration_test::field_to_json;
use arrow_schema::Schema;
use async_trait::async_trait;
use futures::TryStreamExt;
Expand Down
1 change: 0 additions & 1 deletion src/repository/sqlite.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{fmt::Debug, str::FromStr};

use arrow_integration_test::field_to_json;
use arrow_schema::Schema;
use async_trait::async_trait;
use futures::TryStreamExt;
Expand Down
93 changes: 0 additions & 93 deletions src/schema.rs

This file was deleted.

6 changes: 1 addition & 5 deletions src/testutils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};

use arrow::record_batch::RecordBatch;
use arrow_integration_test::schema_from_json;
use datafusion::parquet::arrow::ArrowWriter;
use futures::TryStreamExt;
use itertools::Itertools;
Expand Down Expand Up @@ -155,10 +154,7 @@ pub fn schema_from_header(headers: &HeaderMap<HeaderValue>) -> Schema {
.decode_utf8()
.expect("escaped schema decodable")
.to_string();
let schema_json = serde_json::from_str::<serde_json::Value>(schema_str.as_str())
.expect("decoded schema is valid JSON");

schema_from_json(&schema_json).expect("arrow schema reconstructable from JSON")
serde_json::from_str(schema_str.as_str()).expect("decoded schema is valid JSON")
}

pub fn assert_header_is_float(header: &HeaderValue) -> bool {
Expand Down
1 change: 0 additions & 1 deletion tests/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use arrow::array::{
use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow_integration_test::schema_to_json;
use arrow_schema::TimeUnit;
use datafusion::assert_batches_eq;
use datafusion::parquet::arrow::ArrowWriter;
Expand Down
2 changes: 1 addition & 1 deletion tests/http/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn test_upload_base(

// For CSV uploads we can supply the schema as another part of the multipart request, to
// remove the ambiguity resulting from automatic schema inference
let schema_json = schema_to_json(schema.as_ref()).to_string();
let schema_json = serde_json::to_string(&schema).unwrap();

let range = 0..2000;
let mut input_batch = RecordBatch::try_new(
Expand Down

0 comments on commit 5a77e9a

Please sign in to comment.