DurableHandler.java

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.ParameterizedType;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.lambda.durable.model.DurableExecutionInput;
import software.amazon.lambda.durable.serde.AwsSdkV2Module;

public abstract class DurableHandler<I, O> implements RequestStreamHandler {

    private final Class<I> inputType;
    private final DurableConfig config;
    private final ObjectMapper objectMapper = createObjectMapper(); // Internal ObjectMapper
    private static final Logger logger = LoggerFactory.getLogger(DurableHandler.class);

    @SuppressWarnings("unchecked")
    protected DurableHandler() {
        // Extract input type from generic superclass
        var superClass = getClass().getGenericSuperclass();
        if (superClass instanceof ParameterizedType paramType) {
            this.inputType = (Class<I>) paramType.getActualTypeArguments()[0];
        } else {
            throw new IllegalArgumentException("Cannot determine input type parameter");
        }
        this.config = createConfiguration();
        validateConfiguration();
    }

    /**
     * Gets the configuration used by this handler. This allows test frameworks and other tools to access the handler's
     * configuration for testing purposes.
     *
     * <p>DurableConfig is immutable.
     *
     * @return The DurableConfig instance used by this handler
     */
    public DurableConfig getConfiguration() {
        return config;
    }

    /**
     * Template method for creating configuration. Override this method to provide custom DurableExecutionClient,
     * SerDes, or other configuration.
     *
     * <p>The {@link software.amazon.lambda.durable.client.LambdaDurableFunctionsClient} is a wrapper that customers
     * should use to inject their own configured {@link software.amazon.awssdk.services.lambda.LambdaClient}. This
     * allows full control over AWS SDK configuration including credentials, region, HTTP client, and retry policies.
     *
     * <p>Basic example with custom region and credentials:
     *
     * <pre>{@code
     * @Override
     * protected DurableConfig createConfiguration() {
     *     // Create custom Lambda client with specific configuration
     *     var lambdaClient = LambdaClient.builder()
     *         .region(Region.US_WEST_2)
     *         .credentialsProvider(ProfileCredentialsProvider.create("my-profile"))
     *         .build();
     *
     *     // Wrap the Lambda client with LambdaDurableFunctionsClient
     *     var durableClient = new LambdaDurableFunctionsClient(lambdaClient);
     *
     *     return DurableConfig.builder()
     *         .withDurableExecutionClient(durableClient)
     *         .build();
     * }
     * }</pre>
     *
     * <p>Advanced example with AWS CRT HTTP Client for high-performance scenarios:
     *
     * <pre>{@code
     * @Override
     * protected DurableConfig createConfiguration() {
     *     // Configure AWS CRT HTTP Client for optimal performance
     *     var crtHttpClient = AwsCrtAsyncHttpClient.builder()
     *         .maxConcurrency(50)
     *         .connectionTimeout(Duration.ofSeconds(30))
     *         .connectionMaxIdleTime(Duration.ofSeconds(60))
     *         .build();
     *
     *     // Create Lambda client with CRT HTTP client
     *     var lambdaClient = LambdaClient.builder()
     *         .region(Region.US_EAST_1)
     *         .credentialsProvider(EnvironmentVariableCredentialsProvider.create())
     *         .httpClient(crtHttpClient)
     *         .overrideConfiguration(ClientOverrideConfiguration.builder()
     *             .retryPolicy(RetryPolicy.builder()
     *                 .numRetries(5)
     *                 .build())
     *             .build())
     *         .build();
     *
     *     // Wrap with LambdaDurableFunctionsClient
     *     var durableClient = new LambdaDurableFunctionsClient(lambdaClient);
     *
     *     return DurableConfig.builder()
     *         .withDurableExecutionClient(durableClient)
     *         .withSerDes(customSerDes)  // Optional: custom SerDes for user data
     *         .withExecutorService(customExecutor)  // Optional: custom thread pool
     *         .build();
     * }
     * }</pre>
     *
     * @return DurableConfig with desired configuration
     */
    protected DurableConfig createConfiguration() {
        return DurableConfig.defaultConfig();
    }

    private void validateConfiguration() {
        if (config.getDurableExecutionClient() == null) {
            throw new IllegalStateException("DurableExecutionClient configuration failed");
        }
        if (config.getSerDes() == null) {
            throw new IllegalStateException("SerDes configuration failed");
        }
        if (config.getExecutorService() == null) {
            throw new IllegalStateException("ExecutorService configuration failed");
        }
    }

    @Override
    public final void handleRequest(InputStream inputStream, OutputStream outputStream, Context context)
            throws IOException {
        var inputString = new String(inputStream.readAllBytes());
        logger.debug("Raw input from durable handler: {}", inputString);
        var input = this.objectMapper.readValue(inputString, DurableExecutionInput.class);
        var output = DurableExecutor.execute(input, context, inputType, this::handleRequest, config);
        outputStream.write(objectMapper.writeValueAsBytes(output));
    }

    /**
     * Handle the durable execution.
     *
     * @param input User input
     * @param context Durable context for operations
     * @return Result
     */
    public abstract O handleRequest(I input, DurableContext context);

    /**
     * Creates ObjectMapper for DAR backend communication (internal use only). This is for INTERNAL use only - handles
     * Lambda Durable Functions backend protocol.
     *
     * <p>Customer-facing serialization uses SerDes from DurableConfig.
     *
     * @return Configured ObjectMapper for durable backend communication
     */
    public static ObjectMapper createObjectMapper() {
        var dateModule = new SimpleModule();
        dateModule.addDeserializer(Date.class, new JsonDeserializer<>() {
            @Override
            public Date deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
                    throws IOException {
                // Timestamp is a double value represent seconds since epoch.
                var timestamp = jsonParser.getDoubleValue();
                // Date expects milliseconds since epoch, so multiply by 1000.
                return new Date((long) (timestamp * 1000));
            }
        });
        dateModule.addSerializer(Date.class, new JsonSerializer<>() {
            @Override
            public void serialize(Date date, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
                    throws IOException {
                // Timestamp should be a double value representing seconds since epoch, so
                // convert from milliseconds.
                double timestamp = date.getTime() / 1000.0;
                jsonGenerator.writeNumber(timestamp);
            }
        });

        // Needed for deserialization of timestamps for some SDK v2 objects
        dateModule.addDeserializer(Instant.class, new JsonDeserializer<>() {
            private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder()
                    .appendPattern("yyyy-MM-dd HH:mm:ss.SSSSSSXXX")
                    .toFormatter();

            @Override
            public Instant deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
                    throws IOException {
                if (jsonParser.hasToken(JsonToken.VALUE_NUMBER_INT)) {
                    return Instant.ofEpochMilli(jsonParser.getLongValue());
                }
                var timestampStr = jsonParser.getValueAsString();
                return Instant.from(TIMESTAMP_FORMATTER.parse(timestampStr));
            }
        });

        return JsonMapper.builder()
                .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
                .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
                // Looks pretty, and probably needed for tests to be deterministic.
                .enable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY)
                .enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)
                // Data passed over the wire from the backend is UpperCamelCase
                .propertyNamingStrategy(PropertyNamingStrategies.UPPER_CAMEL_CASE)
                .addModule(new JavaTimeModule())
                .addModule(dateModule)
                .addModule(new AwsSdkV2Module())
                .build();
    }
}