Skip to content

Commit 4c917b9

Browse files
committed
fix: Handle zombie sessions
1 parent e583d32 commit 4c917b9

File tree

4 files changed

+98
-85
lines changed

4 files changed

+98
-85
lines changed

server/handlers.go

Lines changed: 6 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package server
22

33
import (
4-
"strconv"
5-
64
"github.com/creativecreature/pulse"
75
)
86

@@ -16,10 +14,9 @@ func (s *Server) FocusGained(event pulse.Event, reply *string) {
1614
"os", event.OS,
1715
)
1816

19-
// Lock the mutex to prevent race conditions with the heartbeat check.
17+
s.checkHeartbeat()
2018
s.mutex.Lock()
2119
defer s.mutex.Unlock()
22-
2320
s.lastHeartbeat = s.clock.GetTime()
2421

2522
// The FocusGained event will be triggered when I switch back to an active
@@ -82,9 +79,10 @@ func (s *Server) OpenFile(event pulse.Event, reply *string) {
8279
return
8380
}
8481

85-
// Lock the mutex to prevent race conditions with the heartbeat check.
82+
s.checkHeartbeat()
8683
s.mutex.Lock()
8784
defer s.mutex.Unlock()
85+
s.lastHeartbeat = s.clock.GetTime()
8886

8987
// The editor could have been inactive, while focused, for 10 minutes.
9088
// That would end the session, and we could get a OpenFile event without
@@ -96,7 +94,6 @@ func (s *Server) OpenFile(event pulse.Event, reply *string) {
9694
}
9795

