From 19750cb1c2b9f48ed76729700f5a833e29948c1b Mon Sep 17 00:00:00 2001 From: Ian McEwen Date: Mon, 22 Jul 2024 11:53:35 -0700 Subject: [PATCH 1/6] Periodic VICE notifications --- analyses.go | 89 ++++++++++++++++++++++++++++++++++++++++ main.go | 104 +++++++++++++++++++++++++++++++++++++++++++---- notifications.go | 14 +++++++ vicedb.go | 28 ++++++++++++- 4 files changed, 226 insertions(+), 9 deletions(-) diff --git a/analyses.go b/analyses.go index 42a3d25..dacd4f7 100644 --- a/analyses.go +++ b/analyses.go @@ -152,6 +152,95 @@ func JobsToKill(ctx context.Context, dedb *sql.DB) ([]Job, error) { return jobs, nil } +const periodicWarningsQuery = ` +SELECT jobs.id, + jobs.app_id, + jobs.user_id, + jobs.status, + jobs.job_description, + jobs.job_name, + jobs.result_folder_path, + jobs.planned_end_date, + jobs.start_date, + job_types.system_id, + users.username + FROM jobs + JOIN job_types on jobs.job_type_id = job_types.id + JOIN users on jobs.user_id = users.id + LEFT join notif_statuses ON jobs.id = notif_statuses.analysis_id + WHERE jobs.status = $1 + AND (notif_statuses.last_periodic_warning is null + OR notif_statuses.last_periodic_warning < $2 - (coalesce(notif_statuses.periodic_warning_period, 14400)::text || ' seconds'::text)::interval) +` + +// JobPeriodicWarnings returns a list of running jobs that may need periodic notifications to be sent +func JobPeriodicWarnings(ctx context.Context, dedb *sql.DB) ([]Job, error) { + var ( + err error + rows *sql.Rows + ) + + now := time.Now() + if rows, err = dedb.QueryContext( + ctx, + periodicWarningsQuery, + "Running", + now, + ); err != nil { + return nil, err + } + defer rows.Close() + + jobs := []Job{} + + for rows.Next() { + var ( + job Job + startDate pq.NullTime + plannedEndDate pq.NullTime + ) + + job = Job{} + + if err = rows.Scan( + &job.ID, + &job.AppID, + &job.UserID, + &job.Status, + &job.Description, + &job.Name, + &job.ResultFolder, + &plannedEndDate, + &startDate, + &job.Type, + &job.User, + ); err != nil { + return nil, err + } + + if plannedEndDate.Valid { + job.PlannedEndDate = plannedEndDate.Time.Format(TimestampFromDBFormat) + } + + if startDate.Valid { + job.StartDate = startDate.Time.Format(TimestampFromDBFormat) + } + + job.ExternalID, err = getExternalID(ctx, dedb, job.ID) + if err != nil { + return nil, err + } + + jobs = append(jobs, job) + } + + if err = rows.Err(); err != nil { + return nil, err + } + + return jobs, nil +} + const jobWarningsQuery = ` select jobs.id, jobs.app_id, diff --git a/main.go b/main.go index 745aa78..7a13aa0 100644 --- a/main.go +++ b/main.go @@ -164,6 +164,40 @@ func SendWarningNotification(ctx context.Context, j *Job) error { return sendNotif(ctx, j, j.Status, subject, msg) } +func SendPeriodicNotification(ctx context.Context, j *Job) error { + starttime, err := time.Parse(TimestampFromDBFormat, j.StartDate) + if err != nil { + return errors.Wrapf(err, "failed to parse start date %s", j.StartDate) + } + dur := time.Since(starttime) + + subject := fmt.Sprintf(PeriodicSubjectFormat, j.Name, starttime, dur) + + msg := fmt.Sprintf( + PeriodicMessageFormat, + j.Name, + j.ID, + starttime, + dur, + ) + + return sendNotif(ctx, j, j.Status, subject, msg) +} + +func ensureNotifRecord(ctx context.Context, vicedb *VICEDatabaser, job Job) error { + analysisRecordExists := vicedb.AnalysisRecordExists(ctx, job.ID) + + if !analysisRecordExists { + notifId, err := vicedb.AddNotifRecord(ctx, &job) + if err != nil { + return err + } + log.Debugf("notif_statuses ID inserted: %s", notifId) + } + + return nil +} + const maxAttempts = 3 func sendWarning(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser, warningInterval int64, warningKey string) { @@ -180,13 +214,9 @@ func sendWarning(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser, warning updateFailureCount func(context.Context, *Job, int) error ) - analysisRecordExists := vicedb.AnalysisRecordExists(ctx, j.ID) - - if !analysisRecordExists { - if _, err = vicedb.AddNotifRecord(ctx, &j); err != nil { - log.Error(err) - continue - } + if err = ensureNotifRecord(ctx, vicedb, j); err != nil { + log.Error(err) + continue } notifStatuses, err = vicedb.NotifStatuses(ctx, &j) @@ -239,6 +269,63 @@ func sendWarning(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser, warning } } +func sendPeriodic(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser) { + // fetch jobs which periodic updates might apply to + jobs, err := JobPeriodicWarnings(ctx, db) + + // loop over them and check if they have notif_statuses info + if err != nil { + log.Error(err) + } else { + for _, j := range jobs { + var ( + notifStatuses *NotifStatuses + now time.Time + comparisonTimestamp time.Time + periodDuration time.Duration + ) + + // fetch preferences and update in the DB if needed + if err = ensureNotifRecord(ctx, vicedb, j); err != nil { + log.Error(err) + continue + } + + notifStatuses, err = vicedb.NotifStatuses(ctx, &j) + if err != nil { + log.Error(err) + continue + } + + periodDuration = 14400 * time.Second + if notifStatuses.PeriodicWarningPeriod > 0 { + periodDuration = notifStatuses.PeriodicWarningPeriod * time.Second + } + + comparisonTimestamp = j.StartDate + if notifStatuses.LastPeriodicWarning.After(j.StartDate) { + comparisonTimestamp = notifStatuses.lastPeriodicWarning + } + + log.Infof("Comparing last-warning timestamp %s with period %s s", comparisonTimestamp, periodDuration) + + now = time.Now() + + // timeframe is met if: more recent of (last warning, job start date) + periodic warning period is before now + if comparisonTimestamp.Add(periodDuration).Before(now) { + // if so, + err = SendPeriodicNotification(ctx, &j) + if err != nil { + log.Error(errors.Wrap(err, "Error sending periodic notification")) + continue + } + // update timestamp: + vicedb.UpdateLastPeriodicWarning(ctx, &j, now) + } + } + } +} + func main() { log.SetReportCaller(true) @@ -364,6 +451,9 @@ func main() { // 1 day warning sendWarning(ctx, db, vicedb, 1440, oneDayWarningKey) + // periodic warnings + sendPeriodic(ctx, db, vicedb) + jl, err = JobsToKill(ctx, db) if err != nil { log.Error(errors.Wrap(err, "error getting list of jobs to kill")) diff --git a/notifications.go b/notifications.go index bdf9910..4c895d8 100644 --- a/notifications.go +++ b/notifications.go @@ -37,6 +37,20 @@ Please finish any work that is in progress. Output files will be transferred to // to users when their job is going to terminate in the near future. const WarningSubjectFormat = "Analysis %s will terminate on %s (%s)." +// PeriodicMessageFormat is the parameterized message that gets sent to users +// when it's time to send a regular reminder the job is still running +// parameters: analysis name & ID, start date, duration +const PeriodicMessageFormat = `Analysis "%s" (%s) is still running. + +This is a regularly scheduled reminder message to ensure you don't use up your quota. + +This job has been running since %s (%s).` + +// PeriodicSubjectFormat is the parameterized subject for the email that is sent +// to users as a regular reminder of a running job +// parameters: analysis name, start date, duration +const PeriodicSubjectFormat = `Analysis %s is running since %s (%s)` + // Notification is a message intended as a notification to some upstream service // or the DE UI. type Notification struct { diff --git a/vicedb.go b/vicedb.go index 1f1f746..c9e027f 100644 --- a/vicedb.go +++ b/vicedb.go @@ -3,6 +3,7 @@ package main import ( "context" "database/sql" + "time" log "github.com/sirupsen/logrus" ) @@ -22,6 +23,8 @@ type NotifStatuses struct { DayWarningFailureCount int KillWarningSent bool KillWarningFailureCount int + LastPeriodicWarning time.Time + PeriodicWarningPeriod int } const notifStatusQuery = ` @@ -32,7 +35,9 @@ const notifStatusQuery = ` day_warning_sent, day_warning_failure_count, kill_warning_sent, - kill_warning_failure_count + kill_warning_failure_count, + coalesce(last_periodic_warning, '1970-01-01 00:00:00') as last_periodic_warning, + coalesce(periodic_warning_period, 0) as periodic_warning_period from notif_statuses where analysis_id = $1 ` @@ -59,6 +64,8 @@ func (v *VICEDatabaser) NotifStatuses(ctx context.Context, job *Job) (*NotifStat ¬ifStatuses.DayWarningFailureCount, ¬ifStatuses.KillWarningSent, ¬ifStatuses.KillWarningFailureCount, + ¬ifStatuses.LastPeriodicWarning, + ¬ifStatuses.PeriodicWarningPeriod, ); err != nil { return nil, err } @@ -93,7 +100,7 @@ func (v *VICEDatabaser) AnalysisRecordExists(ctx context.Context, analysisID str } const addNotifRecordQuery = ` -insert into notif_statuses (analysis_id, external_id) values ($1, $2) returning id +insert into notif_statuses (analysis_id, external_id, periodic_warning_period) values ($1, $2, $3) returning id ` // AddNotifRecord adds a new record to the notif_statuses table for the provided analysis. @@ -109,6 +116,7 @@ func (v *VICEDatabaser) AddNotifRecord(ctx context.Context, job *Job) (string, e addNotifRecordQuery, job.ID, job.ExternalID, + 14400, // 4 hours ).Scan(¬ifID); err != nil { return "", err } @@ -290,3 +298,19 @@ func (v *VICEDatabaser) SetKillWarningFailureCount(ctx context.Context, job *Job ) return err } + +const updateLastPeriodicWarningQuery = ` +update notif_statuses set last_periodic_warning = $1 where analysis_id = $2 +` + +// UpdateLastPeriodicWarning updates the timestamp for a job's last periodic warning +func (v *VICEDatabaser) UpdateLastPeriodicWarning(ctx context.Context, job *Job, ts time.Time) error { + var err error + _, err = v.db.ExecContext( + ctx, + updateLastPeriodicWarningQuery, + ts, + job.ID, + ) + return err +} From 92138581ebc2fd896f4466741ac4b804ecc3ea37 Mon Sep 17 00:00:00 2001 From: Ian McEwen Date: Thu, 8 Aug 2024 14:58:03 -0700 Subject: [PATCH 2/6] fix some lint mistakes --- main.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index 7a13aa0..f774bea 100644 --- a/main.go +++ b/main.go @@ -299,12 +299,13 @@ func sendPeriodic(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser) { periodDuration = 14400 * time.Second if notifStatuses.PeriodicWarningPeriod > 0 { - periodDuration = notifStatuses.PeriodicWarningPeriod * time.Second + periodDuration = time.Duration(notifStatuses.PeriodicWarningPeriod) * time.Second } - comparisonTimestamp = j.StartDate - if notifStatuses.LastPeriodicWarning.After(j.StartDate) { - comparisonTimestamp = notifStatuses.lastPeriodicWarning + sd, err := time.Parse(TimestampFromDBFormat, j.StartDate) + comparisonTimestamp = sd + if notifStatuses.LastPeriodicWarning.After(sd) { + comparisonTimestamp = notifStatuses.LastPeriodicWarning } log.Infof("Comparing last-warning timestamp %s with period %s s", comparisonTimestamp, periodDuration) From 1f16da64ff75b6dffcf96b2c13a536d47c81e75e Mon Sep 17 00:00:00 2001 From: Ian McEwen Date: Thu, 8 Aug 2024 15:01:13 -0700 Subject: [PATCH 3/6] more linter complaints --- main.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index f774bea..df8b73c 100644 --- a/main.go +++ b/main.go @@ -303,6 +303,10 @@ func sendPeriodic(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser) { } sd, err := time.Parse(TimestampFromDBFormat, j.StartDate) + if err != nil { + log.Errorf(errors.Wrap(err, "Error parsing start date %s", j.StartDate)) + continue + } comparisonTimestamp = sd if notifStatuses.LastPeriodicWarning.After(sd) { comparisonTimestamp = notifStatuses.LastPeriodicWarning @@ -321,7 +325,11 @@ func sendPeriodic(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser) { continue } // update timestamp: - vicedb.UpdateLastPeriodicWarning(ctx, &j, now) + err = vicedb.UpdateLastPeriodicWarning(ctx, &j, now) + if err != nil { + log.Error(errors.Wrap(err, "Error updating periodic notification timestamp")) + continue + } } } } From 44a1044691bb95e14dff20a8b8c5062280312e2e Mon Sep 17 00:00:00 2001 From: Ian McEwen Date: Thu, 8 Aug 2024 15:02:32 -0700 Subject: [PATCH 4/6] meh, changed the wrong function name --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index df8b73c..5929fd3 100644 --- a/main.go +++ b/main.go @@ -304,7 +304,7 @@ func sendPeriodic(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser) { sd, err := time.Parse(TimestampFromDBFormat, j.StartDate) if err != nil { - log.Errorf(errors.Wrap(err, "Error parsing start date %s", j.StartDate)) + log.Error(errors.Wrapf(err, "Error parsing start date %s", j.StartDate)) continue } comparisonTimestamp = sd From 17ad691a7c401ba0098c02d2d9a26648e0166f07 Mon Sep 17 00:00:00 2001 From: Ian McEwen Date: Tue, 13 Aug 2024 10:30:04 -0700 Subject: [PATCH 5/6] Remove time.Now call in JobPeriodicWarnings, use postgresql now() --- analyses.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/analyses.go b/analyses.go index dacd4f7..c9d44a6 100644 --- a/analyses.go +++ b/analyses.go @@ -170,7 +170,7 @@ SELECT jobs.id, LEFT join notif_statuses ON jobs.id = notif_statuses.analysis_id WHERE jobs.status = $1 AND (notif_statuses.last_periodic_warning is null - OR notif_statuses.last_periodic_warning < $2 - (coalesce(notif_statuses.periodic_warning_period, 14400)::text || ' seconds'::text)::interval) + OR notif_statuses.last_periodic_warning < now() - (coalesce(notif_statuses.periodic_warning_period, 14400)::text || ' seconds'::text)::interval) ` // JobPeriodicWarnings returns a list of running jobs that may need periodic notifications to be sent @@ -180,12 +180,10 @@ func JobPeriodicWarnings(ctx context.Context, dedb *sql.DB) ([]Job, error) { rows *sql.Rows ) - now := time.Now() if rows, err = dedb.QueryContext( ctx, periodicWarningsQuery, "Running", - now, ); err != nil { return nil, err } From 168d68e85de8ff57e680f64a45808e0ee69ee365 Mon Sep 17 00:00:00 2001 From: Ian McEwen Date: Wed, 4 Sep 2024 10:02:41 -0700 Subject: [PATCH 6/6] change to using an interval type, hopefully --- analyses.go | 2 +- go.mod | 1 + go.sum | 3 +++ vicedb.go | 11 ++++++----- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/analyses.go b/analyses.go index c9d44a6..049bbad 100644 --- a/analyses.go +++ b/analyses.go @@ -170,7 +170,7 @@ SELECT jobs.id, LEFT join notif_statuses ON jobs.id = notif_statuses.analysis_id WHERE jobs.status = $1 AND (notif_statuses.last_periodic_warning is null - OR notif_statuses.last_periodic_warning < now() - (coalesce(notif_statuses.periodic_warning_period, 14400)::text || ' seconds'::text)::interval) + OR notif_statuses.last_periodic_warning < now() - coalesce(notif_statuses.periodic_warning_period, '4 hours'::interval)) ` // JobPeriodicWarnings returns a list of running jobs that may need periodic notifications to be sent diff --git a/go.mod b/go.mod index 21e7125..fbf0cc7 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/cyverse-de/messaging/v9 v9.1.5 github.com/lib/pq v1.10.4 github.com/pkg/errors v0.9.1 + github.com/sanyokbig/pqinterval v1.1.2 github.com/sirupsen/logrus v1.4.2 github.com/spf13/viper v1.4.0 github.com/streadway/amqp v1.0.1-0.20200716223359-e6b33f460591 diff --git a/go.sum b/go.sum index 994bb32..c0bfa2b 100644 --- a/go.sum +++ b/go.sum @@ -102,6 +102,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= +github.com/sanyokbig/pqinterval v1.1.2 h1:RzHMPdRMNvSZSDE+Qr20fFWSfBkKPFrLdFhzqmF0VnY= +github.com/sanyokbig/pqinterval v1.1.2/go.mod h1:jJvMjZaZFVqNTNVCd90zcFOkmbJgjxlWWkpu9/VeUFs= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -126,6 +128,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/vicedb.go b/vicedb.go index c9e027f..78404ec 100644 --- a/vicedb.go +++ b/vicedb.go @@ -5,6 +5,7 @@ import ( "database/sql" "time" + pqinterval "github.com/sanyokbig/pqinterval" log "github.com/sirupsen/logrus" ) @@ -24,7 +25,7 @@ type NotifStatuses struct { KillWarningSent bool KillWarningFailureCount int LastPeriodicWarning time.Time - PeriodicWarningPeriod int + PeriodicWarningPeriod time.Duration } const notifStatusQuery = ` @@ -37,7 +38,7 @@ const notifStatusQuery = ` kill_warning_sent, kill_warning_failure_count, coalesce(last_periodic_warning, '1970-01-01 00:00:00') as last_periodic_warning, - coalesce(periodic_warning_period, 0) as periodic_warning_period + coalesce(periodic_warning_period, '0 seconds'::interval) as periodic_warning_period from notif_statuses where analysis_id = $1 ` @@ -65,7 +66,7 @@ func (v *VICEDatabaser) NotifStatuses(ctx context.Context, job *Job) (*NotifStat ¬ifStatuses.KillWarningSent, ¬ifStatuses.KillWarningFailureCount, ¬ifStatuses.LastPeriodicWarning, - ¬ifStatuses.PeriodicWarningPeriod, + (*pqinterval.Duration)(¬ifStatuses.PeriodicWarningPeriod), ); err != nil { return nil, err } @@ -100,7 +101,7 @@ func (v *VICEDatabaser) AnalysisRecordExists(ctx context.Context, analysisID str } const addNotifRecordQuery = ` -insert into notif_statuses (analysis_id, external_id, periodic_warning_period) values ($1, $2, $3) returning id +insert into notif_statuses (analysis_id, external_id, periodic_warning_period) values ($1, $2, cast($3 as interval)) returning id ` // AddNotifRecord adds a new record to the notif_statuses table for the provided analysis. @@ -116,7 +117,7 @@ func (v *VICEDatabaser) AddNotifRecord(ctx context.Context, job *Job) (string, e addNotifRecordQuery, job.ID, job.ExternalID, - 14400, // 4 hours + "4 hours", // hardcoded for now ).Scan(¬ifID); err != nil { return "", err }