Skip to content

Commit

Permalink
Fail workflow task on unexpected exceptions (#199)
Browse files Browse the repository at this point in the history
* Fail workflow task on unexpected exceptions

* Code cleanup
  • Loading branch information
mfateev authored Aug 25, 2020
1 parent 2f7f9dd commit 27ba155
Show file tree
Hide file tree
Showing 16 changed files with 213 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

package io.temporal.internal.replay;

public final class NonDeterministicWorkflowError extends Error {
public final class InternalWorkflowTaskException extends RuntimeException {

public NonDeterministicWorkflowError(String message, Throwable cause) {
public InternalWorkflowTaskException(String message, Throwable cause) {
super(message, cause);
}

public InternalWorkflowTaskException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface ReplayWorkflow {
/** Handle an external signal event. */
void handleSignal(String signalName, Optional<Payloads> input, long eventId);

boolean eventLoop() throws Throwable;
boolean eventLoop();

/** @return null means no output yet */
Optional<Payloads> getOutput();
Expand Down Expand Up @@ -61,7 +61,5 @@ public interface ReplayWorkflow {
*/
WorkflowExecutionException mapUnexpectedException(Throwable failure);

WorkflowExecutionException mapError(Throwable failure);

WorkflowImplementationOptions getWorkflowImplementationOptions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ public void eventLoop() {
}
try {
completed = workflow.eventLoop();
} catch (Error e) {
throw e;
} catch (WorkflowExecutionException e) {
failure = e;
completed = true;
Expand All @@ -82,10 +80,6 @@ public void eventLoop() {
failure = workflow.mapUnexpectedException(e);
}
completed = true;
} catch (Throwable e) {
// can cast as Error is caught above.
failure = workflow.mapUnexpectedException(e);
completed = true;
}
if (completed) {
completeWorkflow();
Expand Down Expand Up @@ -147,15 +141,15 @@ public WorkflowImplementationOptions getWorkflowImplementationOptions() {
return workflow.getWorkflowImplementationOptions();
}

public WorkflowExecutionException mapError(Throwable e) {
return workflow.mapError(e);
}

public void close() {
workflow.close();
}

public void start(HistoryEvent startWorkflowEvent) {
workflow.start(startWorkflowEvent, context);
}

public WorkflowExecutionException mapUnexpectedException(Throwable exception) {
return workflow.mapUnexpectedException(exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package io.temporal.internal.replay;

import static io.temporal.internal.common.CheckedExceptionWrapper.wrap;
import static io.temporal.internal.common.ProtobufTimeUtils.toJavaDuration;
import static io.temporal.worker.WorkflowErrorPolicy.FailWorkflow;

import com.google.common.base.Throwables;
import com.google.protobuf.util.Durations;
Expand Down Expand Up @@ -184,16 +184,23 @@ private void handleWorkflowTaskImpl(PollWorkflowTaskQueueResponseOrBuilder workf
}
}
} catch (Throwable e) {
// Fail workflow if exception is of the specified type
WorkflowImplementationOptions implementationOptions =
this.replayWorkflowExecutor.getWorkflowImplementationOptions();
if (implementationOptions.getWorkflowErrorPolicy() == FailWorkflow) {
// fail workflow
throw replayWorkflowExecutor.mapError(e);
} else {
metricsScope.counter(MetricsType.WORKFLOW_TASK_NO_COMPLETION_COUNTER).inc(1);
// fail workflow task, not a workflow
throw e;
Class<? extends Throwable>[] failTypes =
implementationOptions.getFailWorkflowExceptionTypes();
for (Class<? extends Throwable> failType : failTypes) {
if (failType.isAssignableFrom(e.getClass())) {
// Wrap any failure into InternalWorkflowTaskException to support specifying them
// in the implementation options.
if (!(e instanceof InternalWorkflowTaskException)) {
e = new InternalWorkflowTaskException(e);
}
throw replayWorkflowExecutor.mapUnexpectedException(e);
}
}
metricsScope.counter(MetricsType.WORKFLOW_TASK_NO_COMPLETION_COUNTER).inc(1);
throw wrap(e);
} finally {
if (!timerStopped) {
sw.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ public WorkflowTaskHandler.Result handleWorkflowTask(PollWorkflowTaskQueueRespon
private Result failureToResult(PollWorkflowTaskQueueResponse workflowTask, Throwable e)
throws Exception {
String workflowType = workflowTask.getWorkflowType().getName();
// Fail workflow and not a task as WorkflowExecutionException is thrown only if FailWorkflow
// policy was set.
if (e instanceof WorkflowExecutionException) {
RespondWorkflowTaskCompletedRequest response =
RespondWorkflowTaskCompletedRequest.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.temporal.internal.statemachines;

import static io.temporal.internal.common.CheckedExceptionWrapper.unwrap;
import static io.temporal.internal.common.WorkflowExecutionUtils.getEventTypeForCommand;
import static io.temporal.internal.common.WorkflowExecutionUtils.isCommandEvent;
import static io.temporal.internal.statemachines.LocalActivityStateMachine.LOCAL_ACTIVITY_MARKER_NAME;
Expand Down Expand Up @@ -50,7 +51,7 @@
import io.temporal.failure.CanceledFailure;
import io.temporal.internal.replay.ExecuteActivityParameters;
import io.temporal.internal.replay.ExecuteLocalActivityParameters;
import io.temporal.internal.replay.NonDeterministicWorkflowError;
import io.temporal.internal.replay.InternalWorkflowTaskException;
import io.temporal.internal.replay.StartChildWorkflowExecutionParameters;
import io.temporal.internal.worker.ActivityTaskHandler;
import io.temporal.workflow.ChildWorkflowCancellationType;
Expand Down Expand Up @@ -176,7 +177,7 @@ public final void handleEvent(HistoryEvent event, boolean hasNextEvent) {
try {
handleEventImpl(event, hasNextEvent);
} catch (RuntimeException e) {
throw new NonDeterministicWorkflowError(
throw new InternalWorkflowTaskException(
"Failure handling event "
+ event.getEventId()
+ " of '"
Expand All @@ -189,7 +190,7 @@ public final void handleEvent(HistoryEvent event, boolean hasNextEvent) {
+ this.workflowTaskStartedEventId
+ ", Currently Processing StartedEventId="
+ this.currentStartedEventId,
e);
unwrap(e));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ static DeterministicRunner newRunner(
*
* @throws Throwable if one of the threads didn't handle an exception.
*/
void runUntilAllBlocked() throws Throwable;
void runUntilAllBlocked();

/** IsDone returns true when all of threads are completed */
boolean isDone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ SyncWorkflowContext getWorkflowContext() {
}

@Override
public void runUntilAllBlocked() throws Throwable {
public void runUntilAllBlocked() {
if (rootWorkflowThread == null) {
// TODO: workflow instance specific thread name
rootWorkflowThread =
Expand Down Expand Up @@ -291,7 +291,7 @@ public void runUntilAllBlocked() throws Throwable {
}
if (unhandledException != null) {
close();
throw unhandledException;
throw WorkflowInternal.wrap(unhandledException);
}
for (WorkflowThread c : threadsToAdd) {
threads.add(c);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

package io.temporal.internal.sync;

import static io.temporal.worker.WorkflowErrorPolicy.FailWorkflow;
import static io.temporal.internal.common.CheckedExceptionWrapper.wrap;
import static io.temporal.internal.sync.WorkflowInternal.unwrap;

import com.google.common.base.Preconditions;
import io.temporal.api.common.v1.Payloads;
Expand Down Expand Up @@ -102,7 +103,9 @@ void addWorkflowImplementationTypes(

<R> void addWorkflowImplementationFactory(Class<R> clazz, Functions.Func<R> factory) {
WorkflowImplementationOptions unitTestingOptions =
WorkflowImplementationOptions.newBuilder().setWorkflowErrorPolicy(FailWorkflow).build();
WorkflowImplementationOptions.newBuilder()
.setFailWorkflowExceptionTypes(Throwable.class)
.build();
addWorkflowImplementationFactory(unitTestingOptions, clazz, factory);
}

Expand Down Expand Up @@ -287,29 +290,47 @@ public Object execute(Object[] arguments) {
} catch (IllegalAccessException e) {
throw new Error(mapToWorkflowExecutionException(e, dataConverter));
} catch (InvocationTargetException e) {
Throwable targetException = e.getTargetException();
if (targetException instanceof Error) {
throw (Error) targetException;
Throwable target = e.getTargetException();
if (target instanceof DestroyWorkflowThreadError) {
throw (DestroyWorkflowThreadError) target;
}
if (log.isErrorEnabled()) {
boolean cancelRequested =
WorkflowInternal.getRootWorkflowContext().getContext().isCancelRequested();
if (!cancelRequested || !FailureConverter.isCanceledCause(targetException)) {
log.error(
"Workflow execution failure "
+ "WorkflowId="
+ info.getWorkflowId()
+ ", RunId="
+ info.getRunId()
+ ", WorkflowType="
+ info.getWorkflowType(),
targetException);
Throwable exception = unwrap(target);

WorkflowImplementationOptions options = implementationOptions.get(info.getWorkflowType());
Class<? extends Throwable>[] failTypes = options.getFailWorkflowExceptionTypes();
if (exception instanceof TemporalFailure) {
logWorkflowExecutionException(info, exception);
throw mapToWorkflowExecutionException(exception, dataConverter);
}
for (Class<? extends Throwable> failType : failTypes) {
if (failType.isAssignableFrom(exception.getClass())) {
// fail workflow
if (log.isErrorEnabled()) {
boolean cancelRequested =
WorkflowInternal.getRootWorkflowContext().getContext().isCancelRequested();
if (!cancelRequested || !FailureConverter.isCanceledCause(exception)) {
logWorkflowExecutionException(info, exception);
}
}
throw mapToWorkflowExecutionException(exception, dataConverter);
}
}
throw mapToWorkflowExecutionException(targetException, dataConverter);
throw wrap(exception);
}
}

private void logWorkflowExecutionException(WorkflowInfo info, Throwable exception) {
log.error(
"Workflow execution failure "
+ "WorkflowId="
+ info.getWorkflowId()
+ ", RunId="
+ info.getRunId()
+ ", WorkflowType="
+ info.getWorkflowType(),
exception);
}

@Override
public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
WorkflowInternal.getRootWorkflowContext().setHeadInterceptor(outboundCalls);
Expand Down Expand Up @@ -365,11 +386,6 @@ static WorkflowExecutionException mapToWorkflowExecutionException(
return new WorkflowExecutionException(failure);
}

static WorkflowExecutionException mapError(Throwable error) {
Failure failure = FailureConverter.exceptionToFailureNoUnwrapping(error);
return new WorkflowExecutionException(failure);
}

@Override
public String toString() {
return "POJOWorkflowImplementationFactory{"
Expand Down
7 changes: 1 addition & 6 deletions src/main/java/io/temporal/internal/sync/SyncWorkflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void handleSignal(String signalName, Optional<Payloads> input, long event
}

@Override
public boolean eventLoop() throws Throwable {
public boolean eventLoop() {
if (runner == null) {
return false;
}
Expand Down Expand Up @@ -173,9 +173,4 @@ public WorkflowExecutionException mapUnexpectedException(Throwable failure) {
return POJOWorkflowImplementationFactory.mapToWorkflowExecutionException(
failure, dataConverter);
}

@Override
public WorkflowExecutionException mapError(Throwable failure) {
return POJOWorkflowImplementationFactory.mapError(failure);
}
}
45 changes: 0 additions & 45 deletions src/main/java/io/temporal/worker/WorkflowErrorPolicy.java

This file was deleted.

Loading

0 comments on commit 27ba155

Please sign in to comment.