Skip to content

Commit 167baf7

Browse files
authored
Fix: Do not normalize table names when deserializing from protobuf (#18187)
## Which issue does this PR close? Closes #18122 ## Rationale for this change Existing behavior is to use the `relation` field of `ColumnRelation` message to construct a `TableReference` ([mod.rs#L146][fromcol_proto], [mod.rs#L171][tryfrom_proto]). However, the `relation` field is a string and `From<String> for TableReference` always calls parse_identifiers_normalized with `ignore_case: False`, which always normalizes the identifier to lower case ([TableReference::parse_str][parse_str]). For a description of the bug at a bit of a higher level, see #18122. ## What changes are included in this PR? This PR introduces the following: 1. An implementation `From<protobuf::ColumnRelation>` and `From<&protobuf::ColumnRelation>` for `TableReference`. 2. Updated logic in `TryFrom<&protobuf::DFSchema>` for `DFSchema` and in `From<protobuf::Column>` for `Column` that correctly leads to the new `From` impls for `TableReference` to be invoked. 3. A new method, `TableReference::parse_str_normalized`, that parses an identifier without normalizing it, with some logic from `TableReference::parse_str` being refactored to accommodate code reuse. ## Are these changes tested? Commit a355196 adds a new test case, `roundtrip_mixed_case_table_reference`, that tests the desired behavior. The existing behavior (without the fix in 0616df2 and with the extra line `println!("{}", server_logical_plan.display_indent_schema());`): ``` cargo test "roundtrip_mixed_case_table_reference" --test proto_integration -- --nocapture Compiling datafusion-proto v48.0.1 (/Users/aldrinm/code/bauplanlabs/datafusion/octalene-datafusion/datafusion/proto) Finished `test` profile [unoptimized + debuginfo] target(s) in 1.56s Running tests/proto_integration.rs (target/debug/deps/proto_integration-775454d70979734b) running 1 test thread 'cases::roundtrip_logical_plan::roundtrip_mixed_case_table_reference' panicked at datafusion/proto/tests/cases/roundtrip_logical_plan.rs:2690:5: assertion `left == right` failed left: "Filter: TestData.a = Int64(1) [a:Int64;N]\n TableScan: TestData projection=[a], partial_filters=[TestData.a = Int64(1)] [a:Int64;N]" right: "Filter: testdata.a = Int64(1) [a:Int64;N]\n TableScan: TestData projection=[a], partial_filters=[testdata.a = Int64(1)] [a:Int64;N]" note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace test cases::roundtrip_logical_plan::roundtrip_mixed_case_table_reference ... FAILED failures: failures: cases::roundtrip_logical_plan::roundtrip_mixed_case_table_reference test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 112 filtered out; finished in 0.09s ``` With the fix implemented (0616df2): ``` running 1 test Filter: TestData.a = Int64(1) [a:Int64;N] TableScan: TestData projection=[a], partial_filters=[TestData.a = Int64(1)] [a:Int64;N] test cases::roundtrip_logical_plan::roundtrip_mixed_case_table_reference ... ok test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 112 filtered out; finished in 0.06s ``` ## Are there any user-facing changes? None. <!-- Resources --> [fromcol_proto]: https://github.com/apache/datafusion/blob/50.2.0/datafusion/proto-common/src/from_proto/mod.rs#L146 [tryfrom_proto]: https://github.com/apache/datafusion/blob/50.2.0/datafusion/proto-common/src/from_proto/mod.rs#L171 [parse_str]: https://github.com/apache/datafusion/blob/50.2.0/datafusion/common/src/table_reference.rs#L273
1 parent 9f23680 commit 167baf7

File tree

7 files changed

+96
-29
lines changed

7 files changed

+96
-29
lines changed

datafusion-cli/src/object_storage/instrumented.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub enum InstrumentedObjectStoreMode {
5858

5959
impl fmt::Display for InstrumentedObjectStoreMode {
6060
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61-
write!(f, "{:?}", self)
61+
write!(f, "{self:?}")
6262
}
6363
}
6464

@@ -426,7 +426,7 @@ pub enum Operation {
426426

427427
impl fmt::Display for Operation {
428428
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
429-
write!(f, "{:?}", self)
429+
write!(f, "{self:?}")
430430
}
431431
}
432432

@@ -556,11 +556,11 @@ impl RequestSummaries {
556556
let size_stats = s.size_stats.as_ref();
557557
let dur_avg = duration_stats.map(|d| {
558558
let avg = d.sum.as_secs_f32() / count;
559-
format!("{:.6}s", avg)
559+
format!("{avg:.6}s")
560560
});
561561
let size_avg = size_stats.map(|s| {
562562
let avg = s.sum as f32 / count;
563-
format!("{} B", avg)
563+
format!("{avg} B")
564564
});
565565
[dur_avg, size_avg]
566566
})

