Skip to content

Map

Apply a function to each item in a collection

Map executes a function for each item in a collection concurrently. It manages concurrency, collects results as items complete, and checkpoints the outcome.

Each item runs in its own child context and checkpoints its result independently as it completes.

Use map to apply the same operation to every item in a collection. Use parallel instead to execute different operations concurrently.

import {
  BatchResult,
  DurableContext,
  withDurableExecution,
} from "@aws/durable-execution-sdk-js";

export const handler = withDurableExecution(
  async (event: any, context: DurableContext): Promise<number[]> => {
    const result: BatchResult<number> = await context.map(
      "square-numbers",
      [1, 2, 3, 4, 5],
      async (ctx, item, index) =>
        ctx.step(`square-${index}`, async () => item * item),
    );

    return result.getResults();
  },
);
from aws_durable_execution_sdk_python import (
    BatchResult,
    DurableContext,
    durable_execution,
)


def square(ctx: DurableContext, item: int, index: int, items: list[int]) -> int:
    return ctx.step(lambda _: item * item, name=f"square-{index}")


@durable_execution
def handler(event: dict, context: DurableContext) -> list[int]:
    result: BatchResult[int] = context.map(
        [1, 2, 3, 4, 5],
        square,
        name="square-numbers",
    )
    return result.to_dict()
import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.model.MapResult;

public class SimpleMap extends DurableHandler<Void, List<Integer>> {
    @Override
    public List<Integer> handleRequest(Void input, DurableContext context) {
        MapResult<Integer> result = context.map(
                "square-numbers",
                List.of(1, 2, 3, 4, 5),
                Integer.class,
                (item, index, ctx) -> ctx.step(
                        "square-" + index, Integer.class, s -> item * item));

        return result.results();
    }
}

Method signature

context.map

// Named overload
map<TInput, TOutput>(
  name: string | undefined,
  items: TInput[],
  mapFunc: MapFunc<TInput, TOutput>,
  config?: MapConfig<TInput, TOutput>,
): DurablePromise<BatchResult<TOutput>>

// Unnamed overload
map<TInput, TOutput>(
  items: TInput[],
  mapFunc: MapFunc<TInput, TOutput>,
  config?: MapConfig<TInput, TOutput>,
): DurablePromise<BatchResult<TOutput>>

Parameters:

  • name (optional) A name for the map operation. Pass undefined to omit.
  • items An array of items to process.
  • mapFunc A MapFunc called for each item. See Map Function.
  • config (optional) A MapConfig<TInput, TOutput> object.

Returns: DurablePromise<BatchResult<TOutput>>. Use await to get the result.

Throws: Item exceptions are captured in the BatchResult. Call throwIfError() to re-throw the first failure.

def map(
    inputs: Sequence[U],
    func: Callable[[DurableContext, U, int, Sequence[U]], T],
    name: str | None = None,
    config: MapConfig | None = None,
) -> BatchResult[T]: ...

Parameters:

  • inputs A sequence of items to process.
  • func A callable called for each item. See Map Function.
  • name (optional) A name for the map operation.
  • config (optional) A MapConfig object.

Returns: BatchResult[T].

Raises: Item exceptions are captured in the BatchResult. Call throw_if_error() to re-raise the first failure.

// sync — blocks until all items complete
<I, O> MapResult<O> map(String name, Collection<I> items, Class<O> resultType,
                        MapFunction<I, O> function)
<I, O> MapResult<O> map(String name, Collection<I> items, Class<O> resultType,
                        MapFunction<I, O> function, MapConfig config)
<I, O> MapResult<O> map(String name, Collection<I> items, TypeToken<O> resultType,
                        MapFunction<I, O> function)
<I, O> MapResult<O> map(String name, Collection<I> items, TypeToken<O> resultType,
                        MapFunction<I, O> function, MapConfig config)

