OperationProcessor.java
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.testing.local;
import java.time.Instant;
import java.util.UUID;
import software.amazon.awssdk.services.lambda.model.CallbackDetails;
import software.amazon.awssdk.services.lambda.model.ChainedInvokeDetails;
import software.amazon.awssdk.services.lambda.model.ContextDetails;
import software.amazon.awssdk.services.lambda.model.Operation;
import software.amazon.awssdk.services.lambda.model.OperationAction;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
import software.amazon.awssdk.services.lambda.model.StepDetails;
import software.amazon.awssdk.services.lambda.model.WaitDetails;
public class OperationProcessor {
/** Applies the update to the existing operation, returning a new operation. */
static Operation applyUpdate(OperationUpdate update, Operation existingOp) {
var builder = Operation.builder()
.id(update.id())
.name(update.name())
.type(update.type())
.subType(update.subType())
.parentId(update.parentId())
.status(deriveStatus(update.action()));
switch (update.type()) {
case WAIT -> builder.waitDetails(buildWaitDetails(update));
case STEP -> builder.stepDetails(buildStepDetails(update, existingOp));
case CALLBACK -> builder.callbackDetails(buildCallbackDetails(update, existingOp));
case EXECUTION -> {} // No details needed for EXECUTION operations
case CHAINED_INVOKE -> builder.chainedInvokeDetails(buildChainedInvokeDetails(update));
case CONTEXT -> builder.contextDetails(buildContextDetails(update));
case UNKNOWN_TO_SDK_VERSION ->
throw new UnsupportedOperationException("UNKNOWN_TO_SDK_VERSION not supported");
}
return builder.build();
}
/** Applies the result of an operation to the existing operation, returning a new operation. */
public static Operation applyResult(Operation op, OperationResult result) {
var builder = Operation.builder()
.id(op.id())
.name(op.name())
.type(op.type())
.subType(op.subType())
.parentId(op.parentId())
.status(result.operationStatus());
switch (op.type()) {
case WAIT -> builder.waitDetails(buildWaitDetails(result, op));
case STEP -> builder.stepDetails(buildStepDetails(result, op));
case CALLBACK -> builder.callbackDetails(buildCallbackDetails(result, op));
case EXECUTION -> {} // No details needed for EXECUTION operations
case CHAINED_INVOKE -> builder.chainedInvokeDetails(buildChainedInvokeDetails(result, op));
case CONTEXT -> builder.contextDetails(buildContextDetails(result, op));
case UNKNOWN_TO_SDK_VERSION ->
throw new UnsupportedOperationException("UNKNOWN_TO_SDK_VERSION not supported");
}
return builder.build();
}
private static ContextDetails buildContextDetails(OperationResult result, Operation op) {
throw new IllegalArgumentException("Context operation type is not supported");
}
private static ChainedInvokeDetails buildChainedInvokeDetails(OperationResult result, Operation op) {
if (result.operationStatus() == OperationStatus.STOPPED
|| result.operationStatus() == OperationStatus.TIMED_OUT) {
return op.chainedInvokeDetails().toBuilder().error(result.error()).build();
}
throw new IllegalArgumentException("Operation status not supported: " + result.operationStatus());
}
private static ChainedInvokeDetails buildChainedInvokeDetails(OperationUpdate update) {
if (update.action() == OperationAction.START) {
return ChainedInvokeDetails.builder().build();
} else {
return ChainedInvokeDetails.builder()
.result(update.payload())
.error(update.error())
.build();
}
}
private static ContextDetails buildContextDetails(OperationUpdate update) {
var detailsBuilder = ContextDetails.builder().result(update.payload()).error(update.error());
if (update.contextOptions() != null
&& Boolean.TRUE.equals(update.contextOptions().replayChildren())) {
detailsBuilder.replayChildren(true);
}
return detailsBuilder.build();
}
private static WaitDetails buildWaitDetails(OperationResult result, Operation op) {
if (result.operationStatus() != OperationStatus.SUCCEEDED) {
throw new IllegalArgumentException("Operation status is not SUCCEEDED");
}
return op.waitDetails().toBuilder().build();
}
private static WaitDetails buildWaitDetails(OperationUpdate update) {
if (update.waitOptions() == null) return null;
var scheduledEnd = Instant.now().plusSeconds(update.waitOptions().waitSeconds());
return WaitDetails.builder().scheduledEndTimestamp(scheduledEnd).build();
}
private static StepDetails buildStepDetails(OperationUpdate update, Operation existingOp) {
var existing = existingOp != null ? existingOp.stepDetails() : null;
var detailsBuilder = existing != null ? existing.toBuilder() : StepDetails.builder();
var attempt = existing != null && existing.attempt() != null ? existing.attempt() + 1 : 1;
if (update.action() == OperationAction.FAIL) {
detailsBuilder.attempt(attempt).error(update.error());
}
if (update.action() == OperationAction.RETRY) {
detailsBuilder
.attempt(attempt)
.error(update.error())
.nextAttemptTimestamp(
Instant.now().plusSeconds(update.stepOptions().nextAttemptDelaySeconds()));
}
if (update.payload() != null) {
detailsBuilder.result(update.payload());
}
return detailsBuilder.build();
}
private static StepDetails buildStepDetails(OperationResult result, Operation op) {
if (result.operationStatus() == OperationStatus.READY) {
return op.stepDetails().toBuilder().build();
}
throw new IllegalArgumentException("Operation status is not READY");
}
private static CallbackDetails buildCallbackDetails(OperationResult result, Operation op) {
if (result.operationStatus() == OperationStatus.TIMED_OUT) {
return op.callbackDetails().toBuilder().error(result.error()).build();
}
return null;
}
private static CallbackDetails buildCallbackDetails(OperationUpdate update, Operation existingOp) {
var existing = existingOp != null ? existingOp.callbackDetails() : null;
// Preserve existing callbackId, or generate new one on START
var callbackId =
existing != null ? existing.callbackId() : UUID.randomUUID().toString();
return CallbackDetails.builder()
.callbackId(callbackId)
.result(existing != null ? update.payload() : null)
.error(existing != null ? update.error() : null)
.build();
}
private static OperationStatus deriveStatus(OperationAction action) {
return switch (action) {
case START -> OperationStatus.STARTED;
case SUCCEED -> OperationStatus.SUCCEEDED;
case FAIL -> OperationStatus.FAILED;
case RETRY -> OperationStatus.PENDING;
case CANCEL -> OperationStatus.CANCELLED;
case UNKNOWN_TO_SDK_VERSION -> OperationStatus.UNKNOWN_TO_SDK_VERSION; // Todo: Check this
};
}
}