Skip to content

Commit

Permalink
add visibility queries
Browse files Browse the repository at this point in the history
  • Loading branch information
duoertai committed Mar 11, 2024
1 parent f93c258 commit 7907906
Show file tree
Hide file tree
Showing 2 changed files with 281 additions and 0 deletions.
219 changes: 219 additions & 0 deletions extensions/postgres/non_transactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"fmt"
"github.com/xcherryio/apis/goapi/xcapi"
"github.com/xcherryio/xcherry/common/uuid"
"github.com/xcherryio/xcherry/persistence/data_models"
"strings"
"time"

"github.com/jmoiron/sqlx"
"github.com/xcherryio/xcherry/extensions"
Expand Down Expand Up @@ -229,3 +231,220 @@ func (d dbSession) UpdateProcessExecutionStatusForVisibility(
_, err := d.db.NamedExecContext(ctx, updateProcessExecutionStatusQuery, row)
return err
}

const selectProcessExecutionsQuery = `SELECT *
FROM executions_visibility
WHERE namespace = $1
AND start_time >= $2 AND start_time <= $3
AND (process_execution_id > $4 OR start_time < $5)
ORDER BY start_time DESC, process_execution_id
LIMIT $6
`

func (d dbSession) SelectProcessExecutions(
ctx context.Context,
namespace string,
startTimeMinInclusive, startTimeMaxInclusive int64,
lastProcessExecutionId uuid.UUID,
lastStartTime int64,
pageSize int32,
) ([]extensions.ExecutionVisibilityRow, error) {
var rows []extensions.ExecutionVisibilityRow
lastProcessExecutionIdString := lastProcessExecutionId.String()
err := d.db.SelectContext(ctx, &rows, selectProcessExecutionsQuery,
namespace,
time.Unix(startTimeMinInclusive, 0),
time.Unix(startTimeMaxInclusive, 0),
lastProcessExecutionIdString,
time.Unix(lastStartTime, 0),
pageSize)
if err != nil {
return nil, err
}

Check warning on line 263 in extensions/postgres/non_transactional.go

View check run for this annotation

Codecov / codecov/patch

extensions/postgres/non_transactional.go#L251-L263

Added lines #L251 - L263 were not covered by tests

return rows, nil

Check warning on line 265 in extensions/postgres/non_transactional.go

View check run for this annotation

Codecov / codecov/patch

extensions/postgres/non_transactional.go#L265

Added line #L265 was not covered by tests
}

const selectProcessExecutionsByStatusQuery = `SELECT *
FROM executions_visibility
WHERE namespace = $1
AND status = $2
AND start_time >= $3 AND start_time <= $4
AND (process_execution_id > $5 OR start_time < $6)
ORDER BY start_time DESC, process_execution_id
LIMIT $7
`

func (d dbSession) SelectProcessExecutionsByStatus(
ctx context.Context,
namespace string,
status data_models.ProcessExecutionStatus,
startTimeMinInclusive, startTimeMaxInclusive int64,
lastProcessExecutionId uuid.UUID,
lastStartTime int64,
pageSize int32,
) ([]extensions.ExecutionVisibilityRow, error) {
var rows []extensions.ExecutionVisibilityRow
lastProcessExecutionIdString := lastProcessExecutionId.String()
err := d.db.SelectContext(ctx, &rows, selectProcessExecutionsByStatusQuery,
namespace,
status,
time.Unix(startTimeMinInclusive, 0),
time.Unix(startTimeMaxInclusive, 0),
lastProcessExecutionIdString,
time.Unix(lastStartTime, 0),
pageSize)
if err != nil {
return nil, err
}

Check warning on line 299 in extensions/postgres/non_transactional.go

View check run for this annotation

Codecov / codecov/patch

extensions/postgres/non_transactional.go#L286-L299

Added lines #L286 - L299 were not covered by tests

return rows, nil

Check warning on line 301 in extensions/postgres/non_transactional.go

View check run for this annotation

Codecov / codecov/patch

extensions/postgres/non_transactional.go#L301

Added line #L301 was not covered by tests
}

const selectProcessExecutionsByTypeQuery = `SELECT *
FROM executions_visibility
WHERE namespace = $1
AND process_type_name = $2
AND start_time >= $3 AND start_time <= $4
AND (process_execution_id > $5 OR start_time < $6)
ORDER BY start_time DESC, process_execution_id
LIMIT $7
`