// async — returns immediately
<I, O> DurableFuture<MapResult<O>> mapAsync(String name, Collection<I> items,
                                            Class<O> resultType, MapFunction<I, O> function)
<I, O> DurableFuture<MapResult<O>> mapAsync(String name, Collection<I> items,
                                            Class<O> resultType, MapFunction<I, O> function,
                                            MapConfig config)
<I, O> DurableFuture<MapResult<O>> mapAsync(String name, Collection<I> items,
                                            TypeToken<O> resultType, MapFunction<I, O> function)
<I, O> DurableFuture<MapResult<O>> mapAsync(String name, Collection<I> items,
                                            TypeToken<O> resultType, MapFunction<I, O> function,
                                            MapConfig config)

Parameters:

  • name (required) A name for the map operation.
  • items A Collection<I> of items to process.
  • resultType Class<O> or TypeToken<O> for deserialization.
  • function A MapFunction<I, O> called for each item. See Map Function.
  • config (optional) A MapConfig object.

Returns: MapResult<O> from map(), or DurableFuture<MapResult<O>> from mapAsync().

Throws: Item exceptions are captured in MapResult. Inspect failed() to detect failures. If the SDK cannot reconstruct the original exception, it throws MapIterationFailedException.

Map Function

type MapFunc<TInput, TOutput> = (
  context: DurableContext,
  item: TInput,
  index: number,
  array: TInput[],
) => Promise<TOutput>

Parameters:

  • context The child DurableContext for this item's execution.
  • item The current item being processed.
  • index The zero-based index of the item in the input array.
  • array The full input array.

Returns: Promise<TOutput>.

Callable[[DurableContext, T, int, Sequence[T]], R]

Parameters:

  • ctx The child DurableContext for this item's execution.
  • item The current item being processed.
  • index The zero-based index of the item in the input sequence.
  • items The full input sequence.

Returns: R.

@FunctionalInterface
interface MapFunction<I, O> {
    O apply(I item, int index, DurableContext context);
}

Parameters:

  • item The current item being processed.
  • index The zero-based index of the item in the input collection.
  • context The child DurableContext for this item's execution.

Returns: O.

MapConfig

interface MapConfig<TItem, TResult> {
  maxConcurrency?: number;
  itemNamer?: (item: TItem, index: number) => string;
  completionConfig?: CompletionConfig;
  serdes?: Serdes<BatchResult<TResult>>;
  itemSerdes?: Serdes<TResult>;
  nesting?: NestingType;
}

Parameters:

  • maxConcurrency (optional) Maximum items running at once. Default: unlimited.
  • itemNamer (optional) A function that returns a custom name for each item, used in logs and tests.
  • completionConfig (optional) When to stop. Default: wait for all items.
  • serdes (optional) Custom Serdes for the BatchResult.
  • itemSerdes (optional) Custom Serdes for individual item results.
  • nesting (optional) NestingType.NESTED (default) or NestingType.FLAT. FLAT reduces operation overhead by ~30% at the cost of lower observability.
@dataclass(frozen=True)
class MapConfig:
    max_concurrency: int | None = None
    completion_config: CompletionConfig = CompletionConfig()
    serdes: SerDes | None = None
    item_serdes: SerDes | None = None
    summary_generator: SummaryGenerator | None = None

Parameters:

  • max_concurrency (optional) Maximum items running at once. Default: unlimited.
  • completion_config (optional) When to stop. Default: CompletionConfig() (lenient, all items run regardless of failures).
  • serdes (optional) Custom SerDes for the BatchResult.
  • item_serdes (optional) Custom SerDes for individual item results.
  • summary_generator (optional) A callable invoked when the serialized BatchResult exceeds 256KB. See Checkpointing.
MapConfig.builder()
    .maxConcurrency(Integer)       // optional
    .completionConfig(CompletionConfig)  // optional
    .serDes(SerDes)                // optional
    .build()

