Skip to content

Commit

Permalink
Merge pull request #775 from forta-network/caner/forta-1046-simplify-…
Browse files Browse the repository at this point in the history
…auto-update-logic-and-increase-visibility

Add more auto-update telemetry data and simplify release scheduling
  • Loading branch information
canercidam authored Jun 21, 2023
2 parents c3eba88 + 993eb41 commit cb60303
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 59 deletions.
9 changes: 7 additions & 2 deletions services/supervisor/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type SupervisorService struct {
lastCustomTelemetryRequestError health.ErrorTracker
lastAgentLogsRequest health.TimeTracker
lastAgentLogsRequestError health.ErrorTracker
autoUpdatesDisabled health.MessageTracker

healthClient health.HealthClient

Expand Down Expand Up @@ -738,6 +739,7 @@ func (sup *SupervisorService) Health() health.Reports {
sup.lastCustomTelemetryRequestError.GetReport("event.custom-telemetry-sync.error"),
sup.lastAgentLogsRequest.GetReport("event.agent-logs-sync.time"),
sup.lastAgentLogsRequestError.GetReport("event.agent-logs-sync.error"),
sup.autoUpdatesDisabled.GetReport("auto-updates.disabled"),
}
}

Expand Down Expand Up @@ -767,7 +769,7 @@ func NewSupervisorService(ctx context.Context, cfg SupervisorServiceConfig) (*Su
return nil, fmt.Errorf("failed to create the release client: %v", err)
}

return &SupervisorService{
sup := &SupervisorService{
ctx: ctx,
client: dockerClient,
globalClient: globalClient,
Expand All @@ -777,5 +779,8 @@ func NewSupervisorService(ctx context.Context, cfg SupervisorServiceConfig) (*Su
healthClient: health.NewClient(),
sendAgentLogs: agentlogs.NewClient(cfg.Config.AgentLogsConfig.URL).SendLogs,
inspectionCh: make(chan *protocol.InspectionResults),
}, nil
}
sup.autoUpdatesDisabled.Set(strconv.FormatBool(cfg.Config.AutoUpdate.Disable))

return sup, nil
}
69 changes: 21 additions & 48 deletions services/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type UpdaterService struct {

lastChecked health.TimeTracker
lastErr health.ErrorTracker
finalCheck health.TimeTracker
finalCheckErr health.ErrorTracker
latestVersion health.MessageTracker
latestIsPrerelease health.MessageTracker
}
Expand Down Expand Up @@ -117,7 +119,6 @@ func (updater *UpdaterService) Start() error {
log.WithError(err).Panic("too many update errors - exiting")
}
updater.lastErr.Set(err)
updater.lastChecked.Set()
if err != nil {
log.WithError(err).Error("error getting release")
}
Expand All @@ -133,6 +134,7 @@ func (updater *UpdaterService) updateLatestReleaseWithDelay(delay time.Duration)
log.Info("updating latest release")

// note: if reference is blank, this returns an error
updater.lastChecked.Set()
latest, err := updater.srs.GetRelease(updater.ctx)
if err != nil {
return err
Expand All @@ -151,20 +153,25 @@ func (updater *UpdaterService) updateLatestReleaseWithDelay(delay time.Duration)
}

// so that all scanners don't update simultaneously, this waits a period of time
log.WithFields(log.Fields{"release": latest.Reference, "delay": delay}).Info("delaying update")
if delay > 0 {
log.WithFields(log.Fields{
"release": latest.Reference, "delay": delay,
}).Info("delaying update")

// if a newer release is found while waiting, this returns and tries again
// (this resets the delay clock)
if foundNew := updater.waitForDelayOrNewerRelease(latest.Reference, delay); foundNew {
log.Info("detected newer release while delaying current update - aborting")
return nil
}
<-time.After(delay)
}
log.Info("successfully waited before version update")

log.Info("successfully waited before version update")
for {
updater.finalCheck.Set()
latest, err = updater.srs.GetRelease(updater.ctx)
if err == nil {
break
}
updater.finalCheckErr.Set(err)
log.WithError(err).Error("failed to get the latest release just after the delay is over - retrying")
time.Sleep(time.Second * 5)
}
updater.finalCheckErr.Set(nil)
log.WithFields(log.Fields{"release": latest.Reference, "delay": delay}).
Info("successfully got the latest release once more after the delay")

updater.latestVersion.Set(latest.ReleaseManifest.Release.Version)
updater.latestIsPrerelease.Set(strconv.FormatBool(latest.IsPrerelease))
Expand All @@ -180,42 +187,6 @@ func (updater *UpdaterService) updateLatestReleaseWithDelay(delay time.Duration)
return nil
}

