EventProcessor.java

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.testing.local;

import static software.amazon.awssdk.services.lambda.model.EventType.*;

import java.time.Instant;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import software.amazon.awssdk.services.lambda.model.*;

/** Generates Event objects from OperationUpdate for local testing. */
class EventProcessor {
    private final AtomicInteger eventId = new AtomicInteger(1);
    private final List<Event> allEvents = new CopyOnWriteArrayList<>();

    void processUpdate(OperationUpdate update, Operation operation) {
        var builder = Event.builder()
                .eventId(eventId.getAndIncrement())
                .eventTimestamp(Instant.now())
                .id(update.id())
                .name(update.name())
                .parentId(operation.parentId());

        Event event =
                switch (update.type()) {
                    case STEP -> buildStepEvent(builder, update, operation);
                    case WAIT -> buildWaitEvent(builder, update, operation);
                    case CHAINED_INVOKE -> buildInvokeEvent(builder, update, operation);
                    case EXECUTION -> buildExecutionEvent(builder, update);
                    case CALLBACK -> buildCallbackEvent(builder, update);
                    case CONTEXT -> buildContextEvent(builder, update);
                    default -> throw new IllegalArgumentException("Unsupported operation type: " + update.type());
                };

        allEvents.add(event);
    }

    // process new status of an operation without an OperationUpdate
    void processUpdate(Operation updatedOperation) {
        var builder = Event.builder()
                .eventId(eventId.getAndIncrement())
                .eventTimestamp(Instant.now())
                .id(updatedOperation.id())
                .name(updatedOperation.name());
        // support the statuses that don't have a corresponding OperationAction
        switch (updatedOperation.status()) {
            case STARTED -> {
                // used by resetCheckpointToStarted
                return;
            }
            case READY -> {
                if (updatedOperation.type() == OperationType.STEP) {
                    // no event type for this case
                    return;
                } else {
                    throw new IllegalArgumentException("Unsupported operation type: " + updatedOperation.type());
                }
            }
            case TIMED_OUT -> {
                switch (updatedOperation.type()) {
                    case EXECUTION -> builder.eventType(EXECUTION_TIMED_OUT);
                    case CHAINED_INVOKE -> builder.eventType(CHAINED_INVOKE_TIMED_OUT);
                    case CALLBACK -> builder.eventType(CALLBACK_TIMED_OUT);
                    default ->
                        throw new IllegalArgumentException("Unsupported operation type: " + updatedOperation.type());
                }
            }
            case STOPPED -> {
                switch (updatedOperation.type()) {
                    case EXECUTION -> builder.eventType(EXECUTION_STOPPED);
                    case CHAINED_INVOKE -> builder.eventType(CHAINED_INVOKE_STOPPED);
                    default ->
                        throw new IllegalArgumentException("Unsupported operation type: " + updatedOperation.type());
                }
            }
            default -> throw new IllegalArgumentException("Unsupported operation status: " + updatedOperation.status());
        }
        allEvents.add(builder.build());
    }

    List<Event> getAllEvents() {
        return List.copyOf(allEvents);
    }

    public List<Event> getEventsForOperation(String operationId) {
        return allEvents.stream().filter(e -> e.id().equals(operationId)).toList();
    }

    private Event buildStepEvent(Event.Builder builder, OperationUpdate update, Operation operation) {
        return switch (update.action()) {
            case START ->
                builder.eventType(STEP_STARTED)
                        .stepStartedDetails(StepStartedDetails.builder().build())
                        .build();
            case SUCCEED ->
                builder.eventType(STEP_SUCCEEDED)
                        .stepSucceededDetails(StepSucceededDetails.builder()
                                .result(EventResult.builder()
                                        .payload(update.payload())
                                        .build())
                                .retryDetails(buildRetryDetails(operation))
                                .build())
                        .build();
            case FAIL ->
                builder.eventType(STEP_FAILED)
                        .stepFailedDetails(StepFailedDetails.builder()
                                .error(EventError.builder()
                                        .payload(update.error())
                                        .build())
                                .retryDetails(buildRetryDetails(operation))
                                .build())
                        .build();
            case RETRY ->
                builder.eventType(STEP_STARTED)
                        .stepStartedDetails(StepStartedDetails.builder().build())
                        .build();
            default -> throw new IllegalArgumentException("Unsupported step action: " + update.action());
        };
    }

