Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(TCOMP-2710): improve workflow and exception throwing/handling #902

Merged
merged 9 commits into from
Aug 19, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.talend.sdk.component.api.exception;

import javax.json.bind.annotation.JsonbCreator;
import javax.json.bind.annotation.JsonbPropertyOrder;

import lombok.Data;

Expand All @@ -28,13 +29,34 @@
* See me TCOMP-2342 for more details.
*/
@Data
@JsonbPropertyOrder({ "localizedMessage", "message", "stackTrace", "suppressed", "possibleHandleErrorWith" })
public class DiscoverSchemaException extends RuntimeException {

public enum HandleErrorWith {
/**
* default case
*/
EXCEPTION,
/**
* unhandled.
*/
SILENT,
/**
* unhandled.
*/
RETRY,
EXECUTE_MOCK_JOB;
/**
* Potentially execute a mock job in studio.
* Will ask user's validation before executing.
* When specifying this option, developer should be sure that no side effect can be generated by connector.
*/
EXECUTE_MOCK_JOB,
/**
* Will execute a lifecycle through framework using only configuration.
* This won't query for any user input.
* When specifying this option, developer should be sure that no side effect can be generated by connector.
*/
EXECUTE_LIFECYCLE;
}

private HandleErrorWith possibleHandleErrorWith = HandleErrorWith.EXCEPTION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.talend.sdk.component.api.exception.DiscoverSchemaException.HandleErrorWith.EXCEPTION;
import static org.talend.sdk.component.api.exception.DiscoverSchemaException.HandleErrorWith.EXECUTE_LIFECYCLE;
import static org.talend.sdk.component.api.record.SchemaProperty.IS_KEY;
import static org.talend.sdk.component.api.record.SchemaProperty.PATTERN;
import static org.talend.sdk.component.api.record.SchemaProperty.SCALE;
Expand Down Expand Up @@ -159,17 +161,21 @@ private void initClass2JavaTypeMap() {
}
}

