Skip to content

Commit 8e85599

Browse files
committedMar 13, 2021
supports reindex
1 parent f17ca71 commit 8e85599

14 files changed

+380
-32
lines changed
 

‎README.md

+21-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ eskeeper synchronizes index and alias with configuration files while ensuring id
1616

1717
* index
1818
- [x] create
19+
- [x] status (open or close)
20+
- [x] reindex (only basic parameter)
1921
- [ ] update mapping
2022
- [ ] delete
2123

@@ -38,8 +40,8 @@ es.yaml is indices & aliases config file.
3840

3941
```yaml
4042
index:
41-
- name: test-v1
42-
mapping: testdata/test.json
43+
- name: test-v1 # index name
44+
mapping: testdata/test.json # index setting & mapping (json)
4345

4446
- name: test-v2
4547
mapping: testdata/test.json
@@ -48,6 +50,20 @@ index:
4850
mapping: testdata/test.json
4951
status: close
5052

53+
# reindex test-v1 -> reindex-v1
54+
- name: reindex-v1
55+
mapping: testdata/test.json
56+
reindex:
57+
source: test-v1
58+
slices: 3 # default=1
59+
waitForCompletion: true
60+
61+
# 'on' field supports 2 hooks.
62+
# 'reindex': only when index is created for the first time.
63+
# 'always': always exec reindex.
64+
on: firstCreated
65+
66+
5167
alias:
5268
- name: alias1
5369
index:
@@ -67,14 +83,16 @@ curl localhost:9200/_cat/indices
6783
yellow open test-v1 ... 1 1 0 0 208b 208b
6884
yellow open test-v2 ... 1 1 0 0 208b 208b
6985
yellow close close-v1 xxxxxxxxxxxx 1 1
86+
yellow open reindex-v1 ... 1 1 0 0 208b 208b
7087

7188
curl localhost:9200/_cat/aliases
7289
alias2 test-v2 - - - -
7390
alias1 test-v1 - - - -
7491
alias2 test-v1 - - - -
7592
```
7693

77-
## :triangular_ruler: Settings
94+
95+
## :triangular_ruler: Usage
7896

7997
eskeeper supports flag & environment value.
8098

‎beta.go

-2
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,6 @@ func (c *esclient) equalMappingsProperties(ctx context.Context, index index, map
125125
return false, errors.New("get index response dose not contain index name field")
126126
}
127127

128-
fmt.Println(v)
129-
130128
b, err := json.Marshal(v.Mappings)
131129
if err != nil {
132130
return false, fmt.Errorf("marshal properties json: %w", err)

‎conf.go

+27-3
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,29 @@ var status = map[string]struct{}{
1616
"": struct{}{}, // default
1717
}
1818

19+
var reindexOn = map[string]struct{}{
20+
"always": struct{}{},
21+
"firstCreated": struct{}{},
22+
"": struct{}{}, // default
23+
}
24+
1925
type config struct {
2026
Indices []index `json:"index"`
2127
Aliases []alias `json:"alias"` // supports close only
2228
}
2329

2430
type index struct {
25-
Name string `json:"name"`
26-
Mapping string `json:"mapping"`
27-
Status string `json:"status"`
31+
Name string `json:"name"`
32+
Mapping string `json:"mapping"`
33+
Status string `json:"status"`
34+
Reindex reindex `json:"reindex"`
35+
}
36+
37+
type reindex struct {
38+
Source string `json:"source"`
39+
Slices int `json:"slices"`
40+
WaitForCompletion bool `json:"waitForCompletion"`
41+
On string `json:"on"`
2842
}
2943

3044
type alias struct {
@@ -67,6 +81,16 @@ func validateIndex(index index) error {
6781
return fmt.Errorf("unsupported status %v", index.Status)
6882
}
6983

84+
if index.Reindex.Source != "" {
85+
if index.Status == "close" {
86+
return errors.New("unsupported close status and reindex cannot be used together")
87+
}
88+
_, ok := reindexOn[index.Reindex.On]
89+
if !ok {
90+
return fmt.Errorf("unsupported reindex hook %v. [always or firstCreated]", index.Reindex.On)
91+
}
92+
}
93+
7094
return nil
7195
}
7296

‎conf_test.go

+28
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,34 @@ func TestYaml2Conf(t *testing.T) {
4444
},
4545
},
4646
},
47+
{
48+
name: "reindex",
49+
yaml: "testdata/es.reindex.yaml",
50+
want: config{
51+
Indices: []index{
52+
{
53+
Name: "test-v1",
54+
Mapping: "testdata/test.json",
55+
},
56+
{
57+
Name: "test-v2",
58+
Mapping: "testdata/test.json",
59+
Reindex: reindex{
60+
Source: "test-v1",
61+
Slices: 20,
62+
WaitForCompletion: true,
63+
On: "firstCreated",
64+
},
65+
},
66+
},
67+
Aliases: []alias{
68+
{
69+
Name: "alias1",
70+
Indices: []string{"test-v2"},
71+
},
72+
},
73+
},
74+
},
4775
}
4876

