Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/formats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ match-template = { workspace = true }
micromarshal = { workspace = true }
num = { workspace = true }
num-traits = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }

[dev-dependencies]
Expand Down
44 changes: 37 additions & 7 deletions src/query/formats/src/field_decoder/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ use databend_common_io::parse_bitmap;
use databend_common_io::parse_bytes_to_ewkb;
use jsonb::parse_owned_jsonb_with_buf;
use lexical_core::FromLexical;
use serde::Deserialize;
use serde_json::Deserializer;
use serde_json::value::RawValue;

use crate::FileFormatOptionsExt;
use crate::InputCommonSettings;
Expand Down Expand Up @@ -209,7 +212,7 @@ impl NestedValues {
column: &mut StringColumnBuilder,
reader: &mut Cursor<R>,
) -> Result<()> {
reader.read_quoted_text(&mut column.row_buffer, b'\'')?;
self.read_string_inner(reader, &mut column.row_buffer)?;
column.commit_row();
Ok(())
}
Expand All @@ -220,7 +223,7 @@ impl NestedValues {
reader: &mut Cursor<R>,
) -> Result<()> {
let mut buf = Vec::new();
reader.read_quoted_text(&mut buf, b'\'')?;
self.read_string_inner(reader, &mut buf)?;
let decoded = decode_binary(&buf, self.common_settings.binary_format)?;
column.put_slice(&decoded);
column.commit_row();
Expand All @@ -232,7 +235,10 @@ impl NestedValues {
reader: &mut Cursor<R>,
out_buf: &mut Vec<u8>,
) -> Result<()> {
reader.read_quoted_text(out_buf, b'\'')?;
if reader.read_quoted_text(out_buf, b'"').is_err() {
// Read single quoted text, compatible with previous implementations
reader.read_quoted_text(out_buf, b'\'')?;
}
Ok(())
}

Expand Down Expand Up @@ -320,8 +326,13 @@ impl NestedValues {
column: &mut BinaryColumnBuilder,
reader: &mut Cursor<R>,
) -> Result<()> {
let mut buf = Vec::new();
self.read_string_inner(reader, &mut buf)?;
let buf = if let Ok(val) = self.read_json(reader) {
val.as_bytes().to_vec()
} else {
let mut buf = Vec::new();
reader.read_quoted_text(&mut buf, b'\'')?;
buf
};
match parse_owned_jsonb_with_buf(&buf, &mut column.data) {
Ok(_) => {
column.commit_row();
Expand All @@ -343,7 +354,12 @@ impl NestedValues {
reader: &mut Cursor<R>,
) -> Result<()> {
let mut buf = Vec::new();
self.read_string_inner(reader, &mut buf)?;
if reader.read_quoted_text(&mut buf, b'"').is_err()
&& reader.read_quoted_text(&mut buf, b'\'').is_err()
{
let val = self.read_json(reader)?;
buf = val.as_bytes().to_vec();
}
let geom = parse_bytes_to_ewkb(&buf, None)?;
column.put_slice(geom.as_bytes());
column.commit_row();
Expand All @@ -356,13 +372,27 @@ impl NestedValues {
reader: &mut Cursor<R>,
) -> Result<()> {
let mut buf = Vec::new();
self.read_string_inner(reader, &mut buf)?;
if reader.read_quoted_text(&mut buf, b'"').is_err()
&& reader.read_quoted_text(&mut buf, b'\'').is_err()
{
let val = self.read_json(reader)?;
buf = val.as_bytes().to_vec();
}
let geog = geography_from_ewkt_bytes(&buf)?;
column.put_slice(geog.as_bytes());
column.commit_row();
Ok(())
}

fn read_json<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<String> {
let start = reader.position() as usize;
let data = reader.get_ref().as_ref();
let mut deserializer = Deserializer::from_slice(&data[start..]);
let raw: Box<RawValue> = Box::<RawValue>::deserialize(&mut deserializer)?;
reader.set_position((start + raw.get().len()) as u64);
Ok(raw.to_string())
}

fn read_nullable<R: AsRef<[u8]>>(
&self,
column: &mut NullableColumnBuilder<AnyType>,
Expand Down
6 changes: 4 additions & 2 deletions src/query/formats/src/field_encoder/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ impl FieldEncoderCSV {
binary_format: params.binary_format,
geometry_format: params.geometry_format,
},
quote_char: 0, // not used
escape_char: 0, // not used
quote_char: 0, // not used
},
string_formatter: StringFormatter::Csv {
quote_char: params.quote.as_bytes()[0],
Expand All @@ -116,7 +117,8 @@ impl FieldEncoderCSV {
binary_format: Default::default(),
geometry_format: Default::default(),
},
quote_char: 0, // not used
escape_char: 0, // not used
quote_char: 0, // not used
},
string_formatter: StringFormatter::Tsv {
record_delimiter: params.field_delimiter.as_bytes().to_vec()[0],
Expand Down
4 changes: 2 additions & 2 deletions src/query/formats/src/field_encoder/helpers/escape.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ static ESCAPE: [u8; 256] = [
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // F
];

pub fn write_quoted_string(bytes: &[u8], buf: &mut Vec<u8>, quote: u8) {
pub fn write_quoted_string(bytes: &[u8], buf: &mut Vec<u8>, escape: u8, quote: u8) {
let mut start = 0;

for (i, &byte) in bytes.iter().enumerate() {
if byte == quote {
if start < i {
buf.extend_from_slice(&bytes[start..i]);
}
buf.push(quote);
buf.push(escape);
buf.push(quote);
start = i + 1;
}
Expand Down
1 change: 1 addition & 0 deletions src/query/formats/src/field_encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl FieldEncoderJSON {
binary_format: Default::default(),
geometry_format: Default::default(),
},
escape_char: 0,
quote_char: 0,
},
quote_denormals: false,
Expand Down
43 changes: 28 additions & 15 deletions src/query/formats/src/field_encoder/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use crate::field_encoder::helpers::write_quoted_string;

pub struct FieldEncoderValues {
pub common_settings: OutputCommonSettings,
pub escape_char: u8,
pub quote_char: u8,
}

Expand All @@ -78,7 +79,8 @@ impl FieldEncoderValues {
binary_format: Default::default(),
geometry_format: Default::default(),
},
quote_char: b'\'',
escape_char: b'"',
quote_char: b'"',
}
}

Expand All @@ -99,7 +101,8 @@ impl FieldEncoderValues {
binary_format: Default::default(),
geometry_format,
},
quote_char: b'\'',
escape_char: b'\\',
quote_char: b'"',
}
}

Expand All @@ -124,7 +127,8 @@ impl FieldEncoderValues {
binary_format: Default::default(),
geometry_format,
},
quote_char: b'\'',
escape_char: b'\\',
quote_char: b'"',
}
}

Expand Down Expand Up @@ -163,7 +167,7 @@ impl FieldEncoderValues {
Column::Timestamp(c) => self.write_timestamp(c, row_index, out_buf, in_nested),
Column::TimestampTz(c) => self.write_timestamp_tz(c, row_index, out_buf, in_nested),
Column::Bitmap(b) => self.write_bitmap(b, row_index, out_buf, in_nested),
Column::Variant(c) => self.write_variant(c, row_index, out_buf, in_nested),
Column::Variant(c) => self.write_variant(c, row_index, out_buf),
Column::Geometry(c) => self.write_geometry(c, row_index, out_buf, in_nested),
Column::Geography(c) => self.write_geography(c, row_index, out_buf, in_nested),

Expand All @@ -186,12 +190,13 @@ impl FieldEncoderValues {
// so we do not expect the scalar literal to be used in sql.
// it is better to keep it simple: minimal escape.
// it make result easier to decode csv, tsv and http handler result.
write_quoted_string(in_buf, out_buf, self.quote_char);
write_quoted_string(in_buf, out_buf, self.escape_char, self.quote_char);
out_buf.push(self.quote_char);
} else {
out_buf.extend_from_slice(in_buf);
}
}

fn write_bool(&self, column: &Bitmap, row_index: usize, out_buf: &mut Vec<u8>) {
let v = if column.get_bit(row_index) {
&self.common_settings().true_bytes
Expand Down Expand Up @@ -328,16 +333,10 @@ impl FieldEncoderValues {
self.write_string_inner(bitmap_result, out_buf, in_nested);
}

fn write_variant(
&self,
column: &BinaryColumn,
row_index: usize,
out_buf: &mut Vec<u8>,
in_nested: bool,
) {
fn write_variant(&self, column: &BinaryColumn, row_index: usize, out_buf: &mut Vec<u8>) {
let v = unsafe { column.index_unchecked(row_index) };
let s = RawJsonb::new(v).to_string();
self.write_string_inner(s.as_bytes(), out_buf, in_nested);
out_buf.extend_from_slice(s.as_bytes());
}

fn write_geometry(
Expand All @@ -360,7 +359,14 @@ impl FieldEncoderValues {
})
.unwrap_or_else(|_| v.to_vec());

self.write_string_inner(&s, out_buf, in_nested);
match self.common_settings().geometry_format {
GeometryDataType::GEOJSON => {
out_buf.extend_from_slice(&s);
}
_ => {
self.write_string_inner(&s, out_buf, in_nested);
}
}
}

fn write_geography(
Expand All @@ -383,7 +389,14 @@ impl FieldEncoderValues {
})
.unwrap_or_else(|_| v.0.to_vec());

self.write_string_inner(&s, out_buf, in_nested);
match self.common_settings().geometry_format {
GeometryDataType::GEOJSON => {
out_buf.extend_from_slice(&s);
}
_ => {
self.write_string_inner(&s, out_buf, in_nested);
}
}
}

fn write_array(&self, column: &ArrayColumn<AnyType>, row_index: usize, out_buf: &mut Vec<u8>) {
Expand Down
3 changes: 2 additions & 1 deletion src/tests/sqlsmith/src/sql_gen/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,8 @@ impl<'a, R: Rng + 'a> SqlGenerator<'a, R> {
binary_format: Default::default(),
geometry_format: Default::default(),
},
quote_char: b'\'',
escape_char: b'\\',
quote_char: b'"',
};

for i in 0..row_count {
Expand Down
2 changes: 1 addition & 1 deletion tests/nox/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os


PYTHON_DRIVER = ["0.28.1", "0.28.2"]
PYTHON_DRIVER = ["0.33.1"]


@nox.session
Expand Down
4 changes: 2 additions & 2 deletions tests/nox/python_client/test_data_type.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime, date, timedelta
from datetime import datetime, date, timedelta, timezone
from decimal import Decimal
from databend_driver import BlockingDatabendClient

Expand Down Expand Up @@ -37,6 +37,6 @@ def test_data_types():

# Tuple
row = conn.query_row("select (10, '20', to_datetime('2024-04-16 12:34:56.789'))")
assert row.values() == ((10, "20", datetime(2024, 4, 16, 12, 34, 56, 789000)),), (
assert row.values() == ((10, "20", datetime(2024, 4, 16, 12, 34, 56, 789000, tzinfo=timezone.utc)),), (
f"Tuple: {row.values()}"
)
20 changes: 10 additions & 10 deletions tests/nox/python_client/test_local.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from databend_driver import BlockingDatabendClient
from datetime import datetime, date
from datetime import datetime, date, timezone

from .utils import DATABEND_DSL

Expand Down Expand Up @@ -31,9 +31,9 @@ def test_insert():
rows = conn.query_iter("SELECT * FROM test")
ret = [row.values() for row in rows]
expected = [
(-1, 1, 1.0, "'", None, date(2011, 3, 6), datetime(2011, 3, 6, 6, 20)),
(-2, 2, 2.0, '"', "", date(2012, 5, 31), datetime(2012, 5, 31, 11, 20)),
(-3, 3, 3.0, "\\", "NULL", date(2016, 4, 4), datetime(2016, 4, 4, 11, 30)),
(-1, 1, 1.0, "'", None, date(2011, 3, 6), datetime(2011, 3, 6, 6, 20, tzinfo=timezone.utc)),
(-2, 2, 2.0, '"', "", date(2012, 5, 31), datetime(2012, 5, 31, 11, 20, tzinfo=timezone.utc)),
(-3, 3, 3.0, '\\', "NULL", date(2016, 4, 4), datetime(2016, 4, 4, 11, 30, tzinfo=timezone.utc))
]
assert ret == expected, f"ret: {ret}"

Expand Down Expand Up @@ -66,9 +66,9 @@ def test_stream_load():
rows = conn.query_iter("SELECT * FROM test")
ret = [row.values() for row in rows]
expected = [
(-1, 1, 1.0, "'", None, date(2011, 3, 6), datetime(2011, 3, 6, 6, 20)),
(-2, 2, 2.0, '"', None, date(2012, 5, 31), datetime(2012, 5, 31, 11, 20)),
(-3, 3, 3.0, "\\", "NULL", date(2016, 4, 4), datetime(2016, 4, 4, 11, 30)),
(-1, 1, 1.0, "'", None, date(2011, 3, 6), datetime(2011, 3, 6, 6, 20, tzinfo=timezone.utc)),
(-2, 2, 2.0, '"', None, date(2012, 5, 31), datetime(2012, 5, 31, 11, 20, tzinfo=timezone.utc)),
(-3, 3, 3.0, "\\", "NULL", date(2016, 4, 4), datetime(2016, 4, 4, 11, 30, tzinfo=timezone.utc)),
]
assert ret == expected, f"ret: {ret}"

Expand Down Expand Up @@ -111,8 +111,8 @@ def run_load_file(load_method):
rows = conn.query_iter("SELECT * FROM test")
ret = [row.values() for row in rows]
expected = [
(-1, 1, 1.0, "'", None, date(2011, 3, 6), datetime(2011, 3, 6, 6, 20)),
(-2, 2, 2.0, '"', None, date(2012, 5, 31), datetime(2012, 5, 31, 11, 20)),
(-3, 3, 3.0, "\\", "NULL", date(2016, 4, 4), datetime(2016, 4, 4, 11, 30)),
(-1, 1, 1.0, "'", None, date(2011, 3, 6), datetime(2011, 3, 6, 6, 20, tzinfo=timezone.utc)),
(-2, 2, 2.0, '"', None, date(2012, 5, 31), datetime(2012, 5, 31, 11, 20, tzinfo=timezone.utc)),
(-3, 3, 3.0, "\\", "NULL", date(2016, 4, 4), datetime(2016, 4, 4, 11, 30, tzinfo=timezone.utc)),
]
assert ret == expected, f"{load_method} ret: {ret}"
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,9 @@ DROP TABLE t_str
query TTT rowsort
SELECT ( null, to_hour(to_timestamp(3501857592331)), number::Date) from numbers(3) group by all
----
(NULL,18,'1970-01-01')
(NULL,18,'1970-01-02')
(NULL,18,'1970-01-03')
(NULL,18,"1970-01-01")
(NULL,18,"1970-01-02")
(NULL,18,"1970-01-03")

query TTT rowsort
SELECT TRY_CAST('1900-12-30 12:00:00' AS TIMESTAMP) AS "TEMP(Test)(4058757556)(0)",
Expand Down
Loading
Loading