9896
// If a new file was opened it means that the session is still active.
99-
s.lastHeartbeat = s.clock.GetTime()
10097
gitFile, gitFileErr := s.fileReader.GitFile(event.Path)
10198
if gitFileErr != nil {
10299
s.log.Debug("Failed to get git file.",
@@ -122,9 +119,10 @@ func (s *Server) SendHeartbeat(event pulse.Event, reply *string) {
122119
"os", event.OS,
123120
)
124121

125-
// Lock the mutex to prevent race conditions with the heartbeat check.
122+
s.checkHeartbeat()
126123
s.mutex.Lock()
127124
defer s.mutex.Unlock()
125+
s.lastHeartbeat = s.clock.GetTime()
128126

129127
// This is to handle the case where the server would have ended the clients
130128
// session due to inactivity. When a session ends it is written to disk and
@@ -147,8 +145,6 @@ func (s *Server) SendHeartbeat(event pulse.Event, reply *string) {
147145
s.setActiveBuffer(gitFile)
148146
}
149147

150-
// Update the time for the last heartbeat.
151-
s.lastHeartbeat = s.clock.GetTime()
152148
*reply = "Successfully sent heartbeat"
153149
}
154150

@@ -160,7 +156,7 @@ func (s *Server) EndSession(event pulse.Event, reply *string) {
160156
"os", event.OS,
161157
)
162158

163-
// Lock the mutex to prevent race conditions with the heartbeat check.
159+
s.checkHeartbeat()
164160
s.mutex.Lock()
165161
defer s.mutex.Unlock()
166162

@@ -188,29 +184,3 @@ func (s *Server) EndSession(event pulse.Event, reply *string) {
188184
s.saveActiveSession()
189185
*reply = "The session was ended successfully."
190186
}
191-
192-
// CheckHeartbeat is used to check if the session has been inactive for more than
193-
// ten minutes. If that is the case, the session will be terminated and saved to disk.
194-
func (s *Server) CheckHeartbeat() {
195-
s.log.Debug("Checking heartbeat.",
196-
"active_editor_id", s.activeEditorID,
197-
"last_heartbeat", s.lastHeartbeat,
198-
"time_now", s.clock.GetTime(),
199-
)
200-
if s.activeEditorID == "" {
201-
return
202-
}
203-
204-
s.mutex.Lock()
205-
defer s.mutex.Unlock()
206-
207-
if s.lastHeartbeat+HeartbeatTTL.Milliseconds() < s.clock.GetTime() {
208-
s.log.Info(
209-
"Ending all active sessions due to inactivity",
210-
"last_heartbeat", strconv.FormatInt(s.lastHeartbeat, 10),
211-
"current_time", strconv.FormatInt(s.clock.GetTime(), 10),
212-
)
213-
s.saveAllSessions()
214-
s.activeEditorID = ""
215-
}
216-
}

server/heartbeat.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package server
2+
3+
import (
4+
"strconv"
5+
"time"
6+
)
7+
8+
const (
9+
HeartbeatTTL = time.Minute * 10
10+
heartbeatInterval = time.Second * 10
11+
)
12+
13+
// CheckHeartbeat is used to check if the session has been inactive for more than
14+
// ten minutes. If that is the case, the session will be terminated and saved to disk.
15+
func (s *Server) checkHeartbeat() {
16+
s.log.Debug("Checking heartbeat.",
17+
"active_editor_id", s.activeEditorID,
18+
"last_heartbeat", s.lastHeartbeat,
19+
"time_now", s.clock.GetTime(),
20+
)
21+
if s.activeEditorID == "" {
22+
return
23+
}
24+
25+
s.mutex.Lock()
26+
defer s.mutex.Unlock()
27+
28+
if s.lastHeartbeat+HeartbeatTTL.Milliseconds() < s.clock.GetTime() {
29+
s.log.Info(
30+
"Ending all active sessions due to inactivity",
31+
"last_heartbeat", strconv.FormatInt(s.lastHeartbeat, 10),
32+
"current_time", strconv.FormatInt(s.clock.GetTime(), 10),
33+
"end_time", strconv.FormatInt(s.lastHeartbeat+int64(HeartbeatTTL), 10),
34+
)
35+
36+
// The machine may have entered sleep mode, potentially stopping the heartbeat
37+
// check from executing at its scheduled interval. To mitigate this, the session
38+
// will be terminated based on the time of the last recorded heartbeat plus the
39+
// TTL. This prevents the creation of inaccurately long sessions.
40+
s.saveAllSessions(s.lastHeartbeat + int64(HeartbeatTTL/time.Millisecond))
41+
s.activeEditorID = ""
42+
}
43+
}
44+
45+
// startHeartbeatChecks runs in a separate goroutine and makes sure
46+
// that no session is allowed to be idle for more than 10 minutes.
47+
func (s *Server) startHeartbeatChecks() {
48+
go func() {
49+
ticker, stopTicker := s.clock.NewTicker(heartbeatInterval)
50+
defer stopTicker()
51+
for {
52+
select {
53+
case <-s.stopHeartbeatChecks:
54+
return
55+
case <-ticker:
56+
s.checkHeartbeat()
57+
}
58+
}
59+
}()
60+
}

server/server.go

Lines changed: 23 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -18,39 +18,36 @@ import (
1818
"github.com/creativecreature/pulse/proxy"
1919
)
2020

21-
const (
22-
HeartbeatTTL = time.Minute * 10
23-
heartbeatInterval = time.Second * 45
24-
)
25-
2621
type Server struct {
27-
name string
28-
activeEditorID string
29-
activeSessions map[string]*pulse.CodingSession
30-
lastHeartbeat int64
31-
clock pulse.Clock
32-
fileReader FileReader
33-
log *log.Logger
34-
mutex sync.Mutex
35-
storage pulse.TemporaryStorage
22+
name string
23+
activeEditorID string
24+
activeSessions map[string]*pulse.CodingSession
25+
lastHeartbeat int64
26+
stopHeartbeatChecks chan struct{}
27+
clock pulse.Clock
28+
fileReader FileReader
29+
log *log.Logger
30+
mutex sync.Mutex
31+
storage pulse.TemporaryStorage
3632
}
3733

3834
// New creates a new server.
3935
func New(serverName string, opts ...Option) (*Server, error) {
40-
a := &Server{
41-
name: serverName,
42-
activeSessions: make(map[string]*pulse.CodingSession),
43-
clock: pulse.NewClock(),
44-
fileReader: filereader.New(),
36+
s := &Server{
37+
name: serverName,
38+
activeSessions: make(map[string]*pulse.CodingSession),
39+
clock: pulse.NewClock(),
40+
stopHeartbeatChecks: make(chan struct{}),
41+
fileReader: filereader.New(),
4542
}
4643
for _, opt := range opts {
47-
err := opt(a)
44+
err := opt(s)
4845
if err != nil {
4946
return &Server{}, err
5047
}
5148
}
52-
53-
return a, nil
49+
s.startHeartbeatChecks()
50+
return s, nil
5451
}
5552

5653
// createSession creates a new session and sets it as the current session.
@@ -81,8 +78,7 @@ func (s *Server) setActiveBuffer(gitFile pulse.GitFile) {
8178
)
8279
}
8380

84-
func (s *Server) saveAllSessions() {
85-
now := s.clock.GetTime()
81+
func (s *Server) saveAllSessions(endedAt int64) {
8682
s.log.Debug("Saving all sessions.")
8783

8884
for _, session := range s.activeSessions {
@@ -95,7 +91,7 @@ func (s *Server) saveAllSessions() {
9591
return
9692
}
9793

98-
finishedSession := session.End(now)
94+
finishedSession := session.End(endedAt)
9995
err := s.storage.Write(finishedSession)
10096
if err != nil {
10197
s.log.Error(err)
@@ -177,20 +173,6 @@ func (s *Server) startServer(port string) (*http.Server, error) {
177173
return httpServer, nil
178174
}
179175

180-
// HeartbeatCheck runs a heartbeat ticker that ensures that
181-
// the current session is not idle for more than ten minutes.
182-
func (s *Server) HeartbeatCheck() func() {
183-
s.log.Info("Starting the heartbeat checks.")
184-
ticker, stop := s.clock.NewTicker(heartbeatInterval)
185-
go func() {
186-
for range ticker {
187-
s.CheckHeartbeat()
188-
}
189-
}()
190-
191-
return stop
192-
}
193-
194176
// Start starts the server on the given port.
195177
func (s *Server) Start(port string) error {
196178
s.log.Info("Starting up...")
@@ -199,9 +181,6 @@ func (s *Server) Start(port string) error {
199181
return err
200182
}
201183

202-
// Start the ECG. It will end inactive sessions.
203-
stopHeartbeat := s.HeartbeatCheck()
204-
205184
// Catch shutdown signals from the OS
206185
quit := make(chan os.Signal, 1)
207186
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
@@ -211,15 +190,15 @@ func (s *Server) Start(port string) error {
211190
s.log.Info("Received shutdown signal", "signal", sig.String())
212191

213192
// Stop the heartbeat checks and shutdown the http server.
214-
stopHeartbeat()
193+
s.stopHeartbeatChecks <- struct{}{}
215194
err = httpServer.Shutdown(context.Background())
216195
if err != nil {
217196
s.log.Error(err)
218197
return err
219198
}
220199

221200
// Save the all sessions before shutting down.
222-
s.saveAllSessions()
201+
s.saveAllSessions(s.clock.GetTime())
223202
s.log.Info("Shutting down.")
224203

225204
return nil

server/server_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -398,8 +398,6 @@ func TestNoActivityShouldEndSession(t *testing.T) {
398398
t.Fatal(err)
399399
}
400400

401-
s.HeartbeatCheck()
402-
403401
// Open an initial VIM window.
404402
s.OpenFile(pulse.Event{
405403
EditorID: "123",
@@ -445,11 +443,17 @@ func TestNoActivityShouldEndSession(t *testing.T) {
445443
if storedSessions[0].DurationMs != 100 {
446444
t.Errorf("expected the sessions duration to be 100; got %d", storedSessions[0].DurationMs)
447445
}
446+
if storedSessions[0].Files[0].DurationMs != 100 {
447+
t.Errorf("expected the file duration to be 100; got %d", storedSessions[0].Files[0].DurationMs)
448+
}
448449

449-
// The second session should last for 11 minutes. 10 for the
450-
// heartbeat to expire, and 1 for the heartbeat to trigger.
451-
dur := int64(11 * time.Minute / time.Millisecond)
450+
// The second session should have been terminated by the
451+
// heartbeat check after 10 minutes of inactivity.
452+
dur := int64(10 * time.Minute / time.Millisecond)
452453
if storedSessions[1].DurationMs != dur {
453454
t.Errorf("expected the sessions duration to be %d; got %d", dur, storedSessions[1].DurationMs)
454455
}
456+
if storedSessions[1].Files[0].DurationMs != dur {
457+
t.Errorf("expected the file duration to be %d; got %d", dur, storedSessions[1].Files[0].DurationMs)
458+
}
455459
}

0 commit comments

Comments
 (0)