ManyAsyncStepsExample.java

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples.step;

import java.time.Duration;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import software.amazon.lambda.durable.DurableConfig;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableFuture;
import software.amazon.lambda.durable.DurableHandler;

/**
 * Performance test example demonstrating concurrent async steps.
 *
 * <p>This example tests the SDK's ability to handle many concurrent operations:
 *
 * <ul>
 *   <li>Creates async steps in a loop
 *   <li>Each step performs a simple computation
 *   <li>All results are collected using {@link DurableFuture#allOf}
 * </ul>
 */
public class ManyAsyncStepsExample extends DurableHandler<ManyAsyncStepsExample.Input, ManyAsyncStepsExample.Output> {

    public record Input(int multiplier, int steps) {}

    public record Output(long result, long executionTimeMs, long replayTimeMs) {}

    @Override
    public Output handleRequest(Input input, DurableContext context) {
        var startTime = System.nanoTime();
        var multiplier = input.multiplier();
        var steps = input.steps();
        var logger = context.getLogger();

        logger.info("Starting {} async steps with multiplier {}", steps, multiplier);

        // Create async steps
        var futures = new ArrayList<DurableFuture<Integer>>(steps);
        for (var i = 0; i < steps; i++) {
            var index = i;
            var future = context.stepAsync("compute-" + i, Integer.class, stepCtx -> index * multiplier);
            futures.add(future);
        }

        logger.info("All {} async steps created, collecting results", steps);

        // Collect all results using allOf
        var results = DurableFuture.allOf(futures);
        var totalSum = results.stream().mapToInt(Integer::intValue).sum();

        // checkpoint the executionTime so that we can have the same value when replay
        var executionTimeMs = context.step(
                "execution-time", Long.class, stepCtx -> TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
        logger.info("Completed {} steps, total sum: {}, execution time: {}ms", steps, totalSum, executionTimeMs);

        // Wait 2 seconds to test replay
        context.wait("post-compute-wait", Duration.ofSeconds(2));

        var replayTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);

        return new Output(totalSum, executionTimeMs, replayTimeMs);
    }

    @Override
    protected DurableConfig createConfiguration() {
        // Add a small checkpoint delay to help batch the checkpoint requests and reduce the overall latencies
        // when the function has many concurrent operations
        return DurableConfig.builder()
                .withCheckpointDelay(Duration.ofMillis(10))
                .build();
    }
}