Skip to content

Commit

Permalink
Merge branch 'main' into periodic-notifications-continued
Browse files Browse the repository at this point in the history
  • Loading branch information
ianmcorvidae committed Sep 18, 2024
2 parents ff34ec4 + 42551d5 commit fa6557d
Showing 1 changed file with 23 additions and 16 deletions.
39 changes: 23 additions & 16 deletions analyses.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,93 +648,100 @@ func getTimeLimit(ctx context.Context, dedb *sql.DB, analysisID string) (int64,
func CreateMessageHandler(dedb *sql.DB) func(context.Context, amqp.Delivery) {
return func(ctx context.Context, delivery amqp.Delivery) {
var err error
msgLog := log.WithFields(log.Fields{"context": "message handler"})

if err = delivery.Ack(false); err != nil {
log.Error(err)
msgLog.Error(err)
}

update := &messaging.UpdateMessage{}

if err = json.Unmarshal(delivery.Body, update); err != nil {
log.Error(errors.Wrap(err, "error unmarshaling body of update message"))
msgLog.Error(errors.Wrap(err, "error unmarshaling body of update message"))
return
}

var externalID string
if update.Job.InvocationID == "" {
log.Error("external ID was not provided as the invocation ID in the status update, ignoring update")
msgLog.Error("external ID was not provided as the invocation ID in the status update, ignoring update")
return
}
externalID = update.Job.InvocationID
msgLog = msgLog.WithFields(log.Fields{"externalID": externalID})

analysis, err := lookupByExternalID(ctx, dedb, externalID)
if err != nil {
log.Error(errors.Wrapf(err, "error looking up analysis by external ID '%s'", externalID))
msgLog.Error(errors.Wrapf(err, "error looking up analysis by external ID '%s'", externalID))
return
}
msgLog = msgLog.WithFields(log.Fields{"ID": analysis.ID})

analysisIsInteractive, err := isInteractive(ctx, dedb, analysis.ID)
if err != nil {
log.Error(errors.Wrapf(err, "error looking up interactive status for analysis %s", analysis.ID))
msgLog.Error(errors.Wrapf(err, "error looking up interactive status for analysis %s", analysis.ID))
return
}

if !analysisIsInteractive {
log.Infof("analysis %s is not interactive, so move along", analysis.ID)
msgLog.Infof("analysis %s is not interactive, so move along", analysis.ID)
return
}

if update.State != "Running" {
log.Infof("job status update for %s was %s, moving along", analysis.ID, update.State)
msgLog.Infof("job status update for %s was %s, moving along", analysis.ID, update.State)
return
}

log.Infof("job status update for %s was %s", analysis.ID, update.State)
msgLog.Infof("job status update for %s was %s", analysis.ID, update.State)

// Set the subdomain
if analysis.Subdomain == "" {
// make sure to use analysis.ID, not external ID here.
userID, err := getUserIDForJob(ctx, dedb, analysis.ID)
if err != nil {
log.Error(errors.Wrapf(err, "error getting userID for job %s", analysis.ID))
msgLog.Error(errors.Wrapf(err, "error getting userID for job %s", analysis.ID))
} else {
log.Infof("user id is %s and invocation id is %s", userID, externalID)
msgLog = msgLog.WithFields(log.Fields{"userID": userID})
msgLog.Infof("user id is %s and invocation id is %s", userID, externalID)

// make sure to use externalID, not analysis.ID here
subdomain := generateSubdomain(userID, externalID)

log.Infof("generated subdomain for analysis %s is %s, based on user ID %s and invocation ID %s", analysis.ID, subdomain, userID, externalID)
msgLog.Infof("generated subdomain for analysis %s is %s, based on user ID %s and invocation ID %s", analysis.ID, subdomain, userID, externalID)

if err = setSubdomain(ctx, dedb, analysis.ID, subdomain); err != nil {
log.Error(errors.Wrapf(err, "error setting subdomain for analysis '%s' to '%s'", analysis.ID, subdomain))
msgLog.Error(errors.Wrapf(err, "error setting subdomain for analysis '%s' to '%s'", analysis.ID, subdomain))
}
msgLog = msgLog.WithFields(log.Fields{"subdomain": subdomain})
}
} else {
msgLog = msgLog.WithFields(log.Fields{"subdomain": analysis.Subdomain})
}

// Check to see if the planned_end_date is set for the analysis
if analysis.PlannedEndDate != "" {
log.Infof("planned end date for %s is set to %s, nothing to do", analysis.ID, analysis.PlannedEndDate)
msgLog.Infof("planned end date for %s is set to %s, nothing to do", analysis.ID, analysis.PlannedEndDate)
return // it's already set, so move along.
}

startDate, err := time.Parse(TimestampFromDBFormat, analysis.StartDate)
if err != nil {
log.Error(errors.Wrapf(err, "error parsing start date field %s", analysis.StartDate))
msgLog.Error(errors.Wrapf(err, "error parsing start date field %s", analysis.StartDate))
return
}
sdnano := startDate.UnixNano()

timeLimitSeconds, err := getTimeLimit(ctx, dedb, analysis.ID)
if err != nil {
log.Error(errors.Wrapf(err, "error fetching time limit for analysis %s", analysis.ID))
msgLog.Error(errors.Wrapf(err, "error fetching time limit for analysis %s", analysis.ID))
return
}

// StartDate is in milliseconds, so convert it to nanoseconds, add correct number of seconds,
// then convert back to milliseconds.
endDate := time.Unix(0, sdnano).Add(time.Duration(timeLimitSeconds)*time.Second).UnixNano() / 1000000
if err = setPlannedEndDate(ctx, dedb, analysis.ID, endDate); err != nil {
log.Error(errors.Wrapf(err, "error setting planned end date for analysis '%s' to '%d'", analysis.ID, endDate))
msgLog.Error(errors.Wrapf(err, "error setting planned end date for analysis '%s' to '%d'", analysis.ID, endDate))
}
}
}

0 comments on commit fa6557d

Please sign in to comment.