From a875992a7736044ae846e1b02641d5ea87af7722 Mon Sep 17 00:00:00 2001 From: haupv Date: Thu, 11 Mar 2021 11:31:36 +0700 Subject: [PATCH] fix not working purge log scheduler --- storage/disk_storage.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/storage/disk_storage.go b/storage/disk_storage.go index fbda16f..672e9e2 100644 --- a/storage/disk_storage.go +++ b/storage/disk_storage.go @@ -44,7 +44,8 @@ const ( //FileMode is default file mode FileMode = 0600 //DirMode is default dir mode - DirMode = 0700 + DirMode = 0700 + purgeLogInterval = time.Second * 30 ) //DiskStorage is the Storage store data in disk @@ -394,14 +395,13 @@ func (s *DiskStorage) StartPurgeLog() { s.purgeCtx, s.purgeCancel = context.WithCancel(context.Background()) s.purgeQuitC = make(chan struct{}) s.purgeStarted.Store(true) - timer := time.NewTimer(time.Second * 30) + timer := time.NewTimer(purgeLogInterval) defer func() { close(s.purgeQuitC) timer.Stop() }() - var deleteSegments []*Segment for { select { case <-s.purgeCtx.Done(): @@ -409,6 +409,7 @@ func (s *DiskStorage) StartPurgeLog() { return case <-timer.C: // check and purge segments + var deleteSegments []*Segment s.Mu.Lock() if s.ReserveSegmentCount < len(s.Segments) { deleteCount := len(s.Segments) - s.ReserveSegmentCount @@ -418,12 +419,15 @@ func (s *DiskStorage) StartPurgeLog() { s.Mu.Unlock() if len(deleteSegments) == 0 { + timer.Reset(purgeLogInterval) continue } err := s.purgeSegments(deleteSegments, true) if err != nil { - log.Log.Fatalf("purgeSegments error, err:%s, deleteSegments:%v", err, deleteSegments) + // fatal could could interrupt state if error is from munmap API + log.Log.Errorf("purgeSegments error, err:%s, deleteSegments:%v", err, deleteSegments) } + timer.Reset(purgeLogInterval) } } }