From 57177a2f27e445cb5b8645a05c49f8b494be3945 Mon Sep 17 00:00:00 2001 From: Zohaib Sibte Hassan Date: Thu, 28 Dec 2023 08:57:20 -0800 Subject: [PATCH 1/6] Enable CLI based PPROF --- cfg/config.go | 1 + marmot.go | 20 +++++++++++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/cfg/config.go b/cfg/config.go index 20d3fce..5e6addf 100644 --- a/cfg/config.go +++ b/cfg/config.go @@ -118,6 +118,7 @@ var SaveSnapshotFlag = flag.Bool("save-snapshot", false, "Only take snapshot and var ClusterAddrFlag = flag.String("cluster-addr", "", "Cluster listening address") var ClusterPeersFlag = flag.String("cluster-peers", "", "Comma separated list of clusters") var LeafServerFlag = flag.String("leaf-servers", "", "Comma separated list of leaf servers") +var ProfServer = flag.String("pprof", "", "PProf listening address") var DataRootDir = os.TempDir() var Config = &Configuration{ diff --git a/marmot.go b/marmot.go index c036ef3..41b8a66 100644 --- a/marmot.go +++ b/marmot.go @@ -4,6 +4,9 @@ import ( "context" "flag" "io" + "net/http" + "net/http/pprof" + _ "net/http/pprof" "os" "time" @@ -22,7 +25,6 @@ import ( func main() { flag.Parse() - err := cfg.Load(*cfg.ConfigPathFlag) if err != nil { panic(err) @@ -44,6 +46,22 @@ func main() { log.Logger = gLog.Level(zerolog.InfoLevel) } + if *cfg.ProfServer != "" { + go func() { + mux := http.NewServeMux() + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + + err := http.ListenAndServe(*cfg.ProfServer, mux) + if err != nil { + log.Error().Err(err).Msg("unable to bind profiler server") + } + }() + } + log.Debug().Msg("Initializing telemetry") telemetry.InitializeTelemetry() From 6de10892a85a1689ac1445c6bc8acb5e4e6d9e8d Mon Sep 17 00:00:00 2001 From: Zohaib Sibte Hassan Date: Thu, 28 Dec 2023 09:27:55 -0800 Subject: [PATCH 2/6] Adding debug logs for change detection --- db/change_log.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/db/change_log.go b/db/change_log.go index 64e4aab..3f961d3 100644 --- a/db/change_log.go +++ b/db/change_log.go @@ -278,9 +278,11 @@ func (conn *SqliteStreamDB) watchChanges(watcher *fsnotify.Watcher, path string) } if ev.Op != fsnotify.Chmod { + log.Debug().Int("change", int(ev.Op)).Msg("Change detected") conn.publishChangeLog() } case <-changeLogTicker.Channel(): + log.Debug().Dur("timeout", tickerDur).Msg("Change polling timeout") conn.publishChangeLog() } From 11a10d965aca4c7aa262962a83ac36eab7e774d7 Mon Sep 17 00:00:00 2001 From: Zohaib Sibte Hassan Date: Thu, 28 Dec 2023 09:33:59 -0800 Subject: [PATCH 3/6] Adding more debug tracing --- db/change_log.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/db/change_log.go b/db/change_log.go index 3f961d3..a240b25 100644 --- a/db/change_log.go +++ b/db/change_log.go @@ -270,6 +270,7 @@ func (conn *SqliteStreamDB) watchChanges(watcher *fsnotify.Watcher, path string) for { changeLogTicker.Reset() + changesPublished := false err := conn.WithReadTx(func(_tx *sql.Tx) error { select { case ev, ok := <-watcher.Events: @@ -277,13 +278,15 @@ func (conn *SqliteStreamDB) watchChanges(watcher *fsnotify.Watcher, path string) return ErrEndOfWatch } + log.Debug().Int("change", int(ev.Op)).Msg("Change detected") if ev.Op != fsnotify.Chmod { - log.Debug().Int("change", int(ev.Op)).Msg("Change detected") conn.publishChangeLog() + changesPublished = true } case <-changeLogTicker.Channel(): log.Debug().Dur("timeout", tickerDur).Msg("Change polling timeout") conn.publishChangeLog() + changesPublished = true } return nil @@ -307,6 +310,8 @@ func (conn *SqliteStreamDB) watchChanges(watcher *fsnotify.Watcher, path string) if errWal != nil { errWal = watcher.Add(walPath) } + + log.Debug().Bool("changes", changesPublished).Msg("Changes published") } } From cdfa495d1920395ad49b79bc5c8ec828e658ce16 Mon Sep 17 00:00:00 2001 From: Zohaib Sibte Hassan Date: Thu, 28 Dec 2023 10:07:51 -0800 Subject: [PATCH 4/6] Fixing watch change filter --- db/change_log.go | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/db/change_log.go b/db/change_log.go index a240b25..e4f28ba 100644 --- a/db/change_log.go +++ b/db/change_log.go @@ -253,6 +253,24 @@ func (conn *SqliteStreamDB) initTriggers(tableName string) error { return nil } +func (conn *SqliteStreamDB) filterChangesTo(changed chan fsnotify.Event, watcher *fsnotify.Watcher) { + for { + select { + case ev, ok := <-watcher.Events: + if !ok { + close(changed) + return + } + + if ev.Op == fsnotify.Chmod { + continue + } + + changed <- ev + } + } +} + func (conn *SqliteStreamDB) watchChanges(watcher *fsnotify.Watcher, path string) { shmPath := path + "-shm" walPath := path + "-wal" @@ -260,33 +278,30 @@ func (conn *SqliteStreamDB) watchChanges(watcher *fsnotify.Watcher, path string) errDB := watcher.Add(path) errShm := watcher.Add(shmPath) errWal := watcher.Add(walPath) + dbChanged := make(chan fsnotify.Event) tickerDur := time.Duration(cfg.Config.PollingInterval) * time.Millisecond changeLogTicker := utils.NewTimeoutPublisher(tickerDur) // Publish change logs for any residual change logs before starting watcher conn.publishChangeLog() + conn.filterChangesTo(dbChanged, watcher) for { changeLogTicker.Reset() - changesPublished := false err := conn.WithReadTx(func(_tx *sql.Tx) error { select { - case ev, ok := <-watcher.Events: + case ev, ok := <-dbChanged: if !ok { return ErrEndOfWatch } log.Debug().Int("change", int(ev.Op)).Msg("Change detected") - if ev.Op != fsnotify.Chmod { - conn.publishChangeLog() - changesPublished = true - } + conn.publishChangeLog() case <-changeLogTicker.Channel(): log.Debug().Dur("timeout", tickerDur).Msg("Change polling timeout") conn.publishChangeLog() - changesPublished = true } return nil @@ -310,8 +325,6 @@ func (conn *SqliteStreamDB) watchChanges(watcher *fsnotify.Watcher, path string) if errWal != nil { errWal = watcher.Add(walPath) } - - log.Debug().Bool("changes", changesPublished).Msg("Changes published") } } From e2528ad7ca5061e30636cba286bf157a8e983840 Mon Sep 17 00:00:00 2001 From: Zohaib Sibte Hassan Date: Thu, 28 Dec 2023 10:14:34 -0800 Subject: [PATCH 5/6] Fix --- db/change_log.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/db/change_log.go b/db/change_log.go index e4f28ba..526e343 100644 --- a/db/change_log.go +++ b/db/change_log.go @@ -263,6 +263,7 @@ func (conn *SqliteStreamDB) filterChangesTo(changed chan fsnotify.Event, watcher } if ev.Op == fsnotify.Chmod { + time.Sleep(1 * time.Millisecond) continue } @@ -285,7 +286,7 @@ func (conn *SqliteStreamDB) watchChanges(watcher *fsnotify.Watcher, path string) // Publish change logs for any residual change logs before starting watcher conn.publishChangeLog() - conn.filterChangesTo(dbChanged, watcher) + go conn.filterChangesTo(dbChanged, watcher) for { changeLogTicker.Reset() From 2e0e668383fcf2189eb6484535340f6d08e64040 Mon Sep 17 00:00:00 2001 From: Zohaib Sibte Hassan Date: Thu, 28 Dec 2023 21:39:12 -0800 Subject: [PATCH 6/6] Removing sleep --- db/change_log.go | 1 - 1 file changed, 1 deletion(-) diff --git a/db/change_log.go b/db/change_log.go index 526e343..435a649 100644 --- a/db/change_log.go +++ b/db/change_log.go @@ -263,7 +263,6 @@ func (conn *SqliteStreamDB) filterChangesTo(changed chan fsnotify.Event, watcher } if ev.Op == fsnotify.Chmod { - time.Sleep(1 * time.Millisecond) continue }