private DiscoverSchemaException handleException(final Exception e) throws Exception {
private DiscoverSchemaException transformException(final Exception e) {
DiscoverSchemaException discoverSchemaException;
log.error(ERROR_THROUGH_ACTION, e);
if (e instanceof DiscoverSchemaException) {
discoverSchemaException = DiscoverSchemaException.class.cast(e);
} else if (e instanceof ComponentException) {
discoverSchemaException = new DiscoverSchemaException((ComponentException) e);
} else {
discoverSchemaException = new DiscoverSchemaException(e.getMessage(), e.getStackTrace(),
DiscoverSchemaException.HandleErrorWith.EXCEPTION);
discoverSchemaException = new DiscoverSchemaException(e.getMessage(), e.getStackTrace(), EXCEPTION);
}
return discoverSchemaException;
}

private DiscoverSchemaException handleException(final Exception e) throws Exception {
log.error(ERROR_THROUGH_ACTION, e);
final DiscoverSchemaException discoverSchemaException = transformException(e);
try (final Jsonb jsonb = JsonbBuilder.create()) {
jsonb.toJson(discoverSchemaException, out);
}
Expand All @@ -188,22 +194,24 @@ public void guessInputComponentSchema(final Schema schema) throws Exception {
} catch (Exception e) {
throw handleException(e);
}
throw new Exception(ERROR_NO_AVAILABLE_SCHEMA_FOUND);
throw handleException(new Exception(ERROR_NO_AVAILABLE_SCHEMA_FOUND));
}

public void guessComponentSchema(final Schema incomingSchema, final String outgoingBranch,
final Boolean isStartOfJob) throws Exception {
final boolean isStartOfJob) throws Exception {
try {
executeDiscoverSchemaExtendedAction(incomingSchema, outgoingBranch);
return;
} catch (Exception e) {
// Case when a processor is the start of a studio job
if (isStartOfJob) {
guessOutputComponentSchemaThroughResult();
return;
final DiscoverSchemaException dse = transformException(e);
// When a processor is the start of a studio job and dev explicitly set the handleError to Lifecycle exec
if (isStartOfJob && EXECUTE_LIFECYCLE == dse.getPossibleHandleErrorWith()) {
try {
guessOutputComponentSchemaThroughResult();
} catch (Exception er) {
throw handleException(e);
}
} else {
log.error(ERROR_INSTANCE_SCHEMA, e);
throw e;
throw handleException(e);
}
}
}
Expand All @@ -213,33 +221,34 @@ public void guessComponentSchema(final Schema incomingSchema, final String outgo
}

private void executeDiscoverSchemaExtendedAction(final Schema schema, final String branch) throws Exception {
try {
final Collection<ServiceMeta> services = getPluginServices();
ServiceMeta.ActionMeta actionRef = services
final Collection<ServiceMeta> services = getPluginServices();
ServiceMeta.ActionMeta actionRef = services
.stream()
.flatMap(s -> s.getActions().stream())
.filter(a -> a.getFamily().equals(family) &&
a.getType().equals(SCHEMA_EXTENDED_TYPE) &&
componentName.equals(a.getAction()))
.findFirst()
.orElse(null);
// did not find action named like componentName, trying to find one matching action...
if (actionRef == null) {
actionRef = services
.stream()
.flatMap(s -> s.getActions().stream())
.filter(a -> a.getFamily().equals(family) &&
a.getType().equals(SCHEMA_EXTENDED_TYPE) &&
componentName.equals(a.getAction()))
.filter(a -> a.getFamily().equals(family) && a.getType().equals(SCHEMA_EXTENDED_TYPE))
.findFirst()
.orElse(null);
// did not find action named like componentName, trying to find one matching action...
if (actionRef == null) {
actionRef = services
.stream()
.flatMap(s -> s.getActions().stream())
.filter(a -> a.getFamily().equals(family) && a.getType().equals(SCHEMA_EXTENDED_TYPE))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(
"No action " + family + "#" + SCHEMA_EXTENDED_TYPE));
}
final Object schemaResult =
actionRef.getInvoker().apply(buildActionConfig(actionRef, configuration, schema, branch));
if (schemaResult instanceof Schema && fromSchema(Schema.class.cast(schemaResult))) {
return;
.orElseThrow(() -> new IllegalArgumentException(
"No action " + family + "#" + SCHEMA_EXTENDED_TYPE));
}
final Object schemaResult =
actionRef.getInvoker().apply(buildActionConfig(actionRef, configuration, schema, branch));
if (schemaResult instanceof Schema) {
final Schema result = (Schema) schemaResult;
if (result.getEntries().isEmpty()) {
throw new DiscoverSchemaException(ERROR_NO_AVAILABLE_SCHEMA_FOUND, EXCEPTION);
} else {
fromSchema(Schema.class.cast(schemaResult));
}
} catch (Exception e) {
throw handleException(e);
}
}

Expand Down Expand Up @@ -449,12 +458,11 @@ public boolean guessSchemaThroughAction(final Schema schema) {
}

private Collection<ServiceMeta> getPluginServices() {
final Collection<ServiceMeta> services = componentManager
return componentManager
.findPlugin(plugin)
.orElseThrow(() -> new IllegalArgumentException(NO_COMPONENT + plugin))
.get(ContainerComponentRegistry.class)
.getServices();
return services;
}

private boolean fromSchema(final Schema schema) {
Expand Down Expand Up @@ -601,16 +609,14 @@ private void guessOutputComponentSchemaThroughResult() throws Exception {
fromOutputEmitterPojo(processorComponent, "FLOW");
return;
}
if (row != null && guessSchemaThroughResult(row)) {
return;
if (row != null) {
guessSchemaThroughResult(row);
}
} finally {
if (processor != null) {
try {
processor.stop();
} catch (RuntimeException re) {
// nop
}
try {
processor.stop();
} catch (RuntimeException re) {
// nop
}
}
}
Expand Down
Loading