    @SuppressWarnings("unused") // operation param kept for API consistency
    private Event buildWaitEvent(Event.Builder builder, OperationUpdate update, Operation operation) {
        return switch (update.action()) {
            case START -> {
                var waitSeconds =
                        update.waitOptions() != null ? update.waitOptions().waitSeconds() : 0;
                yield builder.eventType(WAIT_STARTED)
                        .waitStartedDetails(WaitStartedDetails.builder()
                                .duration(waitSeconds)
                                .scheduledEndTimestamp(Instant.now().plusSeconds(waitSeconds))
                                .build())
                        .build();
            }
            case SUCCEED ->
                builder.eventType(WAIT_SUCCEEDED)
                        .waitSucceededDetails(WaitSucceededDetails.builder().build())
                        .build();
            case CANCEL ->
                builder.eventType(WAIT_CANCELLED)
                        .waitCancelledDetails(WaitCancelledDetails.builder().build())
                        .build();
            default -> throw new IllegalArgumentException("Unsupported wait action: " + update.action());
        };
    }

    @SuppressWarnings("unused") // operation param kept for API consistency
    private Event buildInvokeEvent(Event.Builder builder, OperationUpdate update, Operation operation) {
        return switch (update.action()) {
            case START ->
                builder.eventType(EventType.CHAINED_INVOKE_STARTED)
                        .chainedInvokeStartedDetails(ChainedInvokeStartedDetails.builder()
                                .functionName(update.chainedInvokeOptions().functionName())
                                .input(EventInput.builder()
                                        .payload(update.payload())
                                        .build())
                                .build())
                        .build();
            case SUCCEED ->
                builder.eventType(EventType.CHAINED_INVOKE_SUCCEEDED)
                        .chainedInvokeSucceededDetails(ChainedInvokeSucceededDetails.builder()
                                .result(EventResult.builder()
                                        .payload(
                                                operation.chainedInvokeDetails().result())
                                        .build())
                                .build())
                        .build();
            case FAIL ->
                builder.eventType(EventType.CHAINED_INVOKE_FAILED)
                        .chainedInvokeFailedDetails(ChainedInvokeFailedDetails.builder()
                                .error(EventError.builder()
                                        .payload(
                                                operation.chainedInvokeDetails().error())
                                        .build())
                                .build())
                        .build();

            default -> throw new IllegalArgumentException("Unsupported invoke action: " + update.action());
        };
    }

    private Event buildExecutionEvent(Event.Builder builder, OperationUpdate update) {
        return switch (update.action()) {
            case START ->
                builder.eventType(EXECUTION_STARTED)
                        .executionStartedDetails(
                                ExecutionStartedDetails.builder().build())
                        .build();
            case SUCCEED ->
                builder.eventType(EXECUTION_SUCCEEDED)
                        .executionSucceededDetails(ExecutionSucceededDetails.builder()
                                .result(EventResult.builder()
                                        .payload(update.payload())
                                        .build())
                                .build())
                        .build();
            case FAIL ->
                builder.eventType(EXECUTION_FAILED)
                        .executionFailedDetails(ExecutionFailedDetails.builder()
                                .error(EventError.builder()
                                        .payload(update.error())
                                        .build())
                                .build())
                        .build();
            default -> throw new IllegalArgumentException("Unsupported execution action: " + update.action());
        };
    }

    private Event buildCallbackEvent(Event.Builder builder, OperationUpdate update) {
        return switch (update.action()) {
            case START -> builder.eventType(CALLBACK_STARTED).build();
            case SUCCEED -> builder.eventType(CALLBACK_SUCCEEDED).build();
            case FAIL -> builder.eventType(CALLBACK_FAILED).build();
            default -> throw new IllegalArgumentException("Unsupported callback action: " + update.action());
        };
    }

    private Event buildContextEvent(Event.Builder builder, OperationUpdate update) {
        return switch (update.action()) {
            case START -> builder.eventType(EventType.CONTEXT_STARTED).build();
            case SUCCEED -> builder.eventType(EventType.CONTEXT_SUCCEEDED).build();
            case FAIL -> builder.eventType(EventType.CONTEXT_FAILED).build();
            default -> throw new IllegalArgumentException("Unsupported context action: " + update.action());
        };
    }

    private RetryDetails buildRetryDetails(Operation operation) {
        if (operation == null || operation.stepDetails() == null) {
            return RetryDetails.builder().currentAttempt(1).build();
        }
        var attempt = operation.stepDetails().attempt();
        return RetryDetails.builder()
                .currentAttempt(attempt != null ? attempt : 1)
                .build();
    }
}