Parameters:

  • maxConcurrency (optional) Maximum items running at once. Default: unlimited.
  • completionConfig (optional) When to stop. Default: CompletionConfig.allCompleted().
  • serDes (optional) Custom SerDes for item results and the overall result.

CompletionConfig

See Completion strategies for how CompletionConfig affects execution and the completion status of the result.

interface CompletionConfig {
  minSuccessful?: number;
  toleratedFailureCount?: number;
  toleratedFailurePercentage?: number;
}
@dataclass(frozen=True)
class CompletionConfig:
    min_successful: int | None = None
    tolerated_failure_count: int | None = None
    tolerated_failure_percentage: int | float | None = None
CompletionConfig.allCompleted()
CompletionConfig.allSuccessful()
CompletionConfig.firstSuccessful()
CompletionConfig.minSuccessful(int count)
CompletionConfig.toleratedFailureCount(int count)
CompletionConfig.toleratedFailurePercentage(double percentage)

Result types

Map returns the same BatchResult<TResult> type as parallel.

interface BatchResult<TResult> {
  all: BatchItem<TResult>[];
  status: BatchItemStatus.SUCCEEDED | BatchItemStatus.FAILED;
  completionReason: "ALL_COMPLETED" | "MIN_SUCCESSFUL_REACHED" | "FAILURE_TOLERANCE_EXCEEDED";
  hasFailure: boolean;
  successCount: number;
  failureCount: number;
  startedCount: number;
  totalCount: number;
  getResults(): TResult[];
  getErrors(): ChildContextError[];
  succeeded(): BatchItem<TResult>[];
  failed(): BatchItem<TResult>[];
  started(): BatchItem<TResult>[];
  throwIfError(): void;
}
  • all all BatchItem entries, one per item, in input order
  • getResults() results of succeeded items, preserving input order
  • getErrors() ChildContextError[] for failed items
  • succeeded() / failed() / started() BatchItem[] filtered by status
  • successCount / failureCount / startedCount / totalCount item counts
  • status SUCCEEDED if no failures, FAILED otherwise
  • completionReason why the operation completed. See Completion strategies.
  • hasFailure true if any item failed
  • throwIfError() throws the first item error, if any
interface BatchItem<TResult> {
  index: number;
  status: BatchItemStatus;
  result?: TResult;
  error?: ChildContextError;
}

enum BatchItemStatus {
  SUCCEEDED = "SUCCEEDED",
  FAILED    = "FAILED",
  STARTED   = "STARTED",
}

Map returns the same BatchResult[R] type as parallel.

@dataclass(frozen=True)
class BatchResult(Generic[R]):
    all: list[BatchItem[R]]
    completion_reason: CompletionReason

    def get_results(self) -> list[R]: ...
    def get_errors(self) -> list[ErrorObject]: ...
    def succeeded(self) -> list[BatchItem[R]]: ...
    def failed(self) -> list[BatchItem[R]]: ...
    def started(self) -> list[BatchItem[R]]: ...
    def throw_if_error(self) -> None: ...
    def to_dict(self) -> dict: ...

    @property
    def status(self) -> BatchItemStatus: ...
    @property
    def has_failure(self) -> bool: ...
    @property
    def success_count(self) -> int: ...
    @property
    def failure_count(self) -> int: ...
    @property
    def started_count(self) -> int: ...
    @property
    def total_count(self) -> int: ...
  • all all BatchItem entries, one per item, in input order
  • get_results() results of succeeded items, preserving input order
  • get_errors() list[ErrorObject] for failed items
  • succeeded() / failed() / started() BatchItem lists filtered by status
  • success_count / failure_count / started_count / total_count item counts
  • status BatchItemStatus.SUCCEEDED if no failures, FAILED otherwise
  • completion_reason why the operation completed. See Completion strategies.
  • has_failure True if any item failed
  • throw_if_error() raises the first item error as a CallableRuntimeError
  • to_dict() serializes to a plain dict. Serializability depends on R.

