Skip to content

Commit

Permalink
use epoch for wf started column in MySQL backend (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
jessepeterson authored Oct 31, 2024
1 parent 98a2a82 commit 070b966
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 34 deletions.
2 changes: 1 addition & 1 deletion engine/storage/mysql/query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ WHERE

-- name: GetWorkflowLastStarted :one
SELECT
last_created_at
last_created_unix
FROM
wf_status
WHERE
Expand Down
3 changes: 3 additions & 0 deletions engine/storage/mysql/schema.00002.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE wf_status
DROP COLUMN last_created_at,
ADD COLUMN last_created_unix BIGINT NOT NULL DEFAULT (UNIX_TIMESTAMP());
6 changes: 5 additions & 1 deletion engine/storage/mysql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ CREATE TABLE wf_status (
enrollment_id VARCHAR(255) NOT NULL,
workflow_name VARCHAR(255) NOT NULL,

last_created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- this was intended to be "DEFAULT (UNIX_TIMESTAMP() * 1000)"
-- which would complement the Golang `time.Time{}.UnixMilli()`.
-- however sqlc seems to not support that syntax, so we'll settle
-- for less precision.
last_created_unix BIGINT NOT NULL DEFAULT (UNIX_TIMESTAMP()),

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
Expand Down
10 changes: 5 additions & 5 deletions engine/storage/mysql/sqlc/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions engine/storage/mysql/sqlc/query.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 8 additions & 15 deletions engine/storage/mysql/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,17 +245,11 @@ func (s *MySQLStorage) CancelSteps(ctx context.Context, id, workflowName string)

// RetrieveWorkflowStarted returns the last time a workflow was started for id.
func (s *MySQLStorage) RetrieveWorkflowStarted(ctx context.Context, id, workflowName string) (time.Time, error) {
ret, err := s.q.GetWorkflowLastStarted(ctx, sqlc.GetWorkflowLastStartedParams{EnrollmentID: id, WorkflowName: workflowName})
epoch, err := s.q.GetWorkflowLastStarted(ctx, sqlc.GetWorkflowLastStartedParams{EnrollmentID: id, WorkflowName: workflowName})
if errors.Is(err, sql.ErrNoRows) {
return time.Time{}, nil
} else if err != nil {
return time.Time{}, err
}
parsedTime, err := time.Parse(mySQLTimestampFormat, ret)
if err != nil {
return time.Time{}, fmt.Errorf("parsing time: %w", err)
}
return parsedTime, err
return time.Unix(epoch, 0), err
}

// RecordWorkflowStarted stores the started time for workflowName for ids.
Expand All @@ -265,26 +259,25 @@ func (s *MySQLStorage) RecordWorkflowStarted(ctx context.Context, ids []string,
}
const numFields = 3
const subst = ", (?, ?, ?)"
fmt.Println(len(ids), len(ids)-1)
parms := make([]interface{}, len(ids)*numFields)
startedFormat := started.Format(mySQLTimestampFormat)
startedUnix := started.Unix()
for i, id := range ids {
// these must match the SQL query, below
parms[i*numFields] = id
parms[i*numFields+1] = workflowName
parms[i*numFields+2] = startedFormat
parms[i*numFields+2] = startedUnix
}
val := subst[2:] + strings.Repeat(subst, len(ids)-1)
values := strings.Repeat(subst, len(ids))[2:]
_, err := s.db.ExecContext(
ctx,
`
INSERT INTO wf_status
(enrollment_id, workflow_name, last_created_at)
(enrollment_id, workflow_name, last_created_unix)
VALUES
`+val+` AS new
`+values+` AS new
ON DUPLICATE KEY
UPDATE
last_created_at = new.last_created_at;`,
last_created_unix = new.last_created_unix;`,
parms...,
)
return err
Expand Down
11 changes: 4 additions & 7 deletions engine/storage/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,7 @@ func mainTest(t *testing.T, s storage.AllStorage) {
// t.Fatalf("invalid test data: step enqueueing with config: %v", err)
// }

// some backends may truncate the time and drop TZ
// so let's truncate ourselves and eliminate the TZ.
// since this value is used to compare the retrived value
// we'll stick with that.
storedAt := time.Now().UTC().Truncate(time.Second)
storedAt := time.Now()

err := s.StoreStep(ctx, tStep.step, storedAt)
if tStep.shouldError && err == nil {
Expand All @@ -301,8 +297,9 @@ func mainTest(t *testing.T, s storage.AllStorage) {
}
if ts.IsZero() {
t.Errorf("RetrieveWorkflowStarted: nil timestamp for id=%s, step=%s err=%v", id, tStep.step.WorkflowName, err)
} else if ts != storedAt {
t.Errorf("RetrieveWorkflowStarted: timestamp mismatch for id=%s, step=%s expected=%v got=%v", id, tStep.step.WorkflowName, storedAt, ts)
} else if t1, t2 := ts.Truncate(time.Second), storedAt.Truncate(time.Second); t1.Compare(t2) != 0 {
// truncate comparison in case backends don't persist precision less than 1s (e.g. SQL textual dates)
t.Errorf("RetrieveWorkflowStarted: timestamp mismatch for id=%s, step=%s expected=%v got=%v compare=%v", id, tStep.step.WorkflowName, t2, t1, t1.Compare(t2))
}
}
}
Expand Down

0 comments on commit 070b966

Please sign in to comment.