From 230518a86f67505b1ecdb11163acf8b5e99fa496 Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Tue, 26 Nov 2024 16:38:30 +0800 Subject: [PATCH] feat: support more Postgres basic data types (#215) --- .github/workflows/postgres-replication.yml | 2 +- catalog/provider.go | 2 + delta/controller.go | 14 +++- pgserver/logrepl/decode.go | 36 ++++++++ pgserver/logrepl/replication_test.go | 96 ++++++++++++++++++++-- pgtypes/pgtypes.go | 28 ++++--- 6 files changed, 159 insertions(+), 19 deletions(-) diff --git a/.github/workflows/postgres-replication.yml b/.github/workflows/postgres-replication.yml index f713c14f..bce4c98a 100644 --- a/.github/workflows/postgres-replication.yml +++ b/.github/workflows/postgres-replication.yml @@ -40,4 +40,4 @@ jobs: run: go build -v - name: Test Postgres Logical Replication - run: go test -v --timeout 30s ./pgserver/logrepl + run: go test -v --timeout 60s ./pgserver/logrepl diff --git a/catalog/provider.go b/catalog/provider.go index 618b7be0..0cd35dc2 100644 --- a/catalog/provider.go +++ b/catalog/provider.go @@ -62,6 +62,8 @@ func NewDBProvider(dataDir, dbFile string) (*DatabaseProvider, error) { bootQueries := []string{ "INSTALL arrow", "LOAD arrow", + "INSTALL icu", + "LOAD icu", "INSTALL postgres_scanner", "LOAD postgres_scanner", } diff --git a/delta/controller.go b/delta/controller.go index 66c6d239..687fb70f 100644 --- a/delta/controller.go +++ b/delta/controller.go @@ -13,8 +13,10 @@ import ( "github.com/apache/arrow-go/v18/arrow/ipc" "github.com/apecloud/myduckserver/binlog" "github.com/apecloud/myduckserver/catalog" + "github.com/apecloud/myduckserver/pgtypes" "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/types" + "github.com/jackc/pgx/v5/pgtype" "github.com/sirupsen/logrus" ) @@ -208,7 +210,7 @@ func (c *DeltaController) updateTable( builder.WriteString(", r[") builder.WriteString(strconv.Itoa(i + 2)) builder.WriteString("]") - if types.IsTimestampType(col.Type) { + if isTimestampType(col.Type) { builder.WriteString("::TIMESTAMP") } builder.WriteString(" AS ") @@ -340,3 +342,13 @@ func (c *DeltaController) updateTable( return nil } + +func isTimestampType(t sql.Type) bool { + if types.IsTimestampType(t) { + return true + } + if pgt, ok := t.(pgtypes.PostgresType); ok { + return pgt.PG.OID == pgtype.TimestampOID + } + return false +} diff --git a/pgserver/logrepl/decode.go b/pgserver/logrepl/decode.go index 92e82ea8..e8eba3cb 100644 --- a/pgserver/logrepl/decode.go +++ b/pgserver/logrepl/decode.go @@ -143,6 +143,17 @@ func decodeToArrow(typeMap *pgtype.Map, columnType *pglogrepl.RelationMessageCol return 4, nil } + case pgtype.TimeOID, pgtype.TimetzOID: + if b, ok := builder.(*array.Time64Builder); ok { + var v pgtype.Time + var codec pgtype.TimeCodec + if err := codec.PlanScan(typeMap, oid, format, &v).Scan(data, &v); err != nil { + return 0, err + } + b.Append(arrow.Time64(v.Microseconds * 1000)) + return 8, nil + } + case pgtype.NumericOID: // Fast path for text format & string destination if format == pgtype.TextFormatCode { @@ -232,7 +243,32 @@ func decodeToArrow(typeMap *pgtype.Map, columnType *pglogrepl.RelationMessageCol b.Append(buf[:]) return 36, nil } + + case pgtype.JSONOID: + if b, ok := builder.(*array.BinaryBuilder); ok { + var buf [32]byte // Stack-allocated buffer for small JSON + v := pgtype.PreallocBytes(buf[:]) + var codec pgtype.JSONCodec + if err := codec.PlanScan(typeMap, oid, format, &v).Scan(data, &v); err != nil { + return 0, err + } + b.Append(v) + return len(v), nil + } + + case pgtype.JSONBOID: + if b, ok := builder.(*array.BinaryBuilder); ok { + var buf [32]byte // Stack-allocated buffer for small JSON + v := pgtype.PreallocBytes(buf[:]) + var codec pgtype.JSONBCodec + if err := codec.PlanScan(typeMap, oid, format, &v).Scan(data, &v); err != nil { + return 0, err + } + b.Append(v) + return len(v), nil + } } + // TODO(fan): add support for other types // Fallback diff --git a/pgserver/logrepl/replication_test.go b/pgserver/logrepl/replication_test.go index 13293eaa..d12f0619 100644 --- a/pgserver/logrepl/replication_test.go +++ b/pgserver/logrepl/replication_test.go @@ -482,20 +482,102 @@ var replicationTests = []ReplicationTest{ createReplicationSlot, startReplication, "/* replica */ drop table if exists public.test", - "/* replica */ create table public.test (id INT primary key, name varchar(100), age INT, is_cool BOOLEAN, height FLOAT, birth_date DATE, birth_timestamp TIMESTAMP, income DECIMAL(10,2))", + "/* replica */ create table public.test (id INT primary key, " + + "name varchar(100), " + + "age SMALLINT, " + + "is_cool BOOLEAN, " + + "height FLOAT, " + + "birth_date DATE, " + + "birth_time TIME, " + + // TIMETZ is discouraged: + // https://www.postgresql.org/docs/current/datatype-datetime.html + // https://github.com/jackc/pgx/issues/1940 + // "birth_timetz TIME WITH TIME ZONE, " + + "birth_timestamp TIMESTAMP, " + + "birth_timestamptz TIMESTAMP WITH TIME ZONE, " + + "income NUMERIC(10,2), " + + "binary_data BYTEA, " + + "description TEXT, " + + "code CHAR(3), " + + // "tags TEXT[], " + + // "scores INTEGER[], " + + // "real_nums REAL[], " + + "small_num SMALLINT, " + + "big_num BIGINT, " + + "json_data JSON)", "drop table if exists public.test", - "create table public.test (id INT primary key, name varchar(100), age INT, is_cool BOOLEAN, height FLOAT, birth_date DATE, birth_timestamp TIMESTAMP, income DECIMAL(10,2))", - "INSERT INTO public.test VALUES (1, 'one', 1, true, 1.1, '2021-01-01', '2021-01-01 12:00:00', 12345678.9)", - "INSERT INTO public.test VALUES (2, 'two', 2, false, 2.2, '2021-02-02', '2021-02-02 13:00:00', 98765432.1)", + "create table public.test (id INT primary key, " + + "name varchar(100), " + + "age SMALLINT, " + + "is_cool BOOLEAN, " + + "height FLOAT, " + + "birth_date DATE, " + + "birth_time TIME, " + + // "birth_timetz TIME WITH TIME ZONE, " + + "birth_timestamp TIMESTAMP, " + + "birth_timestamptz TIMESTAMP WITH TIME ZONE, " + + "income DECIMAL(10,2), " + + "binary_data BYTEA, " + + "description TEXT, " + + "code CHAR(3), " + + // "tags TEXT[], " + + // "scores INTEGER[], " + + // "real_nums REAL[], " + + "small_num SMALLINT, " + + "big_num BIGINT, " + + "json_data JSONB)", + "INSERT INTO public.test VALUES (1, " + + "'one', 1, true, 1.1, " + + "'2021-01-01', '12:00:00', '2021-01-01 12:00:00', '2021-01-01 20:00:00+8', " + + "12345678.9, " + + `'\x0123456789ABCDEF', 'long text description', 'ABC', ` + + // "ARRAY['tag1', 'tag2'], ARRAY[1, 2, 3], ARRAY[1.1, 2.2, 3.3]::real[], " + + `123, 9223372036854775807, '{"key": "value"}')`, + "INSERT INTO public.test VALUES (2, " + + "'two', 2, false, 2.2, " + + "'2021-02-02', '13:00:00.123456', '2021-02-02 13:00:00.123456', '2021-02-02 05:00:00.123456-8', " + + "98765432.1, " + + `'\xDEADBEEF', 'another description', 'XYZ', ` + + // "ARRAY['tag3', 'tag4'], ARRAY[4, 5, 6], ARRAY[4.4, 5.5, 6.6]::real[], " + + `-123, -9223372036854775808, '{"array": [1, 2, 3]}')`, "UPDATE public.test SET name = 'three' WHERE id = 2", - "DELETE FROM public.test WHERE id = 1", + `UPDATE public.test SET json_data = jsonb_set(json_data, '{key}', '"new_value"') WHERE id = 1`, waitForCatchup, }, Assertions: []ScriptTestAssertion{ { - Query: "/* replica */ SELECT * FROM public.test order by id", + Query: "/* replica */ SELECT * EXCLUDE (birth_timestamptz), birth_timestamptz AT TIME ZONE 'UTC' FROM public.test order by id", Expected: []sql.Row{ - {int32(2), "three", int32(2), false, float32(2.2), "2021-02-02", "2021-02-02 13:00:00", pgtest.Numeric("98765432.1")}, + {int32(1), "one", int16(1), true, float32(1.1), + "2021-01-01", "12:00:00.000000", + // "12:00:00+00", + "2021-01-01 12:00:00", + pgtest.Numeric("12345678.9"), + []byte{0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF}, + "long text description", "ABC", + // []string{"tag1", "tag2"}, + // []int32{1, 2, 3}, + // []float32{1.1, 2.2, 3.3}, + int16(123), + int64(9223372036854775807), + `{"key": "new_value"}`, + "2021-01-01 12:00:00", + }, + {int32(2), "three", int16(2), false, float32(2.2), + "2021-02-02", "13:00:00.123456", + // "13:00:00.123456+00", + "2021-02-02 13:00:00.123456", + pgtest.Numeric("98765432.1"), + []byte{0xDE, 0xAD, 0xBE, 0xEF}, + "another description", "XYZ", + // []string{"tag3", "tag4"}, + // []int32{4, 5, 6}, + // []float32{4.4, 5.5, 6.6}, + int16(-123), + int64(-9223372036854775808), + `{"array": [1, 2, 3]}`, + "2021-02-02 13:00:00.123456", + }, }, }, }, diff --git a/pgtypes/pgtypes.go b/pgtypes/pgtypes.go index 47bb92fa..e44ef636 100644 --- a/pgtypes/pgtypes.go +++ b/pgtypes/pgtypes.go @@ -16,7 +16,11 @@ import ( "github.com/dolthub/go-mysql-server/sql" ) -var DefaultTypeMap = pgtype.NewMap() +var DefaultTypeMap *pgtype.Map + +func init() { + DefaultTypeMap = pgtype.NewMap() +} var DuckdbTypeStrToPostgresTypeStr = map[string]string{ "INVALID": "unknown", @@ -46,8 +50,8 @@ var DuckdbTypeStrToPostgresTypeStr = map[string]string{ "ENUM": "text", "UUID": "uuid", "BIT": "bit", - "TIME_TZ": "timetz", - "TIMESTAMP_TZ": "timestamptz", + "TIMETZ": "timetz", + "TIMESTAMPTZ": "timestamptz", "ANY": "text", // Generic ANY type approximated to text "VARINT": "numeric", // Variable integer, mapped to numeric } @@ -166,7 +170,7 @@ func PostgresTypeToArrowType(p PostgresType) arrow.DataType { return arrow.FixedWidthTypes.Boolean case pgtype.ByteaOID: return arrow.BinaryTypes.Binary - case pgtype.NameOID, pgtype.TextOID, pgtype.VarcharOID, pgtype.BPCharOID, pgtype.JSONOID, pgtype.XMLOID: + case pgtype.NameOID, pgtype.TextOID, pgtype.VarcharOID, pgtype.BPCharOID, pgtype.JSONOID, pgtype.JSONBOID, pgtype.XMLOID: return arrow.BinaryTypes.String case pgtype.Int8OID: return arrow.PrimitiveTypes.Int64 @@ -182,15 +186,14 @@ func PostgresTypeToArrowType(p PostgresType) arrow.DataType { return arrow.PrimitiveTypes.Float32 case pgtype.Float8OID: return arrow.PrimitiveTypes.Float64 - case pgtype.PointOID: - return arrow.StructOf(arrow.Field{Name: "x", Type: arrow.PrimitiveTypes.Float64}, - arrow.Field{Name: "y", Type: arrow.PrimitiveTypes.Float64}) case pgtype.DateOID: return arrow.FixedWidthTypes.Date32 - case pgtype.TimeOID: + case pgtype.TimeOID, pgtype.TimetzOID: return arrow.FixedWidthTypes.Time64ns - case pgtype.TimestampOID, pgtype.TimestamptzOID: - return arrow.FixedWidthTypes.Timestamp_s + case pgtype.TimestampOID: + return &arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: ""} + case pgtype.TimestamptzOID: + return arrow.FixedWidthTypes.Timestamp_us case pgtype.NumericOID: if p.Precision > 0 && p.Scale >= 0 { if p.Precision <= 38 { @@ -211,6 +214,11 @@ func PostgresTypeToArrowType(p PostgresType) arrow.DataType { // so we use a string type for UUIDs. // return &arrow.FixedSizeBinaryType{ByteWidth: 16} return arrow.BinaryTypes.String + case pgtype.PointOID: + return arrow.StructOf( + arrow.Field{Name: "x", Type: arrow.PrimitiveTypes.Float64}, + arrow.Field{Name: "y", Type: arrow.PrimitiveTypes.Float64}, + ) default: return arrow.BinaryTypes.Binary // fall back for unknown types }