Skip to content

Commit d5d8552

Browse files
authored
dbos migration (#106)
1 parent c5e221c commit d5d8552

File tree

8 files changed

+247
-138
lines changed

8 files changed

+247
-138
lines changed

cmd/dbos/cli_integration_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"bytes"
55
"context"
6+
"database/sql"
67
"encoding/json"
78
"fmt"
89
"io"
@@ -16,6 +17,7 @@ import (
1617
"time"
1718

1819
"github.com/dbos-inc/dbos-transact-golang/dbos"
20+
_ "github.com/jackc/pgx/v5/stdlib"
1921
"github.com/stretchr/testify/assert"
2022
"github.com/stretchr/testify/require"
2123
)
@@ -68,6 +70,18 @@ func TestCLIWorkflow(t *testing.T) {
6870
require.NoError(t, err, "Reset database command failed: %s", string(output))
6971

7072
assert.Contains(t, string(output), "System database has been reset successfully", "Output should confirm database reset")
73+
assert.Contains(t, string(output), "database\":\"dbos", "Output should confirm database reset")
74+
75+
// log in the database and ensure the dbos schema does not exist anymore
76+
db, err := sql.Open("pgx", getDatabaseURL())
77+
require.NoError(t, err)
78+
defer db.Close()
79+
80+
var exists bool
81+
err = db.QueryRow("SELECT EXISTS(SELECT 1 FROM information_schema.schemata WHERE schema_name = 'dbos')").Scan(&exists)
82+
require.NoError(t, err)
83+
84+
assert.False(t, exists, "DBOS schema should not exist")
7185
})
7286

7387
t.Run("ProjectInitialization", func(t *testing.T) {
@@ -145,6 +159,11 @@ func testProjectInitialization(t *testing.T, cliPath string) {
145159
modCmd := exec.Command("go", "mod", "tidy")
146160
modOutput, err := modCmd.CombinedOutput()
147161
require.NoError(t, err, "go mod tidy failed: %s", string(modOutput))
162+
163+
// TEMPORARY: go get github.com/dbos-inc/dbos-transact-golang/cmd/dbos@dbos-migration
164+
tmpCmd := exec.Command("go", "get", "github.com/dbos-inc/dbos-transact-golang/cmd/dbos@dbos-migration")
165+
tmpOutput, err := tmpCmd.CombinedOutput()
166+
require.NoError(t, err, "Failed to get dbos-migration: %s", string(tmpOutput))
148167
}
149168

150169
// testApplicationLifecycle starts the application and triggers workflows

dbos/dbos_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package dbos
22

33
import (
4+
"context"
45
"testing"
56
"time"
67

@@ -166,4 +167,105 @@ func TestConfig(t *testing.T) {
166167
assert.Equal(t, "env-only-executor", ctx.GetExecutorID())
167168
})
168169
})
170+
171+
t.Run("SystemDBMigration", func(t *testing.T) {
172+
t.Setenv("DBOS__APPVERSION", "v1.0.0")
173+
t.Setenv("DBOS__APPID", "test-migration")
174+
t.Setenv("DBOS__VMID", "test-executor-id")
175+
176+
ctx, err := NewDBOSContext(Config{
177+
DatabaseURL: databaseURL,
178+
AppName: "test-migration",
179+
})
180+
require.NoError(t, err)
181+
defer func() {
182+
if ctx != nil {
183+
ctx.Shutdown(1 * time.Minute)
184+
}
185+
}()
186+
187+
require.NotNil(t, ctx)
188+
189+
// Get the internal systemDB instance to check tables directly
190+
dbosCtx, ok := ctx.(*dbosContext)
191+
require.True(t, ok, "expected dbosContext")
192+
require.NotNil(t, dbosCtx.systemDB)
193+
194+
sysDB, ok := dbosCtx.systemDB.(*sysDB)
195+
require.True(t, ok, "expected sysDB")
196+
197+
// Verify all expected tables exist and have correct structure
198+
dbCtx := context.Background()
199+
200+
// Test workflow_status table
201+
var exists bool
202+
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'dbos' AND table_name = 'workflow_status')").Scan(&exists)
203+
require.NoError(t, err)
204+
assert.True(t, exists, "workflow_status table should exist")
205+
206+
// Test operation_outputs table
207+
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'dbos' AND table_name = 'operation_outputs')").Scan(&exists)
208+
require.NoError(t, err)
209+
assert.True(t, exists, "operation_outputs table should exist")
210+
211+
// Test workflow_events table
212+
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'dbos' AND table_name = 'workflow_events')").Scan(&exists)
213+
require.NoError(t, err)
214+
assert.True(t, exists, "workflow_events table should exist")
215+
216+
// Test notifications table
217+
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'dbos' AND table_name = 'notifications')").Scan(&exists)
218+
require.NoError(t, err)
219+
assert.True(t, exists, "notifications table should exist")
220+
221+
// Test that all tables can be queried (empty results expected)
222+
rows, err := sysDB.pool.Query(dbCtx, "SELECT workflow_uuid FROM dbos.workflow_status LIMIT 1")
223+
require.NoError(t, err)
224+
rows.Close()
225+
226+
rows, err = sysDB.pool.Query(dbCtx, "SELECT workflow_uuid FROM dbos.operation_outputs LIMIT 1")
227+
require.NoError(t, err)
228+
rows.Close()
229+
230+
rows, err = sysDB.pool.Query(dbCtx, "SELECT workflow_uuid FROM dbos.workflow_events LIMIT 1")
231+
require.NoError(t, err)
232+
rows.Close()
233+
234+
rows, err = sysDB.pool.Query(dbCtx, "SELECT destination_uuid FROM dbos.notifications LIMIT 1")
235+
require.NoError(t, err)
236+
rows.Close()
237+
238+
// Check that the dbos_migrations table exists and has one row with the correct version
239+
err = sysDB.pool.QueryRow(dbCtx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'dbos' AND table_name = 'dbos_migrations')").Scan(&exists)
240+
require.NoError(t, err)
241+
assert.True(t, exists, "dbos_migrations table should exist")
242+
243+
// Verify migration version is 1 (after initial migration)
244+
var version int64
245+
var count int
246+
err = sysDB.pool.QueryRow(dbCtx, "SELECT COUNT(*) FROM dbos.dbos_migrations").Scan(&count)
247+
require.NoError(t, err)
248+
assert.Equal(t, 1, count, "dbos_migrations table should have exactly one row")
249+
250+
err = sysDB.pool.QueryRow(dbCtx, "SELECT version FROM dbos.dbos_migrations").Scan(&version)
251+
require.NoError(t, err)
252+
assert.Equal(t, int64(1), version, "migration version should be 1 (after initial migration)")
253+
254+
// Test manual shutdown and recreate
255+
ctx.Shutdown(1 * time.Minute)
256+
257+
// Recreate context - should have no error since DB is already migrated
258+
ctx2, err := NewDBOSContext(Config{
259+
DatabaseURL: databaseURL,
260+
AppName: "test-migration-recreate",
261+
})
262+
require.NoError(t, err)
263+
defer func() {
264+
if ctx2 != nil {
265+
ctx2.Shutdown(1 * time.Minute)
266+
}
267+
}()
268+
269+
require.NotNil(t, ctx2)
270+
})
169271
}

