Skip to content

Parallel

Concurrent branches

Parallel executes multiple operations concurrently. It manages concurrency, collects results as branches complete, and checkpoints the outcome.

Each branch runs in its own child context and checkpoints its result independently as it completes.

Use parallel to execute independent tasks concurrently. Use map instead to execute the same operation concurrently for each item in a collection.

import {
  BatchResult,
  DurableContext,
  withDurableExecution,
} from "@aws/durable-execution-sdk-js";

export const handler = withDurableExecution(
  async (event: any, context: DurableContext): Promise<string[]> => {
    const result: BatchResult<string> = await context.parallel("check-services", [
      async (ctx) => ctx.step("check-inventory", async () => "inventory ok"),
      async (ctx) => ctx.step("check-payment", async () => "payment ok"),
      async (ctx) => ctx.step("check-shipping", async () => "shipping ok"),
    ]);

    return result.getResults();
  },
);
from aws_durable_execution_sdk_python import (
    BatchResult,
    DurableContext,
    durable_execution,
)


def check_inventory(ctx: DurableContext) -> str:
    return ctx.step(lambda _: "inventory ok", name="check-inventory")


def check_payment(ctx: DurableContext) -> str:
    return ctx.step(lambda _: "payment ok", name="check-payment")


def check_shipping(ctx: DurableContext) -> str:
    return ctx.step(lambda _: "shipping ok", name="check-shipping")


@durable_execution
def handler(event: dict, context: DurableContext) -> list[str]:
    result: BatchResult[str] = context.parallel(
        [check_inventory, check_payment, check_shipping],
        name="check-services",
    )
    return result.to_dict()
import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.model.ParallelResult;

public class SimpleParallel extends DurableHandler<Void, ParallelResult> {
    @Override
    public ParallelResult handleRequest(Void input, DurableContext context) {
        try (var parallel = context.parallel("check-services")) {
            parallel.branch("check-inventory", String.class,
                    ctx -> ctx.step("inventory", String.class, s -> "inventory ok"));
            parallel.branch("check-payment", String.class,
                    ctx -> ctx.step("payment", String.class, s -> "payment ok"));
            parallel.branch("check-shipping", String.class,
                    ctx -> ctx.step("shipping", String.class, s -> "shipping ok"));
            return parallel.get();
        }
    }
}

Method signature

context.parallel

// Named overload
parallel<TOutput>(
  name: string | undefined,
  branches: (ParallelFunc<TOutput> | NamedParallelBranch<TOutput>)[],
  config?: ParallelConfig<TOutput>,
): DurablePromise<BatchResult<TOutput>>

// Unnamed overload
parallel<TOutput>(
  branches: (ParallelFunc<TOutput> | NamedParallelBranch<TOutput>)[],
  config?: ParallelConfig<TOutput>,
): DurablePromise<BatchResult<TOutput>>

Parameters:

  • name (optional) A name for the parallel operation. Pass undefined to omit.
  • branches An array of ParallelFunc or NamedParallelBranch objects.
  • config (optional) A ParallelConfig<TOutput> object.

Returns: DurablePromise<BatchResult<TOutput>>. Use await to get the result.

Throws: Branch exceptions are captured in the BatchResult. Call throwIfError() to re-throw the first failure.

ParallelFunc / NamedParallelBranch

Each element in branches is either a plain function or a named branch object:

type ParallelFunc<TResult> = (context: DurableContext) => Promise<TResult>

interface NamedParallelBranch<TResult> {
  name?: string;
  func: ParallelFunc<TResult>;
}
  • name (optional) A name for this branch.
  • func An async function receiving a DurableContext and returning Promise<TResult>.

Use NamedParallelBranch to give an inline lambda a name without defining a named function.

def parallel(
    self,
    functions: Sequence[Callable[[DurableContext], T]],
    name: str | None = None,
    config: ParallelConfig | None = None,
) -> BatchResult[T]: ...

Parameters:

  • functions A sequence of callables, each receiving a DurableContext and returning T.
  • name (optional) A name for the parallel operation.
  • config (optional) A ParallelConfig object.

Returns: BatchResult[T].

Raises: Branch exceptions are captured in the BatchResult. Call throw_if_error() to re-raise the first failure.

