HistoryPoller.java
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.testing;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.lambda.model.Event;
import software.amazon.awssdk.services.lambda.model.EventType;
import software.amazon.awssdk.services.lambda.model.GetDurableExecutionHistoryRequest;
/**
* Polls the GetDurableExecutionHistory API until execution completes or a timeout is reached. Used by
* {@link CloudDurableTestRunner} for synchronous test execution.
*/
public class HistoryPoller {
private final LambdaClient lambdaClient;
/** Creates a poller backed by the given Lambda client. */
public HistoryPoller(LambdaClient lambdaClient) {
this.lambdaClient = lambdaClient;
}
/**
* Polls execution history until a terminal event is found or the timeout is exceeded.
*
* @param executionArn the durable execution ARN to poll
* @param pollInterval the interval between poll requests
* @param timeout the maximum time to wait for completion
* @return all history events collected during polling
* @throws RuntimeException if the timeout is exceeded or polling is interrupted
*/
public List<Event> pollUntilComplete(String executionArn, Duration pollInterval, Duration timeout) {
var allEvents = new ArrayList<Event>();
var startTime = Instant.now();
String marker = null;
while (Duration.between(startTime, Instant.now()).compareTo(timeout) < 0) {
var request = GetDurableExecutionHistoryRequest.builder()
.durableExecutionArn(executionArn)
.includeExecutionData(true)
.marker(marker)
.build();
var response = lambdaClient.getDurableExecutionHistory(request);
var events = response.events();
allEvents.addAll(events);
if (isExecutionComplete(events)) {
return allEvents;
}
marker = response.nextMarker();
if (marker == null && events.isEmpty()) {
// No more events and no new events - wait and try again
}
try {
Thread.sleep(pollInterval.toMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Polling interrupted", e);
}
}
throw new RuntimeException("Execution timeout exceeded");
}
private boolean isExecutionComplete(List<Event> events) {
return events.stream().anyMatch(event -> {
var eventType = event.eventType();
return EventType.EXECUTION_SUCCEEDED.equals(eventType) || EventType.EXECUTION_FAILED.equals(eventType);
});
}
}