Parallel¶
Concurrent branches¶
Parallel executes multiple operations concurrently. It manages concurrency, collects results as branches complete, and checkpoints the outcome.
Each branch runs in its own child context and checkpoints its result independently as it completes.
Use parallel to execute independent tasks concurrently. Use map instead to execute the same operation concurrently for each item in a collection.
import {
BatchResult,
DurableContext,
withDurableExecution,
} from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext): Promise<string[]> => {
const result: BatchResult<string> = await context.parallel("check-services", [
async (ctx) => ctx.step("check-inventory", async () => "inventory ok"),
async (ctx) => ctx.step("check-payment", async () => "payment ok"),
async (ctx) => ctx.step("check-shipping", async () => "shipping ok"),
]);
return result.getResults();
},
);
from aws_durable_execution_sdk_python import (
BatchResult,
DurableContext,
durable_execution,
)
def check_inventory(ctx: DurableContext) -> str:
return ctx.step(lambda _: "inventory ok", name="check-inventory")
def check_payment(ctx: DurableContext) -> str:
return ctx.step(lambda _: "payment ok", name="check-payment")
def check_shipping(ctx: DurableContext) -> str:
return ctx.step(lambda _: "shipping ok", name="check-shipping")
@durable_execution
def handler(event: dict, context: DurableContext) -> list[str]:
result: BatchResult[str] = context.parallel(
[check_inventory, check_payment, check_shipping],
name="check-services",
)
return result.to_dict()
import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.model.ParallelResult;
public class SimpleParallel extends DurableHandler<Void, ParallelResult> {
@Override
public ParallelResult handleRequest(Void input, DurableContext context) {
try (var parallel = context.parallel("check-services")) {
parallel.branch("check-inventory", String.class,
ctx -> ctx.step("inventory", String.class, s -> "inventory ok"));
parallel.branch("check-payment", String.class,
ctx -> ctx.step("payment", String.class, s -> "payment ok"));
parallel.branch("check-shipping", String.class,
ctx -> ctx.step("shipping", String.class, s -> "shipping ok"));
return parallel.get();
}
}
}
Method signature¶
context.parallel¶
// Named overload
parallel<TOutput>(
name: string | undefined,
branches: (ParallelFunc<TOutput> | NamedParallelBranch<TOutput>)[],
config?: ParallelConfig<TOutput>,
): DurablePromise<BatchResult<TOutput>>
// Unnamed overload
parallel<TOutput>(
branches: (ParallelFunc<TOutput> | NamedParallelBranch<TOutput>)[],
config?: ParallelConfig<TOutput>,
): DurablePromise<BatchResult<TOutput>>
Parameters:
name(optional) A name for the parallel operation. Passundefinedto omit.branchesAn array ofParallelFuncorNamedParallelBranchobjects.config(optional) AParallelConfig<TOutput>object.
Returns: DurablePromise<BatchResult<TOutput>>. Use await to get the result.
Throws: Branch exceptions are captured in the BatchResult. Call throwIfError()
to re-throw the first failure.
ParallelFunc / NamedParallelBranch
Each element in branches is either a plain function or a named branch object:
type ParallelFunc<TResult> = (context: DurableContext) => Promise<TResult>
interface NamedParallelBranch<TResult> {
name?: string;
func: ParallelFunc<TResult>;
}
name(optional) A name for this branch.funcAn async function receiving aDurableContextand returningPromise<TResult>.
Use NamedParallelBranch to give an inline lambda a name without defining a named
function.
def parallel(
self,
functions: Sequence[Callable[[DurableContext], T]],
name: str | None = None,
config: ParallelConfig | None = None,
) -> BatchResult[T]: ...
Parameters:
functionsA sequence of callables, each receiving aDurableContextand returningT.name(optional) A name for the parallel operation.config(optional) AParallelConfigobject.
Returns: BatchResult[T].
Raises: Branch exceptions are captured in the BatchResult. Call throw_if_error()
to re-raise the first failure.
Each element in functions is a plain callable (ctx: DurableContext) -> T. Python has
no named-branch wrapper type.
ParallelDurableFuture parallel(String name)
ParallelDurableFuture parallel(String name, ParallelConfig config)
Parameters:
name(required) A name for the parallel operation.config(optional) AParallelConfigobject.
Returns: ParallelResult from get().
Throws: Branch exceptions are captured in ParallelResult. Inspect succeeded and
failed counts to detect failures.
ParallelDurableFuture
parallel() returns a ParallelDurableFuture. Call branch() to register and
immediately start each branch, then call get() to block until all complete.
interface ParallelDurableFuture extends AutoCloseable, DurableFuture<ParallelResult> {
<T> DurableFuture<T> branch(String name, Class<T> resultType,
Function<DurableContext, T> func);
<T> DurableFuture<T> branch(String name, TypeToken<T> resultType,
Function<DurableContext, T> func);
<T> DurableFuture<T> branch(String name, Class<T> resultType,
Function<DurableContext, T> func,
ParallelBranchConfig config);
ParallelResult get(); // blocks until all branches complete
void close(); // calls get() if not already called
}
Parameters for branch():
name(required) A name for this branch.resultTypeClass<T>orTypeToken<T>for deserialization.funcFunction<DurableContext, T>to execute in the branch's child context.config(optional)ParallelBranchConfigfor per-branch serialization.
Returns from branch(): DurableFuture<T>. DurableFuture<T> is the common return
type for all async Java SDK operations. Call .get() on it after parallel.get()
returns to retrieve that branch's individual result.
Tip
Use try-with-resources to guarantee get() is called even if you throw an exception
before reaching it explicitly.
ParallelBranchConfig
ParallelBranchConfig sets a custom SerDes for a single branch, overriding the
handler-level default set on DurableConfig.
ParallelConfig¶
interface ParallelConfig<TResult> {
maxConcurrency?: number;
completionConfig?: CompletionConfig;
serdes?: Serdes<BatchResult<TResult>>;
itemSerdes?: Serdes<TResult>;
nesting?: NestingType;
}
Parameters:
maxConcurrency(optional) Maximum branches running at once. Default: unlimited.completionConfig(optional) When to stop. Default: wait for all branches.serdes(optional) CustomSerdesfor theBatchResult.itemSerdes(optional) CustomSerdesfor individual branch results.nesting(optional)NestingType.NESTED(default) orNestingType.FLAT.FLATreduces operation overhead by ~30% at the cost of lower observability.
@dataclass(frozen=True)
class ParallelConfig:
max_concurrency: int | None = None
completion_config: CompletionConfig = CompletionConfig.all_successful()
serdes: SerDes | None = None
item_serdes: SerDes | None = None
summary_generator: SummaryGenerator | None = None
Parameters:
max_concurrency(optional) Maximum branches running at once. Default: unlimited.completion_config(optional) When to stop. Default:CompletionConfig.all_successful().serdes(optional) CustomSerDesfor theBatchResult.item_serdes(optional) CustomSerDesfor individual branch results.summary_generator(optional) A callable invoked when the serializedBatchResultexceeds 256KB. See Checkpointing.
CompletionConfig¶
See Completion strategies for how CompletionConfig affects
execution and the completion status of the result.
Result types¶
interface BatchResult<TResult> {
all: BatchItem<TResult>[];
status: BatchItemStatus.SUCCEEDED | BatchItemStatus.FAILED;
completionReason: "ALL_COMPLETED" | "MIN_SUCCESSFUL_REACHED" | "FAILURE_TOLERANCE_EXCEEDED";
hasFailure: boolean;
successCount: number;
failureCount: number;
startedCount: number;
totalCount: number;
getResults(): TResult[];
getErrors(): ChildContextError[];
succeeded(): BatchItem<TResult>[];
failed(): BatchItem<TResult>[];
started(): BatchItem<TResult>[];
throwIfError(): void;
}
allallBatchItementries, one per branch, in input order. Iterate withitem.indexfor branch-indexed access when some branches fail.getResults()results of succeeded branches, preserving input ordergetErrors()ChildContextError[]for failed branchessucceeded()/failed()/started()BatchItem[]filtered by statussuccessCount/failureCount/startedCount/totalCountbranch countsstatusSUCCEEDEDif no failures,FAILEDotherwisecompletionReasonwhy the operation completed. See Completion strategies.hasFailuretrueif any branch failedthrowIfError()throws the first branch error, if any
interface BatchItem<TResult> {
index: number;
status: BatchItemStatus;
result?: TResult;
error?: ChildContextError;
}
enum BatchItemStatus {
SUCCEEDED = "SUCCEEDED",
FAILED = "FAILED",
STARTED = "STARTED",
}
indexposition of this branch in the input arraystatusSUCCEEDED,FAILED, orSTARTED(not yet complete)resultthe branch return value, present whenstatusisSUCCEEDEDerrorthe captured error, present whenstatusisFAILED
@dataclass(frozen=True)
class BatchResult(Generic[R]):
all: list[BatchItem[R]]
completion_reason: CompletionReason
def get_results(self) -> list[R]: ...
def get_errors(self) -> list[ErrorObject]: ...
def succeeded(self) -> list[BatchItem[R]]: ...
def failed(self) -> list[BatchItem[R]]: ...
def started(self) -> list[BatchItem[R]]: ...
def throw_if_error(self) -> None: ...
def to_dict(self) -> dict: ...
@property
def status(self) -> BatchItemStatus: ...
@property
def has_failure(self) -> bool: ...
@property
def success_count(self) -> int: ...
@property
def failure_count(self) -> int: ...
@property
def started_count(self) -> int: ...
@property
def total_count(self) -> int: ...
class CompletionReason(Enum):
ALL_COMPLETED = "ALL_COMPLETED"
MIN_SUCCESSFUL_REACHED = "MIN_SUCCESSFUL_REACHED"
FAILURE_TOLERANCE_EXCEEDED = "FAILURE_TOLERANCE_EXCEEDED"
allallBatchItementries, one per branch, in input order. Iterate withitem.indexfor branch-indexed access when some branches fail.get_results()results of succeeded branches, preserving input orderget_errors()list[ErrorObject]for failed branchessucceeded()/failed()/started()BatchItemlists filtered by statussuccess_count/failure_count/started_count/total_countbranch countsstatusBatchItemStatus.SUCCEEDEDif no failures,FAILEDotherwisecompletion_reasonwhy the operation completed. See Completion strategies.has_failureTrueif any branch failedthrow_if_error()raises the first branch error as aCallableRuntimeErrorto_dict()serializes to a plain dict. Serializability depends onR.
@dataclass(frozen=True)
class BatchItem(Generic[R]):
index: int
status: BatchItemStatus
result: R | None = None
error: ErrorObject | None = None
def to_dict(self) -> dict: ...
class BatchItemStatus(Enum):
SUCCEEDED = "SUCCEEDED"
FAILED = "FAILED"
STARTED = "STARTED"
indexposition of this branch in the input sequencestatusSUCCEEDED,FAILED, orSTARTED(not yet complete)resultthe branch return value, present whenstatusisSUCCEEDEDerrorErrorObjectwith the captured error, present whenstatusisFAILEDto_dict()serializes to a plain dict. Serializability ofresultdepends onR.
record ParallelResult(
int size,
int succeeded,
int failed,
ConcurrencyCompletionStatus completionStatus
) {}
enum ConcurrencyCompletionStatus {
ALL_COMPLETED,
MIN_SUCCESSFUL_REACHED,
FAILURE_TOLERANCE_EXCEEDED
}
sizetotal number of registered branchessucceedednumber of branches that succeededfailednumber of branches that failedcompletionStatuswhy the operation completed. See Completion strategies.
ConcurrencyCompletionStatus.isSucceeded() returns true for both ALL_COMPLETED and
MIN_SUCCESSFUL_REACHED. To check if any branch failed, use result.failed() > 0
(where result is a ParallelResult).
ParallelResult contains only aggregate counts. To get individual branch results, hold
the DurableFuture<T> returned by each branch() call and call .get() on it after
parallel.get() returns. Results are available in the order branches were registered.
Branch functions¶
Each branch receives a DurableContext and can use any durable operation such as steps,
waits, child contexts, or nested parallel operations. Branches run in
child contexts, so they do not share state with each other or with
the parent context.
A branch is a ParallelFunc (plain async function) or a NamedParallelBranch (object
with name and func). Use NamedParallelBranch to give an inline lambda a name
without defining a named function.
import {
BatchResult,
DurableContext,
withDurableExecution,
} from "@aws/durable-execution-sdk-js";
// Plain function branch
async function taskA(ctx: DurableContext): Promise<string> {
return ctx.step("run-a", async () => "a done");
}
export const handler = withDurableExecution(
async (event: any, context: DurableContext): Promise<string[]> => {
const result: BatchResult<string> = await context.parallel("process", [
// ParallelFunc: plain async function
taskA,
// NamedParallelBranch: object with name and func
{ name: "task-b", func: async (ctx) => ctx.step("run-b", async () => "b done") },
]);
return result.getResults();
},
);
Branch functions are synchronous callables that receive a DurableContext and return
T.
from aws_durable_execution_sdk_python import (
BatchResult,
DurableContext,
durable_execution,
)
def task_a(ctx: DurableContext) -> str:
return ctx.step(lambda _: "a done", name="run-a")
def task_b(ctx: DurableContext) -> str:
return ctx.step(lambda _: "b done", name="run-b")
@durable_execution
def handler(event: dict, context: DurableContext) -> list[str]:
result: BatchResult[str] = context.parallel(
[task_a, task_b],
name="process",
)
return result.to_dict()
Each branch is registered via ParallelDurableFuture.branch(). The branch function is a
synchronous Function<DurableContext, T>.
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.model.ParallelResult;
public class NamedBranches extends DurableHandler<Void, ParallelResult> {
@Override
public ParallelResult handleRequest(Void input, DurableContext context) {
try (var parallel = context.parallel("process")) {
parallel.branch("task-a", String.class,
ctx -> ctx.step("run-a", String.class, s -> "a done"));
parallel.branch("task-b", String.class,
ctx -> ctx.step("run-b", String.class, s -> "b done"));
return parallel.get();
}
}
}
Pass arguments to branches¶
Capture arguments in a closure:
import {
BatchResult,
DurableContext,
withDurableExecution,
} from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext): Promise<string[]> => {
const items = ["a", "b", "c"];
const result: BatchResult<string> = await context.parallel(
"process-items",
items.map((item) => async (ctx: DurableContext) =>
ctx.step(`process-${item}`, async () => `processed ${item}`),
),
);
return result.getResults();
},
);
Use a factory function to bind arguments. Avoid using loop variables directly in lambdas, as Python closures capture by reference.
from aws_durable_execution_sdk_python import (
BatchResult,
DurableContext,
durable_execution,
)
def make_branch(item: str):
def branch(ctx: DurableContext) -> str:
return ctx.step(lambda _: f"processed {item}", name=f"process-{item}")
return branch
@durable_execution
def handler(event: dict, context: DurableContext) -> list[str]:
items = ["a", "b", "c"]
result: BatchResult[str] = context.parallel(
[make_branch(item) for item in items],
name="process-items",
)
return result.to_dict()
Capture arguments in a lambda. Java lambdas require effectively final variables.
import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.model.ParallelResult;
public class PassArguments extends DurableHandler<Void, ParallelResult> {
@Override
public ParallelResult handleRequest(Void input, DurableContext context) {
var items = List.of("a", "b", "c");
try (var parallel = context.parallel("process-items")) {
for (var item : items) {
parallel.branch("process-" + item, String.class,
ctx -> ctx.step("run-" + item, String.class, s -> "processed " + item));
}
return parallel.get();
}
}
}
Naming parallel operations¶
Name your parallel operations to make them easier to identify in logs and tests.
The name is the first argument. Pass undefined to omit it.
Pass name as a keyword argument. Omit it or pass None to leave it unnamed.
The name is always required. Each branch() call also requires a name. Pass null to
omit it.
Configuration¶
Configure parallel behavior using ParallelConfig:
import {
BatchResult,
DurableContext,
withDurableExecution,
} from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext): Promise<string | undefined> => {
const result: BatchResult<string> = await context.parallel(
"fetch-data",
[
async (ctx) => ctx.step("primary", async () => "primary result"),
async (ctx) => ctx.step("secondary", async () => "secondary result"),
async (ctx) => ctx.step("cache", async () => "cache result"),
],
{
maxConcurrency: 2,
completionConfig: { minSuccessful: 1 },
},
);
return result.getResults()[0];
},
);
from aws_durable_execution_sdk_python import (
BatchResult,
DurableContext,
durable_execution,
)
from aws_durable_execution_sdk_python.config import CompletionConfig, ParallelConfig
def try_primary(ctx: DurableContext) -> str:
return ctx.step(lambda _: "primary result", name="primary")
def try_secondary(ctx: DurableContext) -> str:
return ctx.step(lambda _: "secondary result", name="secondary")
def try_cache(ctx: DurableContext) -> str:
return ctx.step(lambda _: "cache result", name="cache")
@durable_execution
def handler(event: dict, context: DurableContext) -> str | None:
config = ParallelConfig(
max_concurrency=2,
completion_config=CompletionConfig.first_successful(),
)
result: BatchResult[str] = context.parallel(
[try_primary, try_secondary, try_cache],
name="fetch-data",
config=config,
)
results = result.get_results()
return results[0] if results else None
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.config.CompletionConfig;
import software.amazon.lambda.durable.config.ParallelConfig;
import software.amazon.lambda.durable.model.ParallelResult;
public class ParallelConfigExample extends DurableHandler<Void, ParallelResult> {
@Override
public ParallelResult handleRequest(Void input, DurableContext context) {
var config = ParallelConfig.builder()
.maxConcurrency(2)
.completionConfig(CompletionConfig.firstSuccessful())
.build();
try (var parallel = context.parallel("fetch-data", config)) {
parallel.branch("primary", String.class,
ctx -> ctx.step("primary", String.class, s -> "primary result"));
parallel.branch("secondary", String.class,
ctx -> ctx.step("secondary", String.class, s -> "secondary result"));
parallel.branch("cache", String.class,
ctx -> ctx.step("cache", String.class, s -> "cache result"));
return parallel.get();
}
}
}
Completion strategies¶
CompletionConfig controls when the parallel operation completes. When the operation
reaches the completion criteria, it will abandon branches that have not completed yet.
The abandoned branches will keep running in the background but cannot checkpoint their
results after the parent completes. The SDK makes a best-effort attempt to cancel
ongoing work in abandoned branches, but cancellation is not guaranteed.
The BatchResult's completionReason indicates the stop condition with which the
parallel operation completed. Branches that had not started yet do not appear in
result.all at all. Branches that had started but not completed yet appear with status
STARTED.
completionConfig |
Early exit completionReason |
Full completion completionReason |
|---|---|---|
{} or omitted |
FAILURE_TOLERANCE_EXCEEDED |
ALL_COMPLETED |
toleratedFailureCount=N |
FAILURE_TOLERANCE_EXCEEDED |
ALL_COMPLETED |
toleratedFailurePercentage=N |
FAILURE_TOLERANCE_EXCEEDED |
ALL_COMPLETED |
minSuccessful=N |
MIN_SUCCESSFUL_REACHED |
ALL_COMPLETED |
The BatchResult's completion_reason indicates the stop condition with which the
parallel operation completed. Branches that were never started appear in result.all
with status STARTED.
completion_config |
Early exit completion_reason |
Full completion completion_reason |
|---|---|---|
all_successful() (default) |
FAILURE_TOLERANCE_EXCEEDED |
ALL_COMPLETED |
first_successful() |
MIN_SUCCESSFUL_REACHED |
ALL_COMPLETED |
tolerated_failure_count=N |
FAILURE_TOLERANCE_EXCEEDED |
ALL_COMPLETED |
tolerated_failure_percentage=N |
FAILURE_TOLERANCE_EXCEEDED |
ALL_COMPLETED |
min_successful=N |
MIN_SUCCESSFUL_REACHED |
ALL_COMPLETED |
Warning
CompletionConfig.all_completed() is deprecated. Use
CompletionConfig.all_successful() instead.
The ParallelResult's completionStatus indicates the stop condition with which the
parallel operation completed. All registered branches (including those never started)
are counted in size.
completionConfig |
Early exit completionStatus |
Full completion completionStatus |
|---|---|---|
allCompleted() (default) |
n/a | ALL_COMPLETED |
allSuccessful() |
FAILURE_TOLERANCE_EXCEEDED |
ALL_COMPLETED |
firstSuccessful() |
MIN_SUCCESSFUL_REACHED |
ALL_COMPLETED |
minSuccessful(N) |
MIN_SUCCESSFUL_REACHED |
ALL_COMPLETED |
toleratedFailureCount(N) |
FAILURE_TOLERANCE_EXCEEDED |
ALL_COMPLETED |
Note
ParallelConfig in Java does not support toleratedFailurePercentage. Use
toleratedFailureCount instead.
Note
When using a minSuccessful strategy, failures do not trigger early exit. If all
branches fail before the success threshold is reached, the operation completes with
ALL_COMPLETED.
import {
BatchResult,
DurableContext,
withDurableExecution,
} from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext): Promise<string | undefined> => {
// Complete as soon as one branch succeeds
const result: BatchResult<string> = await context.parallel(
"race",
[
async (ctx) => ctx.step("source-a", async () => "result from a"),
async (ctx) => ctx.step("source-b", async () => "result from b"),
async (ctx) => ctx.step("source-c", async () => "result from c"),
],
{ completionConfig: { minSuccessful: 1 } },
);
return result.getResults()[0];
},
);
from aws_durable_execution_sdk_python import (
BatchResult,
DurableContext,
durable_execution,
)
from aws_durable_execution_sdk_python.config import CompletionConfig, ParallelConfig
def source_a(ctx: DurableContext) -> str:
return ctx.step(lambda _: "result from a", name="source-a")
def source_b(ctx: DurableContext) -> str:
return ctx.step(lambda _: "result from b", name="source-b")
def source_c(ctx: DurableContext) -> str:
return ctx.step(lambda _: "result from c", name="source-c")
@durable_execution
def handler(event: dict, context: DurableContext) -> str | None:
result: BatchResult[str] = context.parallel(
[source_a, source_b, source_c],
name="race",
config=ParallelConfig(completion_config=CompletionConfig.first_successful()),
)
results = result.get_results()
return results[0] if results else None
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.config.CompletionConfig;
import software.amazon.lambda.durable.config.ParallelConfig;
import software.amazon.lambda.durable.model.ParallelResult;
public class CompletionConfigExample extends DurableHandler<Void, ParallelResult> {
@Override
public ParallelResult handleRequest(Void input, DurableContext context) {
var config = ParallelConfig.builder()
.completionConfig(CompletionConfig.firstSuccessful())
.build();
try (var parallel = context.parallel("race", config)) {
parallel.branch("source-a", String.class,
ctx -> ctx.step("a", String.class, s -> "result from a"));
parallel.branch("source-b", String.class,
ctx -> ctx.step("b", String.class, s -> "result from b"));
parallel.branch("source-c", String.class,
ctx -> ctx.step("c", String.class, s -> "result from c"));
return parallel.get();
}
}
}
Error handling¶
When a branch throws an error, parallel captures the error in the result rather than propagating it immediately. Other branches continue running.
BatchResult.status is FAILED if any branch failed. Call throwIfError() to
propagate the first branch error as an exception, or inspect getErrors() to handle
errors individually.
import {
BatchResult,
DurableContext,
withDurableExecution,
} from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const result: BatchResult<string> = await context.parallel(
"tasks",
[
async (ctx) => ctx.step("task-1", async () => "ok"),
async (ctx) =>
ctx.step("task-2", async () => {
throw new Error("task 2 failed");
}),
async (ctx) => ctx.step("task-3", async () => "ok"),
],
{ completionConfig: { toleratedFailureCount: 1 } },
);
return {
succeeded: result.successCount,
failed: result.failureCount,
results: result.getResults(),
errors: result.getErrors().map((e) => e.message),
};
},
);
BatchResult.status is FAILED if any branch failed. Call throw_if_error() to
propagate the first branch error as an exception, or inspect get_errors() to handle
errors individually.
from aws_durable_execution_sdk_python import (
BatchResult,
DurableContext,
durable_execution,
)
from aws_durable_execution_sdk_python.config import CompletionConfig, ParallelConfig
def task_1(ctx: DurableContext) -> str:
return ctx.step(lambda _: "ok", name="task-1")
def task_2(ctx: DurableContext) -> str:
def fail(_):
raise ValueError("task 2 failed")
return ctx.step(fail, name="task-2")
def task_3(ctx: DurableContext) -> str:
return ctx.step(lambda _: "ok", name="task-3")
@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
result: BatchResult[str] = context.parallel(
[task_1, task_2, task_3],
name="tasks",
config=ParallelConfig(
completion_config=CompletionConfig(tolerated_failure_count=1)
),
)
return {
"succeeded": result.success_count,
"failed": result.failure_count,
"results": result.get_results(),
"errors": [e.message for e in result.get_errors()],
}
Check result.failed() > 0 (where result is a ParallelResult) to detect branch
failures. To propagate a branch error, call .get() on the DurableFuture<T> for that
branch to rethrow the original exception. This will throw
ParallelBranchFailedException if the SDK cannot reconstruct the original.
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.config.CompletionConfig;
import software.amazon.lambda.durable.config.ParallelConfig;
import software.amazon.lambda.durable.model.ParallelResult;
public class ErrorHandling extends DurableHandler<Void, ParallelResult> {
@Override
public ParallelResult handleRequest(Void input, DurableContext context) {
var config = ParallelConfig.builder()
.completionConfig(CompletionConfig.toleratedFailureCount(1))
.build();
try (var parallel = context.parallel("tasks", config)) {
parallel.branch("task-1", String.class,
ctx -> ctx.step("task-1", String.class, s -> "ok"));
parallel.branch("task-2", String.class, ctx -> ctx.step("task-2", String.class, s -> {
throw new RuntimeException("task 2 failed");
}));
parallel.branch("task-3", String.class,
ctx -> ctx.step("task-3", String.class, s -> "ok"));
return parallel.get();
}
}
}
Checkpointing¶
Each branch checkpoints its result on completion. Branches that have not completed yet
when the parallel operation reaches its completion criteria remain with status STARTED
and will receive no further checkpoint updates.
The parent parallel operation also checkpoints the serialized BatchResult for
observability. On replay, the SDK deserializes the BatchResult directly from that
checkpoint.
For results over 256KB, the SDK cannot store the full BatchResult in the checkpoint.
Instead, the SDK reconstructs the BatchResult from the checkpointed results of the
individual branches. In that case, the checkpoint stores a compact JSON summary, which
is for observability only.
The default summary generator produces:
The parent parallel operation also checkpoints the serialized BatchResult for
observability. On replay, the SDK deserializes the BatchResult directly from that
checkpoint.
For results over 256KB, the SDK cannot store the full BatchResult in the checkpoint,
so it re-executes the branches to reconstruct it instead. In that case, the checkpoint
stores the output of summary_generator, which is for observability only.
The default summary generator produces:
{
"type": "ParallelResult",
"totalCount": 3,
"successCount": 2,
"failureCount": 1,
"startedCount": 0,
"completionReason": "ALL_COMPLETED",
"status": "FAILED"
}
When you pass a custom ParallelConfig without setting summary_generator, the SDK
checkpoints an empty string for large payloads.
SummaryGenerator is a callable protocol you can pass by setting summary_generator on
ParallelConfig:
The parent parallel operation checkpoints no result payload. On replay, the SDK always
re-executes the branches to reconstruct the ParallelResult from their individual
checkpoints.
Nesting parallel operations¶
A branch function can call context.parallel() to create nested parallel operations.
Each nested parallel creates its own set of child contexts.
import {
BatchResult,
DurableContext,
withDurableExecution,
} from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext): Promise<string[][]> => {
const outer: BatchResult<string[]> = await context.parallel("outer", [
async (ctx) => {
const inner: BatchResult<string> = await ctx.parallel("inner-a", [
async (c) => c.step("a1", async () => "a1"),
async (c) => c.step("a2", async () => "a2"),
]);
return inner.getResults();
},
async (ctx) => {
const inner: BatchResult<string> = await ctx.parallel("inner-b", [
async (c) => c.step("b1", async () => "b1"),
async (c) => c.step("b2", async () => "b2"),
]);
return inner.getResults();
},
]);
return outer.getResults();
},
);
from aws_durable_execution_sdk_python import (
BatchResult,
DurableContext,
durable_execution,
)
def group_a(ctx: DurableContext) -> list[str]:
inner: BatchResult[str] = ctx.parallel(
[
lambda c: c.step(lambda _: "a1", name="a1"),
lambda c: c.step(lambda _: "a2", name="a2"),
],
name="inner-a",
)
return inner.get_results()
def group_b(ctx: DurableContext) -> list[str]:
inner: BatchResult[str] = ctx.parallel(
[
lambda c: c.step(lambda _: "b1", name="b1"),
lambda c: c.step(lambda _: "b2", name="b2"),
],
name="inner-b",
)
return inner.get_results()
@durable_execution
def handler(event: dict, context: DurableContext) -> list[list[str]]:
outer: BatchResult[list[str]] = context.parallel(
[group_a, group_b],
name="outer",
)
return outer.to_dict()
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.model.ParallelResult;
public class NestedParallel extends DurableHandler<Void, ParallelResult> {
@Override
public ParallelResult handleRequest(Void input, DurableContext context) {
try (var outer = context.parallel("outer")) {
outer.branch("group-a", ParallelResult.class, ctx -> {
try (var inner = ctx.parallel("inner-a")) {
inner.branch("a1", String.class, c -> c.step("a1", String.class, s -> "a1"));
inner.branch("a2", String.class, c -> c.step("a2", String.class, s -> "a2"));
return inner.get();
}
});
outer.branch("group-b", ParallelResult.class, ctx -> {
try (var inner = ctx.parallel("inner-b")) {
inner.branch("b1", String.class, c -> c.step("b1", String.class, s -> "b1"));
inner.branch("b2", String.class, c -> c.step("b2", String.class, s -> "b2"));
return inner.get();
}
});
return outer.get();
}
}
}
See also¶
- Map operations run the same function concurrently on a collection
- Child contexts understand child context isolation
- Steps use steps within parallel branches
- Error handling in durable functions