// returns true if a newer release is detected, otherwise waits for delay and returns false
func (updater *UpdaterService) waitForDelayOrNewerRelease(currentRef string, delay time.Duration) bool {
detectedCh := make(chan struct{})

ctx, cancel := context.WithCancel(updater.ctx)
defer cancel()

go updater.waitForNewerRelease(ctx, currentRef, detectedCh)

select {
case <-time.After(delay):
return false
case <-detectedCh:
return true
}
}

// notifies channel is a newer version is detected
func (updater *UpdaterService) waitForNewerRelease(ctx context.Context, currentRef string, detectedCh chan struct{}) {
ticker := time.NewTicker(updater.updateCheckInterval)
for {
select {
case <-ticker.C:
if rel, err := updater.srs.GetRelease(updater.ctx); err != nil {
log.WithError(err).Error("error getting release during delay (ignoring intermittent)")
continue
} else if rel.Reference != currentRef {
detectedCh <- struct{}{}
return
}
case <-ctx.Done():
return
}
}
}

// Name returns the name of the service.
func (updater *UpdaterService) Name() string {
return "updater"
Expand Down Expand Up @@ -245,6 +216,8 @@ func (updater *UpdaterService) Health() health.Reports {
return health.Reports{
updater.lastChecked.GetReport("event.checked.time"),
updater.lastErr.GetReport("event.checked.error"),
updater.finalCheck.GetReport("event.checked.final.time"),
updater.finalCheckErr.GetReport("event.checked.final.error"),
updater.latestVersion.GetReport("latest.version"),
updater.latestIsPrerelease.GetReport("latest.is-prerelease"),
}
Expand Down
19 changes: 10 additions & 9 deletions services/updater/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package updater

import (
"context"
"testing"

"github.com/forta-network/forta-node/store"
mock_store "github.com/forta-network/forta-node/store/mocks"
"testing"

"github.com/stretchr/testify/require"

Expand All @@ -26,13 +27,13 @@ func TestUpdaterService_UpdateLatestRelease(t *testing.T) {

svs.EXPECT().GetRelease(gomock.Any()).Return(&store.ScannerRelease{
Reference: "reference",
}, nil).Times(1)
}, nil).Times(2)

err := updater.updateLatestReleaseWithDelay(0)
r.NoError(err)
}

func TestUpdaterService_UpdateLatestReleaseNotCached(t *testing.T) {
func TestUpdaterService_UpdateLatestRelease_SingleEachTime(t *testing.T) {
r := require.New(t)

svs := mock_store.NewMockScannerReleaseStore(gomock.NewController(t))
Expand All @@ -42,11 +43,11 @@ func TestUpdaterService_UpdateLatestReleaseNotCached(t *testing.T) {

svs.EXPECT().GetRelease(gomock.Any()).Return(&store.ScannerRelease{
Reference: "reference1",
}, nil).Times(1)
}, nil).Times(2)

svs.EXPECT().GetRelease(gomock.Any()).Return(&store.ScannerRelease{
Reference: "reference2",
}, nil).Times(1)
}, nil).Times(2)

r.NoError(updater.updateLatestReleaseWithDelay(0))
r.Equal("reference1", updater.latestReference)
Expand All @@ -55,15 +56,15 @@ func TestUpdaterService_UpdateLatestReleaseNotCached(t *testing.T) {
r.Equal("reference2", updater.latestReference)
}

func TestUpdaterService_UpdateLatestReleaseAbort(t *testing.T) {
func TestUpdaterService_UpdateLatestRelease_TwoInARow(t *testing.T) {
r := require.New(t)

svs := mock_store.NewMockScannerReleaseStore(gomock.NewController(t))
updater := NewUpdaterService(
context.Background(), svs, "8080", testUpdateDelaySeconds, testUpdateCheckIntervalSeconds,
)

initalLatestRef := updater.latestReference
finalRef := "reference2"

svs.EXPECT().GetRelease(gomock.Any()).Return(&store.ScannerRelease{
Reference: "reference1",
Expand All @@ -75,6 +76,6 @@ func TestUpdaterService_UpdateLatestReleaseAbort(t *testing.T) {

r.NoError(updater.updateLatestReleaseWithDelay(updater.updateDelay))

// update should be ineffective and be aborted
r.Equal(initalLatestRef, updater.latestReference)
// should update to the latest one
r.Equal(finalRef, updater.latestReference)
}

0 comments on commit cb60303

Please sign in to comment.