Each element in functions is a plain callable (ctx: DurableContext) -> T. Python has no named-branch wrapper type.

ParallelDurableFuture parallel(String name)
ParallelDurableFuture parallel(String name, ParallelConfig config)

Parameters:

  • name (required) A name for the parallel operation.
  • config (optional) A ParallelConfig object.

Returns: ParallelResult from get().

Throws: Branch exceptions are captured in ParallelResult. Inspect succeeded and failed counts to detect failures.

ParallelDurableFuture

parallel() returns a ParallelDurableFuture. Call branch() to register and immediately start each branch, then call get() to block until all complete.

interface ParallelDurableFuture extends AutoCloseable, DurableFuture<ParallelResult> {
    <T> DurableFuture<T> branch(String name, Class<T> resultType,
                                Function<DurableContext, T> func);
    <T> DurableFuture<T> branch(String name, TypeToken<T> resultType,
                                Function<DurableContext, T> func);
    <T> DurableFuture<T> branch(String name, Class<T> resultType,
                                Function<DurableContext, T> func,
                                ParallelBranchConfig config);
    ParallelResult get();   // blocks until all branches complete
    void close();           // calls get() if not already called
}

Parameters for branch():

  • name (required) A name for this branch.
  • resultType Class<T> or TypeToken<T> for deserialization.
  • func Function<DurableContext, T> to execute in the branch's child context.
  • config (optional) ParallelBranchConfig for per-branch serialization.

Returns from branch(): DurableFuture<T>. DurableFuture<T> is the common return type for all async Java SDK operations. Call .get() on it after parallel.get() returns to retrieve that branch's individual result.

Tip

Use try-with-resources to guarantee get() is called even if you throw an exception before reaching it explicitly.

ParallelBranchConfig

ParallelBranchConfig sets a custom SerDes for a single branch, overriding the handler-level default set on DurableConfig.

ParallelBranchConfig.builder()
    .serDes(SerDes)  // optional
    .build()

ParallelConfig

interface ParallelConfig<TResult> {
  maxConcurrency?: number;
  completionConfig?: CompletionConfig;
  serdes?: Serdes<BatchResult<TResult>>;
  itemSerdes?: Serdes<TResult>;
  nesting?: NestingType;
}

Parameters:

  • maxConcurrency (optional) Maximum branches running at once. Default: unlimited.
  • completionConfig (optional) When to stop. Default: wait for all branches.
  • serdes (optional) Custom Serdes for the BatchResult.
  • itemSerdes (optional) Custom Serdes for individual branch results.
  • nesting (optional) NestingType.NESTED (default) or NestingType.FLAT. FLAT reduces operation overhead by ~30% at the cost of lower observability.
@dataclass(frozen=True)
class ParallelConfig:
    max_concurrency: int | None = None
    completion_config: CompletionConfig = CompletionConfig.all_successful()
    serdes: SerDes | None = None
    item_serdes: SerDes | None = None
    summary_generator: SummaryGenerator | None = None

Parameters:

  • max_concurrency (optional) Maximum branches running at once. Default: unlimited.
  • completion_config (optional) When to stop. Default: CompletionConfig.all_successful().
  • serdes (optional) Custom SerDes for the BatchResult.
  • item_serdes (optional) Custom SerDes for individual branch results.
  • summary_generator (optional) A callable invoked when the serialized BatchResult exceeds 256KB. See Checkpointing.
ParallelConfig.builder()
    .maxConcurrency(Integer)      // optional
    .completionConfig(CompletionConfig)  // optional
    .build()

Parameters:

  • maxConcurrency (optional) Maximum branches running at once. Default: unlimited.
  • completionConfig (optional) When to stop. Default: CompletionConfig.allCompleted().

CompletionConfig

See Completion strategies for how CompletionConfig affects execution and the completion status of the result.

interface CompletionConfig {
  minSuccessful?: number;
  toleratedFailureCount?: number;
  toleratedFailurePercentage?: number;
}
@dataclass(frozen=True)
class CompletionConfig:
    min_successful: int | None = None
    tolerated_failure_count: int | None = None
    tolerated_failure_percentage: int | float | None = None
CompletionConfig.allCompleted()
CompletionConfig.allSuccessful()
CompletionConfig.firstSuccessful()
CompletionConfig.minSuccessful(int count)
CompletionConfig.toleratedFailureCount(int count)