Map returns MapResult<O>, which differs from ParallelResult. It holds per-item results with individual status, result, and error fields.

record MapResult<T>(
    List<MapResultItem<T>> items,
    ConcurrencyCompletionStatus completionReason
) {
    MapResultItem<T> getItem(int index)
    T getResult(int index)
    MapError getError(int index)
    boolean allSucceeded()
    int size()
    List<T> results()        // all results, nulls for failed/skipped items
    List<T> succeeded()      // results of succeeded items only
    List<MapError> failed()  // errors of failed items only
}

record MapResultItem<T>(Status status, T result, MapError error) {
    enum Status { SUCCEEDED, FAILED, SKIPPED }
}

record MapError(String errorType, String errorMessage, List<String> stackTrace) {}

enum ConcurrencyCompletionStatus {
    ALL_COMPLETED,
    MIN_SUCCESSFUL_REACHED,
    FAILURE_TOLERANCE_EXCEEDED
}
  • items ordered list of MapResultItem, one per input item
  • getItem(index) the MapResultItem at the given index
  • getResult(index) the result at the given index, or null if failed or skipped
  • getError(index) the MapError at the given index, or null if succeeded or skipped
  • allSucceeded() true if every item has status SUCCEEDED
  • size() total number of items
  • results() all results as a list, with null for failed or skipped items
  • succeeded() results of items with status SUCCEEDED
  • failed() MapError objects for items with status FAILED
  • completionReason why the operation completed. See Completion strategies.

Items that did not start before the operation reached its completion criteria have status SKIPPED (not STARTED as in TypeScript and Python).

The map function

The map function can use any durable operation such as steps, waits, or nested map and parallel operations. Each item runs in its own child context, so items do not share state with each other or with the parent context.

import { DurableContext } from "@aws/durable-execution-sdk-js";

type Order = { id: string; amount: number };
type Receipt = { orderId: string; charged: number };

async function processOrder(
  ctx: DurableContext,
  order: Order,
  index: number,
  orders: Order[],
): Promise<Receipt> {
  const validated = await ctx.step("validate", async () => {
    if (order.amount <= 0) throw new Error("Invalid amount");
    return order;
  });
  const charged = await ctx.step("charge", async () => validated.amount);
  return { orderId: validated.id, charged };
}
from aws_durable_execution_sdk_python import DurableContext


def process_order(
    ctx: DurableContext,
    order: dict,
    index: int,
    orders: list[dict],
) -> dict:
    def validate(_):
        if order["amount"] <= 0:
            raise ValueError("Invalid amount")
        return order

    validated = ctx.step(validate, name="validate")
    charged = ctx.step(lambda _: validated["amount"], name="charge")
    return {"orderId": validated["id"], "charged": charged}
import software.amazon.lambda.durable.DurableContext;

record Order(String id, double amount) {}
record Receipt(String orderId, double charged) {}

// MapFunction<Order, Receipt> implementation
Receipt processOrder(Order order, int index, DurableContext ctx) {
    var validated = ctx.step("validate", Order.class, s -> {
        if (order.amount() <= 0) throw new IllegalArgumentException("Invalid amount");
        return order;
    });
    var charged = ctx.step("charge", Double.class, s -> validated.amount());
    return new Receipt(validated.id(), charged);
}

Naming map operations

Name your map operations to make them easier to identify in logs and tests.

import {
  BatchResult,
  DurableContext,
  withDurableExecution,
} from "@aws/durable-execution-sdk-js";

export const handler = withDurableExecution(
  async (event: { userIds: string[] }, context: DurableContext): Promise<string[]> => {
    // Named: pass name as first argument, undefined to omit
    const result: BatchResult<string> = await context.map(
      "process-users",
      event.userIds,
      async (ctx, userId, index) =>
        ctx.step(`process-${index}`, async () => `processed-${userId}`),
    );

    return result.getResults();
  },
);

