Best Practices
Table of Contents
- Overview
- Function design
- Timeout configuration
- Naming conventions
- Performance optimization
- Serialization
- Common mistakes
- Code organization
- FAQ
- See also
Overview
This guide covers best practices for building reliable, maintainable durable functions. You'll learn how to design functions that are easy to test, debug, and maintain in production.
Function design
Keep functions focused
Each durable function should have a single, clear purpose. Focused functions are easier to test, debug, and maintain. They also make it simpler to understand execution flow and identify failures.
Good:
@durable_execution
def process_order(event: dict, context: DurableContext) -> dict:
"""Process a single order through validation, payment, and fulfillment."""
order_id = event["order_id"]
validation = context.step(validate_order(order_id))
payment = context.step(process_payment(order_id, event["amount"]))
fulfillment = context.step(fulfill_order(order_id))
return {"order_id": order_id, "status": "completed"}
Avoid:
Wrap non-deterministic code in steps
All non-deterministic operations must be wrapped in steps:
@durable_step
def get_timestamp(step_context: StepContext) -> int:
return int(time.time())
@durable_step
def generate_id(step_context: StepContext) -> str:
return str(uuid.uuid4())
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
timestamp = context.step(get_timestamp())
request_id = context.step(generate_id())
return {"timestamp": timestamp, "request_id": request_id}
Why: Non-deterministic code produces different values on replay, breaking state consistency.
Use @durable_step for reusable functions
Decorate functions with @durable_step to get automatic naming, better code organization, and cleaner syntax. This makes your code more maintainable and easier to test.
Good:
Avoid:
Don't share state between steps
Pass data through return values, not global variables or class attributes. Global state breaks on replay because steps return cached results, but global variables reset to their initial values.
Good:
@durable_step
def fetch_user(step_context: StepContext, user_id: str) -> dict:
return {"user_id": user_id, "name": "Jane Doe"}
@durable_step
def send_email(step_context: StepContext, user: dict) -> bool:
send_to_address(user["name"], user.get("email"))
return True
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
user = context.step(fetch_user(event["user_id"]))
sent = context.step(send_email(user))
return {"sent": sent}
Avoid:
Choose the right execution semantics
Use at-most-once semantics for operations with side effects (payments, emails, database writes) to prevent duplicate execution. Use at-least-once (default) for idempotent operations that are safe to retry.
At-most-once for side effects:
from aws_durable_execution_sdk_python.config import StepConfig, StepSemantics
@durable_step
def charge_credit_card(step_context: StepContext, amount: float) -> dict:
return {"transaction_id": "txn_123", "status": "completed"}
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
# Prevent duplicate charges on retry
payment = context.step(
charge_credit_card(event["amount"]),
config=StepConfig(step_semantics=StepSemantics.AT_MOST_ONCE_PER_RETRY),
)
return payment
At-least-once for idempotent operations:
@durable_step
def calculate_total(step_context: StepContext, items: list) -> float:
return sum(item["price"] for item in items)
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> float:
# Safe to run multiple times - same input produces same output
total = context.step(calculate_total(event["items"]))
return total
Handle errors explicitly
Catch and handle exceptions in your step functions. Distinguish between transient failures (network issues, rate limits) that should retry, and permanent failures (invalid input, not found) that shouldn't.
Good:
@durable_step
def call_external_api(step_context: StepContext, url: str) -> dict:
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
except requests.Timeout:
raise # Let retry handle timeouts
except requests.HTTPError as e:
if e.response.status_code >= 500:
raise # Retry server errors
# Don't retry client errors (400-499)
return {"error": "client_error", "status": e.response.status_code}
Avoid:
Timeout configuration
Set realistic timeouts
Choose timeout values based on expected execution time plus buffer for retries and network delays. Too short causes unnecessary failures; too long wastes resources waiting for operations that won't complete.
Good:
from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
# Expected 2 minutes + 1 minute buffer = 3 minutes
callback = context.create_callback(
name="approval",
config=CallbackConfig(timeout=Duration.from_minutes(3)),
)
return {"callback_id": callback.callback_id}
Avoid:
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
# Too short - will timeout before external system responds
callback = context.create_callback(
name="approval",
config=CallbackConfig(timeout=Duration.from_seconds(5)),
)
return {"callback_id": callback.callback_id}
Use heartbeat timeouts for long operations
Enable heartbeat monitoring for callbacks that take more than a few minutes. Heartbeats detect when external systems stop responding, preventing you from waiting the full timeout period.
Without heartbeat monitoring, you'd wait the full 24 hours even if the external system crashes after 10 minutes.
Configure retry delays appropriately
from aws_durable_execution_sdk_python.retries import RetryStrategyConfig
# Fast retry for transient network issues
fast_retry = RetryStrategyConfig(
max_attempts=3,
initial_delay_seconds=1,
max_delay_seconds=5,
backoff_rate=2.0,
)
# Slow retry for rate limiting
slow_retry = RetryStrategyConfig(
max_attempts=5,
initial_delay_seconds=10,
max_delay_seconds=60,
backoff_rate=2.0,
)
Naming conventions
Use descriptive operation names
Choose names that explain what the operation does, not how it does it. Good names make logs easier to read and help you identify which operation failed.
Good:
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
user = context.step(fetch_user(event["user_id"]), name="fetch_user")
validated = context.step(validate_user(user), name="validate_user")
notification = context.step(send_notification(user), name="send_notification")
return {"status": "completed"}
Avoid:
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
# Generic names don't help with debugging
user = context.step(fetch_user(event["user_id"]), name="step1")
validated = context.step(validate_user(user), name="step2")
notification = context.step(send_notification(user), name="step3")
return {"status": "completed"}
Use consistent naming patterns
# Pattern: verb_noun for operations
context.step(validate_order(order_id), name="validate_order")
context.step(process_payment(amount), name="process_payment")
# Pattern: noun_action for callbacks
context.create_callback(name="payment_callback")
context.create_callback(name="approval_callback")
# Pattern: descriptive_wait for waits
context.wait(Duration.from_seconds(30), name="payment_confirmation_wait")
Name dynamic operations with context
Include context when creating operations in loops:
Performance optimization
Minimize checkpoint size
Keep operation inputs and results small. Large payloads increase checkpoint overhead, slow down execution, and can hit size limits. Store large data in S3 and pass references instead.
Good:
@durable_step
def process_large_dataset(step_context: StepContext, s3_key: str) -> str:
data = download_from_s3(s3_key)
result = process_data(data)
result_key = upload_to_s3(result)
return result_key # Small checkpoint - just the S3 key
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
result_key = context.step(process_large_dataset(event["s3_key"]))
return {"result_key": result_key}
Avoid:
Batch operations when possible
Group related operations to reduce checkpoint overhead. Each step creates a checkpoint, so batching reduces API calls and speeds up execution.
Good:
@durable_step
def process_batch(step_context: StepContext, items: list) -> list:
return [process_item(item) for item in items]
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> list:
items = event["items"]
results = []
# Process 10 items per step instead of 1
for i in range(0, len(items), 10):
batch = items[i:i+10]
batch_results = context.step(
process_batch(batch),
name=f"process_batch_{i//10}"
)
results.extend(batch_results)
return results
Avoid:
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> list:
results = []
# Creating a step for each item - too many checkpoints!
for i, item in enumerate(event["items"]):
result = context.step(
lambda _, item=item: process_item(item),
name=f"process_item_{i}"
)
results.append(result)
return results
Use parallel operations for independent work
Execute independent operations concurrently to reduce total execution time. Use context.parallel() to run multiple operations at the same time.
Good:
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
# Execute all three operations concurrently
results = context.parallel(
fetch_user_data(event["user_id"]),
fetch_order_history(event["user_id"]),
fetch_preferences(event["user_id"]),
)
return {
"user": results[0],
"orders": results[1],
"preferences": results[2],
}
Avoid:
Avoid unnecessary waits
Only use waits when you need to delay execution:
Serialization
Use JSON-serializable types
The SDK uses JSON serialization by default for checkpoints. Stick to JSON-compatible types (dict, list, str, int, float, bool, None) for operation inputs and results.
Good:
Avoid:
from datetime import datetime
from decimal import Decimal
@durable_step
def process_order(step_context: StepContext, order: dict) -> dict:
# datetime and Decimal aren't JSON-serializable by default
return {
"order_id": order["id"],
"total": Decimal("99.99"), # Won't serialize!
"timestamp": datetime.now(), # Won't serialize!
}
Convert non-serializable types
Convert complex types to JSON-compatible formats before returning from steps:
Use custom serialization for complex types
For complex objects, implement custom serialization or use the SDK's SerDes system:
Common mistakes
⚠️ Modifying mutable objects between steps
Wrong:
Right:
⚠️ Using context inside its own operations
Wrong:
@durable_step
def process_with_wait(step_context: StepContext, context: DurableContext) -> str:
# DON'T: Can't use context inside its own step operation
context.wait(Duration.from_seconds(1)) # Error: using context inside step!
result = context.step(nested_step(), name="step2") # Error: nested context.step!
return result
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
# This will fail - context is being used inside its own step
result = context.step(process_with_wait(context), name="step1")
return {"result": result}
Right:
@durable_step
def nested_step(step_context: StepContext) -> str:
return "nested step"
@durable_with_child_context
def process_with_wait(child_ctx: DurableContext) -> str:
# Use child context for nested operations
child_ctx.wait(seconds=1)
result = child_ctx.step(nested_step(), name="step2")
return result
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
# Use run_in_child_context for nested operations
result = context.run_in_child_context(
process_with_wait(),
name="block1"
)
return {"result": result}
Why: You can't use a context object inside its own operations (like calling context.step() inside another context.step()). Use child contexts to create isolated execution scopes for nested operations.
⚠️ Forgetting to handle callback timeouts
Wrong:
Right:
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
callback = context.create_callback(name="approval")
result = callback.result()
if result is None:
return {"status": "timeout", "approved": False}
return {"status": "completed", "approved": result.get("approved", False)}
⚠️ Creating too many small steps
Wrong:
Right:
⚠️ Not using retry for transient failures
Right:
from aws_durable_execution_sdk_python.config import StepConfig
from aws_durable_execution_sdk_python.retries import (
RetryStrategyConfig,
create_retry_strategy,
)
@durable_step
def call_api(step_context: StepContext, url: str) -> dict:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
retry_config = RetryStrategyConfig(
max_attempts=3,
retryable_error_types=[requests.Timeout, requests.ConnectionError],
)
result = context.step(
call_api(event["url"]),
config=StepConfig(retry_strategy=create_retry_strategy(retry_config)),
)
return result
Code organization
Separate business logic from orchestration
# business_logic.py
@durable_step
def validate_order(step_context: StepContext, order: dict) -> dict:
if not order.get("items"):
raise ValueError("Order must have items")
return {**order, "validated": True}
# handler.py
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
order = event["order"]
validated_order = context.step(validate_order(order))
return {"status": "completed", "order_id": validated_order["order_id"]}
Use child contexts for complex workflows
@durable_with_child_context
def validate_and_enrich(ctx: DurableContext, data: dict) -> dict:
validated = ctx.step(validate_data(data))
enriched = ctx.step(enrich_data(validated))
return enriched
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
enriched = context.run_in_child_context(
validate_and_enrich(event["data"]),
name="validation_phase",
)
return enriched
Group related configuration
# config.py
from aws_durable_execution_sdk_python.config import StepConfig
from aws_durable_execution_sdk_python.retries import (
RetryStrategyConfig,
create_retry_strategy,
)
FAST_RETRY = StepConfig(
retry_strategy=create_retry_strategy(
RetryStrategyConfig(
max_attempts=3,
initial_delay_seconds=1,
max_delay_seconds=5,
backoff_rate=2.0,
)
)
)
# handler.py
from config import FAST_RETRY
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
data = context.step(fetch_data(event["id"]), config=FAST_RETRY)
return data
FAQ
Q: How many steps should a durable function have?
A: There's a limit of 3,000 operations per execution. Keep in mind that more steps mean more API operations and longer execution time. Balance granularity with performance - group related operations when it makes sense, but don't hesitate to break down complex logic into steps.
Q: Should I create a step for every function call?
A: No. Only create steps for operations that need checkpointing, retry logic, or isolation.
Q: Can I use async/await in durable functions?
A: Functions decorated with @durable_step must be synchronous. If you need to call async code, use asyncio.run() inside your step to execute it synchronously.
Q: How do I handle secrets and credentials?
A: Use AWS Secrets Manager or Parameter Store. Fetch secrets in a step at the beginning of your workflow.
Q: What's the maximum execution time for a durable function?
A: Durable functions can run for days or weeks using waits and callbacks. Each individual Lambda invocation is still subject to the 15-minute Lambda timeout.
Q: How do I test durable functions locally?
A: Use the testing SDK (aws-durable-execution-sdk-python-testing) to run functions locally without AWS credentials. See Testing patterns for examples.
Q: How do I monitor durable functions in production?
A: Use CloudWatch Logs for execution logs, CloudWatch Metrics for performance metrics, and X-Ray for distributed tracing.
See also
- Getting started - Build your first durable function
- Steps - Step operations
- Error handling - Handle failures
- Testing patterns - How to test your functions