Result types

interface BatchResult<TResult> {
  all: BatchItem<TResult>[];
  status: BatchItemStatus.SUCCEEDED | BatchItemStatus.FAILED;
  completionReason: "ALL_COMPLETED" | "MIN_SUCCESSFUL_REACHED" | "FAILURE_TOLERANCE_EXCEEDED";
  hasFailure: boolean;
  successCount: number;
  failureCount: number;
  startedCount: number;
  totalCount: number;
  getResults(): TResult[];
  getErrors(): ChildContextError[];
  succeeded(): BatchItem<TResult>[];
  failed(): BatchItem<TResult>[];
  started(): BatchItem<TResult>[];
  throwIfError(): void;
}
  • all all BatchItem entries, one per branch, in input order. Iterate with item.index for branch-indexed access when some branches fail.
  • getResults() results of succeeded branches, preserving input order
  • getErrors() ChildContextError[] for failed branches
  • succeeded() / failed() / started() BatchItem[] filtered by status
  • successCount / failureCount / startedCount / totalCount branch counts
  • status SUCCEEDED if no failures, FAILED otherwise
  • completionReason why the operation completed. See Completion strategies.
  • hasFailure true if any branch failed
  • throwIfError() throws the first branch error, if any
type CompletionReason =
  | "ALL_COMPLETED"
  | "MIN_SUCCESSFUL_REACHED"
  | "FAILURE_TOLERANCE_EXCEEDED"
interface BatchItem<TResult> {
  index: number;
  status: BatchItemStatus;
  result?: TResult;
  error?: ChildContextError;
}

enum BatchItemStatus {
  SUCCEEDED = "SUCCEEDED",
  FAILED    = "FAILED",
  STARTED   = "STARTED",
}
  • index position of this branch in the input array
  • status SUCCEEDED, FAILED, or STARTED (not yet complete)
  • result the branch return value, present when status is SUCCEEDED
  • error the captured error, present when status is FAILED
@dataclass(frozen=True)
class BatchResult(Generic[R]):
    all: list[BatchItem[R]]
    completion_reason: CompletionReason

    def get_results(self) -> list[R]: ...
    def get_errors(self) -> list[ErrorObject]: ...
    def succeeded(self) -> list[BatchItem[R]]: ...
    def failed(self) -> list[BatchItem[R]]: ...
    def started(self) -> list[BatchItem[R]]: ...
    def throw_if_error(self) -> None: ...
    def to_dict(self) -> dict: ...

    @property
    def status(self) -> BatchItemStatus: ...
    @property
    def has_failure(self) -> bool: ...
    @property
    def success_count(self) -> int: ...
    @property
    def failure_count(self) -> int: ...
    @property
    def started_count(self) -> int: ...
    @property
    def total_count(self) -> int: ...

class CompletionReason(Enum):
    ALL_COMPLETED             = "ALL_COMPLETED"
    MIN_SUCCESSFUL_REACHED    = "MIN_SUCCESSFUL_REACHED"
    FAILURE_TOLERANCE_EXCEEDED = "FAILURE_TOLERANCE_EXCEEDED"
  • all all BatchItem entries, one per branch, in input order. Iterate with item.index for branch-indexed access when some branches fail.
  • get_results() results of succeeded branches, preserving input order
  • get_errors() list[ErrorObject] for failed branches
  • succeeded() / failed() / started() BatchItem lists filtered by status
  • success_count / failure_count / started_count / total_count branch counts
  • status BatchItemStatus.SUCCEEDED if no failures, FAILED otherwise
  • completion_reason why the operation completed. See Completion strategies.
  • has_failure True if any branch failed
  • throw_if_error() raises the first branch error as a CallableRuntimeError
  • to_dict() serializes to a plain dict. Serializability depends on R.
@dataclass(frozen=True)
class BatchItem(Generic[R]):
    index: int
    status: BatchItemStatus
    result: R | None = None
    error: ErrorObject | None = None

    def to_dict(self) -> dict: ...

