From 7907906847e8f7879c2fd132de292726a884dca1 Mon Sep 17 00:00:00 2001 From: duoertai Date: Mon, 11 Mar 2024 08:54:11 -0700 Subject: [PATCH] add visibility queries --- extensions/postgres/non_transactional.go | 219 +++++++++++++++++++++++ extensions/sql_db_interfaces.go | 62 +++++++ 2 files changed, 281 insertions(+) diff --git a/extensions/postgres/non_transactional.go b/extensions/postgres/non_transactional.go index 7ec4eba..ebcaa90 100644 --- a/extensions/postgres/non_transactional.go +++ b/extensions/postgres/non_transactional.go @@ -8,7 +8,9 @@ import ( "fmt" "github.com/xcherryio/apis/goapi/xcapi" "github.com/xcherryio/xcherry/common/uuid" + "github.com/xcherryio/xcherry/persistence/data_models" "strings" + "time" "github.com/jmoiron/sqlx" "github.com/xcherryio/xcherry/extensions" @@ -229,3 +231,220 @@ func (d dbSession) UpdateProcessExecutionStatusForVisibility( _, err := d.db.NamedExecContext(ctx, updateProcessExecutionStatusQuery, row) return err } + +const selectProcessExecutionsQuery = `SELECT * +FROM executions_visibility +WHERE namespace = $1 +AND start_time >= $2 AND start_time <= $3 +AND (process_execution_id > $4 OR start_time < $5) +ORDER BY start_time DESC, process_execution_id +LIMIT $6 +` + +func (d dbSession) SelectProcessExecutions( + ctx context.Context, + namespace string, + startTimeMinInclusive, startTimeMaxInclusive int64, + lastProcessExecutionId uuid.UUID, + lastStartTime int64, + pageSize int32, +) ([]extensions.ExecutionVisibilityRow, error) { + var rows []extensions.ExecutionVisibilityRow + lastProcessExecutionIdString := lastProcessExecutionId.String() + err := d.db.SelectContext(ctx, &rows, selectProcessExecutionsQuery, + namespace, + time.Unix(startTimeMinInclusive, 0), + time.Unix(startTimeMaxInclusive, 0), + lastProcessExecutionIdString, + time.Unix(lastStartTime, 0), + pageSize) + if err != nil { + return nil, err + } + + return rows, nil +} + +const selectProcessExecutionsByStatusQuery = `SELECT * +FROM executions_visibility +WHERE namespace = $1 +AND status = $2 +AND start_time >= $3 AND start_time <= $4 +AND (process_execution_id > $5 OR start_time < $6) +ORDER BY start_time DESC, process_execution_id +LIMIT $7 +` + +func (d dbSession) SelectProcessExecutionsByStatus( + ctx context.Context, + namespace string, + status data_models.ProcessExecutionStatus, + startTimeMinInclusive, startTimeMaxInclusive int64, + lastProcessExecutionId uuid.UUID, + lastStartTime int64, + pageSize int32, +) ([]extensions.ExecutionVisibilityRow, error) { + var rows []extensions.ExecutionVisibilityRow + lastProcessExecutionIdString := lastProcessExecutionId.String() + err := d.db.SelectContext(ctx, &rows, selectProcessExecutionsByStatusQuery, + namespace, + status, + time.Unix(startTimeMinInclusive, 0), + time.Unix(startTimeMaxInclusive, 0), + lastProcessExecutionIdString, + time.Unix(lastStartTime, 0), + pageSize) + if err != nil { + return nil, err + } + + return rows, nil +} + +const selectProcessExecutionsByTypeQuery = `SELECT * +FROM executions_visibility +WHERE namespace = $1 +AND process_type_name = $2 +AND start_time >= $3 AND start_time <= $4 +AND (process_execution_id > $5 OR start_time < $6) +ORDER BY start_time DESC, process_execution_id +LIMIT $7 +` + +func (d dbSession) SelectProcessExecutionsByTypeQuery( + ctx context.Context, + namespace string, + processTypeName string, + startTimeMinInclusive, startTimeMaxInclusive int64, + lastProcessExecutionId uuid.UUID, + lastStartTime int64, + pageSize int32, +) ([]extensions.ExecutionVisibilityRow, error) { + var rows []extensions.ExecutionVisibilityRow + lastProcessExecutionIdString := lastProcessExecutionId.String() + err := d.db.SelectContext(ctx, &rows, selectProcessExecutionsByTypeQuery, + namespace, + processTypeName, + time.Unix(startTimeMinInclusive, 0), + time.Unix(startTimeMaxInclusive, 0), + lastProcessExecutionIdString, + time.Unix(lastStartTime, 0), + pageSize) + if err != nil { + return nil, err + } + + return rows, nil +} + +const selectProcessExecutionsByIdQuery = `SELECT * +FROM executions_visibility +WHERE namespace = $1 +AND process_id = $2 +AND start_time >= $3 AND start_time <= $4 +AND (process_execution_id > $5 OR start_time < $6) +ORDER BY start_time DESC, process_execution_id +LIMIT $7` + +func (d dbSession) SelectProcessExecutionsById( + ctx context.Context, + namespace string, + processId string, + startTimeMinInclusive, startTimeMaxInclusive int64, + lastProcessExecutionId uuid.UUID, + lastStartTime int64, + pageSize int32, +) ([]extensions.ExecutionVisibilityRow, error) { + var rows []extensions.ExecutionVisibilityRow + lastProcessExecutionIdString := lastProcessExecutionId.String() + err := d.db.SelectContext(ctx, &rows, selectProcessExecutionsByIdQuery, + namespace, + processId, + time.Unix(startTimeMinInclusive, 0), + time.Unix(startTimeMaxInclusive, 0), + lastProcessExecutionIdString, + time.Unix(lastStartTime, 0), + pageSize) + if err != nil { + return nil, err + } + + return rows, nil +} + +const selectProcessExecutionsByStatusAndType = `SELECT * +FROM executions_visibility +WHERE namespace = $1 +AND status = $2 +AND process_type_name = $3 +AND start_time >= $4 AND start_time <= $5 +AND (process_execution_id > $6 OR start_time < $7) +ORDER BY start_time DESC, process_execution_id +LIMIT $8 +` + +func (d dbSession) SelectProcessExecutionsByStatusAndType( + ctx context.Context, + namespace string, + status data_models.ProcessExecutionStatus, + processTypeName string, + startTimeMinInclusive, startTimeMaxInclusive int64, + lastProcessExecutionId uuid.UUID, + lastStartTime int64, + pageSize int32, +) ([]extensions.ExecutionVisibilityRow, error) { + var rows []extensions.ExecutionVisibilityRow + lastProcessExecutionIdString := lastProcessExecutionId.String() + err := d.db.SelectContext(ctx, &rows, selectProcessExecutionsByStatusAndType, + namespace, + status, + processTypeName, + time.Unix(startTimeMinInclusive, 0), + time.Unix(startTimeMaxInclusive, 0), + lastProcessExecutionIdString, + time.Unix(lastStartTime, 0), + pageSize) + if err != nil { + return nil, err + } + + return rows, nil +} + +const selectProcessExecutionsByStatusAndIdQuery = `SELECT * +FROM executions_visibility +WHERE namespace = $1 +AND status = $2 +AND process_type_name = $3 +AND start_time >= $4 AND start_time <= $5 +AND (process_execution_id > $6 OR start_time < $7) +ORDER BY start_time DESC, process_execution_id +LIMIT $8` + +func (d dbSession) SelectProcessExecutionsByStatusAndId( + ctx context.Context, + namespace string, + status data_models.ProcessExecutionStatus, + processId string, + startTimeMinInclusive, startTimeMaxInclusive int64, + lastProcessExecutionId uuid.UUID, + lastStartTime int64, + pageSize int32, +) ([]extensions.ExecutionVisibilityRow, error) { + var rows []extensions.ExecutionVisibilityRow + lastProcessExecutionIdString := lastProcessExecutionId.String() + err := d.db.SelectContext(ctx, &rows, selectProcessExecutionsByStatusAndIdQuery, + namespace, + status, + processId, + time.Unix(startTimeMinInclusive, 0), + time.Unix(startTimeMaxInclusive, 0), + lastProcessExecutionIdString, + time.Unix(lastStartTime, 0), + pageSize) + if err != nil { + return nil, err + } + + return rows, nil +} diff --git a/extensions/sql_db_interfaces.go b/extensions/sql_db_interfaces.go index 1848797..5ca473b 100644 --- a/extensions/sql_db_interfaces.go +++ b/extensions/sql_db_interfaces.go @@ -7,6 +7,7 @@ import ( "context" "database/sql" "github.com/xcherryio/apis/goapi/xcapi" + "github.com/xcherryio/xcherry/persistence/data_models" "github.com/xcherryio/xcherry/common/uuid" "github.com/xcherryio/xcherry/config" @@ -114,6 +115,67 @@ type nonTransactionalCRUD interface { UpdateProcessExecutionStatusForVisibility( ctx context.Context, row ExecutionVisibilityRow, ) error + + SelectProcessExecutions( + ctx context.Context, + namespace string, + startTimeMinInclusive, startTimeMaxInclusive int64, + lastProcessExecutionId uuid.UUID, + lastStartTime int64, + pageSize int32, + ) ([]ExecutionVisibilityRow, error) + + SelectProcessExecutionsByStatus( + ctx context.Context, + namespace string, + status data_models.ProcessExecutionStatus, + startTimeMinInclusive, startTimeMaxInclusive int64, + lastProcessExecutionId uuid.UUID, + lastStartTime int64, + pageSize int32, + ) ([]ExecutionVisibilityRow, error) + + SelectProcessExecutionsByTypeQuery( + ctx context.Context, + namespace string, + processTypeName string, + startTimeMinInclusive, startTimeMaxInclusive int64, + lastProcessExecutionId uuid.UUID, + lastStartTime int64, + pageSize int32, + ) ([]ExecutionVisibilityRow, error) + + SelectProcessExecutionsById( + ctx context.Context, + namespace string, + processId string, + startTimeMinInclusive, startTimeMaxInclusive int64, + lastProcessExecutionId uuid.UUID, + lastStartTime int64, + pageSize int32, + ) ([]ExecutionVisibilityRow, error) + + SelectProcessExecutionsByStatusAndType( + ctx context.Context, + namespace string, + status data_models.ProcessExecutionStatus, + processTypeName string, + startTimeMinInclusive, startTimeMaxInclusive int64, + lastProcessExecutionId uuid.UUID, + lastStartTime int64, + pageSize int32, + ) ([]ExecutionVisibilityRow, error) + + SelectProcessExecutionsByStatusAndId( + ctx context.Context, + namespace string, + status data_models.ProcessExecutionStatus, + processId string, + startTimeMinInclusive, startTimeMaxInclusive int64, + lastProcessExecutionId uuid.UUID, + lastStartTime int64, + pageSize int32, + ) ([]ExecutionVisibilityRow, error) } type ErrorChecker interface {