WaitOperation.java

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.operation;

import java.time.Duration;
import java.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.lambda.model.Operation;
import software.amazon.awssdk.services.lambda.model.OperationAction;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
import software.amazon.awssdk.services.lambda.model.WaitOptions;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.TypeToken;
import software.amazon.lambda.durable.serde.NoopSerDes;
import software.amazon.lambda.durable.serde.SerDes;

public class WaitOperation extends BaseDurableOperation<Void> {

    private static final Logger logger = LoggerFactory.getLogger(WaitOperation.class);
    private static final SerDes NOOP_SER_DES = new NoopSerDes();

    private final Duration duration;

    public WaitOperation(String operationId, String name, Duration duration, DurableContext durableContext) {
        super(operationId, name, OperationType.WAIT, TypeToken.get(Void.class), NOOP_SER_DES, durableContext);
        this.duration = duration;
    }

    /** Starts the operation. */
    @Override
    protected void start() {
        Duration remainingWaitTime = duration;

        // First execution - checkpoint with full duration
        var update = OperationUpdate.builder()
                .action(OperationAction.START)
                .waitOptions(WaitOptions.builder()
                        .waitSeconds((int) duration.toSeconds())
                        .build());

        sendOperationUpdate(update);
        logger.debug("Remaining wait time: {} seconds", remainingWaitTime.getSeconds());
        pollForOperationUpdates(remainingWaitTime);
    }

    /** Replays the operation. */
    @Override
    protected void replay(Operation existing) {
        Duration remainingWaitTime = duration;

        if (existing.status() == OperationStatus.SUCCEEDED) {
            // Wait already completed
            markAlreadyCompleted();
            return;
        }
        // Replay - calculate remaining time from scheduledEndTimestamp
        // TODO: if the checkpoint is slow remaining wait time might be off. Track
        // endTimestamp instead and move calculation in front of polling start.
        if (existing.waitDetails() != null && existing.waitDetails().scheduledEndTimestamp() != null) {
            remainingWaitTime =
                    Duration.between(Instant.now(), existing.waitDetails().scheduledEndTimestamp());
        }
        logger.debug("Remaining wait time: {} seconds", remainingWaitTime.getSeconds());
        pollForOperationUpdates(remainingWaitTime);
    }

    @Override
    public Void get() {
        waitForOperationCompletion();

        return null;
    }
}