datafusion-cli/src/print_options.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ impl PrintOptions {
206206

207207
writeln!(writer, "Summaries:")?;
208208
let summaries = RequestSummaries::new(&requests);
209-
writeln!(writer, "{}", summaries)?;
209+
writeln!(writer, "{summaries}")?;
210210
}
211211
}
212212
}

datafusion/common/src/table_reference.rs

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -269,24 +269,41 @@ impl TableReference {
269269
}
270270

271271
/// Forms a [`TableReference`] by parsing `s` as a multipart SQL
272-
/// identifier. See docs on [`TableReference`] for more details.
272+
/// identifier, normalizing `s` to lowercase.
273+
/// See docs on [`TableReference`] for more details.
273274
pub fn parse_str(s: &str) -> Self {
274-
let mut parts = parse_identifiers_normalized(s, false);
275+
Self::parse_str_normalized(s, false)
276+
}
277+
278+
/// Forms a [`TableReference`] by parsing `s` as a multipart SQL
279+
/// identifier, normalizing `s` to lowercase if `ignore_case` is `false`.
280+
/// See docs on [`TableReference`] for more details.
281+
pub fn parse_str_normalized(s: &str, ignore_case: bool) -> Self {
282+
let table_parts = parse_identifiers_normalized(s, ignore_case);
275283

284+
Self::from_vec(table_parts).unwrap_or_else(|| Self::Bare { table: s.into() })
285+
}
286+
287+
/// Consume a vector of identifier parts to compose a [`TableReference`]. The input vector
288+
/// should contain 1 <= N <= 3 elements in the following sequence:
289+
/// ```no_rust
290+
/// [<catalog>, <schema>, table]
291+
/// ```
292+
fn from_vec(mut parts: Vec<String>) -> Option<Self> {
276293
match parts.len() {
277-
1 => Self::Bare {
278-
table: parts.remove(0).into(),
279-
},
280-
2 => Self::Partial {
281-
schema: parts.remove(0).into(),
282-
table: parts.remove(0).into(),
283-
},
284-
3 => Self::Full {
285-
catalog: parts.remove(0).into(),
286-
schema: parts.remove(0).into(),
287-
table: parts.remove(0).into(),
288-
},
289-
_ => Self::Bare { table: s.into() },
294+
1 => Some(Self::Bare {
295+
table: parts.pop()?.into(),
296+
}),
297+
2 => Some(Self::Partial {
298+
table: parts.pop()?.into(),
299+
schema: parts.pop()?.into(),
300+
}),
301+
3 => Some(Self::Full {
302+
table: parts.pop()?.into(),
303+
schema: parts.pop()?.into(),
304+
catalog: parts.pop()?.into(),
305+
}),
306+
_ => None,
290307
}
291308
}
292309

datafusion/common/src/utils/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,9 @@ pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
285285
Ok(idents)
286286
}
287287

288+
/// Parse a string into a vector of identifiers.
289+
///
290+
/// Note: If ignore_case is false, the string will be normalized to lowercase.
288291
#[cfg(feature = "sql")]
289292
pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
290293
parse_identifiers(s)

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,17 @@ where
138138
}
139139
}
140140