The name is the first argument. Pass undefined to omit it.

Use itemNamer in MapConfig to give each item a custom name:

context.map("process-orders", orders, processOrder, {
  itemNamer: (order, index) => `order-${order.id}`,
});
from aws_durable_execution_sdk_python import (
    BatchResult,
    DurableContext,
    durable_execution,
)


def process_user(
    ctx: DurableContext, user_id: str, index: int, user_ids: list[str]
) -> str:
    return ctx.step(lambda _: f"processed-{user_id}", name=f"process-{index}")


@durable_execution
def handler(event: dict, context: DurableContext) -> list[str]:
    # Pass name as keyword argument; omit or pass None to leave unnamed
    result: BatchResult[str] = context.map(
        event["userIds"],
        process_user,
        name="process-users",
    )
    return result.to_dict()

Pass name as a keyword argument. Omit it or pass None to leave it unnamed.

import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.model.MapResult;

public class NamedMap extends DurableHandler<List<String>, List<String>> {
    @Override
    public List<String> handleRequest(List<String> userIds, DurableContext context) {
        // The name is always required in Java
        MapResult<String> result = context.map(
                "process-users",
                userIds,
                String.class,
                (userId, index, ctx) -> ctx.step(
                        "process-" + index, String.class, s -> "processed-" + userId));

        return result.succeeded();
    }
}

The name is always required in Java. The SDK derives each item's name from the operation name: {name}-iteration-{index}.

Configuration

Configure map behavior using MapConfig:

import {
  BatchResult,
  DurableContext,
  withDurableExecution,
} from "@aws/durable-execution-sdk-js";

export const handler = withDurableExecution(
  async (event: { urls: string[] }, context: DurableContext): Promise<string[]> => {
    const result: BatchResult<string> = await context.map(
      "fetch-urls",
      event.urls,
      async (ctx, url, index) =>
        ctx.step(`fetch-${index}`, async () => {
          const response = await fetch(url);
          return response.text();
        }),
      {
        maxConcurrency: 5,
        completionConfig: { toleratedFailureCount: 2 },
      },
    );

    return result.getResults();
  },
);
import urllib.request

from aws_durable_execution_sdk_python import (
    BatchResult,
    DurableContext,
    durable_execution,
)
from aws_durable_execution_sdk_python.config import CompletionConfig, MapConfig


def fetch_url(
    ctx: DurableContext, url: str, index: int, urls: list[str]
) -> str:
    def do_fetch(_):
        with urllib.request.urlopen(url) as response:
            return response.read().decode()

    return ctx.step(do_fetch, name=f"fetch-{index}")


@durable_execution
def handler(event: dict, context: DurableContext) -> list[str]:
    config = MapConfig(
        max_concurrency=5,
        completion_config=CompletionConfig(tolerated_failure_count=2),
    )
    result: BatchResult[str] = context.map(
        event["urls"],
        fetch_url,
        name="fetch-urls",
        config=config,
    )
    return result.to_dict()
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.config.CompletionConfig;
import software.amazon.lambda.durable.config.MapConfig;
import software.amazon.lambda.durable.model.MapResult;

public class MapConfigExample extends DurableHandler<List<String>, List<String>> {
    private static final HttpClient HTTP = HttpClient.newHttpClient();

    @Override
    public List<String> handleRequest(List<String> urls, DurableContext context) {
        var config = MapConfig.builder()
                .maxConcurrency(5)
                .completionConfig(CompletionConfig.toleratedFailureCount(2))
                .build();

        MapResult<String> result = context.map(
                "fetch-urls",
                urls,
                String.class,
                (url, index, ctx) -> ctx.step("fetch-" + index, String.class, s -> {
                    var request = HttpRequest.newBuilder(URI.create(url)).build();
                    return HTTP.send(request, HttpResponse.BodyHandlers.ofString()).body();
                }),
                config);

        return result.succeeded();
    }
}

