Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(state): avoid need for state lock in LatestWarningTime #493

Merged
merged 2 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 6 additions & 41 deletions internals/daemon/api_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package daemon

import (
"encoding/json"
"net/http"

"github.com/canonical/x-go/strutil"
Expand All @@ -35,15 +34,15 @@ func v1Health(c *Command, r *http.Request, _ *UserState) Response {
switch level {
case plan.UnsetLevel, plan.AliveLevel, plan.ReadyLevel:
default:
return healthError(http.StatusBadRequest, `level must be "alive" or "ready"`)
return BadRequest(`level must be "alive" or "ready"`)
}

names := strutil.MultiCommaSeparatedList(query["names"])

checks, err := getChecks(c.d.overlord)
if err != nil {
logger.Noticef("Cannot fetch checks: %v", err.Error())
return healthError(http.StatusInternalServerError, "internal server error")
return InternalError("internal server error")
}

healthy := true
Expand All @@ -58,43 +57,9 @@ func v1Health(c *Command, r *http.Request, _ *UserState) Response {
}
}

return SyncResponse(&healthResp{
Type: ResponseTypeSync,
Status: status,
StatusText: http.StatusText(status),
Result: healthInfo{Healthy: healthy},
return SyncResponse(&resp{
Type: ResponseTypeSync,
Status: status,
Result: healthInfo{Healthy: healthy},
})
}

// Like the resp struct, but without the warning/maintenance fields, so that
// the health endpoint doesn't have to acquire the state lock (resulting in a
// slow response on heavily-loaded systems).
type healthResp struct {
Type ResponseType `json:"type"`
Status int `json:"status-code"`
StatusText string `json:"status,omitempty"`
Result interface{} `json:"result,omitempty"`
}

func (r *healthResp) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
status := r.Status
bs, err := json.Marshal(r)
if err != nil {
logger.Noticef("Cannot marshal %#v to JSON: %v", *r, err)
bs = nil
status = http.StatusInternalServerError
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
w.Write(bs)
}

func healthError(status int, message string) *healthResp {
return &healthResp{
Type: ResponseTypeError,
Status: status,
StatusText: http.StatusText(status),
Result: &errorResult{Message: message},
}
}
5 changes: 1 addition & 4 deletions internals/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,7 @@ func (c *Command) ServeHTTP(w http.ResponseWriter, r *http.Request) {
rsp.transmitMaintenance(errorKindDaemonRestart, "daemon is stopping to wait for socket activation")
}
if rsp.Type != ResponseTypeError {
st := c.d.state
st.Lock()
latest := st.LatestWarningTime()
st.Unlock()
latest := c.d.state.LatestWarningTime()
rsp.addWarningsToMeta(latest)
}
}
Expand Down
28 changes: 18 additions & 10 deletions internals/overlord/state/notices.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ func (s *State) AddNotice(userID *uint32, noticeType NoticeType, key string, opt
notice.lastData = options.Data
notice.repeatAfter = options.RepeatAfter

// Update the latest warning time cache if needed. There's no need to
// actually update atomically here, because the state lock is held.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I was about to comment on that. Looks like you've already considered that. If we ever update this without the state mutex, then we'll need to reconsider this.

if notice.noticeType == WarningNotice && notice.lastRepeated.After(s.LatestWarningTime()) {
latestWarningTime := notice.lastRepeated
s.latestWarningTime.Store(&latestWarningTime)
}

if newOrRepeated {
s.noticeCond.Broadcast()
}
Expand Down Expand Up @@ -371,18 +378,14 @@ func (s *State) Notices(filter *NoticeFilter) []*Notice {

// LatestWarningTime returns the most recent time a warning notice was
// repeated, or the zero value if there are no warnings.
//
// The state lock does not need to be held when calling this method.
func (s *State) LatestWarningTime() time.Time {
s.reading()

// TODO(benhoyt): optimise with an in-memory atomic cache when warnings are added (or pruned),
// to avoid having to acquire the state lock on every request
var latest time.Time
for _, notice := range s.notices {
if notice.noticeType == WarningNotice && notice.lastRepeated.After(latest) {
latest = notice.lastRepeated
}
t := s.latestWarningTime.Load()
if t == nil {
return time.Time{}
}
return latest
return *t
}

// Notice returns a single notice by ID, or nil if not found.
Expand Down Expand Up @@ -415,14 +418,19 @@ func (s *State) flattenNotices(filter *NoticeFilter) []*Notice {
func (s *State) unflattenNotices(flat []*Notice) {
now := time.Now()
s.notices = make(map[noticeKey]*Notice)
latestWarningTime := time.Time{}
for _, n := range flat {
if n.expired(now) {
continue
}
userID, hasUserID := n.UserID()
uniqueKey := noticeKey{hasUserID, userID, n.noticeType, n.key}
s.notices[uniqueKey] = n
if n.noticeType == WarningNotice && n.lastRepeated.After(latestWarningTime) {
latestWarningTime = n.lastRepeated
}
}
s.latestWarningTime.Store(&latestWarningTime)
}

// WaitNotices waits for notices that match the filter to exist or occur,
Expand Down
10 changes: 9 additions & 1 deletion internals/overlord/state/notices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,20 +436,28 @@ func (s *noticesSuite) TestDeleteExpired(c *C) {
st.Lock()
defer st.Unlock()

c.Assert(st.NumNotices(), Equals, 0)
c.Assert(st.LatestWarningTime().IsZero(), Equals, true)

old := time.Now().Add(-8 * 24 * time.Hour)
addNotice(c, st, nil, state.CustomNotice, "foo.com/w", &state.AddNoticeOptions{
Time: old,
})
addNotice(c, st, nil, state.CustomNotice, "foo.com/x", &state.AddNoticeOptions{
Time: old,
})
addNotice(c, st, nil, state.WarningNotice, "warning!", &state.AddNoticeOptions{
Time: old,
})
addNotice(c, st, nil, state.CustomNotice, "foo.com/y", nil)
time.Sleep(time.Microsecond)
addNotice(c, st, nil, state.CustomNotice, "foo.com/z", nil)

c.Assert(st.NumNotices(), Equals, 4)
c.Assert(st.NumNotices(), Equals, 5)
c.Assert(st.LatestWarningTime().Equal(old), Equals, true)
st.Prune(time.Now(), 0, 0, 0)
c.Assert(st.NumNotices(), Equals, 2)
c.Assert(st.LatestWarningTime().IsZero(), Equals, true)

notices := st.Notices(nil)
c.Assert(notices, HasLen, 2)
Expand Down
8 changes: 7 additions & 1 deletion internals/overlord/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ type State struct {
notices map[noticeKey]*Notice
identities map[string]*Identity

noticeCond *sync.Cond
noticeCond *sync.Cond
latestWarningTime atomic.Pointer[time.Time]

modified bool

Expand Down Expand Up @@ -504,11 +505,16 @@ func (s *State) Prune(startOfOperation time.Time, pruneWait, abortWait time.Dura
readyChangesCount++
}

// Prune expired notices, and update the latest warning time cache.
var latestWarningTime time.Time
for k, n := range s.notices {
if n.expired(now) {
delete(s.notices, k)
} else if n.noticeType == WarningNotice && n.lastRepeated.After(latestWarningTime) {
latestWarningTime = n.lastRepeated
}
}
s.latestWarningTime.Store(&latestWarningTime)

NextChange:
for _, chg := range changes {
Expand Down
1 change: 0 additions & 1 deletion internals/overlord/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,6 @@ func (ss *stateSuite) TestMethodEntrance(c *C) {
func() { st.MarshalJSON() },
func() { st.Prune(time.Now(), time.Hour, time.Hour, 100) },
func() { st.TaskCount() },
func() { st.LatestWarningTime() },
}

for i, f := range reads {
Expand Down
Loading