class BatchItemStatus(Enum):
    SUCCEEDED = "SUCCEEDED"
    FAILED    = "FAILED"
    STARTED   = "STARTED"
  • index position of this branch in the input sequence
  • status SUCCEEDED, FAILED, or STARTED (not yet complete)
  • result the branch return value, present when status is SUCCEEDED
  • error ErrorObject with the captured error, present when status is FAILED
  • to_dict() serializes to a plain dict. Serializability of result depends on R.
record ParallelResult(
    int size,
    int succeeded,
    int failed,
    ConcurrencyCompletionStatus completionStatus
) {}

enum ConcurrencyCompletionStatus {
    ALL_COMPLETED,
    MIN_SUCCESSFUL_REACHED,
    FAILURE_TOLERANCE_EXCEEDED
}
  • size total number of registered branches
  • succeeded number of branches that succeeded
  • failed number of branches that failed
  • completionStatus why the operation completed. See Completion strategies.

ConcurrencyCompletionStatus.isSucceeded() returns true for both ALL_COMPLETED and MIN_SUCCESSFUL_REACHED. To check if any branch failed, use result.failed() > 0 (where result is a ParallelResult).

ParallelResult contains only aggregate counts. To get individual branch results, hold the DurableFuture<T> returned by each branch() call and call .get() on it after parallel.get() returns. Results are available in the order branches were registered.

Branch functions

Each branch receives a DurableContext and can use any durable operation such as steps, waits, child contexts, or nested parallel operations. Branches run in child contexts, so they do not share state with each other or with the parent context.

A branch is a ParallelFunc (plain async function) or a NamedParallelBranch (object with name and func). Use NamedParallelBranch to give an inline lambda a name without defining a named function.

import {
  BatchResult,
  DurableContext,
  withDurableExecution,
} from "@aws/durable-execution-sdk-js";

// Plain function branch
async function taskA(ctx: DurableContext): Promise<string> {
  return ctx.step("run-a", async () => "a done");
}

export const handler = withDurableExecution(
  async (event: any, context: DurableContext): Promise<string[]> => {
    const result: BatchResult<string> = await context.parallel("process", [
      // ParallelFunc: plain async function
      taskA,
      // NamedParallelBranch: object with name and func
      { name: "task-b", func: async (ctx) => ctx.step("run-b", async () => "b done") },
    ]);

    return result.getResults();
  },
);

Branch functions are synchronous callables that receive a DurableContext and return T.

from aws_durable_execution_sdk_python import (
    BatchResult,
    DurableContext,
    durable_execution,
)


def task_a(ctx: DurableContext) -> str:
    return ctx.step(lambda _: "a done", name="run-a")


def task_b(ctx: DurableContext) -> str:
    return ctx.step(lambda _: "b done", name="run-b")


@durable_execution
def handler(event: dict, context: DurableContext) -> list[str]:
    result: BatchResult[str] = context.parallel(
        [task_a, task_b],
        name="process",
    )
    return result.to_dict()

Each branch is registered via ParallelDurableFuture.branch(). The branch function is a synchronous Function<DurableContext, T>.

import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.model.ParallelResult;

public class NamedBranches extends DurableHandler<Void, ParallelResult> {
    @Override
    public ParallelResult handleRequest(Void input, DurableContext context) {
        try (var parallel = context.parallel("process")) {
            parallel.branch("task-a", String.class,
                    ctx -> ctx.step("run-a", String.class, s -> "a done"));
            parallel.branch("task-b", String.class,
                    ctx -> ctx.step("run-b", String.class, s -> "b done"));
            return parallel.get();
        }
    }
}

Pass arguments to branches

Capture arguments in a closure:

import {
  BatchResult,
  DurableContext,
  withDurableExecution,
} from "@aws/durable-execution-sdk-js";

export const handler = withDurableExecution(
  async (event: any, context: DurableContext): Promise<string[]> => {
    const items = ["a", "b", "c"];

    const result: BatchResult<string> = await context.parallel(
      "process-items",
      items.map((item) => async (ctx: DurableContext) =>
        ctx.step(`process-${item}`, async () => `processed ${item}`),
      ),
    );

    return result.getResults();
  },
);

Use a factory function to bind arguments. Avoid using loop variables directly in lambdas, as Python closures capture by reference.

from aws_durable_execution_sdk_python import (
    BatchResult,
    DurableContext,
    durable_execution,
)


