Skip to content

Commit 4bbe355

Browse files
vianhazmanFavian Kharisma Hazman
andauthored
feat: google sheets external table support for bigquery datastore (#45)
Co-authored-by: Favian Kharisma Hazman <[email protected]>
1 parent 506dff7 commit 4bbe355

File tree

11 files changed

+461
-8
lines changed

11 files changed

+461
-8
lines changed

ext/datastore/bigquery/adapter.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,73 @@ func bqFieldModeTo(field BQField) (fieldMode, error) {
8484
}
8585
return fm, nil
8686
}
87+
func bqGoogleSheetsOptionsTo(m map[string]interface{}) *bqapi.GoogleSheetsOptions {
88+
var skipLeadingRows int64
89+
var sheetRange string
90+
91+
if val, ok := m["skip_leading_rows"]; ok {
92+
skipLeadingRows = int64(val.(float64))
93+
}
94+
95+
if val, ok := m["range"]; ok {
96+
sheetRange = val.(string)
97+
}
98+
99+
return &bqapi.GoogleSheetsOptions{
100+
SkipLeadingRows: skipLeadingRows,
101+
Range: sheetRange,
102+
}
103+
}
104+
105+
func bqGoogleSheetsOptionsFrom(opt *bqapi.GoogleSheetsOptions) map[string]interface{} {
106+
resultMap := make(map[string]interface{})
107+
108+
if opt.SkipLeadingRows != 0 {
109+
// Map value of int has to be converted to float because of using interface{}
110+
resultMap["skip_leading_rows"] = float64(opt.SkipLeadingRows)
111+
}
112+
if opt.Range != "" {
113+
resultMap["range"] = opt.Range
114+
}
115+
return resultMap
116+
}
117+
118+
func bqExternalDataConfigTo(es BQExternalSource) (*bqapi.ExternalDataConfig, error) {
119+
var option bqapi.ExternalDataConfigOptions
120+
var sourceType bqapi.DataFormat
121+
switch bqapi.DataFormat(strings.ToUpper(es.SourceType)) {
122+
case bqapi.GoogleSheets:
123+
option = bqGoogleSheetsOptionsTo(es.Config)
124+
sourceType = bqapi.GoogleSheets
125+
default:
126+
return &bqapi.ExternalDataConfig{}, fmt.Errorf("Source format not yet implemented %s", es.SourceType)
127+
}
128+
129+
externalConfig := &bqapi.ExternalDataConfig{
130+
SourceFormat: sourceType,
131+
SourceURIs: es.SourceURIs,
132+
Options: option,
133+
}
134+
return externalConfig, nil
135+
}
136+
137+
func bqExternalDataConfigFrom(c *bqapi.ExternalDataConfig) (*BQExternalSource, error) {
138+
var option map[string]interface{}
139+
140+
switch c.SourceFormat {
141+
case bqapi.GoogleSheets:
142+
option = bqGoogleSheetsOptionsFrom(c.Options.(*bqapi.GoogleSheetsOptions))
143+
default:
144+
return &BQExternalSource{}, fmt.Errorf("Source format not yet implemented %s", c.SourceFormat)
145+
}
146+
147+
externalDataConfig := &BQExternalSource{
148+
SourceType: string(c.SourceFormat),
149+
SourceURIs: c.SourceURIs,
150+
Config: option,
151+
}
152+
return externalDataConfig, nil
153+
}
87154

