Skip to content

Commit c8b3bc6

Browse files
authored
feat(query): added MapR Queryer (#21)
1 parent eb33d0d commit c8b3bc6

17 files changed

+1397
-55
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
# Test binary, built with `go test -c`
1212
*.test
13+
*.db
1314

1415
# Output of the go coverage tool, specifically when used with LiteIDE
1516
*.out

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/mattn/go-sqlite3 v1.14.22
88
github.com/rs/zerolog v1.32.0
99
github.com/stretchr/testify v1.9.0
10+
github.com/yaitoo/async v1.0.0
1011
)
1112

1213
require (

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0=
2020
github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
2121
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
2222
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
23+
github.com/yaitoo/async v1.0.0 h1:1of0INlIdF24Tz0Scg5Vd0Hnef5M+hSfQLdv2wFOHvc=
24+
github.com/yaitoo/async v1.0.0/go.mod h1:IpSO7Ei7AxiqLxFqDjN4rJaVlt8wm4ZxMXyyQaWmM1g=
2325
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
2426
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
2527
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

migrate/migrator.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -405,18 +405,16 @@ func (m *Migrator) migrate(ctx context.Context, db *sqle.DB) error {
405405
switch s.Rotate {
406406
case shardid.MonthlyRotate:
407407
for t := s.RotateBegin; !t.After(s.RotateEnd); t = t.AddDate(0, 1, 0) {
408-
rotations = append(rotations, "_"+t.Format("200601"))
408+
rotations = append(rotations, shardid.FormatMonth(t))
409409
}
410410
case shardid.WeeklyRotate:
411-
var week int
412411
for t := s.RotateBegin; !t.After(s.RotateEnd); t = t.AddDate(0, 0, 7) {
413-
_, week = t.ISOWeek() //1-53 week
414-
rotations = append(rotations, "_"+t.Format("2006")+fmt.Sprintf("%03d", week))
412+
rotations = append(rotations, shardid.FormatWeek(t))
415413
}
416414

417415
case shardid.DailyRotate:
418416
for t := s.RotateBegin; !t.After(s.RotateEnd); t = t.AddDate(0, 0, 1) {
419-
rotations = append(rotations, "_"+t.Format("20060102"))
417+
rotations = append(rotations, shardid.FormatDay(t))
420418
}
421419
}
422420

query.go

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package sqle
2+
3+
import (
4+
"context"
5+
"fmt"
6+
)
7+
8+
type Errors struct {
9+
items []error
10+
}
11+
12+
func (e *Errors) Error() string {
13+
return fmt.Sprint(e.items)
14+
}
15+
16+
type Query[T any] struct {
17+
db *DB
18+
queryer Queryer[T]
19+
tables []string
20+
}
21+
22+
// NewQuery create a Query
23+
func NewQuery[T any](db *DB, options ...QueryOption[T]) Query[T] {
24+
q := Query[T]{
25+
db: db,
26+
}
27+
28+
for _, opt := range options {
29+
if opt != nil {
30+
opt(&q)
31+
}
32+
}
33+
34+
if q.tables == nil {
35+
q.tables = []string{""}
36+
}
37+
38+
if q.queryer == nil {
39+
q.queryer = &MapR[T]{
40+
dbs: q.db.dbs,
41+
}
42+
}
43+
44+
return q
45+
}
46+
47+
func (q *Query[T]) First(ctx context.Context, b *Builder) (T, error) {
48+
return q.queryer.First(ctx, q.tables, b)
49+
}
50+
51+
func (q *Query[T]) Count(ctx context.Context, b *Builder) (int, error) {
52+
return q.queryer.Count(ctx, q.tables, b)
53+
}
54+
55+
func (q *Query[T]) Query(ctx context.Context, b *Builder, less func(i, j T) bool) ([]T, error) {
56+
return q.queryer.Query(ctx, q.tables, b, less)
57+
}
58+
59+
func (q *Query[T]) QueryLimit(ctx context.Context, b *Builder, less func(i, j T) bool, limit int) ([]T, error) {
60+
return q.queryer.QueryLimit(ctx, q.tables, b, less, limit)
61+
}

query_option.go

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package sqle
2+
3+
import (
4+
"time"
5+
6+
"github.com/yaitoo/sqle/shardid"
7+
)
8+
9+
type QueryOption[T any] func(q *Query[T])
10+
11+
func WithMonths[T any](start, end time.Time) QueryOption[T] {
12+
return func(q *Query[T]) {
13+
for t := start; !t.After(end); t = t.AddDate(0, 1, 0) {
14+
q.tables = append(q.tables, shardid.FormatMonth(t))
15+
}
16+
}
17+
}
18+
19+
func WithWeeks[T any](start, end time.Time) QueryOption[T] {
20+
return func(q *Query[T]) {
21+
for t := start; !t.After(end); t = t.AddDate(0, 0, 7) {
22+
q.tables = append(q.tables, shardid.FormatWeek(t))
23+
}
24+
}
25+
}
26+
27+
func WithDays[T any](start, end time.Time) QueryOption[T] {
28+
return func(q *Query[T]) {
29+
for t := start; !t.After(end); t = t.AddDate(0, 0, 1) {
30+
q.tables = append(q.tables, shardid.FormatDay(t))
31+
}
32+
}
33+
}
34+
35+
func WithQueryer[T any](qr Queryer[T]) QueryOption[T] {
36+
return func(q *Query[T]) {
37+
q.queryer = qr
38+
}
39+
}

queryer.go

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package sqle
2+
3+
import "context"
4+
5+
type Queryer[T any] interface {
6+
First(ctx context.Context, tables []string, b *Builder) (T, error)
7+
Count(ctx context.Context, tables []string, b *Builder) (int, error)
8+
Query(ctx context.Context, tables []string, b *Builder, less func(i, j T) bool) ([]T, error)
9+
QueryLimit(ctx context.Context, tables []string, b *Builder, less func(i, j T) bool, limit int) ([]T, error)
10+
}

queryer_mapr.go

+156
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package sqle
2+
3+
import (
4+
"context"
5+
"sort"
6+
"strconv"
7+
"strings"
8+
9+
"github.com/yaitoo/async"
10+
)
11+
12+
// MapR Map/Reduce Query
13+
type MapR[T any] struct {
14+
dbs []*Context
15+
}
16+
17+
func (q *MapR[T]) First(ctx context.Context, tables []string, b *Builder) (T, error) {
18+
var it T
19+
b.Input("rotate", "<rotate>") //lazy replace on async.Wait
20+
query, args, err := b.Build()
21+
if err != nil {
22+
return it, err
23+
}
24+
25+
w := async.New[T]()
26+
27+
for _, r := range tables {
28+
qr := strings.ReplaceAll(query, "<rotate>", r)
29+
for _, db := range q.dbs {
30+
w.Add(func(db *Context, qr string) func(context.Context) (T, error) {
31+
return func(ctx context.Context) (T, error) {
32+
var t T
33+
err := db.QueryRowContext(ctx, qr, args...).Bind(&t)
34+
if err != nil {
35+
return t, err
36+
}
37+
38+
return t, nil
39+
}
40+
}(db, qr))
41+
}
42+
}
43+
44+
return w.WaitAny(ctx)
45+
}
46+
func (q *MapR[T]) Count(ctx context.Context, tables []string, b *Builder) (int, error) {
47+
b.Input("rotate", "<rotate>") //lazy replace on async.Wait
48+
query, args, err := b.Build()
49+
if err != nil {
50+
return 0, err
51+
}
52+
53+
w := async.New[int]()
54+
55+
for _, r := range tables {
56+
qr := strings.ReplaceAll(query, "<rotate>", r)
57+
for _, db := range q.dbs {
58+
w.Add(func(db *Context, qr string) func(context.Context) (int, error) {
59+
return func(ctx context.Context) (int, error) {
60+
var i int
61+
err := db.QueryRowContext(ctx, qr, args...).Scan(&i)
62+
if err != nil {
63+
return i, err
64+
}
65+
66+
return i, nil
67+
}
68+
}(db, qr))
69+
}
70+
}
71+
72+
items, err := w.Wait(ctx)
73+
74+
if err != nil {
75+
return 0, err
76+
}
77+
78+
var total int
79+
80+
for _, it := range items {
81+
total += it
82+
}
83+
84+
return total, nil
85+
}
86+
func (q *MapR[T]) Query(ctx context.Context, tables []string, b *Builder, less func(i, j T) bool) ([]T, error) {
87+
88+
b.Input("rotate", "<rotate>") //lazy replace on async.Wait
89+
query, args, err := b.Build()
90+
if err != nil {
91+
return nil, err
92+
}
93+
94+
w := async.New[[]T]()
95+
96+
for _, r := range tables {
97+
qr := strings.ReplaceAll(query, "<rotate>", r)
98+
for _, db := range q.dbs {
99+
w.Add(func(db *Context, qr string) func(context.Context) ([]T, error) {
100+
return func(context.Context) ([]T, error) {
101+
var t []T
102+
rows, err := db.QueryContext(ctx, qr, args...)
103+
if err != nil {
104+
return t, err
105+
}
106+
107+
err = rows.Bind(&t)
108+
if err != nil {
109+
return t, err
110+
}
111+
112+
return t, nil
113+
}
114+
}(db, qr))
115+
}
116+
}
117+
118+
items, err := w.Wait(ctx)
119+
120+
if err != nil {
121+
return nil, err
122+
}
123+
124+
var list []T
125+
126+
for _, it := range items {
127+
if it != nil {
128+
list = append(list, it...)
129+
}
130+
}
131+
132+
if less != nil {
133+
sort.Slice(list, func(i, j int) bool {
134+
return less(list[i], list[j])
135+
})
136+
}
137+
138+
return list, nil
139+
}
140+
func (q *MapR[T]) QueryLimit(ctx context.Context, tables []string, b *Builder, less func(i, j T) bool, limit int) ([]T, error) {
141+
142+
if limit > 0 {
143+
b.SQL(" LIMIT " + strconv.Itoa(limit*len(q.dbs)))
144+
}
145+
146+
list, err := q.Query(ctx, tables, b, less)
147+
if err != nil {
148+
return nil, err
149+
}
150+
151+
if limit < len(list) {
152+
return list[0:limit], nil
153+
}
154+
155+
return list, nil
156+
}

0 commit comments

Comments
 (0)