ConcurrentWaitForConditionExample.java

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples.wait;

import java.util.stream.IntStream;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.config.MapConfig;
import software.amazon.lambda.durable.config.WaitForConditionConfig;
import software.amazon.lambda.durable.model.WaitForConditionResult;

/**
 * Example demonstrating concurrent waitForCondition operations using map.
 *
 * <p>Runs many (totalOperations) waitForCondition operations concurrently (maxConcurrency). Each operation:
 *
 * <ol>
 *   <li>Uses attempt count as state (replay-safe).
 *   <li>Fails and retries until the attempt count reaches the given threshold, and then succeeds
 * </ol>
 */
public class ConcurrentWaitForConditionExample extends DurableHandler<ConcurrentWaitForConditionExample.Input, String> {

    public record Input(int threshold, int totalOperations, int maxConcurrency) {}

    @Override
    public String handleRequest(Input input, DurableContext context) {
        var items = IntStream.range(0, input.totalOperations()).boxed().toList();

        var config = MapConfig.builder().maxConcurrency(input.maxConcurrency()).build();

        var result = context.map(
                "concurrent-wait-for-conditions",
                items,
                String.class,
                (item, index, ctx) -> {
                    var conditionConfig = WaitForConditionConfig.<Integer>builder()
                            .initialState(1)
                            .build();
                    // Poll until the counter reaches the input threshold
                    var count = ctx.waitForCondition(
                            "condition-" + index,
                            Integer.class,
                            (callCount, stepCtx) -> {
                                if (callCount >= input.threshold()) {
                                    return WaitForConditionResult.stopPolling(callCount);
                                }
                                return WaitForConditionResult.continuePolling(callCount + 1);
                            },
                            conditionConfig);
                    return String.valueOf(count);
                },
                config);

        return String.join(" | ", result.results());
    }
}