Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodic VICE notifications #10

Merged
merged 6 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions analyses.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,93 @@ func JobsToKill(ctx context.Context, dedb *sql.DB) ([]Job, error) {
return jobs, nil
}

const periodicWarningsQuery = `
SELECT jobs.id,
jobs.app_id,
jobs.user_id,
jobs.status,
jobs.job_description,
jobs.job_name,
jobs.result_folder_path,
jobs.planned_end_date,
jobs.start_date,
job_types.system_id,
users.username
FROM jobs
JOIN job_types on jobs.job_type_id = job_types.id
JOIN users on jobs.user_id = users.id
LEFT join notif_statuses ON jobs.id = notif_statuses.analysis_id
WHERE jobs.status = $1
AND (notif_statuses.last_periodic_warning is null
OR notif_statuses.last_periodic_warning < now() - coalesce(notif_statuses.periodic_warning_period, '4 hours'::interval))
`

// JobPeriodicWarnings returns a list of running jobs that may need periodic notifications to be sent
func JobPeriodicWarnings(ctx context.Context, dedb *sql.DB) ([]Job, error) {
var (
err error
rows *sql.Rows
)

if rows, err = dedb.QueryContext(
ctx,
periodicWarningsQuery,
"Running",
); err != nil {
return nil, err
}
defer rows.Close()

jobs := []Job{}

for rows.Next() {
var (
job Job
startDate pq.NullTime
plannedEndDate pq.NullTime
)

job = Job{}

if err = rows.Scan(
&job.ID,
&job.AppID,
&job.UserID,
&job.Status,
&job.Description,
&job.Name,
&job.ResultFolder,
&plannedEndDate,
&startDate,
&job.Type,
&job.User,
); err != nil {
return nil, err
}

if plannedEndDate.Valid {
job.PlannedEndDate = plannedEndDate.Time.Format(TimestampFromDBFormat)
}

if startDate.Valid {
job.StartDate = startDate.Time.Format(TimestampFromDBFormat)
}

job.ExternalID, err = getExternalID(ctx, dedb, job.ID)
if err != nil {
return nil, err
}

jobs = append(jobs, job)
}

if err = rows.Err(); err != nil {
return nil, err
}

return jobs, nil
}