func (d dbSession) SelectProcessExecutionsByTypeQuery(
ctx context.Context,
namespace string,
processTypeName string,
startTimeMinInclusive, startTimeMaxInclusive int64,
lastProcessExecutionId uuid.UUID,
lastStartTime int64,
pageSize int32,
) ([]extensions.ExecutionVisibilityRow, error) {
var rows []extensions.ExecutionVisibilityRow
lastProcessExecutionIdString := lastProcessExecutionId.String()
err := d.db.SelectContext(ctx, &rows, selectProcessExecutionsByTypeQuery,
namespace,
processTypeName,
time.Unix(startTimeMinInclusive, 0),
time.Unix(startTimeMaxInclusive, 0),
lastProcessExecutionIdString,
time.Unix(lastStartTime, 0),
pageSize)
if err != nil {
return nil, err
}

Check warning on line 335 in extensions/postgres/non_transactional.go

View check run for this annotation

Codecov / codecov/patch

extensions/postgres/non_transactional.go#L322-L335

Added lines #L322 - L335 were not covered by tests

return rows, nil

Check warning on line 337 in extensions/postgres/non_transactional.go

View check run for this annotation

Codecov / codecov/patch

extensions/postgres/non_transactional.go#L337

Added line #L337 was not covered by tests
}

const selectProcessExecutionsByIdQuery = `SELECT *
FROM executions_visibility
WHERE namespace = $1
AND process_id = $2
AND start_time >= $3 AND start_time <= $4
AND (process_execution_id > $5 OR start_time < $6)
ORDER BY start_time DESC, process_execution_id
LIMIT $7`

func (d dbSession) SelectProcessExecutionsById(
ctx context.Context,
namespace string,
processId string,
startTimeMinInclusive, startTimeMaxInclusive int64,
lastProcessExecutionId uuid.UUID,
lastStartTime int64,
pageSize int32,
) ([]extensions.ExecutionVisibilityRow, error) {
var rows []extensions.ExecutionVisibilityRow
lastProcessExecutionIdString := lastProcessExecutionId.String()
err := d.db.SelectContext(ctx, &rows, selectProcessExecutionsByIdQuery,
namespace,
processId,
time.Unix(startTimeMinInclusive, 0),
time.Unix(startTimeMaxInclusive, 0),
lastProcessExecutionIdString,
time.Unix(lastStartTime, 0),
pageSize)
if err != nil {
return nil, err
}

Check warning on line 370 in extensions/postgres/non_transactional.go

View check run for this annotation

Codecov / codecov/patch

extensions/postgres/non_transactional.go#L357-L370

Added lines #L357 - L370 were not covered by tests

return rows, nil

Check warning on line 372 in extensions/postgres/non_transactional.go

View check run for this annotation

Codecov / codecov/patch

extensions/postgres/non_transactional.go#L372

Added line #L372 was not covered by tests
}

const selectProcessExecutionsByStatusAndType = `SELECT *
FROM executions_visibility
WHERE namespace = $1
AND status = $2
AND process_type_name = $3
AND start_time >= $4 AND start_time <= $5
AND (process_execution_id > $6 OR start_time < $7)
ORDER BY start_time DESC, process_execution_id
LIMIT $8
`

func (d dbSession) SelectProcessExecutionsByStatusAndType(
ctx context.Context,
namespace string,
status data_models.ProcessExecutionStatus,
processTypeName string,
startTimeMinInclusive, startTimeMaxInclusive int64,
lastProcessExecutionId uuid.UUID,
lastStartTime int64,
pageSize int32,
) ([]extensions.ExecutionVisibilityRow, error) {
var rows []extensions.ExecutionVisibilityRow
lastProcessExecutionIdString := lastProcessExecutionId.String()
err := d.db.SelectContext(ctx, &rows, selectProcessExecutionsByStatusAndType,
namespace,
status,
processTypeName,
time.Unix(startTimeMinInclusive, 0),
time.Unix(startTimeMaxInclusive, 0),
lastProcessExecutionIdString,
time.Unix(lastStartTime, 0),
pageSize)
if err != nil {
return nil, err
}

Check warning on line 409 in extensions/postgres/non_transactional.go

View check run for this annotation

Codecov / codecov/patch

extensions/postgres/non_transactional.go#L395-L409

Added lines #L395 - L409 were not covered by tests

return rows, nil

Check warning on line 411 in extensions/postgres/non_transactional.go

View check run for this annotation

Codecov / codecov/patch

extensions/postgres/non_transactional.go#L411

Added line #L411 was not covered by tests
}