Completion strategies

CompletionConfig controls when the map operation completes. When the operation reaches the completion criteria, it abandons items that have not completed yet. The abandoned items will keep running in the background but cannot checkpoint their results after the parent completes. The SDK makes a best-effort attempt to cancel ongoing work in abandoned items, but cancellation is not guaranteed.

The BatchResult's completionReason indicates the stop condition. Items that had not started yet do not appear in result.all. Items that had started but not completed appear with status STARTED.

completionConfig Early exit completionReason Full completion completionReason
{} or omitted FAILURE_TOLERANCE_EXCEEDED ALL_COMPLETED
toleratedFailureCount=N FAILURE_TOLERANCE_EXCEEDED ALL_COMPLETED
toleratedFailurePercentage=N FAILURE_TOLERANCE_EXCEEDED ALL_COMPLETED
minSuccessful=N MIN_SUCCESSFUL_REACHED ALL_COMPLETED

The BatchResult's completion_reason indicates the stop condition. Items that were never started appear in result.all with status STARTED.

completion_config Early exit completion_reason Full completion completion_reason
CompletionConfig() (default) FAILURE_TOLERANCE_EXCEEDED ALL_COMPLETED
first_successful() MIN_SUCCESSFUL_REACHED ALL_COMPLETED
tolerated_failure_count=N FAILURE_TOLERANCE_EXCEEDED ALL_COMPLETED
tolerated_failure_percentage=N FAILURE_TOLERANCE_EXCEEDED ALL_COMPLETED
min_successful=N MIN_SUCCESSFUL_REACHED ALL_COMPLETED

Warning

CompletionConfig.all_completed() is deprecated. Use the default CompletionConfig() instead.

The MapResult's completionReason indicates the stop condition. Items that did not start before the operation completed have status SKIPPED.

completionConfig Early exit completionReason Full completion completionReason
allCompleted() (default) n/a ALL_COMPLETED
allSuccessful() FAILURE_TOLERANCE_EXCEEDED ALL_COMPLETED
firstSuccessful() MIN_SUCCESSFUL_REACHED ALL_COMPLETED
minSuccessful(N) MIN_SUCCESSFUL_REACHED ALL_COMPLETED
toleratedFailureCount(N) FAILURE_TOLERANCE_EXCEEDED ALL_COMPLETED
toleratedFailurePercentage(p) FAILURE_TOLERANCE_EXCEEDED ALL_COMPLETED

Note

When using a minSuccessful strategy, failures do not trigger early exit. If all items fail before the success threshold is reached, the operation completes with ALL_COMPLETED.

import {
  BatchResult,
  DurableContext,
  withDurableExecution,
} from "@aws/durable-execution-sdk-js";

export const handler = withDurableExecution(
  async (event: { items: string[] }, context: DurableContext): Promise<string[]> => {
    const result: BatchResult<string> = await context.map(
      "process-items",
      event.items,
      async (ctx, item, index) =>
        ctx.step(`process-${index}`, async () => item.toUpperCase()),
      {
        completionConfig: { minSuccessful: 3 },
      },
    );

    return result.getResults();
  },
);
from aws_durable_execution_sdk_python import (
    BatchResult,
    DurableContext,
    durable_execution,
)
from aws_durable_execution_sdk_python.config import CompletionConfig, MapConfig


def process_item(
    ctx: DurableContext, item: str, index: int, items: list[str]
) -> str:
    return ctx.step(lambda _: item.upper(), name=f"process-{index}")


@durable_execution
def handler(event: dict, context: DurableContext) -> list[str]:
    config = MapConfig(
        completion_config=CompletionConfig(min_successful=3),
    )
    result: BatchResult[str] = context.map(
        event["items"],
        process_item,
        name="process-items",
        config=config,
    )
    return result.to_dict()
import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.config.CompletionConfig;
import software.amazon.lambda.durable.config.MapConfig;
import software.amazon.lambda.durable.model.MapResult;