def make_branch(item: str):
    def branch(ctx: DurableContext) -> str:
        return ctx.step(lambda _: f"processed {item}", name=f"process-{item}")

    return branch


@durable_execution
def handler(event: dict, context: DurableContext) -> list[str]:
    items = ["a", "b", "c"]
    result: BatchResult[str] = context.parallel(
        [make_branch(item) for item in items],
        name="process-items",
    )
    return result.to_dict()

Capture arguments in a lambda. Java lambdas require effectively final variables.

import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.model.ParallelResult;

public class PassArguments extends DurableHandler<Void, ParallelResult> {
    @Override
    public ParallelResult handleRequest(Void input, DurableContext context) {
        var items = List.of("a", "b", "c");

        try (var parallel = context.parallel("process-items")) {
            for (var item : items) {
                parallel.branch("process-" + item, String.class,
                        ctx -> ctx.step("run-" + item, String.class, s -> "processed " + item));
            }
            return parallel.get();
        }
    }
}

Naming parallel operations

Name your parallel operations to make them easier to identify in logs and tests.

The name is the first argument. Pass undefined to omit it.

Pass name as a keyword argument. Omit it or pass None to leave it unnamed.

The name is always required. Each branch() call also requires a name. Pass null to omit it.

Configuration

Configure parallel behavior using ParallelConfig:

import {
  BatchResult,
  DurableContext,
  withDurableExecution,
} from "@aws/durable-execution-sdk-js";

export const handler = withDurableExecution(
  async (event: any, context: DurableContext): Promise<string | undefined> => {
    const result: BatchResult<string> = await context.parallel(
      "fetch-data",
      [
        async (ctx) => ctx.step("primary", async () => "primary result"),
        async (ctx) => ctx.step("secondary", async () => "secondary result"),
        async (ctx) => ctx.step("cache", async () => "cache result"),
      ],
      {
        maxConcurrency: 2,
        completionConfig: { minSuccessful: 1 },
      },
    );

    return result.getResults()[0];
  },
);
from aws_durable_execution_sdk_python import (
    BatchResult,
    DurableContext,
    durable_execution,
)
from aws_durable_execution_sdk_python.config import CompletionConfig, ParallelConfig


def try_primary(ctx: DurableContext) -> str:
    return ctx.step(lambda _: "primary result", name="primary")


def try_secondary(ctx: DurableContext) -> str:
    return ctx.step(lambda _: "secondary result", name="secondary")


def try_cache(ctx: DurableContext) -> str:
    return ctx.step(lambda _: "cache result", name="cache")


@durable_execution
def handler(event: dict, context: DurableContext) -> str | None:
    config = ParallelConfig(
        max_concurrency=2,
        completion_config=CompletionConfig.first_successful(),
    )
    result: BatchResult[str] = context.parallel(
        [try_primary, try_secondary, try_cache],
        name="fetch-data",
        config=config,
    )
    results = result.get_results()
    return results[0] if results else None
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.config.CompletionConfig;
import software.amazon.lambda.durable.config.ParallelConfig;
import software.amazon.lambda.durable.model.ParallelResult;

public class ParallelConfigExample extends DurableHandler<Void, ParallelResult> {
    @Override
    public ParallelResult handleRequest(Void input, DurableContext context) {
        var config = ParallelConfig.builder()
                .maxConcurrency(2)
                .completionConfig(CompletionConfig.firstSuccessful())
                .build();

        try (var parallel = context.parallel("fetch-data", config)) {
            parallel.branch("primary", String.class,
                    ctx -> ctx.step("primary", String.class, s -> "primary result"));
            parallel.branch("secondary", String.class,
                    ctx -> ctx.step("secondary", String.class, s -> "secondary result"));
            parallel.branch("cache", String.class,
                    ctx -> ctx.step("cache", String.class, s -> "cache result"));
            return parallel.get();
        }
    }
}

Completion strategies

CompletionConfig controls when the parallel operation completes. When the operation reaches the completion criteria, it will abandon branches that have not completed yet. The abandoned branches will keep running in the background but cannot checkpoint their results after the parent completes. The SDK makes a best-effort attempt to cancel ongoing work in abandoned branches, but cancellation is not guaranteed.

The BatchResult's completionReason indicates the stop condition with which the parallel operation completed. Branches that had not started yet do not appear in result.all at all. Branches that had started but not completed yet appear with status STARTED.

