Skip to content

Commit d171b6c

Browse files
authored
ARROW-17276: [Go][Integration] Implement IPC handling for union type (apache#13806)
With this, the Go implementation finally fully supports IPC handling for All the Arrow DataTypes! Authored-by: Matt Topol <[email protected]> Signed-off-by: Matt Topol <[email protected]>
1 parent b1d36c0 commit d171b6c

File tree

17 files changed

+808
-54
lines changed

17 files changed

+808
-54
lines changed

.gitattributes

+1
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ r/man/*.Rd linguist-generated=true
66
cpp/src/generated/*.h linguist-generated=true
77
r/NEWS.md merge=union
88
go/**/*.s linguist-generated=true
9+
go/arrow/unionmode_string.go linguist-generated=true

dev/archery/archery/integration/datagen.py

-1
Original file line numberDiff line numberDiff line change
@@ -1636,7 +1636,6 @@ def _temp_path():
16361636

16371637
generate_unions_case()
16381638
.skip_category('C#')
1639-
.skip_category('Go')
16401639
.skip_category('JS'),
16411640

16421641
generate_custom_metadata_case()

dev/release/rat_exclude_files.txt

+1
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ go/arrow/flight/internal/flight/Flight_grpc.pb.go
140140
go/arrow/internal/cpu/*
141141
go/arrow/type_string.go
142142
go/arrow/cdata/test/go.sum
143+
go/arrow/unionmode_string.go
143144
go/arrow/compute/datumkind_string.go
144145
go/arrow/compute/valueshape_string.go
145146
go/*.tmpldata

docs/source/status.rst

+2-2
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,9 @@ Data Types
8383
+-------------------+-------+-------+-------+------------+-------+-------+-------+
8484
| Map ||||| | ||
8585
+-------------------+-------+-------+-------+------------+-------+-------+-------+
86-
| Dense Union ||| | | | ||
86+
| Dense Union ||| | | | ||
8787
+-------------------+-------+-------+-------+------------+-------+-------+-------+
88-
| Sparse Union ||| | | | ||
88+
| Sparse Union ||| | | | ||
8989
+-------------------+-------+-------+-------+------------+-------+-------+-------+
9090

9191
+-------------------+-------+-------+-------+------------+-------+-------+-------+

go/arrow/array/union.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -314,15 +314,16 @@ func (a *SparseUnion) setData(data *Data) {
314314
}
315315

316316
func (a *SparseUnion) getOneForMarshal(i int) interface{} {
317+
typeID := a.RawTypeCodes()[i]
318+
317319
childID := a.ChildID(i)
318-
field := a.unionType.Fields()[childID]
319320
data := a.Field(childID)
320321

321322
if data.IsNull(i) {
322323
return nil
323324
}
324325

325-
return map[string]interface{}{field.Name: data.(arraymarshal).getOneForMarshal(i)}
326+
return []interface{}{typeID, data.(arraymarshal).getOneForMarshal(i)}
326327
}
327328

328329
func (a *SparseUnion) MarshalJSON() ([]byte, error) {
@@ -570,16 +571,17 @@ func (a *DenseUnion) setData(data *Data) {
570571
}
571572

572573
func (a *DenseUnion) getOneForMarshal(i int) interface{} {
574+
typeID := a.RawTypeCodes()[i]
575+
573576
childID := a.ChildID(i)
574-
field := a.unionType.Fields()[childID]
575577
data := a.Field(childID)
576578

577579
offsets := a.RawValueOffsets()
578580
if data.IsNull(int(offsets[i])) {
579581
return nil
580582
}
581583

582-
return map[string]interface{}{field.Name: data.(arraymarshal).getOneForMarshal(int(offsets[i]))}
584+
return []interface{}{typeID, data.(arraymarshal).getOneForMarshal(int(offsets[i]))}
583585
}
584586

585587
func (a *DenseUnion) MarshalJSON() ([]byte, error) {

go/arrow/bitutil/bitmaps_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,6 @@ func (s *BitmapOpSuite) testAligned(op bitmapOp, leftBits, rightBits []int, resu
378378
out *memory.Buffer
379379
length int64
380380
)
381-
382381
for _, lOffset := range []int64{0, 1, 3, 5, 7, 8, 13, 21, 38, 75, 120, 65536} {
383382
s.Run(fmt.Sprintf("left offset %d", lOffset), func() {
384383
left = bitmapFromSlice(leftBits, int(lOffset))

go/arrow/datatype_nested.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -399,8 +399,8 @@ const (
399399
MaxUnionTypeCode UnionTypeCode = 127
400400
InvalidUnionChildID int = -1
401401

402-
SparseMode UnionMode = iota
403-
DenseMode
402+
SparseMode UnionMode = iota // SPARSE
403+
DenseMode // DENSE
404404
)
405405

406406
// UnionType is an interface to encompass both Dense and Sparse Union types.

go/arrow/doc.go

+1
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,4 @@ package arrow
3838

3939
// stringer
4040
//go:generate stringer -type=Type
41+
//go:generate stringer -type=UnionMode -linecomment

go/arrow/internal/arrdata/arrdata.go

+63-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func init() {
4949
Records["decimal128"] = makeDecimal128sRecords()
5050
Records["maps"] = makeMapsRecords()
5151
Records["extension"] = makeExtensionRecords()
52-
// Records["union"] = makeUnionRecords()
52+
Records["union"] = makeUnionRecords()
5353

5454
for k := range Records {
5555
RecordNames = append(RecordNames, k)
@@ -935,6 +935,68 @@ func makeExtensionRecords() []arrow.Record {
935935
return recs
936936
}
937937

938+
func makeUnionRecords() []arrow.Record {
939+
mem := memory.NewGoAllocator()
940+
941+
unionFields := []arrow.Field{
942+
{Name: "u0", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
943+
{Name: "u1", Type: arrow.PrimitiveTypes.Uint8, Nullable: true},
944+
}
945+
946+
typeCodes := []arrow.UnionTypeCode{5, 10}
947+
sparseType := arrow.SparseUnionOf(unionFields, typeCodes)
948+
denseType := arrow.DenseUnionOf(unionFields, typeCodes)
949+
950+
schema := arrow.NewSchema([]arrow.Field{
951+
{Name: "sparse", Type: sparseType, Nullable: true},
952+
{Name: "dense", Type: denseType, Nullable: true},
953+
}, nil)
954+
955+
sparseChildren := make([]arrow.Array, 4)
956+
denseChildren := make([]arrow.Array, 4)
957+
958+
const length = 7
959+
960+
typeIDsBuffer := memory.NewBufferBytes(arrow.Uint8Traits.CastToBytes([]uint8{5, 10, 5, 5, 10, 10, 5}))
961+
sparseChildren[0] = arrayOf(mem, []int32{0, 1, 2, 3, 4, 5, 6},
962+
[]bool{true, true, true, false, true, true, true})
963+
defer sparseChildren[0].Release()
964+
sparseChildren[1] = arrayOf(mem, []uint8{10, 11, 12, 13, 14, 15, 16},
965+
nil)
966+
defer sparseChildren[1].Release()
967+
sparseChildren[2] = arrayOf(mem, []int32{0, -1, -2, -3, -4, -5, -6},
968+
[]bool{true, true, true, true, true, true, false})
969+
defer sparseChildren[2].Release()
970+
sparseChildren[3] = arrayOf(mem, []uint8{100, 101, 102, 103, 104, 105, 106},
971+
nil)
972+
defer sparseChildren[3].Release()
973+
974+
denseChildren[0] = arrayOf(mem, []int32{0, 2, 3, 7}, []bool{true, false, true, true})
975+
defer denseChildren[0].Release()
976+
denseChildren[1] = arrayOf(mem, []uint8{11, 14, 15}, nil)
977+
defer denseChildren[1].Release()
978+
denseChildren[2] = arrayOf(mem, []int32{0, -2, -3, -7}, []bool{false, true, true, false})
979+
defer denseChildren[2].Release()
980+
denseChildren[3] = arrayOf(mem, []uint8{101, 104, 105}, nil)
981+
defer denseChildren[3].Release()
982+
983+
offsetsBuffer := memory.NewBufferBytes(arrow.Int32Traits.CastToBytes([]int32{0, 0, 1, 2, 1, 2, 3}))
984+
sparse1 := array.NewSparseUnion(sparseType, length, sparseChildren[:2], typeIDsBuffer, 0)
985+
dense1 := array.NewDenseUnion(denseType, length, denseChildren[:2], typeIDsBuffer, offsetsBuffer, 0)
986+
987+
sparse2 := array.NewSparseUnion(sparseType, length, sparseChildren[2:], typeIDsBuffer, 0)
988+
dense2 := array.NewDenseUnion(denseType, length, denseChildren[2:], typeIDsBuffer, offsetsBuffer, 0)
989+
990+
defer sparse1.Release()
991+
defer dense1.Release()
992+
defer sparse2.Release()
993+
defer dense2.Release()
994+
995+
return []arrow.Record{
996+
array.NewRecord(schema, []arrow.Array{sparse1, dense1}, -1),
997+
array.NewRecord(schema, []arrow.Array{sparse2, dense2}, -1)}
998+
}
999+
9381000
func extArray(mem memory.Allocator, dt arrow.ExtensionType, a interface{}, valids []bool) arrow.Array {
9391001
var storage arrow.Array
9401002
switch st := dt.StorageType().(type) {

go/arrow/internal/arrjson/arrjson.go

+73-6
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,8 @@ func typeToJSON(arrowType arrow.DataType) (json.RawMessage, error) {
220220
typ = decimalJSON{"decimal", int(dt.Scale), int(dt.Precision), 128}
221221
case *arrow.Decimal256Type:
222222
typ = decimalJSON{"decimal", int(dt.Scale), int(dt.Precision), 256}
223+
case arrow.UnionType:
224+
typ = unionJSON{"union", dt.Mode().String(), dt.TypeCodes()}
223225
default:
224226
return nil, fmt.Errorf("unknown arrow.DataType %v", arrowType)
225227
}
@@ -462,6 +464,17 @@ func typeFromJSON(typ json.RawMessage, children []FieldWrapper) (arrowType arrow
462464
case 128, 0: // default to 128 bits when missing
463465
arrowType = &arrow.Decimal128Type{Precision: int32(t.Precision), Scale: int32(t.Scale)}
464466
}
467+
case "union":
468+
t := unionJSON{}
469+
if err = json.Unmarshal(typ, &t); err != nil {
470+
return
471+
}
472+
switch t.Mode {
473+
case "SPARSE":
474+
arrowType = arrow.SparseUnionOf(fieldsFromJSON(children), t.TypeIDs)
475+
case "DENSE":
476+
arrowType = arrow.DenseUnionOf(fieldsFromJSON(children), t.TypeIDs)
477+
}
465478
}
466479

467480
if arrowType == nil {
@@ -598,6 +611,12 @@ type mapJSON struct {
598611
KeysSorted bool `json:"keysSorted,omitempty"`
599612
}
600613

614+
type unionJSON struct {
615+
Name string `json:"name"`
616+
Mode string `json:"mode"`
617+
TypeIDs []arrow.UnionTypeCode `json:"typeIds"`
618+
}
619+
601620
func schemaToJSON(schema *arrow.Schema, mapper *dictutils.Mapper) Schema {
602621
return Schema{
603622
Fields: fieldsToJSON(schema.Fields(), dictutils.NewFieldPos(), mapper),
@@ -742,12 +761,13 @@ func recordToJSON(rec arrow.Record) Record {
742761
}
743762

744763
type Array struct {
745-
Name string `json:"name"`
746-
Count int `json:"count"`
747-
Valids []int `json:"VALIDITY,omitempty"`
748-
Data []interface{} `json:"DATA,omitempty"`
749-
Offset interface{} `json:"-"`
750-
Children []Array `json:"children,omitempty"`
764+
Name string `json:"name"`
765+
Count int `json:"count"`
766+
Valids []int `json:"VALIDITY,omitempty"`
767+
Data []interface{} `json:"DATA,omitempty"`
768+
TypeID []arrow.UnionTypeCode `json:"TYPE_ID,omitempty"`
769+
Offset interface{} `json:"OFFSET,omitempty"`
770+
Children []Array `json:"children,omitempty"`
751771
}
752772

753773
func (a *Array) MarshalJSON() ([]byte, error) {
@@ -782,6 +802,10 @@ func (a *Array) UnmarshalJSON(b []byte) (err error) {
782802
return
783803
}
784804

805+
if len(rawOffsets) == 0 {
806+
return
807+
}
808+
785809
switch rawOffsets[0].(type) {
786810
case string:
787811
out := make([]int64, len(rawOffsets))
@@ -1152,6 +1176,31 @@ func arrayFromJSON(mem memory.Allocator, dt arrow.DataType, arr Array) arrow.Arr
11521176
defer indices.Release()
11531177
return array.NewData(dt, indices.Len(), indices.Buffers(), indices.Children(), indices.NullN(), indices.Offset())
11541178

1179+
case arrow.UnionType:
1180+
fields := make([]arrow.ArrayData, len(dt.Fields()))
1181+
for i, f := range dt.Fields() {
1182+
child := arrayFromJSON(mem, f.Type, arr.Children[i])
1183+
defer child.Release()
1184+
fields[i] = child
1185+
}
1186+
1187+
typeIdBuf := memory.NewBufferBytes(arrow.Int8Traits.CastToBytes(arr.TypeID))
1188+
defer typeIdBuf.Release()
1189+
buffers := []*memory.Buffer{nil, typeIdBuf}
1190+
if dt.Mode() == arrow.DenseMode {
1191+
var offsets []byte
1192+
if arr.Offset == nil {
1193+
offsets = []byte{}
1194+
} else {
1195+
offsets = arrow.Int32Traits.CastToBytes(arr.Offset.([]int32))
1196+
}
1197+
offsetBuf := memory.NewBufferBytes(offsets)
1198+
defer offsetBuf.Release()
1199+
buffers = append(buffers, offsetBuf)
1200+
}
1201+
1202+
return array.NewData(dt, arr.Count, buffers, fields, 0, 0)
1203+
11551204
default:
11561205
panic(fmt.Errorf("unknown data type %v %T", dt, dt))
11571206
}
@@ -1478,6 +1527,24 @@ func arrayToJSON(field arrow.Field, arr arrow.Array) Array {
14781527
case *array.Dictionary:
14791528
return arrayToJSON(field, arr.Indices())
14801529

1530+
case array.Union:
1531+
dt := arr.DataType().(arrow.UnionType)
1532+
o := Array{
1533+
Name: field.Name,
1534+
Count: arr.Len(),
1535+
Valids: validsToJSON(arr),
1536+
TypeID: arr.RawTypeCodes(),
1537+
Children: make([]Array, len(dt.Fields())),
1538+
}
1539+
if dt.Mode() == arrow.DenseMode {
1540+
o.Offset = arr.(*array.DenseUnion).RawValueOffsets()
1541+
}
1542+
fields := dt.Fields()
1543+
for i := range o.Children {
1544+
o.Children[i] = arrayToJSON(fields[i], arr.Field(i))
1545+
}
1546+
return o
1547+
14811548
default:
14821549
panic(fmt.Errorf("unknown array type %T", arr))
14831550
}

0 commit comments

Comments
 (0)