DeserializationFailedParallelExample.java

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples.parallel;

import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.ParallelDurableFuture;
import software.amazon.lambda.durable.TypeToken;
import software.amazon.lambda.durable.config.ParallelBranchConfig;
import software.amazon.lambda.durable.config.ParallelConfig;
import software.amazon.lambda.durable.exception.SerDesException;
import software.amazon.lambda.durable.execution.SuspendExecutionException;
import software.amazon.lambda.durable.serde.JacksonSerDes;

/**
 * Example demonstrating parallel branch execution with the Durable Execution SDK.
 *
 * <p>This handler processes a list of items concurrently using {@code context.parallel()}:
 *
 * <ol>
 *   <li>Each item is processed in its own branch (child context)
 *   <li>All branches run concurrently and their results are collected
 *   <li>A final step combines the results into a summary
 * </ol>
 *
 * <p>The {@link ParallelDurableFuture} implements {@link AutoCloseable}, so try-with-resources guarantees
 * {@code join()} is called even if an exception occurs.
 */
public class DeserializationFailedParallelExample
        extends DurableHandler<DeserializationFailedParallelExample.Input, String> {

    public record Input(List<String> items) {}

    @Override
    public String handleRequest(Input input, DurableContext context) {
        var logger = context.getLogger();
        var items = input.items();
        logger.info("Starting parallel processing of {} items", items.size());

        var config = ParallelConfig.builder().build();

        var parallel = context.parallel("process-items", config);

        try (parallel) {
            var future = parallel.branch(
                    "process",
                    String.class,
                    branchCtx -> {
                        return branchCtx.step("transform", String.class, stepCtx -> {
                            throw new RuntimeException("Intentional failure for transform");
                        });
                    },
                    ParallelBranchConfig.builder().serDes(new FailedSerDes()).build());

            parallel.get();
            try {
                return future.get();
            } catch (SuspendExecutionException e) {
                throw e;
            } catch (Exception e) {
                return e.getMessage();
            }
        }
    }

    private static class FailedSerDes extends JacksonSerDes {

        @Override
        public <T> T deserialize(String json, TypeToken<T> typeToken) {
            T result = super.deserialize(json, typeToken);
            if (result instanceof RuntimeException ex) {
                throw new SerDesException("Deserialization failed", ex);
            }
            return result;
        }
    }
}