completionConfig Early exit completionReason Full completion completionReason
{} or omitted FAILURE_TOLERANCE_EXCEEDED ALL_COMPLETED
toleratedFailureCount=N FAILURE_TOLERANCE_EXCEEDED ALL_COMPLETED
toleratedFailurePercentage=N FAILURE_TOLERANCE_EXCEEDED ALL_COMPLETED
minSuccessful=N MIN_SUCCESSFUL_REACHED ALL_COMPLETED

The BatchResult's completion_reason indicates the stop condition with which the parallel operation completed. Branches that were never started appear in result.all with status STARTED.

completion_config Early exit completion_reason Full completion completion_reason
all_successful() (default) FAILURE_TOLERANCE_EXCEEDED ALL_COMPLETED
first_successful() MIN_SUCCESSFUL_REACHED ALL_COMPLETED
tolerated_failure_count=N FAILURE_TOLERANCE_EXCEEDED ALL_COMPLETED
tolerated_failure_percentage=N FAILURE_TOLERANCE_EXCEEDED ALL_COMPLETED
min_successful=N MIN_SUCCESSFUL_REACHED ALL_COMPLETED

Warning

CompletionConfig.all_completed() is deprecated. Use CompletionConfig.all_successful() instead.

The ParallelResult's completionStatus indicates the stop condition with which the parallel operation completed. All registered branches (including those never started) are counted in size.

completionConfig Early exit completionStatus Full completion completionStatus
allCompleted() (default) n/a ALL_COMPLETED
allSuccessful() FAILURE_TOLERANCE_EXCEEDED ALL_COMPLETED
firstSuccessful() MIN_SUCCESSFUL_REACHED ALL_COMPLETED
minSuccessful(N) MIN_SUCCESSFUL_REACHED ALL_COMPLETED
toleratedFailureCount(N) FAILURE_TOLERANCE_EXCEEDED ALL_COMPLETED

Note

ParallelConfig in Java does not support toleratedFailurePercentage. Use toleratedFailureCount instead.

Note

When using a minSuccessful strategy, failures do not trigger early exit. If all branches fail before the success threshold is reached, the operation completes with ALL_COMPLETED.

import {
  BatchResult,
  DurableContext,
  withDurableExecution,
} from "@aws/durable-execution-sdk-js";

export const handler = withDurableExecution(
  async (event: any, context: DurableContext): Promise<string | undefined> => {
    // Complete as soon as one branch succeeds
    const result: BatchResult<string> = await context.parallel(
      "race",
      [
        async (ctx) => ctx.step("source-a", async () => "result from a"),
        async (ctx) => ctx.step("source-b", async () => "result from b"),
        async (ctx) => ctx.step("source-c", async () => "result from c"),
      ],
      { completionConfig: { minSuccessful: 1 } },
    );

    return result.getResults()[0];
  },
);
from aws_durable_execution_sdk_python import (
    BatchResult,
    DurableContext,
    durable_execution,
)
from aws_durable_execution_sdk_python.config import CompletionConfig, ParallelConfig


def source_a(ctx: DurableContext) -> str:
    return ctx.step(lambda _: "result from a", name="source-a")


def source_b(ctx: DurableContext) -> str:
    return ctx.step(lambda _: "result from b", name="source-b")


def source_c(ctx: DurableContext) -> str:
    return ctx.step(lambda _: "result from c", name="source-c")


@durable_execution
def handler(event: dict, context: DurableContext) -> str | None:
    result: BatchResult[str] = context.parallel(
        [source_a, source_b, source_c],
        name="race",
        config=ParallelConfig(completion_config=CompletionConfig.first_successful()),
    )
    results = result.get_results()
    return results[0] if results else None
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.config.CompletionConfig;
import software.amazon.lambda.durable.config.ParallelConfig;
import software.amazon.lambda.durable.model.ParallelResult;

public class CompletionConfigExample extends DurableHandler<Void, ParallelResult> {
    @Override
    public ParallelResult handleRequest(Void input, DurableContext context) {
        var config = ParallelConfig.builder()
                .completionConfig(CompletionConfig.firstSuccessful())
                .build();

        try (var parallel = context.parallel("race", config)) {
            parallel.branch("source-a", String.class,
                    ctx -> ctx.step("a", String.class, s -> "result from a"));
            parallel.branch("source-b", String.class,
                    ctx -> ctx.step("b", String.class, s -> "result from b"));
            parallel.branch("source-c", String.class,
                    ctx -> ctx.step("c", String.class, s -> "result from c"));
            return parallel.get();
        }
    }
}