const selectProcessExecutionsByStatusAndIdQuery = `SELECT *
FROM executions_visibility
WHERE namespace = $1
AND status = $2
AND process_type_name = $3
AND start_time >= $4 AND start_time <= $5
AND (process_execution_id > $6 OR start_time < $7)
ORDER BY start_time DESC, process_execution_id
LIMIT $8`

func (d dbSession) SelectProcessExecutionsByStatusAndId(
ctx context.Context,
namespace string,
status data_models.ProcessExecutionStatus,
processId string,
startTimeMinInclusive, startTimeMaxInclusive int64,
lastProcessExecutionId uuid.UUID,
lastStartTime int64,
pageSize int32,
) ([]extensions.ExecutionVisibilityRow, error) {
var rows []extensions.ExecutionVisibilityRow
lastProcessExecutionIdString := lastProcessExecutionId.String()
err := d.db.SelectContext(ctx, &rows, selectProcessExecutionsByStatusAndIdQuery,
namespace,
status,
processId,
time.Unix(startTimeMinInclusive, 0),
time.Unix(startTimeMaxInclusive, 0),
lastProcessExecutionIdString,
time.Unix(lastStartTime, 0),
pageSize)
if err != nil {
return nil, err
}

Check warning on line 447 in extensions/postgres/non_transactional.go

View check run for this annotation

Codecov / codecov/patch

extensions/postgres/non_transactional.go#L433-L447

Added lines #L433 - L447 were not covered by tests

return rows, nil

Check warning on line 449 in extensions/postgres/non_transactional.go

View check run for this annotation

Codecov / codecov/patch

extensions/postgres/non_transactional.go#L449

Added line #L449 was not covered by tests
}
62 changes: 62 additions & 0 deletions extensions/sql_db_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"database/sql"
"github.com/xcherryio/apis/goapi/xcapi"
"github.com/xcherryio/xcherry/persistence/data_models"

"github.com/xcherryio/xcherry/common/uuid"
"github.com/xcherryio/xcherry/config"
Expand Down Expand Up @@ -114,6 +115,67 @@ type nonTransactionalCRUD interface {
UpdateProcessExecutionStatusForVisibility(
ctx context.Context, row ExecutionVisibilityRow,
) error

SelectProcessExecutions(
ctx context.Context,
namespace string,
startTimeMinInclusive, startTimeMaxInclusive int64,
lastProcessExecutionId uuid.UUID,
lastStartTime int64,
pageSize int32,
) ([]ExecutionVisibilityRow, error)

SelectProcessExecutionsByStatus(
ctx context.Context,
namespace string,
status data_models.ProcessExecutionStatus,
startTimeMinInclusive, startTimeMaxInclusive int64,
lastProcessExecutionId uuid.UUID,
lastStartTime int64,
pageSize int32,
) ([]ExecutionVisibilityRow, error)

SelectProcessExecutionsByTypeQuery(
ctx context.Context,
namespace string,
processTypeName string,
startTimeMinInclusive, startTimeMaxInclusive int64,
lastProcessExecutionId uuid.UUID,
lastStartTime int64,
pageSize int32,
) ([]ExecutionVisibilityRow, error)

SelectProcessExecutionsById(
ctx context.Context,
namespace string,
processId string,
startTimeMinInclusive, startTimeMaxInclusive int64,
lastProcessExecutionId uuid.UUID,
lastStartTime int64,
pageSize int32,
) ([]ExecutionVisibilityRow, error)

SelectProcessExecutionsByStatusAndType(
ctx context.Context,
namespace string,
status data_models.ProcessExecutionStatus,
processTypeName string,
startTimeMinInclusive, startTimeMaxInclusive int64,
lastProcessExecutionId uuid.UUID,
lastStartTime int64,
pageSize int32,
) ([]ExecutionVisibilityRow, error)

SelectProcessExecutionsByStatusAndId(
ctx context.Context,
namespace string,
status data_models.ProcessExecutionStatus,
processId string,
startTimeMinInclusive, startTimeMaxInclusive int64,
lastProcessExecutionId uuid.UUID,
lastStartTime int64,
pageSize int32,
) ([]ExecutionVisibilityRow, error)
}

type ErrorChecker interface {
Expand Down

0 comments on commit 7907906

Please sign in to comment.