4977
for _, tt := range tests {

‎docker-compose.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
version: '3'
22
services:
33
elasticsearch:
4-
image: docker.elastic.co/elasticsearch/elasticsearch:7.9.2
4+
image: docker.elastic.co/elasticsearch/elasticsearch:7.11.1
55
ports:
66
- "9200:9200"
77
- "9300:9300"

‎go.mod

+23-16
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,43 @@
11
module github.com/po3rin/eskeeper
22

3-
go 1.15
3+
go 1.16
44

55
require (
66
github.com/Cside/jsondiff v0.0.0-20180209072652-0e50d980b458
7+
github.com/Masterminds/semver/v3 v3.1.1 // indirect
78
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
89
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
910
github.com/containerd/continuity v0.0.0-20200928162600-f2cc35102c2a // indirect
1011
github.com/creack/pty v1.1.9 // indirect
1112
github.com/elastic/go-elasticsearch v0.0.0
12-
github.com/elastic/go-elasticsearch/v7 v7.9.0
13+
github.com/elastic/go-elasticsearch/v7 v7.11.0
1314
github.com/evanphx/json-patch v4.9.0+incompatible // indirect
14-
github.com/fatih/color v1.9.0 // indirect
15-
github.com/goccy/go-yaml v1.8.2
16-
github.com/gofrs/uuid v3.3.0+incompatible
17-
github.com/google/go-cmp v0.4.0 // indirect
18-
github.com/itchyny/gojq v0.11.1
19-
github.com/itchyny/timefmt-go v0.1.1 // indirect
15+
github.com/fatih/color v1.10.0 // indirect
16+
github.com/goccy/go-yaml v1.8.9
17+
github.com/gofrs/uuid v4.0.0+incompatible
18+
github.com/hokaccha/go-prettyjson v0.0.0-20190818114111-108c894c2c0e // indirect
19+
github.com/itchyny/astgen-go v0.0.0-20200815150004-12a293722290 // indirect
20+
github.com/itchyny/gojq v0.12.2
2021
github.com/kataras/pio v0.0.10
2122
github.com/mattn/go-colorable v0.1.8 // indirect
23+
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
24+
github.com/oligot/go-mod-upgrade v0.4.1 // indirect
2225
github.com/ory/dockertest v3.3.5+incompatible
2326
github.com/ory/dockertest/v3 v3.6.0 // indirect
2427
github.com/pkg/errors v0.9.1
25-
github.com/po3rin/bmfzf v0.0.1
28+
github.com/po3rin/bmfzf v0.0.2
2629
github.com/sergi/go-diff v1.1.0 // indirect
2730
github.com/sirupsen/logrus v1.7.0 // indirect
28-
github.com/spf13/cobra v1.0.0
29-
github.com/spf13/pflag v1.0.3
30-
github.com/spf13/viper v1.7.0
31-
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
32-
golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6 // indirect
33-
golang.org/x/text v0.3.3 // indirect
31+
github.com/spf13/cobra v1.1.3
32+
github.com/spf13/pflag v1.0.5
33+
github.com/spf13/viper v1.7.1
34+
github.com/stretchr/testify v1.6.1 // indirect
35+
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
36+
golang.org/x/sys v0.0.0-20210313110737-8e9fff1a3a18 // indirect
37+
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d // indirect
38+
golang.org/x/text v0.3.5 // indirect
3439
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
35-
gopkg.in/yaml.v2 v2.2.7
40+
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
41+
gopkg.in/go-playground/validator.v9 v9.30.0 // indirect
42+
gopkg.in/yaml.v2 v2.4.0
3643
)

‎go.sum

+82
Large diffs are not rendered by default.

‎index.go

+37-2
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,46 @@ func (c *esclient) syncIndex(ctx context.Context, index index) error {
5252
}
5353
return fmt.Errorf("failed to create index [index=%v, statusCode=%v, res=%v]", index.Name, res.StatusCode, string(body))
5454
}
55+
56+
// reindex --------
57+
58+
if index.Reindex.Source == "" {
59+
return nil
60+
}
61+
62+
ok, err := c.existIndex(ctx, index.Reindex.Source)
63+
if err != nil {
64+
return fmt.Errorf("check index exists for reindex process: %w", err)
65+
}
66+
if !ok {
67+
return fmt.Errorf("reindex (%s -> %s) conf is invalid. Make sure %s index exists", index.Reindex.Source, index.Reindex.Source, index.Name)
68+
}
69+
70+
err = c.reindex(ctx, index.Name, index.Reindex)
71+
if err != nil {
72+
return fmt.Errorf("reindex (%s -> %s)", index.Reindex.Source, index.Name)
73+
}
5574
return nil
5675
}
5776

