EventProcessor.java

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.testing;

import static software.amazon.awssdk.services.lambda.model.EventType.*;

import java.time.Instant;
import java.util.concurrent.atomic.AtomicInteger;
import software.amazon.awssdk.services.lambda.model.*;

/** Generates Event objects from OperationUpdate for local testing. */
class EventProcessor {
    private final AtomicInteger eventId = new AtomicInteger(1);

    Event processUpdate(OperationUpdate update, Operation operation) {
        var builder = Event.builder()
                .eventId(eventId.getAndIncrement())
                .eventTimestamp(Instant.now())
                .id(update.id())
                .name(update.name());

        return switch (update.type()) {
            case STEP -> buildStepEvent(builder, update, operation);
            case WAIT -> buildWaitEvent(builder, update, operation);
            case CHAINED_INVOKE -> buildInvokeEvent(builder, update, operation);
            case EXECUTION -> buildExecutionEvent(builder, update);
            case CALLBACK -> buildCallbackEvent(builder, update);
            case CONTEXT -> buildContextEvent(builder, update);
            default -> throw new IllegalArgumentException("Unsupported operation type: " + update.type());
        };
    }

    private Event buildStepEvent(Event.Builder builder, OperationUpdate update, Operation operation) {
        return switch (update.action()) {
            case START ->
                builder.eventType(STEP_STARTED)
                        .stepStartedDetails(StepStartedDetails.builder().build())
                        .build();
            case SUCCEED ->
                builder.eventType(STEP_SUCCEEDED)
                        .stepSucceededDetails(StepSucceededDetails.builder()
                                .result(EventResult.builder()
                                        .payload(update.payload())
                                        .build())
                                .retryDetails(buildRetryDetails(operation))
                                .build())
                        .build();
            case FAIL ->
                builder.eventType(STEP_FAILED)
                        .stepFailedDetails(StepFailedDetails.builder()
                                .error(EventError.builder()
                                        .payload(update.error())
                                        .build())
                                .retryDetails(buildRetryDetails(operation))
                                .build())
                        .build();
            case RETRY ->
                builder.eventType(STEP_STARTED)
                        .stepStartedDetails(StepStartedDetails.builder().build())
                        .build();
            default -> throw new IllegalArgumentException("Unsupported step action: " + update.action());
        };
    }

    @SuppressWarnings("unused") // operation param kept for API consistency
    private Event buildWaitEvent(Event.Builder builder, OperationUpdate update, Operation operation) {
        return switch (update.action()) {
            case START -> {
                var waitSeconds =
                        update.waitOptions() != null ? update.waitOptions().waitSeconds() : 0;
                yield builder.eventType(WAIT_STARTED)
                        .waitStartedDetails(WaitStartedDetails.builder()
                                .duration(waitSeconds)
                                .scheduledEndTimestamp(Instant.now().plusSeconds(waitSeconds))
                                .build())
                        .build();
            }
            case SUCCEED ->
                builder.eventType(WAIT_SUCCEEDED)
                        .waitSucceededDetails(WaitSucceededDetails.builder().build())
                        .build();
            case CANCEL ->
                builder.eventType(WAIT_CANCELLED)
                        .waitCancelledDetails(WaitCancelledDetails.builder().build())
                        .build();
            default -> throw new IllegalArgumentException("Unsupported wait action: " + update.action());
        };
    }

    @SuppressWarnings("unused") // operation param kept for API consistency
    private Event buildInvokeEvent(Event.Builder builder, OperationUpdate update, Operation operation) {
        return switch (update.action()) {
            case START ->
                builder.eventType(EventType.CHAINED_INVOKE_STARTED)
                        .chainedInvokeStartedDetails(ChainedInvokeStartedDetails.builder()
                                .functionName(update.chainedInvokeOptions().functionName())
                                .input(EventInput.builder()
                                        .payload(update.payload())
                                        .build())
                                .build())
                        .build();
            case SUCCEED ->
                builder.eventType(EventType.CHAINED_INVOKE_SUCCEEDED)
                        .chainedInvokeSucceededDetails(ChainedInvokeSucceededDetails.builder()
                                .result(EventResult.builder()
                                        .payload(
                                                operation.chainedInvokeDetails().result())
                                        .build())
                                .build())
                        .build();
            case FAIL ->
                builder.eventType(EventType.CHAINED_INVOKE_FAILED)
                        .chainedInvokeFailedDetails(ChainedInvokeFailedDetails.builder()
                                .error(EventError.builder()
                                        .payload(
                                                operation.chainedInvokeDetails().error())
                                        .build())
                                .build())
                        .build();

            default -> throw new IllegalArgumentException("Unsupported invoke action: " + update.action());
        };
    }

    private Event buildExecutionEvent(Event.Builder builder, OperationUpdate update) {
        return switch (update.action()) {
            case START ->
                builder.eventType(EXECUTION_STARTED)
                        .executionStartedDetails(
                                ExecutionStartedDetails.builder().build())
                        .build();
            case SUCCEED ->
                builder.eventType(EXECUTION_SUCCEEDED)
                        .executionSucceededDetails(ExecutionSucceededDetails.builder()
                                .result(EventResult.builder()
                                        .payload(update.payload())
                                        .build())
                                .build())
                        .build();
            case FAIL ->
                builder.eventType(EXECUTION_FAILED)
                        .executionFailedDetails(ExecutionFailedDetails.builder()
                                .error(EventError.builder()
                                        .payload(update.error())
                                        .build())
                                .build())
                        .build();
            default -> throw new IllegalArgumentException("Unsupported execution action: " + update.action());
        };
    }

    private Event buildCallbackEvent(Event.Builder builder, OperationUpdate update) {
        return switch (update.action()) {
            case START -> builder.eventType(CALLBACK_STARTED).build();
            case SUCCEED -> builder.eventType(CALLBACK_SUCCEEDED).build();
            case FAIL -> builder.eventType(CALLBACK_FAILED).build();
            default -> throw new IllegalArgumentException("Unsupported callback action: " + update.action());
        };
    }

    private Event buildContextEvent(Event.Builder builder, OperationUpdate update) {
        return switch (update.action()) {
            case START -> builder.eventType(EventType.CONTEXT_STARTED).build();
            case SUCCEED -> builder.eventType(EventType.CONTEXT_SUCCEEDED).build();
            case FAIL -> builder.eventType(EventType.CONTEXT_FAILED).build();
            default -> throw new IllegalArgumentException("Unsupported context action: " + update.action());
        };
    }

    private RetryDetails buildRetryDetails(Operation operation) {
        if (operation == null || operation.stepDetails() == null) {
            return RetryDetails.builder().currentAttempt(1).build();
        }
        var attempt = operation.stepDetails().attempt();
        return RetryDetails.builder()
                .currentAttempt(attempt != null ? attempt : 1)
                .build();
    }
}