141+
impl From<protobuf::ColumnRelation> for TableReference {
142+
fn from(rel: protobuf::ColumnRelation) -> Self {
143+
Self::parse_str_normalized(rel.relation.as_str(), true)
144+
}
145+
}
146+
141147
impl From<protobuf::Column> for Column {
142148
fn from(c: protobuf::Column) -> Self {
143149
let protobuf::Column { relation, name } = c;
144150

145-
Self::new(relation.map(|r| r.relation), name)
151+
Self::new(relation, name)
146152
}
147153
}
148154

@@ -164,10 +170,7 @@ impl TryFrom<&protobuf::DfSchema> for DFSchema {
164170
.map(|df_field| {
165171
let field: Field = df_field.field.as_ref().required("field")?;
166172
Ok((
167-
df_field
168-
.qualifier
169-
.as_ref()
170-
.map(|q| q.relation.clone().into()),
173+
df_field.qualifier.as_ref().map(|q| q.clone().into()),
171174
Arc::new(field),
172175
))
173176
})

datafusion/proto/src/logical_plan/from_proto.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -315,8 +315,7 @@ pub fn parse_expr(
315315
let null_treatment = protobuf::NullTreatment::try_from(null_treatment)
316316
.map_err(|_| {
317317
proto_error(format!(
318-
"Received a WindowExprNode message with unknown NullTreatment {}",
319-
null_treatment
318+
"Received a WindowExprNode message with unknown NullTreatment {null_treatment}",
320319
))
321320
})?;
322321
Some(NullTreatment::from(null_treatment))
@@ -596,8 +595,7 @@ pub fn parse_expr(
596595
let null_treatment = protobuf::NullTreatment::try_from(null_treatment)
597596
.map_err(|_| {
598597
proto_error(format!(
599-
"Received an AggregateUdfExprNode message with unknown NullTreatment {}",
600-
null_treatment
598+
"Received an AggregateUdfExprNode message with unknown NullTreatment {null_treatment}",
601599
))
602600
})?;
603601
Some(NullTreatment::from(null_treatment))

datafusion/proto/tests/cases/roundtrip_logical_plan.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2804,3 +2804,49 @@ async fn roundtrip_arrow_scan() -> Result<()> {
28042804
assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
28052805
Ok(())
28062806
}
2807+
2808+
#[tokio::test]
2809+
async fn roundtrip_mixed_case_table_reference() -> Result<()> {
2810+
// Prepare "client" database
2811+
let client_ctx = SessionContext::new_with_config(
2812+
SessionConfig::new()
2813+
.set_bool("datafusion.sql_parser.enable_ident_normalization", false),
2814+
);
2815+
client_ctx
2816+
.register_csv(
2817+
"\"TestData\"",
2818+
"tests/testdata/test.csv",
2819+
CsvReadOptions::default(),
2820+
)
2821+
.await?;
2822+
2823+
// Prepare "server" database
2824+
let server_ctx = SessionContext::new_with_config(
2825+
SessionConfig::new()
2826+
.set_bool("datafusion.sql_parser.enable_ident_normalization", false),
2827+
);
2828+
server_ctx
2829+
.register_csv(
2830+
"\"TestData\"",
2831+
"tests/testdata/test.csv",
2832+
CsvReadOptions::default(),
2833+
)
2834+
.await?;
2835+
2836+
// Create a logical plan, serialize it (client), then deserialize it (server)
2837+
let dataframe = client_ctx
2838+
.sql("SELECT a FROM TestData WHERE TestData.a = 1")
2839+
.await?;
2840+
2841+
let client_logical_plan = dataframe.into_optimized_plan()?;
2842+
let plan_bytes = logical_plan_to_bytes(&client_logical_plan)?;
2843+
let server_logical_plan =
2844+
logical_plan_from_bytes(&plan_bytes, &server_ctx.task_ctx())?;
2845+
2846+
assert_eq!(
2847+
format!("{}", client_logical_plan.display_indent_schema()),
2848+
format!("{}", server_logical_plan.display_indent_schema())
2849+
);
2850+
2851+
Ok(())
2852+
}

0 commit comments

Comments
 (0)