const jobWarningsQuery = `
select jobs.id,
jobs.app_id,
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/cyverse-de/messaging/v9 v9.1.5
github.com/lib/pq v1.10.4
github.com/pkg/errors v0.9.1
github.com/sanyokbig/pqinterval v1.1.2
github.com/sirupsen/logrus v1.4.2
github.com/spf13/viper v1.4.0
github.com/streadway/amqp v1.0.1-0.20200716223359-e6b33f460591
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/sanyokbig/pqinterval v1.1.2 h1:RzHMPdRMNvSZSDE+Qr20fFWSfBkKPFrLdFhzqmF0VnY=
github.com/sanyokbig/pqinterval v1.1.2/go.mod h1:jJvMjZaZFVqNTNVCd90zcFOkmbJgjxlWWkpu9/VeUFs=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand All @@ -126,6 +128,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
113 changes: 106 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,40 @@ func SendWarningNotification(ctx context.Context, j *Job) error {
return sendNotif(ctx, j, j.Status, subject, msg)
}

func SendPeriodicNotification(ctx context.Context, j *Job) error {
starttime, err := time.Parse(TimestampFromDBFormat, j.StartDate)
if err != nil {
return errors.Wrapf(err, "failed to parse start date %s", j.StartDate)
}
dur := time.Since(starttime)

subject := fmt.Sprintf(PeriodicSubjectFormat, j.Name, starttime, dur)

msg := fmt.Sprintf(
PeriodicMessageFormat,
j.Name,
j.ID,
starttime,
dur,
)

return sendNotif(ctx, j, j.Status, subject, msg)
}

func ensureNotifRecord(ctx context.Context, vicedb *VICEDatabaser, job Job) error {
analysisRecordExists := vicedb.AnalysisRecordExists(ctx, job.ID)

if !analysisRecordExists {
notifId, err := vicedb.AddNotifRecord(ctx, &job)
if err != nil {
return err
}
log.Debugf("notif_statuses ID inserted: %s", notifId)
}

return nil
}

const maxAttempts = 3

func sendWarning(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser, warningInterval int64, warningKey string) {
Expand All @@ -180,13 +214,9 @@ func sendWarning(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser, warning
updateFailureCount func(context.Context, *Job, int) error
)

analysisRecordExists := vicedb.AnalysisRecordExists(ctx, j.ID)

if !analysisRecordExists {
if _, err = vicedb.AddNotifRecord(ctx, &j); err != nil {
log.Error(err)
continue
}
if err = ensureNotifRecord(ctx, vicedb, j); err != nil {
log.Error(err)
continue
}

notifStatuses, err = vicedb.NotifStatuses(ctx, &j)
Expand Down Expand Up @@ -239,6 +269,72 @@ func sendWarning(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser, warning
}
}

func sendPeriodic(ctx context.Context, db *sql.DB, vicedb *VICEDatabaser) {
// fetch jobs which periodic updates might apply to
jobs, err := JobPeriodicWarnings(ctx, db)

// loop over them and check if they have notif_statuses info
if err != nil {
log.Error(err)
} else {
for _, j := range jobs {
var (
notifStatuses *NotifStatuses
now time.Time
comparisonTimestamp time.Time
periodDuration time.Duration
)

// fetch preferences and update in the DB if needed
if err = ensureNotifRecord(ctx, vicedb, j); err != nil {
log.Error(err)
continue
}

notifStatuses, err = vicedb.NotifStatuses(ctx, &j)
if err != nil {
log.Error(err)
continue
}

periodDuration = 14400 * time.Second
if notifStatuses.PeriodicWarningPeriod > 0 {
periodDuration = time.Duration(notifStatuses.PeriodicWarningPeriod) * time.Second
}

sd, err := time.Parse(TimestampFromDBFormat, j.StartDate)
if err != nil {
log.Error(errors.Wrapf(err, "Error parsing start date %s", j.StartDate))
continue
}
comparisonTimestamp = sd
if notifStatuses.LastPeriodicWarning.After(sd) {
comparisonTimestamp = notifStatuses.LastPeriodicWarning
}

log.Infof("Comparing last-warning timestamp %s with period %s s", comparisonTimestamp, periodDuration)

now = time.Now()

// timeframe is met if: more recent of (last warning, job start date) + periodic warning period is before now
if comparisonTimestamp.Add(periodDuration).Before(now) {
// if so,
err = SendPeriodicNotification(ctx, &j)
if err != nil {
log.Error(errors.Wrap(err, "Error sending periodic notification"))
continue
}
// update timestamp:
err = vicedb.UpdateLastPeriodicWarning(ctx, &j, now)
if err != nil {
log.Error(errors.Wrap(err, "Error updating periodic notification timestamp"))
continue
}
}
}
}
}

func main() {
log.SetReportCaller(true)

Expand Down Expand Up @@ -364,6 +460,9 @@ func main() {
// 1 day warning
sendWarning(ctx, db, vicedb, 1440, oneDayWarningKey)

// periodic warnings
sendPeriodic(ctx, db, vicedb)

jl, err = JobsToKill(ctx, db)
if err != nil {
log.Error(errors.Wrap(err, "error getting list of jobs to kill"))
Expand Down
14 changes: 14 additions & 0 deletions notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ Please finish any work that is in progress. Output files will be transferred to
// to users when their job is going to terminate in the near future.
const WarningSubjectFormat = "Analysis %s will terminate on %s (%s)."

// PeriodicMessageFormat is the parameterized message that gets sent to users
// when it's time to send a regular reminder the job is still running
// parameters: analysis name & ID, start date, duration
const PeriodicMessageFormat = `Analysis "%s" (%s) is still running.

