Skip to content

Commit 38ea109

Browse files
fix(lineage): lineage building memory spikes (#16)
* spike: optimise memory use * fix(lineage): monitors keep getting replaced by nil values * fix(lineage): a broken test due to new way to fetch records * refactor: add test for RecordRepository.GetAllIterator * refactor: minor refactoring Co-authored-by: Saravjeet 'Aman' Singh <[email protected]>
1 parent a09f925 commit 38ea109

File tree

10 files changed

+175
-27
lines changed

10 files changed

+175
-27
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ COVERFILE="/tmp/columbus.coverprofile"
77
all: build
88

99
build:
10-
go build -ldflags "-X main.Version=${VERSION}" ${NAME}
10+
go build -ldflags "-X cmd.Version=${VERSION}" ${NAME}
1111

1212
clean:
1313
rm -rf columbus dist/

api/routes.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,25 +29,25 @@ func RegisterRoutes(router *mux.Router, config Config) {
2929
config.TypeRepository,
3030
)
3131

32+
lineageHandler := handlers.NewLineageHandler(
33+
config.Logger.WithField("reporter", "lineage-handler"),
34+
config.LineageProvider,
35+
)
36+
3237
router.PathPrefix("/ping").Handler(handlers.NewHeartbeatHandler())
3338
setupTypeRoutes(router, "/v1/types", typeHandler)
3439

3540
router.Path("/v1/search").
3641
Methods(http.MethodGet).
3742
HandlerFunc(searchHandler.Search)
3843

39-
// Temporarily disable lineage routes
40-
// lineageHandler := handlers.NewLineageHandler(
41-
// config.Logger.WithField("reporter", "lineage-handler"),
42-
// config.LineageProvider,
43-
// )
44-
// router.PathPrefix("/v1/lineage/{type}/{id}").
45-
// Methods(http.MethodGet).
46-
// HandlerFunc(lineageHandler.GetLineage)
47-
48-
// router.PathPrefix("/v1/lineage").
49-
// Methods(http.MethodGet).
50-
// HandlerFunc(lineageHandler.ListLineage)
44+
router.PathPrefix("/v1/lineage/{type}/{id}").
45+
Methods(http.MethodGet).
46+
HandlerFunc(lineageHandler.GetLineage)
47+
48+
router.PathPrefix("/v1/lineage").
49+
Methods(http.MethodGet).
50+
HandlerFunc(lineageHandler.ListLineage)
5151
}
5252