Error handling

When a branch throws an error, parallel captures the error in the result rather than propagating it immediately. Other branches continue running.

BatchResult.status is FAILED if any branch failed. Call throwIfError() to propagate the first branch error as an exception, or inspect getErrors() to handle errors individually.

import {
  BatchResult,
  DurableContext,
  withDurableExecution,
} from "@aws/durable-execution-sdk-js";

export const handler = withDurableExecution(
  async (event: any, context: DurableContext) => {
    const result: BatchResult<string> = await context.parallel(
      "tasks",
      [
        async (ctx) => ctx.step("task-1", async () => "ok"),
        async (ctx) =>
          ctx.step("task-2", async () => {
            throw new Error("task 2 failed");
          }),
        async (ctx) => ctx.step("task-3", async () => "ok"),
      ],
      { completionConfig: { toleratedFailureCount: 1 } },
    );

    return {
      succeeded: result.successCount,
      failed: result.failureCount,
      results: result.getResults(),
      errors: result.getErrors().map((e) => e.message),
    };
  },
);

BatchResult.status is FAILED if any branch failed. Call throw_if_error() to propagate the first branch error as an exception, or inspect get_errors() to handle errors individually.

from aws_durable_execution_sdk_python import (
    BatchResult,
    DurableContext,
    durable_execution,
)
from aws_durable_execution_sdk_python.config import CompletionConfig, ParallelConfig


def task_1(ctx: DurableContext) -> str:
    return ctx.step(lambda _: "ok", name="task-1")


def task_2(ctx: DurableContext) -> str:
    def fail(_):
        raise ValueError("task 2 failed")

    return ctx.step(fail, name="task-2")


def task_3(ctx: DurableContext) -> str:
    return ctx.step(lambda _: "ok", name="task-3")


@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
    result: BatchResult[str] = context.parallel(
        [task_1, task_2, task_3],
        name="tasks",
        config=ParallelConfig(
            completion_config=CompletionConfig(tolerated_failure_count=1)
        ),
    )
    return {
        "succeeded": result.success_count,
        "failed": result.failure_count,
        "results": result.get_results(),
        "errors": [e.message for e in result.get_errors()],
    }

Check result.failed() > 0 (where result is a ParallelResult) to detect branch failures. To propagate a branch error, call .get() on the DurableFuture<T> for that branch to rethrow the original exception. This will throw ParallelBranchFailedException if the SDK cannot reconstruct the original.

import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.config.CompletionConfig;
import software.amazon.lambda.durable.config.ParallelConfig;
import software.amazon.lambda.durable.model.ParallelResult;

public class ErrorHandling extends DurableHandler<Void, ParallelResult> {
    @Override
    public ParallelResult handleRequest(Void input, DurableContext context) {
        var config = ParallelConfig.builder()
                .completionConfig(CompletionConfig.toleratedFailureCount(1))
                .build();

        try (var parallel = context.parallel("tasks", config)) {
            parallel.branch("task-1", String.class,
                    ctx -> ctx.step("task-1", String.class, s -> "ok"));
            parallel.branch("task-2", String.class, ctx -> ctx.step("task-2", String.class, s -> {
                throw new RuntimeException("task 2 failed");
            }));
            parallel.branch("task-3", String.class,
                    ctx -> ctx.step("task-3", String.class, s -> "ok"));
            return parallel.get();
        }
    }
}

Checkpointing

Each branch checkpoints its result on completion. Branches that have not completed yet when the parallel operation reaches its completion criteria remain with status STARTED and will receive no further checkpoint updates.

The parent parallel operation also checkpoints the serialized BatchResult for observability. On replay, the SDK deserializes the BatchResult directly from that checkpoint.

For results over 256KB, the SDK cannot store the full BatchResult in the checkpoint. Instead, the SDK reconstructs the BatchResult from the checkpointed results of the individual branches. In that case, the checkpoint stores a compact JSON summary, which is for observability only.