5877
// index already exists.
5978

79+
// reindex -------
80+
if index.Reindex.Source != "" && index.Reindex.On == "always" {
81+
ok, err = c.existIndex(ctx, index.Reindex.Source)
82+
if err != nil {
83+
return fmt.Errorf("check index exists for reindex process: %w", err)
84+
}
85+
if !ok {
86+
return fmt.Errorf("reindex (%s -> %s) conf is invalid. Make sure %s index exists", index.Reindex.Source, index.Reindex.Source, index.Name)
87+
}
88+
err = c.reindex(ctx, index.Name, index.Reindex)
89+
if err != nil {
90+
return fmt.Errorf("reindex (%s -> %s)", index.Reindex.Source, index.Name)
91+
}
92+
return nil
93+
}
94+
6095
// Since downtime may occur when switching aliases, only open is processed before switching aliases.
6196
// TODO: refactoring.
6297
if index.Status == "close" {
@@ -143,9 +178,9 @@ func (c *esclient) openIndex(ctx context.Context, index index) error {
143178
if res.StatusCode != 200 {
144179
body, err := ioutil.ReadAll(res.Body)
145180
if err != nil {
146-
return fmt.Errorf("failed to open index [index= %v, statusCode=%v]", index, res.StatusCode)
181+
return fmt.Errorf("failed to open index [index= %v, statusCode=%v]: %w", index, res.StatusCode, err)
147182
}
148-
return fmt.Errorf("failed to open index [index= %v, statusCode=%v, res=%v]", index, res.StatusCode, string(body))
183+
return fmt.Errorf("failed to open index [index= %v, statusCode=%v, res=%v]: %w", index, res.StatusCode, string(body), err)
149184
}
150185
return nil
151186
}

‎index_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func TestSyncIndices(t *testing.T) {
9191
},
9292
setup: func(tb testing.TB) {
9393
createTmpIndexHelper(tb, "open-v1")
94-
closeIndex(tb, "open-v1")
94+
closeIndexHelper(tb, "open-v1")
9595
},
9696
},
9797
{

‎main_test.go

+32-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func TestMain(m *testing.M) {
2222

2323
resource, err := pool.Run(
2424
"docker.elastic.co/elasticsearch/elasticsearch",
25-
"7.9.2",
25+
"7.11.1",
2626
[]string{
2727
"ES_JAVA_OPTS=-Xms512m -Xmx512m",
2828
"discovery.type=single-node",
@@ -132,7 +132,7 @@ func createTmpAliasHelper(tb testing.TB, name string, index string) {
132132
}
133133
}
134134

135-
func closeIndex(tb testing.TB, index string) {
135+
func closeIndexHelper(tb testing.TB, index string) {
136136
conf := elasticsearch.Config{
137137
Addresses: []string{url},
138138
}
@@ -153,3 +153,33 @@ func closeIndex(tb testing.TB, index string) {
153153
tb.Fatalf("failed to close index [index= %v, statusCode=%v, res=%v]", index, res.StatusCode, string(body))
154154
}
155155
}
156+
157+
func postDocHelper(tb testing.TB, index string) {
158+
tb.Helper()
159+
conf := elasticsearch.Config{
160+
Addresses: []string{url},
161+
}
162+
es, err := elasticsearch.NewClient(conf)
163+
if err != nil {
164+
tb.Fatal(err)
165+
}
166+
167+
body := strings.NewReader(`{"title":"this is title","body":"this is body"}`)
168+
169+
res, err := es.Index(
170+
index,
171+
body,
172+
es.Index.WithRefresh("true"),
173+
)
174+
if err != nil {
175+
tb.Fatal(err)
176+
}
177+
178+
if res.StatusCode != 201 {
179+
body, err := ioutil.ReadAll(res.Body)
180+
if err != nil {
181+
tb.Fatal(err)
182+
}
183+
tb.Fatalf("failed to post document [index=%v, statusCode=%v, res=%v]", index, res.StatusCode, string(body))
184+
}
185+
}

‎reindex.go

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package eskeeper
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io/ioutil"
7+
"strings"
8+
)
9+
10+
func (c *esclient) reindex(ctx context.Context, dest string, reindex reindex) error {
11+
ri := c.client.Reindex
12+
body := strings.NewReader(
13+
fmt.Sprintf(`
14+
{
15+
"source": {
16+
"index": "%s"
17+
},
18+
"dest": {
19+
"index": "%s"
20+
}
21+
}`,
22+
reindex.Source, dest,
23+
),
24+
)
25+
26+
slices := reindex.Slices
27+
if slices == 0 {
28+
slices = 1
29+
}
30+
31+
res, err := ri(
32+
body,
33+
ri.WithContext(ctx),
34+
ri.WithSlices(slices),
35+
ri.WithWaitForCompletion(reindex.WaitForCompletion),
36+
)
37+
if err != nil {
38+
return fmt.Errorf("reindex: %w", err)
39+
}
40+
if res.StatusCode != 200 {
41+
body, err := ioutil.ReadAll(res.Body)
42+
if err != nil {
43+
return err
44+
}
45+
return fmt.Errorf("failed to reindex [index=%v, statusCode=%v, res=%v]", reindex.Source, res.StatusCode, string(body))
46+
}
47+
return nil
48+
}

‎reindex_test.go

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package eskeeper
2+
3+
import (
4+
"context"
5+
"testing"
6+
)
7+
8+
func TestReindex(t *testing.T) {
9+
tests := []struct {
10+
name string
11+
dest string
12+
reindex reindex
13+
setup func(tb testing.TB)
14+
cleanup func(tb testing.TB)
15+
}{
16+
{
17+
name: "reindex",
18+
dest: "reindex-dest",
19+
reindex: reindex{
20+
Source: "reindex-src",
21+
Slices: 3,
22+
WaitForCompletion: true,
23+
On: "firstCreated",
24+
},
25+
setup: func(tb testing.TB) {
26+
createTmpIndexHelper(tb, "reindex-src")
27+
createTmpIndexHelper(tb, "reindex-dest")
28+
postDocHelper(tb, "reindex-src")
29+
},
30+
cleanup: func(tb testing.TB) {
31+
},
32+
},
33+
}
34+
35+
es, err := newEsClient([]string{url}, "", "")
36+
if err != nil {
37+
t.Fatal(err)
38+
}
39+
40+
for _, tt := range tests {
41+
t.Run(tt.name, func(t *testing.T) {
42+
ctx := context.Background()
43+
if tt.setup != nil {
44+
tt.setup(t)
45+
}
46+
err := es.reindex(ctx, tt.dest, tt.reindex)
47+
if err != nil {
48+
t.Error(err)
49+
}
50+
if tt.cleanup != nil {
51+
tt.cleanup(t)
52+
}
53+
})
54+
}
55+
}

‎testdata/es.reindex.yaml

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
index:
2+
- name: test-v1
3+
mapping: testdata/test.json
4+
5+
# reindex test-v1 -> reindex-v1
6+
- name: reindex-v1
7+
mapping: testdata/test.json
8+
reindex:
9+
source: test-v1
10+
slices: 3 # default=1
11+
waitForCompletion: true
12+
# 'on' field supports 2 hooks.
13+
# 'reindex': only when index is created for the first time.
14+
# 'always': always exec reindex.
15+
on: firstCreated
16+
17+
18+
alias:
19+
- name: alias1
20+
index:
21+
- test-v2
22+

‎testdata/es.yaml

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
index:
2-
- name: test-v1
3-
mapping: testdata/test.json
2+
- name: test-v1 # index name
3+
mapping: testdata/test.json # index setting & mapping (json)
44

55
- name: test-v2
66
mapping: testdata/test.json
@@ -14,6 +14,7 @@ alias:
1414
index:
1515
- test-v1
1616

17+
# multi indicies
1718
- name: alias2
1819
index:
1920
- test-v1

0 commit comments

Comments
 (0)
Please sign in to comment.