88155
func bqFieldModeFrom(fm fieldMode) string {
89156
if fm.repeated {
@@ -159,6 +226,14 @@ func bqCreateTableMetaAdapter(t BQTable) (meta *bqapi.TableMetadata, err error)
159226
meta.RangePartitioning = bqPartitioningRangeTo(*t.Metadata.Partition)
160227
}
161228
}
229+
230+
if t.Metadata.Source != nil {
231+
meta.ExternalDataConfig, err = bqExternalDataConfigTo(*t.Metadata.Source)
232+
if err != nil {
233+
return nil, err
234+
}
235+
}
236+
162237
if t.Metadata.ExpirationTime != "" {
163238
expiryTime, err := time.Parse(time.RFC3339, t.Metadata.ExpirationTime)
164239
if err != nil {
@@ -195,6 +270,7 @@ func bqUpdateTableMetaAdapter(t BQTable) (meta bqapi.TableMetadataToUpdate, err
195270
for key, value := range t.Metadata.Labels {
196271
meta.SetLabel(key, value)
197272
}
273+
198274
if t.Metadata.ExpirationTime != "" {
199275
expiryTime, err := time.Parse(time.RFC3339, t.Metadata.ExpirationTime)
200276
if err != nil {

ext/datastore/bigquery/adapter_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,29 @@ func TestAdapter(t *testing.T) {
6767
},
6868
}
6969

70+
t.Run("should convert from and to BQ ExternalDataConfig successfully", func(t *testing.T) {
71+
externalDataSource := BQExternalSource{
72+
SourceType: string(ExternalTableTypeGoogleSheets),
73+
SourceURIs: []string{"http://googlesheets.com/1234"},
74+
Config: map[string]interface{}{"skip_leading_rows": 1.0, "range": "A!:A1:B1"},
75+
}
76+
expectedBQExternalDataConfig := &bigquery.ExternalDataConfig{
77+
SourceFormat: bigquery.GoogleSheets,
78+
SourceURIs: []string{"http://googlesheets.com/1234"},
79+
Options: &bigquery.GoogleSheetsOptions{
80+
SkipLeadingRows: 1,
81+
Range: "A!:A1:B1",
82+
},
83+
}
84+
bQExternalDataConfigResult, err := bqExternalDataConfigTo(externalDataSource)
85+
assert.Nil(t, err)
86+
assert.Equal(t, expectedBQExternalDataConfig, bQExternalDataConfigResult)
87+
88+
externalSourceResult, err := bqExternalDataConfigFrom(bQExternalDataConfigResult)
89+
assert.Nil(t, err)
90+
assert.Equal(t, &externalDataSource, externalSourceResult)
91+
})
92+
7093
t.Run("should convert from and to BQ TimePartitioning successfully", func(t *testing.T) {
7194
partitionField := "partition-field"
7295
partitionExpiryInHours := int64(720)
@@ -150,6 +173,7 @@ func TestAdapter(t *testing.T) {
150173
assert.Equal(t, schema, schemaResult)
151174
assert.Nil(t, err)
152175
})
176+
153177
t.Run("bqCreateTableMetaAdapter", func(t *testing.T) {
154178
t.Run("should convert to BQ TableMetadata", func(t *testing.T) {
155179
testingProject := "project"

ext/datastore/bigquery/bigquery.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ func (b BigQuery) Description() string {
4141

4242
func (b BigQuery) Types() map[models.ResourceType]models.DatastoreTypeController {
4343
return map[models.ResourceType]models.DatastoreTypeController{
44-
models.ResourceTypeTable: &tableSpec{},
45-
models.ResourceTypeView: &standardViewSpec{},
46-
models.ResourceTypeDataset: &datasetSpec{},
44+
models.ResourceTypeTable: &tableSpec{},
45+
models.ResourceTypeView: &standardViewSpec{},
46+
models.ResourceTypeDataset: &datasetSpec{},
47+
models.ResourceTypeExternalTable: &externalTableSpec{},
4748
}
4849
}
4950

@@ -65,6 +66,8 @@ func (b *BigQuery) CreateResource(ctx context.Context, request models.CreateReso
6566
return createStandardView(ctx, request.Resource, client, false)
6667
case models.ResourceTypeDataset:
6768
return createDataset(ctx, request.Resource, client, false)
69+
case models.ResourceTypeExternalTable:
70+
return createExternalTable(ctx, request.Resource, client, false)
6871
}
6972
return fmt.Errorf("unsupported resource type %s", request.Resource.Type)
7073
}
@@ -87,6 +90,8 @@ func (b *BigQuery) UpdateResource(ctx context.Context, request models.UpdateReso
8790
return createStandardView(ctx, request.Resource, client, true)
8891
case models.ResourceTypeDataset:
8992
return createDataset(ctx, request.Resource, client, true)
93+
case models.ResourceTypeExternalTable:
94+
return createExternalTable(ctx, request.Resource, client, true)
9095
}
9196
return fmt.Errorf("unsupported resource type %s", request.Resource.Type)
9297
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package bigquery
2+
3+
import (
4+
"context"
5+
"net/http"
6+
"time"
7+
8+
bqapi "cloud.google.com/go/bigquery"
9+
10+
"google.golang.org/api/googleapi"
11+
12+
"github.com/googleapis/google-cloud-go-testing/bigquery/bqiface"
13+
"github.com/odpf/optimus/models"
14+
"github.com/pkg/errors"
15+
)
16+
17+
func createExternalTable(ctx context.Context, spec models.ResourceSpec, client bqiface.Client, upsert bool) error {
18+
bqResource, ok := spec.Spec.(BQTable)
19+
20+
if !ok {
21+
return errors.New("failed to read table spec for bigquery")
22+
}
23+
24+
// inherit from base
25+
bqResource.Metadata.Labels = spec.Labels
26+
27+
dataset := client.DatasetInProject(bqResource.Project, bqResource.Dataset)
28+
if err := ensureDataset(ctx, dataset, BQDataset{
29+
Project: bqResource.Project,
30+
Dataset: bqResource.Dataset,
31+
Metadata: BQDatasetMetadata{},
32+
}, false); err != nil {
33+
return err
34+
}
35+
table := dataset.Table(bqResource.Table)
36+
return ensureExternalTable(ctx, table, bqResource, upsert)
37+
}
38+
39+
func ensureExternalTable(ctx context.Context, tableHandle bqiface.Table, t BQTable, upsert bool) error {
40+
meta, err := tableHandle.Metadata(ctx)
41+
if err != nil {
42+
if metaErr, ok := err.(*googleapi.Error); !ok || metaErr.Code != http.StatusNotFound {
43+
return err
44+
}
45+
meta, err := bqCreateTableMetaAdapter(t)
46+
if err != nil {
47+
return err
48+
}
49+
50+
if t.Metadata.ExpirationTime != "" {
51+
expiryTime, err := time.Parse(time.RFC3339, t.Metadata.ExpirationTime)
52+
if err != nil {
53+
return errors.Wrapf(err, "unable to parse timestamp %s", t.Metadata.ExpirationTime)
54+
}
55+
meta.ExpirationTime = expiryTime
56+
}
57+
return tableHandle.Create(ctx, meta)
58+
}
59+
if !upsert {
60+
return nil
61+
}
62+
63+
// update if already exists
64+
m := bqapi.TableMetadataToUpdate{
65+
Description: t.Metadata.Description,
66+
}
67+
if t.Metadata.ExpirationTime != "" {
68+
expiryTime, err := time.Parse(time.RFC3339, t.Metadata.ExpirationTime)
69+
if err != nil {
70+
return errors.Wrapf(err, "unable to parse timestamp %s", t.Metadata.ExpirationTime)
71+
}
72+
m.ExpirationTime = expiryTime
73+
}
74+
for k, v := range t.Metadata.Labels {
75+
m.SetLabel(k, v)
76+
}
77+
if _, err := tableHandle.Update(ctx, m, meta.ETag); err != nil {
78+
return err
79+
}
80+
return nil
81+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package bigquery
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"github.com/odpf/optimus/models"
8+
"google.golang.org/protobuf/types/known/structpb"
9+
)
10+
11+
const (
12+
ExternalTableTypeGoogleSheets ExternalTableType = "GOOGLE_SHEETS"
13+
)
14+
15+
type ExternalTableType string
16+
17+
// BQExternalSource specifies table source information for external data source
18+
type BQExternalSource struct {
19+
SourceType string `yaml:"type,omitempty" structs:"type"`
20+
21+
// External Table URI string for the referenced spreadsheets
22+
SourceURIs []string `yaml:"uris,omitempty" structs:"uris,omitempty"`
23+
24+
// Additional configs for CSV, GoogleSheets, Bigtable, and Parquet formats.
25+
Config map[string]interface{} `yaml:"config,omitempty" structs:"config"`
26+
}
27+
28+
type externalTableSpec struct{}
29+
30+
func (s externalTableSpec) Adapter() models.DatastoreSpecAdapter {
31+
return &tableSpecHandler{}
32+
}
33+
34+
func (s externalTableSpec) Validator() models.DatastoreSpecValidator {
35+
return func(spec models.ResourceSpec) error {
36+
if !tableNameParseRegex.MatchString(spec.Name) {
37+
return fmt.Errorf("for example 'project_name.dataset_name.table_name'")
38+
}
39+
parsedNames := tableNameParseRegex.FindStringSubmatch(spec.Name)
40+
if len(parsedNames) < 3 || len(parsedNames[1]) == 0 || len(parsedNames[2]) == 0 || len(parsedNames[3]) == 0 {
41+
return fmt.Errorf("for example 'project_name.dataset_name.table_name'")
42+
}
43+
return nil
44+
}
45+
}
46+
47+
func (s externalTableSpec) DefaultAssets() map[string]string {
48+
return map[string]string{}
49+
}
50+
51+
func extractTableSourceFromProtoStruct(protoVal *structpb.Value) *BQExternalSource {
52+
sInfo := &BQExternalSource{}
53+
if protoVal.GetStructValue() == nil {
54+
return sInfo
55+
}
56+
if f, ok := protoVal.GetStructValue().Fields["type"]; ok {
57+
sInfo.SourceType = strings.ToUpper(f.GetStringValue())
58+
}
59+
if f, ok := protoVal.GetStructValue().Fields["uris"]; ok {
60+
var sourceURIs []string
61+
if f.GetListValue() != nil {
62+
for _, value := range f.GetListValue().GetValues() {
63+
sourceURIs = append(sourceURIs, value.GetStringValue())
64+
}
65+
}
66+
sInfo.SourceURIs = sourceURIs
67+
}
68+
if f, ok := protoVal.GetStructValue().Fields["config"]; ok {
69+
sInfo.Config = f.GetStructValue().AsMap()
70+
}
71+
return sInfo
72+
}

0 commit comments

Comments
 (0)