public class MapCompletionConfig extends DurableHandler<List<String>, List<String>> {
    @Override
    public List<String> handleRequest(List<String> items, DurableContext context) {
        var config = MapConfig.builder()
                .completionConfig(CompletionConfig.minSuccessful(3))
                .build();

        MapResult<String> result = context.map(
                "process-items",
                items,
                String.class,
                (item, index, ctx) -> ctx.step(
                        "process-" + index, String.class, s -> item.toUpperCase()),
                config);

        return result.succeeded();
    }
}

Error handling

When an item throws an error, map captures the error in the result rather than propagating it immediately. Other items continue running.

BatchResult.status is FAILED if any item failed. Call throwIfError() to propagate the first item error as an exception, or inspect getErrors() to handle errors individually.

import {
  BatchResult,
  DurableContext,
  withDurableExecution,
} from "@aws/durable-execution-sdk-js";

export const handler = withDurableExecution(
  async (event: { items: string[] }, context: DurableContext): Promise<void> => {
    const result: BatchResult<string> = await context.map(
      "process-items",
      event.items,
      async (ctx, item, index) =>
        ctx.step(`process-${index}`, async () => {
          if (item === "bad") throw new Error("bad item");
          return item.toUpperCase();
        }),
    );

    if (result.hasFailure) {
      const errors = result.getErrors();
      console.log(`${result.failureCount} items failed:`, errors);
    }

    const successes = result.getResults();
    console.log(`${result.successCount} items succeeded:`, successes);
  },
);

BatchResult.status is FAILED if any item failed. Call throw_if_error() to propagate the first item error as an exception, or inspect get_errors() to handle errors individually.

from aws_durable_execution_sdk_python import (
    BatchResult,
    DurableContext,
    durable_execution,
)


def process_item(
    ctx: DurableContext, item: str, index: int, items: list[str]
) -> str:
    def do_process(_):
        if item == "bad":
            raise ValueError("bad item")
        return item.upper()

    return ctx.step(do_process, name=f"process-{index}")


@durable_execution
def handler(event: dict, context: DurableContext) -> None:
    result: BatchResult[str] = context.map(
        event["items"],
        process_item,
        name="process-items",
    )

    if result.has_failure:
        errors = result.get_errors()
        print(f"{result.failure_count} items failed:", errors)

    successes = result.get_results()
    print(f"{result.success_count} items succeeded:", successes)

Check result.failed() to detect item failures. Each MapError contains errorType, errorMessage, and stackTrace as plain strings. If the SDK cannot reconstruct the original exception, it throws MapIterationFailedException.

import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.model.MapResult;

public class MapErrorHandling extends DurableHandler<List<String>, Void> {
    @Override
    public Void handleRequest(List<String> items, DurableContext context) {
        MapResult<String> result = context.map(
                "process-items",
                items,
                String.class,
                (item, index, ctx) -> ctx.step("process-" + index, String.class, s -> {
                    if ("bad".equals(item)) throw new IllegalArgumentException("bad item");
                    return item.toUpperCase();
                }));

        var failures = result.failed();
        if (!failures.isEmpty()) {
            System.out.println(failures.size() + " items failed");
            failures.forEach(e -> System.out.println(e.errorType() + ": " + e.errorMessage()));
        }

        var successes = result.succeeded();
        System.out.println(successes.size() + " items succeeded: " + successes);
        return null;
    }
}

Checkpointing

Each item checkpoints its result on completion. Items that have not completed when the map operation reaches its completion criteria remain with status STARTED and will receive no further checkpoint updates.

The parent map operation also checkpoints the serialized BatchResult for observability. On replay, the SDK deserializes the BatchResult directly from that checkpoint.

For results over 256KB, the SDK cannot store the full BatchResult in the checkpoint. Instead, the SDK reconstructs the BatchResult from the checkpointed results of the individual items. In that case, the checkpoint stores a compact JSON summary, which is for observability only.

