Skip to content

Commit

Permalink
feat: support more Postgres basic data types (#215)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanyang01 authored Nov 26, 2024
1 parent 148b867 commit 230518a
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/postgres-replication.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ jobs:
run: go build -v

- name: Test Postgres Logical Replication
run: go test -v --timeout 30s ./pgserver/logrepl
run: go test -v --timeout 60s ./pgserver/logrepl
2 changes: 2 additions & 0 deletions catalog/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func NewDBProvider(dataDir, dbFile string) (*DatabaseProvider, error) {
bootQueries := []string{
"INSTALL arrow",
"LOAD arrow",
"INSTALL icu",
"LOAD icu",
"INSTALL postgres_scanner",
"LOAD postgres_scanner",
}
Expand Down
14 changes: 13 additions & 1 deletion delta/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"github.com/apache/arrow-go/v18/arrow/ipc"
"github.com/apecloud/myduckserver/binlog"
"github.com/apecloud/myduckserver/catalog"
"github.com/apecloud/myduckserver/pgtypes"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/types"
"github.com/jackc/pgx/v5/pgtype"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -208,7 +210,7 @@ func (c *DeltaController) updateTable(
builder.WriteString(", r[")
builder.WriteString(strconv.Itoa(i + 2))
builder.WriteString("]")
if types.IsTimestampType(col.Type) {
if isTimestampType(col.Type) {
builder.WriteString("::TIMESTAMP")
}
builder.WriteString(" AS ")
Expand Down Expand Up @@ -340,3 +342,13 @@ func (c *DeltaController) updateTable(

return nil
}

func isTimestampType(t sql.Type) bool {
if types.IsTimestampType(t) {
return true
}
if pgt, ok := t.(pgtypes.PostgresType); ok {
return pgt.PG.OID == pgtype.TimestampOID
}
return false
}
36 changes: 36 additions & 0 deletions pgserver/logrepl/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,17 @@ func decodeToArrow(typeMap *pgtype.Map, columnType *pglogrepl.RelationMessageCol
return 4, nil
}

case pgtype.TimeOID, pgtype.TimetzOID:
if b, ok := builder.(*array.Time64Builder); ok {
var v pgtype.Time
var codec pgtype.TimeCodec
if err := codec.PlanScan(typeMap, oid, format, &v).Scan(data, &v); err != nil {
return 0, err
}
b.Append(arrow.Time64(v.Microseconds * 1000))
return 8, nil
}

case pgtype.NumericOID:
// Fast path for text format & string destination
if format == pgtype.TextFormatCode {
Expand Down Expand Up @@ -232,7 +243,32 @@ func decodeToArrow(typeMap *pgtype.Map, columnType *pglogrepl.RelationMessageCol
b.Append(buf[:])
return 36, nil
}

case pgtype.JSONOID:
if b, ok := builder.(*array.BinaryBuilder); ok {
var buf [32]byte // Stack-allocated buffer for small JSON
v := pgtype.PreallocBytes(buf[:])
var codec pgtype.JSONCodec
if err := codec.PlanScan(typeMap, oid, format, &v).Scan(data, &v); err != nil {
return 0, err
}
b.Append(v)
return len(v), nil
}

case pgtype.JSONBOID:
if b, ok := builder.(*array.BinaryBuilder); ok {
var buf [32]byte // Stack-allocated buffer for small JSON
v := pgtype.PreallocBytes(buf[:])
var codec pgtype.JSONBCodec
if err := codec.PlanScan(typeMap, oid, format, &v).Scan(data, &v); err != nil {
return 0, err
}
b.Append(v)
return len(v), nil
}
}

// TODO(fan): add support for other types

// Fallback
Expand Down
96 changes: 89 additions & 7 deletions pgserver/logrepl/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,20 +482,102 @@ var replicationTests = []ReplicationTest{
createReplicationSlot,
startReplication,
"/* replica */ drop table if exists public.test",
"/* replica */ create table public.test (id INT primary key, name varchar(100), age INT, is_cool BOOLEAN, height FLOAT, birth_date DATE, birth_timestamp TIMESTAMP, income DECIMAL(10,2))",
"/* replica */ create table public.test (id INT primary key, " +
"name varchar(100), " +
"age SMALLINT, " +
"is_cool BOOLEAN, " +
"height FLOAT, " +
"birth_date DATE, " +
"birth_time TIME, " +
// TIMETZ is discouraged:
// https://www.postgresql.org/docs/current/datatype-datetime.html
// https://github.com/jackc/pgx/issues/1940
// "birth_timetz TIME WITH TIME ZONE, " +
"birth_timestamp TIMESTAMP, " +
"birth_timestamptz TIMESTAMP WITH TIME ZONE, " +
"income NUMERIC(10,2), " +
"binary_data BYTEA, " +
"description TEXT, " +
"code CHAR(3), " +
// "tags TEXT[], " +
// "scores INTEGER[], " +
// "real_nums REAL[], " +
"small_num SMALLINT, " +
"big_num BIGINT, " +
"json_data JSON)",
"drop table if exists public.test",
"create table public.test (id INT primary key, name varchar(100), age INT, is_cool BOOLEAN, height FLOAT, birth_date DATE, birth_timestamp TIMESTAMP, income DECIMAL(10,2))",
"INSERT INTO public.test VALUES (1, 'one', 1, true, 1.1, '2021-01-01', '2021-01-01 12:00:00', 12345678.9)",
"INSERT INTO public.test VALUES (2, 'two', 2, false, 2.2, '2021-02-02', '2021-02-02 13:00:00', 98765432.1)",
"create table public.test (id INT primary key, " +
"name varchar(100), " +
"age SMALLINT, " +
"is_cool BOOLEAN, " +
"height FLOAT, " +
"birth_date DATE, " +
"birth_time TIME, " +
// "birth_timetz TIME WITH TIME ZONE, " +
"birth_timestamp TIMESTAMP, " +
"birth_timestamptz TIMESTAMP WITH TIME ZONE, " +
"income DECIMAL(10,2), " +
"binary_data BYTEA, " +
"description TEXT, " +
"code CHAR(3), " +
// "tags TEXT[], " +
// "scores INTEGER[], " +
// "real_nums REAL[], " +
"small_num SMALLINT, " +
"big_num BIGINT, " +
"json_data JSONB)",
"INSERT INTO public.test VALUES (1, " +
"'one', 1, true, 1.1, " +
"'2021-01-01', '12:00:00', '2021-01-01 12:00:00', '2021-01-01 20:00:00+8', " +
"12345678.9, " +
`'\x0123456789ABCDEF', 'long text description', 'ABC', ` +
// "ARRAY['tag1', 'tag2'], ARRAY[1, 2, 3], ARRAY[1.1, 2.2, 3.3]::real[], " +
`123, 9223372036854775807, '{"key": "value"}')`,
"INSERT INTO public.test VALUES (2, " +
"'two', 2, false, 2.2, " +
"'2021-02-02', '13:00:00.123456', '2021-02-02 13:00:00.123456', '2021-02-02 05:00:00.123456-8', " +
"98765432.1, " +
`'\xDEADBEEF', 'another description', 'XYZ', ` +
// "ARRAY['tag3', 'tag4'], ARRAY[4, 5, 6], ARRAY[4.4, 5.5, 6.6]::real[], " +
`-123, -9223372036854775808, '{"array": [1, 2, 3]}')`,
"UPDATE public.test SET name = 'three' WHERE id = 2",
"DELETE FROM public.test WHERE id = 1",
`UPDATE public.test SET json_data = jsonb_set(json_data, '{key}', '"new_value"') WHERE id = 1`,
waitForCatchup,
},
Assertions: []ScriptTestAssertion{
{
Query: "/* replica */ SELECT * FROM public.test order by id",
Query: "/* replica */ SELECT * EXCLUDE (birth_timestamptz), birth_timestamptz AT TIME ZONE 'UTC' FROM public.test order by id",
Expected: []sql.Row{
{int32(2), "three", int32(2), false, float32(2.2), "2021-02-02", "2021-02-02 13:00:00", pgtest.Numeric("98765432.1")},
{int32(1), "one", int16(1), true, float32(1.1),
"2021-01-01", "12:00:00.000000",
// "12:00:00+00",
"2021-01-01 12:00:00",
pgtest.Numeric("12345678.9"),
[]byte{0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF},
"long text description", "ABC",
// []string{"tag1", "tag2"},
// []int32{1, 2, 3},
// []float32{1.1, 2.2, 3.3},
int16(123),
int64(9223372036854775807),
`{"key": "new_value"}`,
"2021-01-01 12:00:00",
},
{int32(2), "three", int16(2), false, float32(2.2),
"2021-02-02", "13:00:00.123456",
// "13:00:00.123456+00",
"2021-02-02 13:00:00.123456",
pgtest.Numeric("98765432.1"),
[]byte{0xDE, 0xAD, 0xBE, 0xEF},
"another description", "XYZ",
// []string{"tag3", "tag4"},
// []int32{4, 5, 6},
// []float32{4.4, 5.5, 6.6},
int16(-123),
int64(-9223372036854775808),
`{"array": [1, 2, 3]}`,
"2021-02-02 13:00:00.123456",
},
},
},
},
Expand Down
28 changes: 18 additions & 10 deletions pgtypes/pgtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ import (
"github.com/dolthub/go-mysql-server/sql"
)

var DefaultTypeMap = pgtype.NewMap()
var DefaultTypeMap *pgtype.Map

func init() {
DefaultTypeMap = pgtype.NewMap()
}

var DuckdbTypeStrToPostgresTypeStr = map[string]string{
"INVALID": "unknown",
Expand Down Expand Up @@ -46,8 +50,8 @@ var DuckdbTypeStrToPostgresTypeStr = map[string]string{
"ENUM": "text",
"UUID": "uuid",
"BIT": "bit",
"TIME_TZ": "timetz",
"TIMESTAMP_TZ": "timestamptz",
"TIMETZ": "timetz",
"TIMESTAMPTZ": "timestamptz",
"ANY": "text", // Generic ANY type approximated to text
"VARINT": "numeric", // Variable integer, mapped to numeric
}
Expand Down Expand Up @@ -166,7 +170,7 @@ func PostgresTypeToArrowType(p PostgresType) arrow.DataType {
return arrow.FixedWidthTypes.Boolean
case pgtype.ByteaOID:
return arrow.BinaryTypes.Binary
case pgtype.NameOID, pgtype.TextOID, pgtype.VarcharOID, pgtype.BPCharOID, pgtype.JSONOID, pgtype.XMLOID:
case pgtype.NameOID, pgtype.TextOID, pgtype.VarcharOID, pgtype.BPCharOID, pgtype.JSONOID, pgtype.JSONBOID, pgtype.XMLOID:
return arrow.BinaryTypes.String
case pgtype.Int8OID:
return arrow.PrimitiveTypes.Int64
Expand All @@ -182,15 +186,14 @@ func PostgresTypeToArrowType(p PostgresType) arrow.DataType {
return arrow.PrimitiveTypes.Float32
case pgtype.Float8OID:
return arrow.PrimitiveTypes.Float64
case pgtype.PointOID:
return arrow.StructOf(arrow.Field{Name: "x", Type: arrow.PrimitiveTypes.Float64},
arrow.Field{Name: "y", Type: arrow.PrimitiveTypes.Float64})
case pgtype.DateOID:
return arrow.FixedWidthTypes.Date32
case pgtype.TimeOID:
case pgtype.TimeOID, pgtype.TimetzOID:
return arrow.FixedWidthTypes.Time64ns
case pgtype.TimestampOID, pgtype.TimestamptzOID:
return arrow.FixedWidthTypes.Timestamp_s
case pgtype.TimestampOID:
return &arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: ""}
case pgtype.TimestamptzOID:
return arrow.FixedWidthTypes.Timestamp_us
case pgtype.NumericOID:
if p.Precision > 0 && p.Scale >= 0 {
if p.Precision <= 38 {
Expand All @@ -211,6 +214,11 @@ func PostgresTypeToArrowType(p PostgresType) arrow.DataType {
// so we use a string type for UUIDs.
// return &arrow.FixedSizeBinaryType{ByteWidth: 16}
return arrow.BinaryTypes.String
case pgtype.PointOID:
return arrow.StructOf(
arrow.Field{Name: "x", Type: arrow.PrimitiveTypes.Float64},
arrow.Field{Name: "y", Type: arrow.PrimitiveTypes.Float64},
)
default:
return arrow.BinaryTypes.Binary // fall back for unknown types
}
Expand Down

0 comments on commit 230518a

Please sign in to comment.