The default summary generator produces:

{
  "type": "ParallelResult",
  "totalCount": 3,
  "successCount": 2,
  "failureCount": 1,
  "startedCount": 0,
  "completionReason": "ALL_COMPLETED",
  "status": "FAILED"
}

The parent parallel operation also checkpoints the serialized BatchResult for observability. On replay, the SDK deserializes the BatchResult directly from that checkpoint.

For results over 256KB, the SDK cannot store the full BatchResult in the checkpoint, so it re-executes the branches to reconstruct it instead. In that case, the checkpoint stores the output of summary_generator, which is for observability only.

The default summary generator produces:

{
  "type": "ParallelResult",
  "totalCount": 3,
  "successCount": 2,
  "failureCount": 1,
  "startedCount": 0,
  "completionReason": "ALL_COMPLETED",
  "status": "FAILED"
}

When you pass a custom ParallelConfig without setting summary_generator, the SDK checkpoints an empty string for large payloads.

SummaryGenerator is a callable protocol you can pass by setting summary_generator on ParallelConfig:

class SummaryGenerator(Protocol[T]):
    def __call__(self, result: T) -> str: ...

The parent parallel operation checkpoints no result payload. On replay, the SDK always re-executes the branches to reconstruct the ParallelResult from their individual checkpoints.

Nesting parallel operations

A branch function can call context.parallel() to create nested parallel operations. Each nested parallel creates its own set of child contexts.

import {
  BatchResult,
  DurableContext,
  withDurableExecution,
} from "@aws/durable-execution-sdk-js";

export const handler = withDurableExecution(
  async (event: any, context: DurableContext): Promise<string[][]> => {
    const outer: BatchResult<string[]> = await context.parallel("outer", [
      async (ctx) => {
        const inner: BatchResult<string> = await ctx.parallel("inner-a", [
          async (c) => c.step("a1", async () => "a1"),
          async (c) => c.step("a2", async () => "a2"),
        ]);
        return inner.getResults();
      },
      async (ctx) => {
        const inner: BatchResult<string> = await ctx.parallel("inner-b", [
          async (c) => c.step("b1", async () => "b1"),
          async (c) => c.step("b2", async () => "b2"),
        ]);
        return inner.getResults();
      },
    ]);

    return outer.getResults();
  },
);
from aws_durable_execution_sdk_python import (
    BatchResult,
    DurableContext,
    durable_execution,
)


def group_a(ctx: DurableContext) -> list[str]:
    inner: BatchResult[str] = ctx.parallel(
        [
            lambda c: c.step(lambda _: "a1", name="a1"),
            lambda c: c.step(lambda _: "a2", name="a2"),
        ],
        name="inner-a",
    )
    return inner.get_results()


def group_b(ctx: DurableContext) -> list[str]:
    inner: BatchResult[str] = ctx.parallel(
        [
            lambda c: c.step(lambda _: "b1", name="b1"),
            lambda c: c.step(lambda _: "b2", name="b2"),
        ],
        name="inner-b",
    )
    return inner.get_results()


@durable_execution
def handler(event: dict, context: DurableContext) -> list[list[str]]:
    outer: BatchResult[list[str]] = context.parallel(
        [group_a, group_b],
        name="outer",
    )
    return outer.to_dict()
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.model.ParallelResult;

public class NestedParallel extends DurableHandler<Void, ParallelResult> {
    @Override
    public ParallelResult handleRequest(Void input, DurableContext context) {
        try (var outer = context.parallel("outer")) {
            outer.branch("group-a", ParallelResult.class, ctx -> {
                try (var inner = ctx.parallel("inner-a")) {
                    inner.branch("a1", String.class, c -> c.step("a1", String.class, s -> "a1"));
                    inner.branch("a2", String.class, c -> c.step("a2", String.class, s -> "a2"));
                    return inner.get();
                }
            });
            outer.branch("group-b", ParallelResult.class, ctx -> {
                try (var inner = ctx.parallel("inner-b")) {
                    inner.branch("b1", String.class, c -> c.step("b1", String.class, s -> "b1"));
                    inner.branch("b2", String.class, c -> c.step("b2", String.class, s -> "b2"));
                    return inner.get();
                }
            });
            return outer.get();
        }
    }
}

See also