ParallelFailureToleranceExample.java
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples.parallel;
import java.util.ArrayList;
import java.util.List;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableFuture;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.config.CompletionConfig;
import software.amazon.lambda.durable.config.ParallelConfig;
import software.amazon.lambda.durable.config.StepConfig;
import software.amazon.lambda.durable.model.ParallelResult;
import software.amazon.lambda.durable.retry.RetryStrategies;
/**
* Example demonstrating parallel execution with failure tolerance.
*
* <p>When {@code toleratedFailureCount} is set, the parallel operation completes successfully even if some branches
* fail — as long as the number of failures does not exceed the threshold. Failed branches produce {@code null} results
* that callers can filter out.
*
* <p>Use this pattern when partial success is acceptable, for example: sending notifications to multiple channels where
* some channels may be unavailable.
*/
public class ParallelFailureToleranceExample
extends DurableHandler<ParallelFailureToleranceExample.Input, ParallelFailureToleranceExample.Output> {
public record Input(List<String> services, Integer toleratedFailures, Integer minSuccessful) {}
public record Output(int succeeded, int failed) {}
@Override
public Output handleRequest(Input input, DurableContext context) {
var logger = context.getLogger();
logger.info("Starting parallel execution with toleratedFailureCount={}", input.toleratedFailures());
var config = ParallelConfig.builder()
.completionConfig(new CompletionConfig(input.minSuccessful, input.toleratedFailures, null))
.build();
var futures = new ArrayList<DurableFuture<String>>(input.services().size());
var parallel = context.parallel("call-services", config);
try (parallel) {
for (var service : input.services()) {
var future = parallel.branch("call-" + service, String.class, branchCtx -> {
return branchCtx.step(
"invoke-" + service,
String.class,
stepCtx -> {
if (service.startsWith("bad-")) {
throw new RuntimeException("Service unavailable: " + service);
}
return "ok:" + service;
},
StepConfig.builder()
.retryStrategy(RetryStrategies.Presets.NO_RETRY)
.build());
});
futures.add(future);
}
}
ParallelResult parallelResult = parallel.get();
logger.info(
"Parallel complete: succeeded={}, failed={}, status={}",
parallelResult.succeeded(),
parallelResult.failed(),
parallelResult.completionStatus().isSucceeded() ? "succeeded" : "failed");
var succeeded = parallelResult.succeeded();
var failed = parallelResult.failed();
logger.info("Completed: {} succeeded, {} failed", succeeded, failed);
return new Output(succeeded, failed);
}
}