Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples;

import java.util.List;
import software.amazon.lambda.durable.ConcurrencyConfig;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.ParallelBranchConfig;
import software.amazon.lambda.durable.TypeToken;

/**
* Simple example demonstrating basic step execution with the Durable Execution SDK.
*
* <p>This handler processes a greeting request through three sequential steps:
*
* <ol>
* <li>Create greeting message
* <li>Transform to uppercase
* <li>Add punctuation
* </ol>
*/
public class MapExample extends DurableHandler<GreetingRequest, String> {

@Override
public String handleRequest(GreetingRequest input, DurableContext context) {
var squared = context.mapAsync(
"map example",
List.of(1, 2, 3),
(ctx, item, index) -> item * item,
TypeToken.get(Integer.class),
new ConcurrencyConfig(10, 2, 1));

var parallel = context.parallelAsync("parallel example", new ConcurrencyConfig(10, 2, 1));
var b1 = parallel.branch("branch1", TypeToken.get(String.class), ctx -> "hello", new ParallelBranchConfig());
var b2 = parallel.branch("branch2", TypeToken.get(String.class), ctx -> "world", new ParallelBranchConfig());

var result = parallel.get();
return b1.get() + " " + b2.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable;

public class BatchResult<T> extends ParallelResult {
// results/errors as well as the statistics
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable;

public record ConcurrencyConfig(int maxConcurrency, int minSuccessful, int toleratedFailureCount) {}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.HexFormat;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
Expand All @@ -21,8 +22,11 @@
import software.amazon.lambda.durable.operation.CallbackOperation;
import software.amazon.lambda.durable.operation.ChildContextOperation;
import software.amazon.lambda.durable.operation.InvokeOperation;
import software.amazon.lambda.durable.operation.MapOperation;
import software.amazon.lambda.durable.operation.ParallelOperation;
import software.amazon.lambda.durable.operation.StepOperation;
import software.amazon.lambda.durable.operation.WaitOperation;
import software.amazon.lambda.durable.serde.JacksonSerDes;
import software.amazon.lambda.durable.validation.ParameterValidator;

public class DurableContext extends BaseContext {
Expand Down Expand Up @@ -335,7 +339,7 @@ private <T> DurableFuture<T> runInChildContextAsync(
var operationId = nextOperationId();

var operation = new ChildContextOperation<>(
operationId, name, func, subType, typeToken, getDurableConfig().getSerDes(), this);
operationId, name, func, subType, typeToken, getDurableConfig().getSerDes(), this, null);

operation.execute();
return operation;
Expand Down Expand Up @@ -438,6 +442,28 @@ public <T> DurableFuture<T> waitForCallbackAsync(
OperationSubType.WAIT_FOR_CALLBACK);
}

// parallel operations
public DurableParallelFuture parallelAsync(String name, ConcurrencyConfig config) {
var operationId = nextOperationId();
var operation = new ParallelOperation(operationId, name, config, this);
operation.execute();
return operation;
}

// map operations
public <T, I> DurableFuture<BatchResult<T>> mapAsync(
String name,
List<I> collection,
MapFunction<I, T> func,
TypeToken<T> resultTypeToken,
ConcurrencyConfig config) {
var operationId = nextOperationId();
var operation = new MapOperation<>(
operationId, name, collection, func, resultTypeToken, new JacksonSerDes(), config, this);
operation.execute();
return operation;
}

// =============== accessors ================
/**
* Returns a logger with execution context information for replay-aware logging.
Expand Down Expand Up @@ -474,7 +500,7 @@ public void close() {
* prefixed with the parent hashed contextId (e.g. "<hash>-1", "<hash>-2" inside parent context <hash>). This
* matches the Python SDK's stepPrefix convention and prevents ID collisions in checkpoint batches.
*/
private String nextOperationId() {
public String nextOperationId() {
var counter = String.valueOf(operationCounter.incrementAndGet());
var rawId = getContextId() != null ? getContextId() + "-" + counter : counter;
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable;

import java.util.function.Function;

public interface DurableParallelFuture extends DurableFuture<ParallelResult> {
<T> DurableFuture<T> branch(
String name, TypeToken<T> resultType, Function<DurableContext, T> func, ParallelBranchConfig config);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable;

@FunctionalInterface
public interface MapFunction<I, O> {
O apply(DurableContext context, I item, int index);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable;

public class ParallelBranchConfig {
// SerDes and etc
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable;

/** Statistics of a parallel operation (succeeded, failed, etc.) */
public class ParallelResult {}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
public enum OperationSubType {
RUN_IN_CHILD_CONTEXT("RunInChildContext"),
MAP("Map"),
MAP_ITERATION("MapInteration"),
PARALLEL("Parallel"),
PARALLEL_BRANCH("ParallelBranch"),
WAIT_FOR_CALLBACK("WaitForCallback");

private final String value;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.operation;

import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import software.amazon.awssdk.services.lambda.model.OperationAction;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
import software.amazon.lambda.durable.ConcurrencyConfig;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.TypeToken;
import software.amazon.lambda.durable.model.OperationSubType;
import software.amazon.lambda.durable.serde.NoopSerDes;
import software.amazon.lambda.durable.serde.SerDes;

public abstract class BaseConcurrentOperation<R> extends BaseDurableOperation<R> {

private final ArrayList<ChildContextOperation<?>> branches;
private final Queue<ChildContextOperation<?>> queue;
private final DurableContext rootContext;
private final AtomicInteger succeeded;
private final AtomicInteger failed;
private final OperationSubType subType;
private final ConcurrencyConfig config;
private final AtomicInteger activeBranches;

public BaseConcurrentOperation(
String operationId,
String name,
OperationSubType subType,
ConcurrencyConfig config,
DurableContext durableContext) {
super(operationId, name, OperationType.CONTEXT, new TypeToken<>() {}, new NoopSerDes(), durableContext);
this.branches = new ArrayList<>();
this.queue = new ConcurrentLinkedQueue<>();
this.rootContext = durableContext.createChildContext(operationId, name);
this.config = config;
this.succeeded = new AtomicInteger(0);
this.failed = new AtomicInteger(0);
this.subType = subType;
this.activeBranches = new AtomicInteger(0);
}

protected <T> ChildContextOperation<T> branchInternal(
String name, TypeToken<T> resultType, SerDes resultSerDes, Function<DurableContext, T> func) {
var operationId = this.rootContext.nextOperationId();
ChildContextOperation<T> operation;

synchronized (this.branches) {
operation = new ChildContextOperation<>(
operationId,
name,
func,
OperationSubType.PARALLEL_BRANCH,
resultType,
resultSerDes,
rootContext,
this);
branches.add(operation);
queue.add(operation);
}

executeNewBranchIfConcurrencyAllows();

return operation;
}

private void executeNewBranchIfConcurrencyAllows() {
synchronized (this) {
// use one extra thread from user's thread pool to wait for the semaphore
if (activeBranches.get() < config.maxConcurrency()) {
if (!queue.isEmpty()) {
activeBranches.incrementAndGet();

var op = queue.poll();
op.execute();
}
}
}
}

@Override
public <T> void onChildContextComplete(ChildContextOperation<T> parallelBranchOperation) {
if (isOperationCompleted()) {
return;
}

activeBranches.decrementAndGet();

// handle branch results
try {
parallelBranchOperation.get();
succeeded.incrementAndGet();
} catch (Exception e) {
failed.incrementAndGet();
}

if (isDone()) {
sendOperationUpdateAsync(OperationUpdate.builder()
.action(OperationAction.SUCCEED)
.subType(OperationSubType.PARALLEL.getValue())
.payload(""));

rootContext.close();
} else {
// we must make sure the thread for the new branch is registered before the child thread is deregistered
executeNewBranchIfConcurrencyAllows();
}
}

private boolean isDone() {
return succeeded.get() >= config.minSuccessful() || failed.get() > config.toleratedFailureCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public abstract class BaseDurableOperation<T> implements DurableFuture<T> {
private final String name;
private final OperationType operationType;
private final ExecutionManager executionManager;
private final TypeToken<T> resultTypeToken;
private final SerDes resultSerDes;
protected final TypeToken<T> resultTypeToken;
protected final SerDes resultSerDes;
protected final CompletableFuture<Void> completionFuture;
private final DurableContext durableContext;

Expand Down Expand Up @@ -338,4 +338,9 @@ protected void validateReplay(Operation checkpointed) {
operationId, checkpointed.name(), getName())));
}
}

protected <U> void onChildContextComplete(ChildContextOperation<U> tChildContextOperation) {
// do nothing

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class ChildContextOperation<T> extends BaseDurableOperation<T> {

private final Function<DurableContext, T> function;
private final ExecutorService userExecutor;
private final BaseDurableOperation<?> parentOperation;
private boolean replayChildContext;
private T reconstructedResult;
private final OperationSubType subType;
Expand All @@ -53,11 +54,13 @@ public ChildContextOperation(
OperationSubType subType,
TypeToken<T> resultTypeToken,
SerDes resultSerDes,
DurableContext durableContext) {
DurableContext durableContext,
BaseDurableOperation<?> parentOperation) {
super(operationId, name, OperationType.CONTEXT, resultTypeToken, resultSerDes, durableContext);
this.function = function;
this.userExecutor = getContext().getDurableConfig().getExecutorService();
this.subType = subType;
this.parentOperation = parentOperation;
}

/** Starts the operation. */
Expand Down Expand Up @@ -118,6 +121,10 @@ private void executeChildContext() {
handleChildContextSuccess(result);
} catch (Throwable e) {
handleChildContextFailure(e);
} finally {
if (parentOperation != null) {
parentOperation.onChildContextComplete(this);
}
}
}
};
Expand All @@ -138,6 +145,9 @@ private void handleChildContextSuccess(T result) {
}

private void checkpointSuccess(T result) {
if (parentOperation != null && parentOperation.isOperationCompleted()) {
return; // Already completed by parent operation
}
var serialized = serializeResult(result);
var serializedBytes = serialized.getBytes(StandardCharsets.UTF_8);

Expand Down Expand Up @@ -169,6 +179,10 @@ private void handleChildContextFailure(Throwable exception) {
terminateExecution((UnrecoverableDurableExecutionException) exception);
}

if (parentOperation != null && parentOperation.isOperationCompleted()) {
return; // Already completed by parent operation
}

final ErrorObject errorObject;
if (exception instanceof DurableOperationException opEx) {
errorObject = opEx.getErrorObject();
Expand Down Expand Up @@ -210,6 +224,8 @@ public T get() {
case MAP -> throw new ChildContextFailedException(op);
case PARALLEL -> throw new ChildContextFailedException(op);
case RUN_IN_CHILD_CONTEXT -> throw new ChildContextFailedException(op);
case PARALLEL_BRANCH -> throw new ChildContextFailedException(op);
case MAP_ITERATION -> throw new ChildContextFailedException(op);
};
}
}
Expand Down
Loading
Loading