This is a regularly scheduled reminder message to ensure you don't use up your quota.

This job has been running since %s (%s).`

// PeriodicSubjectFormat is the parameterized subject for the email that is sent
// to users as a regular reminder of a running job
// parameters: analysis name, start date, duration
const PeriodicSubjectFormat = `Analysis %s is running since %s (%s)`
Comment on lines +43 to +52
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I received a request to have this message sent as an HTML message to make it more user friendly. That doesn't have to be done as part of this phase, but eventually, we'll probably want to use a template in de-mailer to format a user friendly message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I think we'll need to modify some of the notification stuff to allow passing in an email template, but that's certainly better. I might merge this to get it into QA for testing soonish (ideally, before I'm out for a few days coming up here), but I'll try to get something minimal into de-mailer soon. If there's a template already being used specifically for this/in place it should be easier to update the content to meet requirements anyway.


// Notification is a message intended as a notification to some upstream service
// or the DE UI.
type Notification struct {
Expand Down
29 changes: 27 additions & 2 deletions vicedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package main
import (
"context"
"database/sql"
"time"

pqinterval "github.com/sanyokbig/pqinterval"
log "github.com/sirupsen/logrus"
)

Expand All @@ -22,6 +24,8 @@ type NotifStatuses struct {
DayWarningFailureCount int
KillWarningSent bool
KillWarningFailureCount int
LastPeriodicWarning time.Time
PeriodicWarningPeriod time.Duration
}

const notifStatusQuery = `
Expand All @@ -32,7 +36,9 @@ const notifStatusQuery = `
day_warning_sent,
day_warning_failure_count,
kill_warning_sent,
kill_warning_failure_count
kill_warning_failure_count,
coalesce(last_periodic_warning, '1970-01-01 00:00:00') as last_periodic_warning,
coalesce(periodic_warning_period, '0 seconds'::interval) as periodic_warning_period
from notif_statuses
where analysis_id = $1
`
Expand All @@ -59,6 +65,8 @@ func (v *VICEDatabaser) NotifStatuses(ctx context.Context, job *Job) (*NotifStat
&notifStatuses.DayWarningFailureCount,
&notifStatuses.KillWarningSent,
&notifStatuses.KillWarningFailureCount,
&notifStatuses.LastPeriodicWarning,
(*pqinterval.Duration)(&notifStatuses.PeriodicWarningPeriod),
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -93,7 +101,7 @@ func (v *VICEDatabaser) AnalysisRecordExists(ctx context.Context, analysisID str
}

const addNotifRecordQuery = `
insert into notif_statuses (analysis_id, external_id) values ($1, $2) returning id
insert into notif_statuses (analysis_id, external_id, periodic_warning_period) values ($1, $2, cast($3 as interval)) returning id
`

// AddNotifRecord adds a new record to the notif_statuses table for the provided analysis.
Expand All @@ -109,6 +117,7 @@ func (v *VICEDatabaser) AddNotifRecord(ctx context.Context, job *Job) (string, e
addNotifRecordQuery,
job.ID,
job.ExternalID,
"4 hours", // hardcoded for now
).Scan(&notifID); err != nil {
return "", err
}
Expand Down Expand Up @@ -290,3 +299,19 @@ func (v *VICEDatabaser) SetKillWarningFailureCount(ctx context.Context, job *Job
)
return err
}

const updateLastPeriodicWarningQuery = `
update notif_statuses set last_periodic_warning = $1 where analysis_id = $2
`

// UpdateLastPeriodicWarning updates the timestamp for a job's last periodic warning
func (v *VICEDatabaser) UpdateLastPeriodicWarning(ctx context.Context, job *Job, ts time.Time) error {
var err error
_, err = v.db.ExecContext(
ctx,
updateLastPeriodicWarningQuery,
ts,
job.ID,
)
return err
}