The default summary generator produces:

{
  "type": "MapResult",
  "totalCount": 5,
  "successCount": 4,
  "failureCount": 1,
  "completionReason": "ALL_COMPLETED",
  "status": "FAILED"
}

The parent map operation also checkpoints the serialized BatchResult for observability. On replay, the SDK deserializes the BatchResult directly from that checkpoint.

For results over 256KB, the SDK cannot store the full BatchResult in the checkpoint, so it re-executes the items to reconstruct it instead. In that case, the checkpoint stores the output of summary_generator, which is for observability only.

The default summary generator produces:

{
  "type": "MapResult",
  "totalCount": 5,
  "successCount": 4,
  "failureCount": 1,
  "completionReason": "ALL_COMPLETED",
  "status": "FAILED"
}

When you pass a custom MapConfig without setting summary_generator, the SDK checkpoints an empty string for large payloads.

SummaryGenerator is a callable protocol you can pass by setting summary_generator on MapConfig:

class SummaryGenerator(Protocol[T]):
    def __call__(self, result: T) -> str: ...

For results under 256KB, the SDK checkpoints the serialized MapResult payload. On replay, the SDK deserializes the MapResult directly from that checkpoint without re-executing items.

For results over 256KB, the SDK checkpoints with an empty payload and a replayChildren flag. On replay, the SDK re-executes the items to reconstruct the MapResult from their individual checkpoints.

Nesting map operations

A map function can call context.map() or context.parallel() to create nested operations. Each nested map creates its own set of child contexts.

import {
  BatchResult,
  DurableContext,
  withDurableExecution,
} from "@aws/durable-execution-sdk-js";

type Region = { name: string; items: string[] };

export const handler = withDurableExecution(
  async (event: { regions: Region[] }, context: DurableContext): Promise<string[][]> => {
    const result: BatchResult<string[]> = await context.map(
      "process-regions",
      event.regions,
      async (ctx, region, index) => {
        const inner: BatchResult<string> = await ctx.map(
          `process-${region.name}`,
          region.items,
          async (innerCtx, item, i) =>
            innerCtx.step(`item-${i}`, async () => item.toUpperCase()),
        );
        return inner.getResults();
      },
    );

    return result.getResults();
  },
);
from aws_durable_execution_sdk_python import (
    BatchResult,
    DurableContext,
    durable_execution,
)


def process_item(
    ctx: DurableContext, item: str, index: int, items: list[str]
) -> str:
    return ctx.step(lambda _: item.upper(), name=f"item-{index}")


def process_region(
    ctx: DurableContext, region: dict, index: int, regions: list[dict]
) -> list[str]:
    inner: BatchResult[str] = ctx.map(
        region["items"],
        process_item,
        name=f"process-{region['name']}",
    )
    return inner.get_results()


@durable_execution
def handler(event: dict, context: DurableContext) -> list[list[str]]:
    result: BatchResult[list[str]] = context.map(
        event["regions"],
        process_region,
        name="process-regions",
    )
    return result.to_dict()
import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.TypeToken;
import software.amazon.lambda.durable.model.MapResult;

public class NestedMap extends DurableHandler<List<Region>, List<List<String>>> {
    record Region(String name, List<String> items) {}

    @Override
    public List<List<String>> handleRequest(List<Region> regions, DurableContext context) {
        MapResult<List<String>> result = context.map(
                "process-regions",
                regions,
                new TypeToken<List<String>>() {},
                (region, index, ctx) -> {
                    MapResult<String> inner = ctx.map(
                            "process-" + region.name(),
                            region.items(),
                            String.class,
                            (item, i, innerCtx) -> innerCtx.step(
                                    "item-" + i, String.class, s -> item.toUpperCase()));
                    return inner.succeeded();
                });

        return result.succeeded();
    }
}

See also