dbos/migrations/000001_initial_dbos_schema.down.sql

Lines changed: 0 additions & 18 deletions
This file was deleted.
Lines changed: 39 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
1-
-- 001_initial_dbos_schema.up.sql
2-
3-
-- Create the dbos schema
4-
CREATE SCHEMA IF NOT EXISTS dbos;
5-
61
-- Enable uuid extension for generating UUIDs
72
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
83

9-
-- Create workflow_status table
10-
CREATE TABLE IF NOT EXISTS dbos.workflow_status (
4+
CREATE TABLE dbos.workflow_status (
115
workflow_uuid TEXT PRIMARY KEY,
126
status TEXT,
137
name TEXT,
148
authenticated_user TEXT,
159
assumed_role TEXT,
1610
authenticated_roles TEXT,
11+
request TEXT,
1712
output TEXT,
1813
error TEXT,
1914
executor_id TEXT,
@@ -27,34 +22,21 @@ CREATE TABLE IF NOT EXISTS dbos.workflow_status (
2722
queue_name TEXT,
2823
workflow_timeout_ms BIGINT,
2924
workflow_deadline_epoch_ms BIGINT,
25+
inputs TEXT,
3026
started_at_epoch_ms BIGINT,
3127
deduplication_id TEXT,
32-
inputs TEXT,
3328
priority INTEGER NOT NULL DEFAULT 0
3429
);
3530

36-
-- Create indexes for workflow_status
37-
CREATE INDEX IF NOT EXISTS workflow_status_created_at_index ON dbos.workflow_status (created_at);
38-
CREATE INDEX IF NOT EXISTS workflow_status_executor_id_index ON dbos.workflow_status (executor_id);
39-
CREATE INDEX IF NOT EXISTS workflow_status_status_index ON dbos.workflow_status (status);
31+
CREATE INDEX workflow_status_created_at_index ON dbos.workflow_status (created_at);
32+
CREATE INDEX workflow_status_executor_id_index ON dbos.workflow_status (executor_id);
33+
CREATE INDEX workflow_status_status_index ON dbos.workflow_status (status);
4034

41-
-- Create unique constraint for queue_name and deduplication_id
42-
DO $$
43-
BEGIN
44-
IF NOT EXISTS (
45-
SELECT 1 FROM information_schema.table_constraints
46-
WHERE constraint_name = 'uq_workflow_status_queue_name_dedup_id'
47-
AND table_name = 'workflow_status'
48-
AND table_schema = 'dbos'
49-
) THEN
50-
ALTER TABLE dbos.workflow_status
51-
ADD CONSTRAINT uq_workflow_status_queue_name_dedup_id
52-
UNIQUE (queue_name, deduplication_id);
53-
END IF;
54-
END $$;
35+
ALTER TABLE dbos.workflow_status
36+
ADD CONSTRAINT uq_workflow_status_queue_name_dedup_id
37+
UNIQUE (queue_name, deduplication_id);
5538

56-
-- Create operation_outputs table
57-
CREATE TABLE IF NOT EXISTS dbos.operation_outputs (
39+
CREATE TABLE dbos.operation_outputs (
5840
workflow_uuid TEXT NOT NULL,
5941
function_id INTEGER NOT NULL,
6042
function_name TEXT NOT NULL DEFAULT '',
@@ -66,7 +48,7 @@ CREATE TABLE IF NOT EXISTS dbos.operation_outputs (
6648
ON UPDATE CASCADE ON DELETE CASCADE
6749
);
6850

69-
CREATE TABLE IF NOT EXISTS dbos.notifications (
51+
CREATE TABLE dbos.notifications (
7052
destination_uuid TEXT NOT NULL,
7153
topic TEXT,
7254
message TEXT NOT NULL,
@@ -75,8 +57,7 @@ CREATE TABLE IF NOT EXISTS dbos.notifications (
7557
FOREIGN KEY (destination_uuid) REFERENCES dbos.workflow_status(workflow_uuid)
7658
ON UPDATE CASCADE ON DELETE CASCADE
7759
);
78-
-- Create index for notifications
79-
CREATE INDEX IF NOT EXISTS idx_workflow_topic ON dbos.notifications (destination_uuid, topic);
60+
CREATE INDEX idx_workflow_topic ON dbos.notifications (destination_uuid, topic);
8061

8162
-- Create notification function
8263
CREATE OR REPLACE FUNCTION dbos.notifications_function() RETURNS TRIGGER AS $$
@@ -89,22 +70,11 @@ END;
8970
$$ LANGUAGE plpgsql;
9071

9172
-- Create notification trigger
92-
DO $$
93-
BEGIN
94-
IF NOT EXISTS (
95-
SELECT 1 FROM information_schema.triggers
96-
WHERE trigger_name = 'dbos_notifications_trigger'
97-
AND event_object_table = 'notifications'
98-
AND event_object_schema = 'dbos'
99-
) THEN
100-
CREATE TRIGGER dbos_notifications_trigger
101-
AFTER INSERT ON dbos.notifications
102-
FOR EACH ROW EXECUTE FUNCTION dbos.notifications_function();
103-
END IF;
104-
END $$;
73+
CREATE TRIGGER dbos_notifications_trigger
74+
AFTER INSERT ON dbos.notifications
75+
FOR EACH ROW EXECUTE FUNCTION dbos.notifications_function();
10576

106-
-- Create workflow_events table
107-
CREATE TABLE IF NOT EXISTS dbos.workflow_events (
77+
CREATE TABLE dbos.workflow_events (
10878
workflow_uuid TEXT NOT NULL,
10979
key TEXT NOT NULL,
11080
value TEXT NOT NULL,
@@ -124,16 +94,26 @@ END;
12494
$$ LANGUAGE plpgsql;
12595

12696
-- Create events trigger
127-
DO $$
128-
BEGIN
129-
IF NOT EXISTS (
130-
SELECT 1 FROM information_schema.triggers
131-
WHERE trigger_name = 'dbos_workflow_events_trigger'
132-
AND event_object_table = 'workflow_events'
133-
AND event_object_schema = 'dbos'
134-
) THEN
135-
CREATE TRIGGER dbos_workflow_events_trigger
136-
AFTER INSERT ON dbos.workflow_events
137-
FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function();
138-
END IF;
139-
END $$;
97+
CREATE TRIGGER dbos_workflow_events_trigger
98+
AFTER INSERT ON dbos.workflow_events
99+
FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function();
100+
101+
CREATE TABLE dbos.streams (
102+
workflow_uuid TEXT NOT NULL,
103+
key TEXT NOT NULL,
104+
value TEXT NOT NULL,
105+
"offset" INTEGER NOT NULL,
106+
PRIMARY KEY (workflow_uuid, key, "offset"),
107+
FOREIGN KEY (workflow_uuid) REFERENCES dbos.workflow_status(workflow_uuid)
108+
ON UPDATE CASCADE ON DELETE CASCADE
109+
);
110+
111+
CREATE TABLE dbos.event_dispatch_kv (
112+
service_name TEXT NOT NULL,
113+
workflow_fn_name TEXT NOT NULL,
114+
key TEXT NOT NULL,
115+
value TEXT,
116+
update_seq NUMERIC(38,0),
117+
update_time NUMERIC(38,15),
118+
PRIMARY KEY (service_name, workflow_fn_name, key)
119+
);

0 commit comments

Comments
 (0)