5353
func setupTypeRoutes(router *mux.Router, baseURL string, typeHandler *handlers.TypeHandler) {

cmd/serve.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func initRouter(
6464
if err != nil {
6565
log.Fatalf("error creating searcher: %v", err)
6666
}
67+
6768
lineageService, err := lineage.NewService(typeRepository, recordRepositoryFactory, lineage.Config{
6869
RefreshInterval: config.LineageRefreshIntervalStr,
6970
MetricsMonitor: statsdMonitor,
@@ -72,6 +73,7 @@ func initRouter(
7273
if err != nil {
7374
log.Fatal(err)
7475
}
76+
rootLogger.Info("lineage build complete")
7577

7678
router := mux.NewRouter()
7779
if nrMonitor != nil {

lib/mock/mocks.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ func (repo *RecordRepository) GetAll(ctx context.Context, filter models.RecordFi
5757
return args.Get(0).([]models.Record), args.Error(1)
5858
}
5959

60+
func (repo *RecordRepository) GetAllIterator(ctx context.Context) (models.RecordIterator, error) {
61+
args := repo.Called(ctx)
62+
return args.Get(0).(models.RecordIterator), args.Error(1)
63+
}
64+
6065
func (repo *RecordRepository) GetByID(ctx context.Context, id string) (models.Record, error) {
6166
args := repo.Called(ctx, id)
6267
return args.Get(0).(models.Record), args.Error(1)
@@ -67,6 +72,25 @@ func (repo *RecordRepository) Delete(ctx context.Context, id string) error {
6772
return args.Error(0)
6873
}
6974

75+
type RecordIterator struct {
76+
mock.Mock
77+
}
78+
79+
func (m *RecordIterator) Scan() bool {
80+
args := m.Called()
81+
return args.Bool(0)
82+
}
83+
84+
func (m *RecordIterator) Next() []models.Record {
85+
args := m.Called()
86+
return args.Get(0).([]models.Record)
87+
}
88+
89+
func (m *RecordIterator) Close() error {
90+
args := m.Called()
91+
return args.Error(0)
92+
}
93+
7094
type RecordSearcher struct {
7195
mock.Mock
7296
}

lineage/builder.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,17 @@ func (builder defaultBuilder) populateTypeRecords(ctx context.Context, graph Adj
105105
if err != nil {
106106
return fmt.Errorf("error obtaing record repository: %w", err)
107107
}
108-
records, err := recordRepository.GetAll(ctx, models.RecordFilter{})
108+
109+
recordIter, err := recordRepository.GetAllIterator(ctx)
109110
if err != nil {
110-
return fmt.Errorf("error reading records for type %q: %w", typ.Name, err)
111+
return fmt.Errorf("error getting record iterator: %w", err)
111112
}
112-
for _, record := range records {
113-
if err := builder.addRecord(graph, typ, record, lineageProc); err != nil {
114-
return fmt.Errorf("error adding record to graph: %w", err)
113+
defer recordIter.Close()
114+
for recordIter.Scan() {
115+
for _, record := range recordIter.Next() {
116+
if err := builder.addRecord(graph, typ, record, lineageProc); err != nil {
117+
return fmt.Errorf("error adding record to graph: %w", err)
118+
}
115119
}
116120
}
117121
return nil

lineage/builder_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,13 @@ func initialiseRepos(datasets []dataset) (models.TypeRepository, models.RecordRe
2929
for _, dataset := range datasets {
3030
typ := dataset.Type.Normalise()
3131
tr.On("GetByName", typ.Name).Return(typ, nil)
32+
recordIterator := new(mock.RecordIterator)
33+
recordIterator.On("Scan").Return(true).Once()
34+
recordIterator.On("Scan").Return(false).Once()
35+
recordIterator.On("Next").Return(dataset.Records)
36+
recordIterator.On("Close").Return(nil)
3237
recordRepo := new(mock.RecordRepository)
33-
recordRepo.On("GetAll", ctx, models.RecordFilter{}).Return(dataset.Records, nil)
38+
recordRepo.On("GetAllIterator", ctx).Return(recordIterator, nil)
3439
rrf.On("For", typ).Return(recordRepo, nil)
3540
typList = append(typList, typ)
3641
}

lineage/service.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package lineage
33
import (
44
"context"
55
"fmt"
6+
"reflect"
67
"sync"
78
"sync/atomic"
89
"time"
@@ -53,7 +54,6 @@ func (srv *Service) build() {
5354
graph, err := srv.builder.Build(ctx, srv.typeRepo, srv.recordRepoFactory)
5455
now := srv.timeSource.Now()
5556
srv.metricsMonitor.Duration("lineageBuildTime", int(now.Sub(startTime)/time.Millisecond))
56-
5757
srv.mu.Lock()
5858
defer srv.mu.Unlock()
5959

@@ -104,11 +104,7 @@ func NewService(er models.TypeRepository, rrf models.RecordRepositoryFactory, co
104104
return nil, err
105105
}
106106

107-
// TODO: Find a solution to solve memory issue
108-
109-
// Temporarily disable building lineage on service creation.
110-
// Columbus's memory keeps spiking when app is starting
111-
// srv.build()
107+
srv.build()
112108

113109
return srv, nil
114110
}
@@ -124,11 +120,11 @@ func applyConfig(service *Service, config Config) error {
124120
}
125121
service.refreshInterval = lineageRefreshInterval
126122

127-
if config.MetricsMonitor != nil {
123+
if !isNilMonitor(config.MetricsMonitor) {
128124
service.metricsMonitor = config.MetricsMonitor
129125
}
130126

131-
if config.PerformanceMonitor != nil {
127+
if !isNilMonitor(config.PerformanceMonitor) {
132128
service.performanceMonitor = config.PerformanceMonitor
133129
}
134130

@@ -142,3 +138,8 @@ func applyConfig(service *Service, config Config) error {
142138

143139
return nil
144140
}
141+
142+
func isNilMonitor(monitor interface{}) bool {
143+
v := reflect.ValueOf(monitor)
144+
return !v.IsValid() || reflect.DeepEqual(v.Interface(), reflect.Zero(v.Type()).Interface())
145+
}

models/record.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ import (
99
// TODO(Aman): add validation for mandatory fields? (landscape for instance)
1010
type Record = map[string]interface{}
1111

12+
type RecordIterator interface {
13+
Scan() bool
14+
Next() []Record
15+
Close() error
16+
}
17+
1218
// RecordFilter is a filter intended to be used as a search
1319
// criteria for operations involving record search
1420
type RecordFilter = map[string][]string
@@ -22,6 +28,9 @@ type RecordRepository interface {
2228
// used for return documents matching the search criteria.
2329
GetAll(context.Context, RecordFilter) ([]Record, error)
2430

31+
// GetAllIterator returns RecordIterator to iterate records by batches
32+
GetAllIterator(context.Context) (RecordIterator, error)
33+
2534
// GetByID returns a record by it's id.
2635
// The field that contains this ID is defined by the
2736
// type to which this record belongs

store/record_repository.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/elastic/go-elasticsearch/v7"
14+
"github.com/elastic/go-elasticsearch/v7/esapi"
1415
"github.com/odpf/columbus/models"
1516
"github.com/olivere/elastic/v7"
1617
)
@@ -92,6 +93,41 @@ func (repo *RecordRepository) writeInsertAction(w io.Writer, record models.Recor
9293
return json.NewEncoder(w).Encode(action)
9394
}
9495

96+
func (repo *RecordRepository) GetAllIterator(ctx context.Context) (models.RecordIterator, error) {
97+
body, err := repo.getAllQuery(models.RecordFilter{})
98+
if err != nil {
99+
return nil, fmt.Errorf("error building search query: %w", err)
100+
}
101+
102+
resp, err := repo.cli.Search(
103+
repo.cli.Search.WithIndex(repo.recordType.Name),
104+
repo.cli.Search.WithBody(body),
105+
repo.cli.Search.WithScroll(defaultScrollTimeout),
106+
repo.cli.Search.WithSize(defaultScrollBatchSize),
107+
repo.cli.Search.WithContext(ctx),
108+
)
109+
if err != nil {
110+
return nil, fmt.Errorf("error executing search: %w", err)
111+
}
112+
if resp.IsError() {
113+
return nil, fmt.Errorf("error response from elasticsearch: %s", errorReasonFromResponse(resp))
114+
}
115+
116+
var response searchResponse
117+
err = json.NewDecoder(resp.Body).Decode(&response)
118+
if err != nil {
119+
return nil, fmt.Errorf("error decoding es response: %w", err)
120+
}
121+
var results = repo.toRecordList(response)
122+
it := recordIterator{
123+
resp: resp,
124+
records: results,
125+
scrollID: response.ScrollID,
126+
repo: repo,
127+
}
128+
return &it, nil
129+
}
130+
95131
func (repo *RecordRepository) GetAll(ctx context.Context, filters models.RecordFilter) ([]models.Record, error) {
96132
// XXX(Aman): we should probably think about result ordering, if the client
97133
// is going to slice the data for pagination. Does ES guarantee the result order?
@@ -246,6 +282,35 @@ func (repo *RecordRepository) Delete(ctx context.Context, id string) error {
246282
return nil
247283
}
248284

285+
// recordIterator is the internal implementation of models.RecordIterator by RecordRepository
286+
type recordIterator struct {
287+
resp *esapi.Response
288+
records []models.Record
289+
repo *RecordRepository
290+
scrollID string
291+
}
292+
293+
func (it *recordIterator) Scan() bool {
294+
return len(strings.TrimSpace(it.scrollID)) > 0
295+
}
296+
297+
func (it *recordIterator) Next() (prev []models.Record) {
298+
prev = it.records
299+
var err error
300+
it.records, it.scrollID, err = it.repo.scrollRecords(context.Background(), it.scrollID)
301+
if err != nil {
302+
panic("error scrolling results:" + err.Error())
303+
}
304+
if len(it.records) == 0 {
305+
it.scrollID = ""
306+
}
307+
return
308+
}
309+
310+
func (it *recordIterator) Close() error {
311+
return it.resp.Body.Close()
312+
}
313+
249314
// RecordRepositoryFactory can be used to construct a RecordRepository
250315
// for a certain type
251316
type RecordRepositoryFactory struct {

store/record_repository_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,44 @@ func TestRecordRepository(t *testing.T) {
163163
return
164164
}
165165

166+
t.Run("GetAllIterator", func(t *testing.T) {
167+
type testCase struct {
168+
Description string
169+
Filter models.RecordFilter
170+
ResultsFile string
171+
}
172+
173+
t.Run("should return record iterator to iterate records", func(t *testing.T) {
174+
expectedResults := []models.Record{}
175+
raw, err := ioutil.ReadFile("./testdata/dagger.json")
176+
if err != nil {
177+
t.Fatalf("error reading results file: %v", err)
178+
return
179+
}
180+
err = json.Unmarshal(raw, &expectedResults)
181+
if err != nil {
182+
t.Fatalf("error parsing results file: %v", err)
183+
return
184+
}
185+
186+
var actualResults []models.Record
187+
iterator, err := recordRepo.GetAllIterator(ctx)
188+
if err != nil {
189+
t.Fatalf("error executing GetAllIterator: %v", err)
190+
return
191+
}
192+
for iterator.Scan() {
193+
actualResults = append(actualResults, iterator.Next()...)
194+
}
195+
iterator.Close()
196+
197+
if reflect.DeepEqual(expectedResults, actualResults) == false {
198+
t.Error(incorrectResultsError(expectedResults, actualResults))
199+
return
200+
}
201+
})
202+
})
203+
166204
t.Run("GetAll", func(t *testing.T) {
167205
type testCase struct {
168206
Description string

0 commit comments

Comments
 (0)