HistoryEventProcessor.java
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.testing;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import software.amazon.awssdk.services.lambda.model.CallbackDetails;
import software.amazon.awssdk.services.lambda.model.ChainedInvokeDetails;
import software.amazon.awssdk.services.lambda.model.ContextDetails;
import software.amazon.awssdk.services.lambda.model.ErrorObject;
import software.amazon.awssdk.services.lambda.model.Event;
import software.amazon.awssdk.services.lambda.model.Operation;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.awssdk.services.lambda.model.StepDetails;
import software.amazon.awssdk.services.lambda.model.WaitDetails;
import software.amazon.lambda.durable.TypeToken;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.serde.JacksonSerDes;
/**
* Processes execution history events from the GetDurableExecutionHistory API into {@link TestResult} objects. Used by
* {@link CloudDurableTestRunner} and {@link AsyncExecution} to convert cloud execution history into testable results.
*/
public class HistoryEventProcessor {
private final JacksonSerDes serDes = new JacksonSerDes();
/**
* Processes a list of execution history events into a structured {@link TestResult}.
*
* @param events the raw history events from the GetDurableExecutionHistory API
* @param outputType the expected output type for deserialization
* @param <O> the handler output type
* @return a TestResult containing the execution status, output, and operation details
*/
public <O> TestResult<O> processEvents(List<Event> events, TypeToken<O> outputType) {
var operations = new HashMap<String, Operation>();
var operationEvents = new HashMap<String, List<Event>>();
var status = ExecutionStatus.PENDING;
String result = null;
ErrorObject error = null;
for (var event : events) {
var eventType = event.eventType();
var operationId = event.id();
// Group events by operation
if (operationId != null) {
operationEvents
.computeIfAbsent(operationId, k -> new ArrayList<>())
.add(event);
}
switch (eventType) {
case EXECUTION_STARTED, INVOCATION_COMPLETED -> {
// Execution started - no action needed, just track the event
}
case EXECUTION_SUCCEEDED -> {
status = ExecutionStatus.SUCCEEDED;
var details = event.executionSucceededDetails();
if (details != null
&& details.result() != null
&& details.result().payload() != null) {
result = details.result().payload();
}
}
case EXECUTION_FAILED -> {
status = ExecutionStatus.FAILED;
var details = event.executionFailedDetails();
if (details != null
&& details.error() != null
&& details.error().payload() != null) {
error = details.error().payload();
}
}
case EXECUTION_TIMED_OUT -> {
status = ExecutionStatus.FAILED;
var details = event.executionTimedOutDetails();
if (details != null
&& details.error() != null
&& details.error().payload() != null) {
error = details.error().payload();
}
}
case EXECUTION_STOPPED -> {
status = ExecutionStatus.FAILED;
var details = event.executionStoppedDetails();
if (details != null
&& details.error() != null
&& details.error().payload() != null) {
error = details.error().payload();
}
}
case STEP_STARTED -> {
if (operationId != null) {
operations.putIfAbsent(
operationId,
createStepOperation(operationId, event.name(), null, OperationStatus.STARTED, 1));
}
}
case STEP_SUCCEEDED -> {
if (operationId != null) {
var details = event.stepSucceededDetails();
var stepResult = details != null && details.result() != null
? details.result().payload()
: null;
var attempt = details != null && details.retryDetails() != null
? details.retryDetails().currentAttempt()
: 1;
operations.put(
operationId,
createStepOperation(
operationId, event.name(), stepResult, OperationStatus.SUCCEEDED, attempt));
}
}
case STEP_FAILED -> {
if (operationId != null) {
var details = event.stepFailedDetails();
var attempt = details != null && details.retryDetails() != null
? details.retryDetails().currentAttempt()
: 1;
operations.put(
operationId,
createStepOperation(operationId, event.name(), null, OperationStatus.FAILED, attempt));
}
}
case WAIT_STARTED -> {
if (operationId != null) {
operations.putIfAbsent(
operationId,
createWaitOperation(operationId, event.name(), OperationStatus.STARTED, event));
}
}
case WAIT_SUCCEEDED -> {
if (operationId != null) {
operations.put(
operationId,
createWaitOperation(operationId, event.name(), OperationStatus.SUCCEEDED, event));
}
}
case WAIT_CANCELLED -> {
if (operationId != null) {
operations.put(
operationId,
createWaitOperation(operationId, event.name(), OperationStatus.CANCELLED, event));
}
}
case CALLBACK_STARTED -> {
if (operationId != null) {
operations.putIfAbsent(
operationId,
createCallbackOperation(operationId, event.name(), OperationStatus.STARTED, event));
}
}
case CALLBACK_SUCCEEDED -> {
if (operationId != null) {
operations.put(
operationId,
createCallbackOperation(operationId, event.name(), OperationStatus.SUCCEEDED, event));
}
}
case CALLBACK_FAILED -> {
if (operationId != null) {
operations.put(
operationId,
createCallbackOperation(operationId, event.name(), OperationStatus.FAILED, event));
}
}
case CALLBACK_TIMED_OUT -> {
if (operationId != null) {
operations.put(
operationId,
createCallbackOperation(operationId, event.name(), OperationStatus.TIMED_OUT, event));
}
}
case UNKNOWN_TO_SDK_VERSION -> {
// Unknown event type - log and ignore gracefully
}
case CONTEXT_STARTED -> {
if (operationId != null) {
operations.putIfAbsent(
operationId,
createContextOperation(operationId, event.name(), OperationStatus.STARTED, event));
}
}
case CONTEXT_SUCCEEDED -> {
if (operationId != null) {
operations.put(
operationId,
createContextOperation(operationId, event.name(), OperationStatus.SUCCEEDED, event));
}
}
case CONTEXT_FAILED -> {
if (operationId != null) {
operations.put(
operationId,
createContextOperation(operationId, event.name(), OperationStatus.FAILED, event));
}
}
case CHAINED_INVOKE_STARTED,
CHAINED_INVOKE_SUCCEEDED,
CHAINED_INVOKE_FAILED,
CHAINED_INVOKE_TIMED_OUT,
CHAINED_INVOKE_STOPPED -> {
if (operationId != null) {
operations.putIfAbsent(operationId, createInvokeOperation(operationId, event));
}
}
default -> throw new UnsupportedOperationException("Unknown operation: " + eventType);
}
}
// Build TestOperations with events
var testOperations = new ArrayList<TestOperation>();
for (var entry : operations.entrySet()) {
var opEvents = operationEvents.getOrDefault(entry.getKey(), List.of());
testOperations.add(new TestOperation(entry.getValue(), opEvents, serDes));
}
return new TestResult<>(status, result, error, testOperations, events, serDes);
}
private Operation createStepOperation(
String id, String name, String stepResult, OperationStatus status, Integer attempt) {
var stepDetails = StepDetails.builder()
.result(stepResult)
.attempt(attempt != null ? attempt : 1)
.build();
return Operation.builder()
.id(id)
.name(name)
.status(status)
.type(OperationType.STEP)
.stepDetails(stepDetails)
.build();
}
private Operation createWaitOperation(String id, String name, OperationStatus status, Event event) {
var builder = WaitDetails.builder();
if (event.waitStartedDetails() != null) {
builder.scheduledEndTimestamp(event.waitStartedDetails().scheduledEndTimestamp());
}
return Operation.builder()
.id(id)
.name(name)
.status(status)
.type(OperationType.WAIT)
.waitDetails(builder.build())
.build();
}
private Operation createCallbackOperation(String id, String name, OperationStatus status, Event event) {
var builder = CallbackDetails.builder();
// Extract callback ID and details from event
if (event.callbackStartedDetails() != null) {
var details = event.callbackStartedDetails();
if (details.callbackId() != null) {
builder.callbackId(details.callbackId());
}
} else if (event.callbackSucceededDetails() != null) {
var details = event.callbackSucceededDetails();
// CallbackSucceededDetails doesn't have callbackId, need to get it from started event
if (details.result() != null && details.result().payload() != null) {
builder.result(details.result().payload());
}
} else if (event.callbackFailedDetails() != null) {
var details = event.callbackFailedDetails();
// CallbackFailedDetails doesn't have callbackId, need to get it from started event
if (details.error() != null && details.error().payload() != null) {
builder.error(ErrorObject.builder()
.errorType(details.error().payload().errorType())
.errorMessage(details.error().payload().errorMessage())
.build());
}
}
return Operation.builder()
.id(id)
.name(name)
.status(status)
.type(OperationType.CALLBACK)
.callbackDetails(builder.build())
.build();
}
private Operation createInvokeOperation(String id, Event event) {
var builder = ChainedInvokeDetails.builder();
OperationStatus status =
switch (event.eventType()) {
case CHAINED_INVOKE_STARTED -> OperationStatus.STARTED;
case CHAINED_INVOKE_SUCCEEDED -> {
var details = event.callbackSucceededDetails();
if (details != null
&& details.result() != null
&& details.result().payload() != null) {
builder.result(details.result().payload());
}
yield OperationStatus.SUCCEEDED;
}
case CHAINED_INVOKE_FAILED -> {
var details = event.callbackFailedDetails();
if (details != null
&& details.error() != null
&& details.error().payload() != null) {
builder.error(details.error().payload());
}
yield OperationStatus.FAILED;
}
case CHAINED_INVOKE_STOPPED -> {
var details = event.chainedInvokeStoppedDetails();
if (details != null
&& details.error() != null
&& details.error().payload() != null) {
builder.error(details.error().payload());
}
yield OperationStatus.STOPPED;
}
case CHAINED_INVOKE_TIMED_OUT -> {
var details = event.chainedInvokeTimedOutDetails();
if (details != null
&& details.error() != null
&& details.error().payload() != null) {
builder.error(details.error().payload());
}
yield OperationStatus.TIMED_OUT;
}
default ->
throw new UnsupportedOperationException(
"Unknown chained invocation operation: " + event.eventType());
};
return Operation.builder()
.id(id)
.name(event.name())
.status(status)
.type(OperationType.CHAINED_INVOKE)
.chainedInvokeDetails(builder.build())
.build();
}
private Operation createContextOperation(String id, String name, OperationStatus status, Event event) {
var builder = ContextDetails.builder();
if (event.contextSucceededDetails() != null) {
var details = event.contextSucceededDetails();
if (details.result() != null && details.result().payload() != null) {
builder.result(details.result().payload());
}
} else if (event.contextFailedDetails() != null) {
var details = event.contextFailedDetails();
if (details.error() != null && details.error().payload() != null) {
builder.error(details.error().payload());
}
}
return Operation.builder()
.id(id)
.name(name)
.status(status)
.type(OperationType.CONTEXT)
.subType(event.subType())
.contextDetails(builder.build())
.build();
}
}