diff --git a/analyses.go b/analyses.go index 42a3d25..049bbad 100644 --- a/analyses.go +++ b/analyses.go @@ -152,6 +152,93 @@ func JobsToKill(ctx context.Context, dedb *sql.DB) ([]Job, error) { return jobs, nil } +const periodicWarningsQuery = ` +SELECT jobs.id, + jobs.app_id, + jobs.user_id, + jobs.status, + jobs.job_description, + jobs.job_name, + jobs.result_folder_path, + jobs.planned_end_date, + jobs.start_date, + job_types.system_id, + users.username + FROM jobs + JOIN job_types on jobs.job_type_id = job_types.id + JOIN users on jobs.user_id = users.id + LEFT join notif_statuses ON jobs.id = notif_statuses.analysis_id + WHERE jobs.status = $1 + AND (notif_statuses.last_periodic_warning is null + OR notif_statuses.last_periodic_warning < now() - coalesce(notif_statuses.periodic_warning_period, '4 hours'::interval)) +` + +// JobPeriodicWarnings returns a list of running jobs that may need periodic notifications to be sent +func JobPeriodicWarnings(ctx context.Context, dedb *sql.DB) ([]Job, error) { + var ( + err error + rows *sql.Rows + ) + + if rows, err = dedb.QueryContext( + ctx, + periodicWarningsQuery, + "Running", + ); err != nil { + return nil, err + } + defer rows.Close() + + jobs := []Job{} + + for rows.Next() { + var ( + job Job + startDate pq.NullTime + plannedEndDate pq.NullTime + ) + + job = Job{} + + if err = rows.Scan( + &job.ID, + &job.AppID, + &job.UserID, + &job.Status, + &job.Description, + &job.Name, + &job.ResultFolder, + &plannedEndDate, + &startDate, + &job.Type, + &job.User, + ); err != nil { + return nil, err + } + + if plannedEndDate.Valid { + job.PlannedEndDate = plannedEndDate.Time.Format(TimestampFromDBFormat) + } + + if startDate.Valid { + job.StartDate = startDate.Time.Format(TimestampFromDBFormat) + } + + job.ExternalID, err = getExternalID(ctx, dedb, job.ID) + if err != nil { + return nil, err + } + + jobs = append(jobs, job) + } + + if err = rows.Err(); err != nil { + return nil, err + } + + return jobs, nil +} + const jobWarningsQuery = ` select jobs.id, jobs.app_id, diff --git a/go.mod b/go.mod index 21e7125..fbf0cc7 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/cyverse-de/messaging/v9 v9.1.5 github.com/lib/pq v1.10.4 github.com/pkg/errors v0.9.1 + github.com/sanyokbig/pqinterval v1.1.2 github.com/sirupsen/logrus v1.4.2 github.com/spf13/viper v1.4.0 github.com/streadway/amqp v1.0.1-0.20200716223359-e6b33f460591 diff --git a/go.sum b/go.sum index 994bb32..c0bfa2b 100644 --- a/go.sum +++ b/go.sum @@ -102,6 +102,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= +github.com/sanyokbig/pqinterval v1.1.2 h1:RzHMPdRMNvSZSDE+Qr20fFWSfBkKPFrLdFhzqmF0VnY= +github.com/sanyokbig/pqinterval v1.1.2/go.mod h1:jJvMjZaZFVqNTNVCd90zcFOkmbJgjxlWWkpu9/VeUFs= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -126,6 +128,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/main.go b/main.go index 745aa78..5929fd3 100644 --- a/main.go +++ b/main.go @@ -164,6 +164,40 @@ func SendWarningNotification(ctx context.Context, j *Job) error { return sendNotif(ctx, j, j.Status, subject, msg) } +func SendPeriodicNotification(ctx context.Context, j *Job) error { + starttime, err := time.Parse(TimestampFromDBFormat, j.StartDate) + if err != nil { + return errors.Wrapf(err, "failed to parse start date %s", j.StartDate) + } + dur := time.Since(starttime) + + subject := fmt.Sprintf(PeriodicSubjectFormat, j.Name, starttime, dur) + + msg := fmt.Sprintf( + PeriodicMessageFormat, + j.Name, + j.ID, + starttime, + dur, + ) + + return sendNotif(ctx, j, j.Status, subject, msg) +} + +func ensureNotifRecord(ctx context.Context, vicedb *VICEDatabaser, job Job) error { + analysisRecordExists := vicedb.AnalysisRecordExists(ctx, job.ID) + + if !analysisRecordExists { + notifId, err := vicedb.AddNotifRecord(ctx, &job) + if err != nil { + return err + } + log.Debugf("notif_statuses ID inserted: %s", notifId) + } + + return nil +} + const maxAttempts = 3 func sendWarning(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser, warningInterval int64, warningKey string) { @@ -180,13 +214,9 @@ func sendWarning(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser, warning updateFailureCount func(context.Context, *Job, int) error ) - analysisRecordExists := vicedb.AnalysisRecordExists(ctx, j.ID) - - if !analysisRecordExists { - if _, err = vicedb.AddNotifRecord(ctx, &j); err != nil { - log.Error(err) - continue - } + if err = ensureNotifRecord(ctx, vicedb, j); err != nil { + log.Error(err) + continue } notifStatuses, err = vicedb.NotifStatuses(ctx, &j) @@ -239,6 +269,72 @@ func sendWarning(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser, warning } } +func sendPeriodic(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser) { + // fetch jobs which periodic updates might apply to + jobs, err := JobPeriodicWarnings(ctx, db) + + // loop over them and check if they have notif_statuses info + if err != nil { + log.Error(err) + } else { + for _, j := range jobs { + var ( + notifStatuses *NotifStatuses + now time.Time + comparisonTimestamp time.Time + periodDuration time.Duration + ) + + // fetch preferences and update in the DB if needed + if err = ensureNotifRecord(ctx, vicedb, j); err != nil { + log.Error(err) + continue + } + + notifStatuses, err = vicedb.NotifStatuses(ctx, &j) + if err != nil { + log.Error(err) + continue + } + + periodDuration = 14400 * time.Second + if notifStatuses.PeriodicWarningPeriod > 0 { + periodDuration = time.Duration(notifStatuses.PeriodicWarningPeriod) * time.Second + } + + sd, err := time.Parse(TimestampFromDBFormat, j.StartDate) + if err != nil { + log.Error(errors.Wrapf(err, "Error parsing start date %s", j.StartDate)) + continue + } + comparisonTimestamp = sd + if notifStatuses.LastPeriodicWarning.After(sd) { + comparisonTimestamp = notifStatuses.LastPeriodicWarning + } + + log.Infof("Comparing last-warning timestamp %s with period %s s", comparisonTimestamp, periodDuration) + + now = time.Now() + + // timeframe is met if: more recent of (last warning, job start date) + periodic warning period is before now + if comparisonTimestamp.Add(periodDuration).Before(now) { + // if so, + err = SendPeriodicNotification(ctx, &j) + if err != nil { + log.Error(errors.Wrap(err, "Error sending periodic notification")) + continue + } + // update timestamp: + err = vicedb.UpdateLastPeriodicWarning(ctx, &j, now) + if err != nil { + log.Error(errors.Wrap(err, "Error updating periodic notification timestamp")) + continue + } + } + } + } +} + func main() { log.SetReportCaller(true) @@ -364,6 +460,9 @@ func main() { // 1 day warning sendWarning(ctx, db, vicedb, 1440, oneDayWarningKey) + // periodic warnings + sendPeriodic(ctx, db, vicedb) + jl, err = JobsToKill(ctx, db) if err != nil { log.Error(errors.Wrap(err, "error getting list of jobs to kill")) diff --git a/notifications.go b/notifications.go index bdf9910..4c895d8 100644 --- a/notifications.go +++ b/notifications.go @@ -37,6 +37,20 @@ Please finish any work that is in progress. Output files will be transferred to // to users when their job is going to terminate in the near future. const WarningSubjectFormat = "Analysis %s will terminate on %s (%s)." +// PeriodicMessageFormat is the parameterized message that gets sent to users +// when it's time to send a regular reminder the job is still running +// parameters: analysis name & ID, start date, duration +const PeriodicMessageFormat = `Analysis "%s" (%s) is still running. + +This is a regularly scheduled reminder message to ensure you don't use up your quota. + +This job has been running since %s (%s).` + +// PeriodicSubjectFormat is the parameterized subject for the email that is sent +// to users as a regular reminder of a running job +// parameters: analysis name, start date, duration +const PeriodicSubjectFormat = `Analysis %s is running since %s (%s)` + // Notification is a message intended as a notification to some upstream service // or the DE UI. type Notification struct { diff --git a/vicedb.go b/vicedb.go index 1f1f746..78404ec 100644 --- a/vicedb.go +++ b/vicedb.go @@ -3,7 +3,9 @@ package main import ( "context" "database/sql" + "time" + pqinterval "github.com/sanyokbig/pqinterval" log "github.com/sirupsen/logrus" ) @@ -22,6 +24,8 @@ type NotifStatuses struct { DayWarningFailureCount int KillWarningSent bool KillWarningFailureCount int + LastPeriodicWarning time.Time + PeriodicWarningPeriod time.Duration } const notifStatusQuery = ` @@ -32,7 +36,9 @@ const notifStatusQuery = ` day_warning_sent, day_warning_failure_count, kill_warning_sent, - kill_warning_failure_count + kill_warning_failure_count, + coalesce(last_periodic_warning, '1970-01-01 00:00:00') as last_periodic_warning, + coalesce(periodic_warning_period, '0 seconds'::interval) as periodic_warning_period from notif_statuses where analysis_id = $1 ` @@ -59,6 +65,8 @@ func (v *VICEDatabaser) NotifStatuses(ctx context.Context, job *Job) (*NotifStat ¬ifStatuses.DayWarningFailureCount, ¬ifStatuses.KillWarningSent, ¬ifStatuses.KillWarningFailureCount, + ¬ifStatuses.LastPeriodicWarning, + (*pqinterval.Duration)(¬ifStatuses.PeriodicWarningPeriod), ); err != nil { return nil, err } @@ -93,7 +101,7 @@ func (v *VICEDatabaser) AnalysisRecordExists(ctx context.Context, analysisID str } const addNotifRecordQuery = ` -insert into notif_statuses (analysis_id, external_id) values ($1, $2) returning id +insert into notif_statuses (analysis_id, external_id, periodic_warning_period) values ($1, $2, cast($3 as interval)) returning id ` // AddNotifRecord adds a new record to the notif_statuses table for the provided analysis. @@ -109,6 +117,7 @@ func (v *VICEDatabaser) AddNotifRecord(ctx context.Context, job *Job) (string, e addNotifRecordQuery, job.ID, job.ExternalID, + "4 hours", // hardcoded for now ).Scan(¬ifID); err != nil { return "", err } @@ -290,3 +299,19 @@ func (v *VICEDatabaser) SetKillWarningFailureCount(ctx context.Context, job *Job ) return err } + +const updateLastPeriodicWarningQuery = ` +update notif_statuses set last_periodic_warning = $1 where analysis_id = $2 +` + +// UpdateLastPeriodicWarning updates the timestamp for a job's last periodic warning +func (v *VICEDatabaser) UpdateLastPeriodicWarning(ctx context.Context, job *Job, ts time.Time) error { + var err error + _, err = v.db.ExecContext( + ctx, + updateLastPeriodicWarningQuery, + ts, + job.ID, + ) + return err +}