Skip to content

Commit 79ca78b

Browse files
authored
feat(elasticsearch): make service as an index (#136)
1 parent 0707e4c commit 79ca78b

18 files changed

+200
-764
lines changed

cli/migrate.go

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,13 @@ import (
55
"fmt"
66
"os/signal"
77
"syscall"
8-
"time"
98

109
"github.com/MakeNowJust/heredoc"
11-
"github.com/odpf/compass/core/asset"
1210
"github.com/odpf/compass/internal/store/postgres"
1311
"github.com/odpf/salt/log"
1412
"github.com/spf13/cobra"
1513
)
1614

17-
const (
18-
esMigrationTimeout = 5 * time.Second
19-
)
20-
2115
func cmdMigrate() *cobra.Command {
2216
return &cobra.Command{
2317
Use: "migrate",
@@ -55,11 +49,6 @@ func runMigrations(ctx context.Context, config Config) error {
5549
}
5650
logger.Info("Migration Postgres done.")
5751

58-
logger.Info("Migrating ES...")
59-
if err := migrateElasticsearch(logger, config); err != nil {
60-
return err
61-
}
62-
logger.Info("Migration ES done.")
6352
return nil
6453
}
6554

@@ -79,23 +68,3 @@ func migratePostgres(logger log.Logger, config Config) (err error) {
7968

8069
return nil
8170
}
82-
83-
func migrateElasticsearch(logger log.Logger, config Config) error {
84-
logger.Info("Initiating ES client...")
85-
esClient, err := initElasticsearch(logger, config.Elasticsearch)
86-
if err != nil {
87-
return err
88-
}
89-
90-
for _, supportedType := range asset.AllSupportedTypes {
91-
logger.Info("Migrating type\n", "type", supportedType)
92-
ctx, cancel := context.WithTimeout(context.Background(), esMigrationTimeout)
93-
defer cancel()
94-
err := esClient.Migrate(ctx, supportedType)
95-
if err != nil {
96-
return fmt.Errorf("error creating/replacing type %q: %w", supportedType, err)
97-
}
98-
logger.Info("created/updated type\n", "type", supportedType)
99-
}
100-
return nil
101-
}

internal/store/elasticsearch/discovery_repository.go

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,15 @@ import (
66
"encoding/json"
77
"fmt"
88
"io"
9-
"strconv"
109
"strings"
1110

1211
"github.com/odpf/compass/core/asset"
1312
)
1413

15-
var indexTypeMap = map[asset.Type]string{
16-
asset.TypeTable: "table",
17-
asset.TypeTopic: "topic",
18-
asset.TypeJob: "job",
19-
asset.TypeDashboard: "dashboard",
20-
}
21-
2214
// DiscoveryRepository implements discovery.Repository
2315
// with elasticsearch as the backing store.
2416
type DiscoveryRepository struct {
25-
cli *Client
26-
typeWhiteList []string
27-
typeWhiteListSet map[string]bool
17+
cli *Client
2818
}
2919

3020
func NewDiscoveryRepository(cli *Client) *DiscoveryRepository {
@@ -41,6 +31,17 @@ func (repo *DiscoveryRepository) Upsert(ctx context.Context, ast asset.Asset) er
4131
return asset.ErrUnknownType
4232
}
4333

34+
idxExists, err := repo.cli.indexExists(ctx, ast.Service)
35+
if err != nil {
36+
return elasticSearchError(err)
37+
}
38+
39+
if !idxExists {
40+
if err := repo.cli.CreateIdx(ctx, ast.Service); err != nil {
41+
return err
42+
}
43+
}
44+
4445
body, err := repo.createUpsertBody(ast)
4546
if err != nil {
4647
return fmt.Errorf("error serialising payload: %w", err)
@@ -82,9 +83,9 @@ func (repo *DiscoveryRepository) Delete(ctx context.Context, assetID string) err
8283
}
8384

8485
func (repo *DiscoveryRepository) GetTypes(ctx context.Context) (map[asset.Type]int, error) {
85-
resp, err := repo.cli.client.Cat.Indices(
86-
repo.cli.client.Cat.Indices.WithFormat("json"),
87-
repo.cli.client.Cat.Indices.WithContext(ctx),
86+
resp, err := repo.cli.client.Search(
87+
repo.cli.client.Search.WithContext(ctx),
88+
repo.cli.client.Search.WithBody(strings.NewReader(`{"size": 0,"aggs":{"aggregation_name":{"terms":{"field":"type.keyword"}}}}`)),
8889
)
8990
if err != nil {
9091
return nil, fmt.Errorf("error from es client %w", err)
@@ -93,23 +94,20 @@ func (repo *DiscoveryRepository) GetTypes(ctx context.Context) (map[asset.Type]i
9394
if resp.IsError() {
9495
return nil, fmt.Errorf("error from es server: %s", errorReasonFromResponse(resp))
9596
}
96-
var indices []esIndex
97-
err = json.NewDecoder(resp.Body).Decode(&indices)
97+
98+
var response searchResponse
99+
err = json.NewDecoder(resp.Body).Decode(&response)
98100
if err != nil {
99-
return nil, fmt.Errorf("error decoding es response %w", err)
101+
return nil, fmt.Errorf("error decoding aggregate search response %w", err)
100102
}
101103

102104
results := map[asset.Type]int{}
103-
for _, index := range indices {
104-
count, err := strconv.Atoi(index.DocsCount)
105-
if err != nil {
106-
return results, fmt.Errorf("error converting docs count to a number: %w", err)
107-
}
108-
typName := asset.Type(index.Index)
105+
for _, bucket := range response.Aggregations.AggregationName.Buckets {
106+
typName := asset.Type(bucket.Key)
109107
if !typName.IsValid() {
110108
continue
111109
}
112-
results[typName] = count
110+
results[typName] = bucket.DocCount
113111
}
114112

115113
return results, nil
@@ -132,7 +130,7 @@ func (repo *DiscoveryRepository) createUpsertBody(ast asset.Asset) (io.Reader, e
132130
func (repo *DiscoveryRepository) writeInsertAction(w io.Writer, ast asset.Asset) error {
133131
action := map[string]interface{}{
134132
"index": map[string]interface{}{
135-
"_index": indexTypeMap[ast.Type],
133+
"_index": ast.Service,
136134
"_id": ast.ID,
137135
},
138136
}

internal/store/elasticsearch/discovery_repository_test.go

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ import (
1414
)
1515

1616
func TestDiscoveryRepositoryUpsert(t *testing.T) {
17-
ctx := context.Background()
17+
var (
18+
ctx = context.Background()
19+
bigqueryService = "bigquery-test"
20+
)
1821

1922
t.Run("should return error if id empty", func(t *testing.T) {
2023
cli, err := esTestServer.NewClient()
@@ -28,8 +31,9 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
2831

2932
repo := store.NewDiscoveryRepository(esClient)
3033
err = repo.Upsert(ctx, asset.Asset{
31-
ID: "",
32-
Type: asset.TypeTable,
34+
ID: "",
35+
Type: asset.TypeTable,
36+
Service: bigqueryService,
3337
})
3438
assert.ErrorIs(t, err, asset.ErrEmptyID)
3539
})
@@ -46,18 +50,19 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
4650

4751
repo := store.NewDiscoveryRepository(esClient)
4852
err = repo.Upsert(ctx, asset.Asset{
49-
ID: "sample-id",
50-
Type: asset.Type("unknown-type"),
53+
ID: "sample-id",
54+
Type: asset.Type("unknown-type"),
55+
Service: bigqueryService,
5156
})
5257
assert.ErrorIs(t, err, asset.ErrUnknownType)
5358
})
5459

55-
t.Run("should insert asset to the correct index by its type", func(t *testing.T) {
60+
t.Run("should insert asset to the correct index by its service", func(t *testing.T) {
5661
ast := asset.Asset{
5762
ID: "sample-id",
5863
URN: "sample-urn",
5964
Type: asset.TypeTable,
60-
Service: "bigquery",
65+
Service: bigqueryService,
6166
Name: "sample-name",
6267
Description: "sample-description",
6368
Data: map[string]interface{}{
@@ -85,7 +90,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
8590
err = repo.Upsert(ctx, ast)
8691
assert.NoError(t, err)
8792

88-
res, err := cli.API.Get("table", ast.ID)
93+
res, err := cli.API.Get(bigqueryService, ast.ID)
8994
require.NoError(t, err)
9095
require.False(t, res.IsError())
9196

@@ -112,7 +117,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
112117
ID: "existing-id",
113118
URN: "existing-urn",
114119
Type: asset.TypeTable,
115-
Service: "bigquery",
120+
Service: bigqueryService,
116121
Name: "existing-name",
117122
Description: "existing-description",
118123
}
@@ -137,7 +142,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
137142
err = repo.Upsert(ctx, newAsset)
138143
assert.NoError(t, err)
139144

140-
res, err := cli.API.Get("table", existingAsset.ID)
145+
res, err := cli.API.Get(bigqueryService, existingAsset.ID)
141146
require.NoError(t, err)
142147
require.False(t, res.IsError())
143148

@@ -155,7 +160,10 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
155160
}
156161

157162
func TestDiscoveryRepositoryDelete(t *testing.T) {
158-
ctx := context.Background()
163+
var (
164+
ctx = context.Background()
165+
bigqueryService = "bigquery-test"
166+
)
159167

160168
t.Run("should return error if id empty", func(t *testing.T) {
161169
cli, err := esTestServer.NewClient()
@@ -174,9 +182,10 @@ func TestDiscoveryRepositoryDelete(t *testing.T) {
174182

175183
t.Run("should not return error on success", func(t *testing.T) {
176184
ast := asset.Asset{
177-
ID: "delete-id",
178-
Type: asset.TypeTable,
179-
URN: "some-urn",
185+
ID: "delete-id",
186+
Type: asset.TypeTable,
187+
Service: bigqueryService,
188+
URN: "some-urn",
180189
}
181190

182191
cli, err := esTestServer.NewClient()
@@ -199,7 +208,10 @@ func TestDiscoveryRepositoryDelete(t *testing.T) {
199208
}
200209

201210
func TestDiscoveryRepositoryGetTypes(t *testing.T) {
202-
ctx := context.Background()
211+
var (
212+
ctx = context.Background()
213+
bigqueryService = "bigquery-test"
214+
)
203215

204216
t.Run("should return empty map if no type is available", func(t *testing.T) {
205217
cli, err := esTestServer.NewClient()
@@ -218,8 +230,7 @@ func TestDiscoveryRepositoryGetTypes(t *testing.T) {
218230
assert.Equal(t, map[asset.Type]int{}, counts)
219231
})
220232

221-
t.Run("should return map with 0 count if type has not been populated yet", func(t *testing.T) {
222-
typ := asset.TypeTable
233+
t.Run("should return empty map if type has not been populated yet", func(t *testing.T) {
223234
cli, err := esTestServer.NewClient()
224235
require.NoError(t, err)
225236
esClient, err := store.NewClient(
@@ -229,25 +240,27 @@ func TestDiscoveryRepositoryGetTypes(t *testing.T) {
229240
)
230241
require.NoError(t, err)
231242

232-
err = esClient.Migrate(ctx, typ)
243+
err = esClient.CreateIdx(ctx, bigqueryService)
233244
require.NoError(t, err)
234245

235246
repo := store.NewDiscoveryRepository(esClient)
236247
counts, err := repo.GetTypes(ctx)
237248
require.NoError(t, err)
238249

239-
expected := map[asset.Type]int{
240-
asset.TypeTable: 0,
241-
}
250+
expected := map[asset.Type]int{}
242251
assert.Equal(t, expected, counts)
243252
})
244253

245254
t.Run("should return maps of asset count with valid type as its key", func(t *testing.T) {
246-
typ := asset.TypeDashboard
255+
var (
256+
typ = asset.TypeDashboard
257+
tableauService = "tableau-test"
258+
)
259+
247260
assets := []asset.Asset{
248-
{ID: "id-asset-1", URN: "asset-1", Name: "asset-1", Type: typ},
249-
{ID: "id-asset-2", URN: "asset-2", Name: "asset-2", Type: typ},
250-
{ID: "id-asset-3", URN: "asset-3", Name: "asset-3", Type: typ},
261+
{ID: "id-asset-1", URN: "asset-1", Name: "asset-1", Type: typ, Service: tableauService},
262+
{ID: "id-asset-2", URN: "asset-2", Name: "asset-2", Type: typ, Service: tableauService},
263+
{ID: "id-asset-3", URN: "asset-3", Name: "asset-3", Type: typ, Service: tableauService},
251264
}
252265

253266
cli, err := esTestServer.NewClient()
@@ -259,13 +272,6 @@ func TestDiscoveryRepositoryGetTypes(t *testing.T) {
259272
)
260273
require.NoError(t, err)
261274

262-
err = esClient.Migrate(ctx, asset.TypeDashboard)
263-
require.NoError(t, err)
264-
265-
invalidType := "invalid-type"
266-
err = esClient.Migrate(ctx, asset.Type(invalidType))
267-
require.NoError(t, err)
268-
269275
repo := store.NewDiscoveryRepository(esClient)
270276
_, err = repo.GetTypes(ctx)
271277
require.NoError(t, err)

0 commit comments

Comments
 (0)