Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rename waitToComplete to GracefulCompleteRequested #118

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions extensions/data_models_row.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type (
StateExecutionSequenceMaps types.JSONText
StateExecutionLocalQueues types.JSONText

WaitToComplete bool
GracefulCompleteRequested bool
}

ProcessExecutionRow struct {
Expand All @@ -63,11 +63,11 @@ type (

Namespace string

ProcessId string
StartTime time.Time
TimeoutSeconds int32
Info types.JSONText
WaitToComplete bool
ProcessId string
StartTime time.Time
TimeoutSeconds int32
Info types.JSONText
GracefulCompleteRequested bool
}

AsyncStateExecutionSelectFilter struct {
Expand Down
2 changes: 1 addition & 1 deletion extensions/postgres/schema/xcherry_sys_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ CREATE TABLE xcherry_sys_process_executions(
history_event_id_sequence INTEGER,
state_execution_sequence_maps jsonb NOT NULL , -- some maps from stateId and sequence number
state_execution_local_queues jsonb, -- some maps to quickly consume received local queue messages
wait_to_complete BOOLEAN NOT NULL DEFAULT false, -- if set to true, the process will be gracefully completed when there is no running state
graceful_complete_requested BOOLEAN NOT NULL DEFAULT false, -- if set to true, the process will be gracefully completed when there is no running state
info jsonb , -- workerURL, processType, etc
PRIMARY KEY (id)
);
Expand Down
6 changes: 3 additions & 3 deletions extensions/postgres/transactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ status = :status,
history_event_id_sequence = :history_event_id_sequence,
state_execution_sequence_maps = :state_execution_sequence_maps,
state_execution_local_queues = :state_execution_local_queues,
wait_to_complete = :wait_to_complete
graceful_complete_requested = :graceful_complete_requested
WHERE id=:process_execution_id_string
`

Expand Down Expand Up @@ -189,7 +189,7 @@ func (d dbTx) InsertImmediateTask(ctx context.Context, row extensions.ImmediateT
}

const selectProcessExecutionForUpdateQuery = `SELECT
id as process_execution_id, status, history_event_id_sequence, state_execution_sequence_maps, state_execution_local_queues, wait_to_complete
id as process_execution_id, status, history_event_id_sequence, state_execution_sequence_maps, state_execution_local_queues, graceful_complete_requested
FROM xcherry_sys_process_executions WHERE id=$1 FOR UPDATE`

func (d dbTx) SelectProcessExecutionForUpdate(
Expand All @@ -201,7 +201,7 @@ func (d dbTx) SelectProcessExecutionForUpdate(
}

const selectProcessExecutionQuery = `SELECT
id as process_execution_id, status, history_event_id_sequence, state_execution_sequence_maps, state_execution_local_queues, wait_to_complete,
id as process_execution_id, status, history_event_id_sequence, state_execution_sequence_maps, state_execution_local_queues, graceful_complete_requested,
namespace, process_id, start_time, timeout_seconds, info
FROM xcherry_sys_process_executions WHERE id=$1 `

Expand Down
18 changes: 9 additions & 9 deletions persistence/process/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type (

// for ProcessExecutionRowForUpdate
ProcessExecutionRowStateExecutionSequenceMaps *data_models.StateExecutionSequenceMapsJson
ProcessExecutionRowWaitToComplete bool
ProcessExecutionRowGracefulCompleteRequested bool
ProcessExecutionRowStatus data_models.ProcessExecutionStatus

TaskShardId int32
Expand All @@ -89,7 +89,7 @@ type (

// for ProcessExecutionRowForUpdate to update
ProcessExecutionRowNewStateExecutionSequenceMaps *data_models.StateExecutionSequenceMapsJson
ProcessExecutionRowNewWaitToComplete bool
ProcessExecutionRowNewGracefulCompleteRequested bool
ProcessExecutionRowNewStatus data_models.ProcessExecutionStatus
}
)
Expand All @@ -102,7 +102,7 @@ func (p sqlProcessStoreImpl) handleStateDecision(

// these fields will be updated and returned back in response for ProcessExecutionRowForUpdate
sequenceMaps := request.ProcessExecutionRowStateExecutionSequenceMaps
procExecWaitToComplete := request.ProcessExecutionRowWaitToComplete
procExecGracefulCompleteRequested := request.ProcessExecutionRowGracefulCompleteRequested
procExecStatus := request.ProcessExecutionRowStatus

if len(request.StateDecision.GetNextStates()) > 0 {
Expand Down Expand Up @@ -146,16 +146,16 @@ func (p sqlProcessStoreImpl) handleStateDecision(
// then gracefully complete the process regardless of the thread close type set in this state.
// Otherwise, handle the thread close type set in this state.

toGracefullyComplete := procExecWaitToComplete && len(sequenceMaps.PendingExecutionMap) == 0
shouldGracefulComplete := procExecGracefulCompleteRequested && len(sequenceMaps.PendingExecutionMap) == 0

toAbortRunningAsyncStates := false

threadDecision := request.StateDecision.GetThreadCloseDecision()
if !toGracefullyComplete && request.StateDecision.HasThreadCloseDecision() {
if !shouldGracefulComplete && request.StateDecision.HasThreadCloseDecision() {
switch threadDecision.GetCloseType() {
case xcapi.GRACEFUL_COMPLETE_PROCESS:
procExecWaitToComplete = true
toGracefullyComplete = len(sequenceMaps.PendingExecutionMap) == 0
procExecGracefulCompleteRequested = true
shouldGracefulComplete = len(sequenceMaps.PendingExecutionMap) == 0
case xcapi.FORCE_COMPLETE_PROCESS:
toAbortRunningAsyncStates = len(sequenceMaps.PendingExecutionMap) > 0

Expand All @@ -171,7 +171,7 @@ func (p sqlProcessStoreImpl) handleStateDecision(
}
}

if toGracefullyComplete {
if shouldGracefulComplete {
procExecStatus = data_models.ProcessExecutionStatusCompleted
}

Expand All @@ -188,7 +188,7 @@ func (p sqlProcessStoreImpl) handleStateDecision(
return &HandleStateDecisionResponse{
HasNewImmediateTask: hasNewImmediateTask,
ProcessExecutionRowNewStateExecutionSequenceMaps: sequenceMaps,
ProcessExecutionRowNewWaitToComplete: procExecWaitToComplete,
ProcessExecutionRowNewGracefulCompleteRequested: procExecGracefulCompleteRequested,
ProcessExecutionRowNewStatus: procExecStatus,
}, nil
}
4 changes: 2 additions & 2 deletions persistence/process/complete_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (p sqlProcessStoreImpl) doCompleteExecuteExecutionTx(
WorkerUrl: request.Prepare.Info.WorkerURL,

ProcessExecutionRowStateExecutionSequenceMaps: &sequenceMaps,
ProcessExecutionRowWaitToComplete: prcRow.WaitToComplete,
ProcessExecutionRowGracefulCompleteRequested: prcRow.GracefulCompleteRequested,
ProcessExecutionRowStatus: prcRow.Status,

TaskShardId: request.TaskShardId,
Expand All @@ -131,7 +131,7 @@ func (p sqlProcessStoreImpl) doCompleteExecuteExecutionTx(
hasNewImmediateTask = true
}

prcRow.WaitToComplete = resp.ProcessExecutionRowNewWaitToComplete
prcRow.GracefulCompleteRequested = resp.ProcessExecutionRowNewGracefulCompleteRequested
prcRow.Status = resp.ProcessExecutionRowNewStatus
prcRow.StateExecutionSequenceMaps, err = resp.ProcessExecutionRowNewStateExecutionSequenceMaps.ToBytes()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion persistence/process/start_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (p sqlProcessStoreImpl) applyTerminateIfRunningPolicy(
HistoryEventIdSequence: processExecutionRowForUpdate.HistoryEventIdSequence,
StateExecutionSequenceMaps: processExecutionRowForUpdate.StateExecutionSequenceMaps,
StateExecutionLocalQueues: processExecutionRowForUpdate.StateExecutionLocalQueues,
WaitToComplete: processExecutionRowForUpdate.WaitToComplete,
GracefulCompleteRequested: processExecutionRowForUpdate.GracefulCompleteRequested,
})
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions persistence/process/update_process_execution_for_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
WorkerUrl: request.WorkerUrl,

ProcessExecutionRowStateExecutionSequenceMaps: &sequenceMaps,
ProcessExecutionRowWaitToComplete: prcRow.WaitToComplete,
ProcessExecutionRowGracefulCompleteRequested: prcRow.GracefulCompleteRequested,

Check warning on line 86 in persistence/process/update_process_execution_for_rpc.go

View check run for this annotation

Codecov / codecov/patch

persistence/process/update_process_execution_for_rpc.go#L86

Added line #L86 was not covered by tests
ProcessExecutionRowStatus: prcRow.Status,

TaskShardId: request.TaskShardId,
Expand All @@ -95,7 +95,7 @@
hasNewImmediateTask = true
}

prcRow.WaitToComplete = resp.ProcessExecutionRowNewWaitToComplete
prcRow.GracefulCompleteRequested = resp.ProcessExecutionRowNewGracefulCompleteRequested

Check warning on line 98 in persistence/process/update_process_execution_for_rpc.go

View check run for this annotation

Codecov / codecov/patch

persistence/process/update_process_execution_for_rpc.go#L98

Added line #L98 was not covered by tests
prcRow.Status = resp.ProcessExecutionRowNewStatus
prcRow.StateExecutionSequenceMaps, err = resp.ProcessExecutionRowNewStateExecutionSequenceMaps.ToBytes()
if err != nil {
Expand Down
Loading