-
Notifications
You must be signed in to change notification settings - Fork 0
/
api.go
90 lines (72 loc) · 2.15 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package main
import (
"fmt"
daxc "github.com/aws/aws-dax-go/dax"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dax"
"github.com/aws/aws-sdk-go/service/dynamodb"
"strings"
)
type Api struct {
dynamo *dynamodb.DynamoDB
dax *daxc.Dax
}
func (a *Api) ListTablesPages(input *dynamodb.ListTablesInput, cb func(*dynamodb.ListTablesOutput, bool) bool) error {
return a.dynamo.ListTablesPages(input, cb)
}
func (a *Api) DescribeTable(input *dynamodb.DescribeTableInput) (*dynamodb.DescribeTableOutput, error) {
return a.dynamo.DescribeTable(input)
}
func (a *Api) QueryPages(input *dynamodb.QueryInput, cb func(*dynamodb.QueryOutput, bool) bool) error {
if a.dax != nil {
return a.dax.QueryPages(input, cb)
}
return a.dynamo.QueryPages(input, cb)
}
func (a *Api) ScanPages(input *dynamodb.ScanInput, cb func(*dynamodb.ScanOutput, bool) bool) error {
if a.dax != nil {
return a.dax.ScanPages(input, cb)
}
return a.dynamo.ScanPages(input, cb)
}
func apiClient(daxCluster string, profile string) *Api {
sess, err := session.NewSessionWithOptions(session.Options{
Profile: profile,
SharedConfigState: session.SharedConfigEnable,
AssumeRoleTokenProvider: stscreds.StdinTokenProvider,
})
if err != nil {
panic(err)
}
api := &Api{dynamo: dynamodb.New(sess)}
if len(daxCluster) == 0 {
return api
}
if !strings.Contains(daxCluster, ".") {
// must be a cluster name rather than domain name
dapi := dax.New(sess)
desc, err := dapi.DescribeClusters(&dax.DescribeClustersInput{ClusterNames: []*string{}})
if err != nil {
panic(err)
}
if len(desc.Clusters) == 0 {
panic("no cluster found by that name")
}
e := desc.Clusters[0].ClusterDiscoveryEndpoint
daxCluster = fmt.Sprintf("%s:%d", *e.Address, *e.Port)
}
if !strings.Contains(daxCluster, ":") {
// missing port, assume default
daxCluster += ":8111"
}
cfg := daxc.DefaultConfig()
cfg.HostPorts = []string{daxCluster}
cfg.Credentials = sess.Config.Credentials
cfg.Region = *sess.Config.Region
api.dax, err = daxc.New(cfg)
if err != nil {
panic(err)
}
return api
}