Skip to content
Recommendation: Use the AgentCore CLI for new projects

The AgentCore CLI (@aws/agentcore-cli) is now the recommended way to create, develop, and deploy AI agents on Amazon Bedrock AgentCore. It offers broader framework support, local development with hot reload, built-in evaluations, gateway management, and more.

Get started: npm i @aws/agentcore-cli

See the Migration Guide for step-by-step instructions to migrate existing projects. The AgentCore CLI docs cover the full commands reference, supported frameworks, and configuration.

Runtime

Runtime management and application context for Bedrock AgentCore.

bedrock_agentcore.runtime

BedrockAgentCore Runtime Package.

This package contains the core runtime components for Bedrock AgentCore applications: - BedrockAgentCoreApp: Main application class - RequestContext: HTTP request context - BedrockAgentCoreContext: Agent identity context

AgentCoreRuntimeClient

Client for generating WebSocket authentication for AgentCore Runtime.

This client provides authentication credentials for WebSocket connections to AgentCore Runtime endpoints, allowing applications to establish bidirectional streaming connections with agent runtimes.

Attributes:

Name Type Description
region str

The AWS region being used.

session Session

The boto3 session for AWS credentials.

Source code in bedrock_agentcore/runtime/agent_core_runtime_client.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
class AgentCoreRuntimeClient:
    """Client for generating WebSocket authentication for AgentCore Runtime.

    This client provides authentication credentials for WebSocket connections
    to AgentCore Runtime endpoints, allowing applications to establish
    bidirectional streaming connections with agent runtimes.

    Attributes:
        region (str): The AWS region being used.
        session (boto3.Session): The boto3 session for AWS credentials.
    """

    def __init__(self, region: str, session: Optional[boto3.Session] = None) -> None:
        """Initialize an AgentCoreRuntime client for the specified AWS region.

        Args:
            region (str): The AWS region to use for the AgentCore Runtime service.
            session (Optional[boto3.Session]): Optional boto3 session. If not provided,
                a new session will be created using default credentials.
        """
        self.region = region
        self.logger = logging.getLogger(__name__)

        if session is None:
            session = boto3.Session()

        self.session = session

    def _parse_runtime_arn(self, runtime_arn: str) -> Dict[str, str]:
        """Parse runtime ARN and extract components.

        Args:
            runtime_arn (str): Full runtime ARN

        Returns:
            Dict[str, str]: Dictionary with region, account_id, runtime_id

        Raises:
            ValueError: If ARN format is invalid
        """
        # Expected format: arn:aws:bedrock-agentcore:{region}:{account}:runtime/{runtime_id}
        parts = runtime_arn.split(":")

        if len(parts) != 6:
            raise ValueError(f"Invalid runtime ARN format: {runtime_arn}")

        if parts[0] != "arn" or parts[1] != "aws" or parts[2] != "bedrock-agentcore":
            raise ValueError(f"Invalid runtime ARN format: {runtime_arn}")

        # Parse the resource part (runtime/{runtime_id})
        resource = parts[5]
        if not resource.startswith("runtime/"):
            raise ValueError(f"Invalid runtime ARN format: {runtime_arn}")

        runtime_id = resource.split("/", 1)[1]

        # Validate that components are not empty
        region = parts[3]
        account_id = parts[4]

        if not region or not account_id or not runtime_id:
            raise ValueError("ARN components cannot be empty")

        return {
            "region": region,
            "account_id": account_id,
            "runtime_id": runtime_id,
        }

    def _build_websocket_url(
        self,
        runtime_arn: str,
        endpoint_name: Optional[str] = None,
        custom_headers: Optional[Dict[str, str]] = None,
    ) -> str:
        """Build WebSocket URL with query parameters.

        Args:
            runtime_arn (str): Full runtime ARN
            endpoint_name (Optional[str]): Optional endpoint name for qualifier param
            custom_headers (Optional[Dict[str, str]]): Optional custom query parameters

        Returns:
            str: WebSocket URL with query parameters
        """
        # Get the data plane endpoint
        host = get_data_plane_endpoint(self.region).replace("https://", "")

        # URL-encode the runtime ARN
        encoded_arn = quote(runtime_arn, safe="")

        # Build base path
        path = f"/runtimes/{encoded_arn}/ws"

        # Build query parameters
        query_params = {}

        if endpoint_name:
            query_params["qualifier"] = endpoint_name

        if custom_headers:
            query_params.update(custom_headers)

        # Construct URL
        if query_params:
            query_string = urlencode(query_params)
            ws_url = f"wss://{host}{path}?{query_string}"
        else:
            ws_url = f"wss://{host}{path}"

        return ws_url

    def generate_ws_connection(
        self,
        runtime_arn: str,
        session_id: Optional[str] = None,
        endpoint_name: Optional[str] = None,
    ) -> Tuple[str, Dict[str, str]]:
        """Generate WebSocket URL and SigV4 signed headers for runtime connection.

        Args:
            runtime_arn (str): Full runtime ARN
                (e.g., 'arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime-abc')
            session_id (Optional[str]): Session ID to use. If None, auto-generates a UUID.
            endpoint_name (Optional[str]): Endpoint name to use as 'qualifier' query parameter.
                If provided, adds ?qualifier={endpoint_name} to the URL.

        Returns:
            Tuple[str, Dict[str, str]]: A tuple containing:
                - WebSocket URL (wss://...) with query parameters
                - Headers dictionary with SigV4 signature

        Raises:
            RuntimeError: If no AWS credentials are found.
            ValueError: If runtime_arn format is invalid.

        Example:
            >>> client = AgentCoreRuntimeClient('us-west-2')
            >>> ws_url, headers = client.generate_ws_connection(
            ...     runtime_arn='arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime',
            ...     endpoint_name='DEFAULT'
            ... )
        """
        self.logger.info("Generating WebSocket connection credentials...")

        # Validate ARN
        self._parse_runtime_arn(runtime_arn)

        # Auto-generate session ID if not provided
        if not session_id:
            session_id = str(uuid.uuid4())
            self.logger.debug("Auto-generated session ID: %s", session_id)

        # Build WebSocket URL
        ws_url = self._build_websocket_url(runtime_arn, endpoint_name)

        # Get AWS credentials
        credentials = self.session.get_credentials()
        if not credentials:
            raise RuntimeError("No AWS credentials found")

        frozen_credentials = credentials.get_frozen_credentials()

        # Convert wss:// to https:// for signing
        https_url = ws_url.replace("wss://", "https://")
        parsed = urlparse(https_url)
        host = parsed.netloc

        # Create the request to sign
        request = AWSRequest(
            method="GET",
            url=https_url,
            headers={
                "host": host,
                "x-amz-date": datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%dT%H%M%SZ"),
            },
        )

        # Sign the request with SigV4
        auth = SigV4Auth(frozen_credentials, "bedrock-agentcore", self.region)
        auth.add_auth(request)

        # Build headers for WebSocket connection
        headers = {
            "Host": host,
            "X-Amz-Date": request.headers["x-amz-date"],
            "Authorization": request.headers["Authorization"],
            "X-Amzn-Bedrock-AgentCore-Runtime-Session-Id": session_id,
            "Upgrade": "websocket",
            "Connection": "Upgrade",
            "Sec-WebSocket-Version": "13",
            "Sec-WebSocket-Key": base64.b64encode(secrets.token_bytes(16)).decode(),
            "User-Agent": "AgentCoreRuntimeClient/1.0",
        }

        # Add session token if present
        if frozen_credentials.token:
            headers["X-Amz-Security-Token"] = frozen_credentials.token

        self.logger.info("✓ WebSocket connection credentials generated (Session: %s)", session_id)
        return ws_url, headers

    def generate_presigned_url(
        self,
        runtime_arn: str,
        session_id: Optional[str] = None,
        endpoint_name: Optional[str] = None,
        custom_headers: Optional[Dict[str, str]] = None,
        expires: int = DEFAULT_PRESIGNED_URL_TIMEOUT,
    ) -> str:
        """Generate a presigned WebSocket URL for runtime connection.

        Presigned URLs include authentication in query parameters, allowing
        frontend clients to connect without managing AWS credentials.

        Args:
            runtime_arn (str): Full runtime ARN
                (e.g., 'arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime-abc')
            session_id (Optional[str]): Session ID to use. If None, auto-generates a UUID.
            endpoint_name (Optional[str]): Endpoint name to use as 'qualifier' query parameter.
                If provided, adds ?qualifier={endpoint_name} to the URL before signing.
            custom_headers (Optional[Dict[str, str]]): Additional query parameters to include
                in the presigned URL before signing (e.g., {"abc": "pqr"}).
            expires (int): Seconds until URL expires (default: 300, max: 300).

        Returns:
            str: Presigned WebSocket URL with query string parameters including:
                - Original query params (qualifier, custom_headers)
                - SigV4 auth params (X-Amz-Algorithm, X-Amz-Credential, etc.)

        Raises:
            ValueError: If expires exceeds maximum (300 seconds).
            RuntimeError: If URL generation fails or no credentials found.

        Example:
            >>> client = AgentCoreRuntimeClient('us-west-2')
            >>> presigned_url = client.generate_presigned_url(
            ...     runtime_arn='arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime',
            ...     endpoint_name='DEFAULT',
            ...     custom_headers={'abc': 'pqr'},
            ...     expires=300
            ... )
        """
        self.logger.info("Generating presigned WebSocket URL...")

        # Validate expires parameter
        if expires > MAX_PRESIGNED_URL_TIMEOUT:
            raise ValueError(f"Expiry timeout cannot exceed {MAX_PRESIGNED_URL_TIMEOUT} seconds, got {expires}")

        # Validate ARN
        self._parse_runtime_arn(runtime_arn)

        # Auto-generate session ID if not provided
        if not session_id:
            session_id = str(uuid.uuid4())
            self.logger.debug("Auto-generated session ID: %s", session_id)

        # Add session_id to custom_headers (which become query params)
        if custom_headers is None:
            custom_headers = {}
        custom_headers["X-Amzn-Bedrock-AgentCore-Runtime-Session-Id"] = session_id

        # Build WebSocket URL with query parameters
        ws_url = self._build_websocket_url(runtime_arn, endpoint_name, custom_headers)

        # Convert wss:// to https:// for signing
        https_url = ws_url.replace("wss://", "https://")

        # Parse URL
        url = urlparse(https_url)

        # Get AWS credentials
        credentials = self.session.get_credentials()
        if not credentials:
            raise RuntimeError("No AWS credentials found")

        frozen_credentials = credentials.get_frozen_credentials()

        # Create the request to sign
        request = AWSRequest(method="GET", url=https_url, headers={"host": url.hostname})

        # Sign the request with SigV4QueryAuth
        signer = SigV4QueryAuth(
            credentials=frozen_credentials,
            service_name="bedrock-agentcore",
            region_name=self.region,
            expires=expires,
        )
        signer.add_auth(request)

        if not request.url:
            raise RuntimeError("Failed to generate presigned URL")

        # Convert back to wss:// for WebSocket connection
        presigned_url = request.url.replace("https://", "wss://")

        self.logger.info("✓ Presigned URL generated (expires in %s seconds, Session: %s)", expires, session_id)
        return presigned_url

    def generate_ws_connection_oauth(
        self,
        runtime_arn: str,
        bearer_token: str,
        session_id: Optional[str] = None,
        endpoint_name: Optional[str] = None,
    ) -> Tuple[str, Dict[str, str]]:
        """Generate WebSocket URL and OAuth headers for runtime connection.

        This method uses OAuth bearer token authentication instead of AWS SigV4.
        Suitable for scenarios where OAuth tokens are used for authentication.

        Args:
            runtime_arn (str): Full runtime ARN
                (e.g., 'arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime-abc')
            bearer_token (str): OAuth bearer token for authentication.
            session_id (Optional[str]): Session ID to use. If None, auto-generates one.
            endpoint_name (Optional[str]): Endpoint name to use as 'qualifier' query parameter.
                If provided, adds ?qualifier={endpoint_name} to the URL.

        Returns:
            Tuple[str, Dict[str, str]]: A tuple containing:
                - WebSocket URL (wss://...) with query parameters
                - Headers dictionary with OAuth authentication

        Raises:
            ValueError: If runtime_arn format is invalid or bearer_token is empty.

        Example:
            >>> client = AgentCoreRuntimeClient('us-west-2')
            >>> ws_url, headers = client.generate_ws_connection_oauth(
            ...     runtime_arn='arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime',
            ...     bearer_token='eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...',
            ...     endpoint_name='DEFAULT'
            ... )
        """
        self.logger.info("Generating WebSocket connection with OAuth authentication...")

        # Validate inputs
        if not bearer_token:
            raise ValueError("Bearer token cannot be empty")

        # Validate ARN
        self._parse_runtime_arn(runtime_arn)

        # Auto-generate session ID if not provided
        if not session_id:
            session_id = str(uuid.uuid4())
            self.logger.debug("Auto-generated session ID: %s", session_id)

        # Build WebSocket URL
        ws_url = self._build_websocket_url(runtime_arn, endpoint_name)

        # Convert wss:// to https:// to get host
        https_url = ws_url.replace("wss://", "https://")
        parsed = urlparse(https_url)

        # Generate WebSocket key
        ws_key = base64.b64encode(secrets.token_bytes(16)).decode()

        # Build OAuth headers
        headers = {
            "Authorization": f"Bearer {bearer_token}",
            "X-Amzn-Bedrock-AgentCore-Runtime-Session-Id": session_id,
            "Host": parsed.netloc,
            "Connection": "Upgrade",
            "Upgrade": "websocket",
            "Sec-WebSocket-Key": ws_key,
            "Sec-WebSocket-Version": "13",
            "User-Agent": "OAuth-WebSocket-Client/1.0",
        }

        self.logger.info("✓ OAuth WebSocket connection credentials generated (Session: %s)", session_id)
        self.logger.debug("Bearer token length: %d characters", len(bearer_token))

        return ws_url, headers

__init__(region, session=None)

Initialize an AgentCoreRuntime client for the specified AWS region.

Parameters:

Name Type Description Default
region str

The AWS region to use for the AgentCore Runtime service.

required
session Optional[Session]

Optional boto3 session. If not provided, a new session will be created using default credentials.

None
Source code in bedrock_agentcore/runtime/agent_core_runtime_client.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(self, region: str, session: Optional[boto3.Session] = None) -> None:
    """Initialize an AgentCoreRuntime client for the specified AWS region.

    Args:
        region (str): The AWS region to use for the AgentCore Runtime service.
        session (Optional[boto3.Session]): Optional boto3 session. If not provided,
            a new session will be created using default credentials.
    """
    self.region = region
    self.logger = logging.getLogger(__name__)

    if session is None:
        session = boto3.Session()

    self.session = session

generate_presigned_url(runtime_arn, session_id=None, endpoint_name=None, custom_headers=None, expires=DEFAULT_PRESIGNED_URL_TIMEOUT)

Generate a presigned WebSocket URL for runtime connection.

Presigned URLs include authentication in query parameters, allowing frontend clients to connect without managing AWS credentials.

Parameters:

Name Type Description Default
runtime_arn str

Full runtime ARN (e.g., 'arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime-abc')

required
session_id Optional[str]

Session ID to use. If None, auto-generates a UUID.

None
endpoint_name Optional[str]

Endpoint name to use as 'qualifier' query parameter. If provided, adds ?qualifier={endpoint_name} to the URL before signing.

None
custom_headers Optional[Dict[str, str]]

Additional query parameters to include in the presigned URL before signing (e.g., {"abc": "pqr"}).

None
expires int

Seconds until URL expires (default: 300, max: 300).

DEFAULT_PRESIGNED_URL_TIMEOUT

Returns:

Name Type Description
str str

Presigned WebSocket URL with query string parameters including: - Original query params (qualifier, custom_headers) - SigV4 auth params (X-Amz-Algorithm, X-Amz-Credential, etc.)

Raises:

Type Description
ValueError

If expires exceeds maximum (300 seconds).

RuntimeError

If URL generation fails or no credentials found.

Example

client = AgentCoreRuntimeClient('us-west-2') presigned_url = client.generate_presigned_url( ... runtime_arn='arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime', ... endpoint_name='DEFAULT', ... custom_headers={'abc': 'pqr'}, ... expires=300 ... )

Source code in bedrock_agentcore/runtime/agent_core_runtime_client.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def generate_presigned_url(
    self,
    runtime_arn: str,
    session_id: Optional[str] = None,
    endpoint_name: Optional[str] = None,
    custom_headers: Optional[Dict[str, str]] = None,
    expires: int = DEFAULT_PRESIGNED_URL_TIMEOUT,
) -> str:
    """Generate a presigned WebSocket URL for runtime connection.

    Presigned URLs include authentication in query parameters, allowing
    frontend clients to connect without managing AWS credentials.

    Args:
        runtime_arn (str): Full runtime ARN
            (e.g., 'arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime-abc')
        session_id (Optional[str]): Session ID to use. If None, auto-generates a UUID.
        endpoint_name (Optional[str]): Endpoint name to use as 'qualifier' query parameter.
            If provided, adds ?qualifier={endpoint_name} to the URL before signing.
        custom_headers (Optional[Dict[str, str]]): Additional query parameters to include
            in the presigned URL before signing (e.g., {"abc": "pqr"}).
        expires (int): Seconds until URL expires (default: 300, max: 300).

    Returns:
        str: Presigned WebSocket URL with query string parameters including:
            - Original query params (qualifier, custom_headers)
            - SigV4 auth params (X-Amz-Algorithm, X-Amz-Credential, etc.)

    Raises:
        ValueError: If expires exceeds maximum (300 seconds).
        RuntimeError: If URL generation fails or no credentials found.

    Example:
        >>> client = AgentCoreRuntimeClient('us-west-2')
        >>> presigned_url = client.generate_presigned_url(
        ...     runtime_arn='arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime',
        ...     endpoint_name='DEFAULT',
        ...     custom_headers={'abc': 'pqr'},
        ...     expires=300
        ... )
    """
    self.logger.info("Generating presigned WebSocket URL...")

    # Validate expires parameter
    if expires > MAX_PRESIGNED_URL_TIMEOUT:
        raise ValueError(f"Expiry timeout cannot exceed {MAX_PRESIGNED_URL_TIMEOUT} seconds, got {expires}")

    # Validate ARN
    self._parse_runtime_arn(runtime_arn)

    # Auto-generate session ID if not provided
    if not session_id:
        session_id = str(uuid.uuid4())
        self.logger.debug("Auto-generated session ID: %s", session_id)

    # Add session_id to custom_headers (which become query params)
    if custom_headers is None:
        custom_headers = {}
    custom_headers["X-Amzn-Bedrock-AgentCore-Runtime-Session-Id"] = session_id

    # Build WebSocket URL with query parameters
    ws_url = self._build_websocket_url(runtime_arn, endpoint_name, custom_headers)

    # Convert wss:// to https:// for signing
    https_url = ws_url.replace("wss://", "https://")

    # Parse URL
    url = urlparse(https_url)

    # Get AWS credentials
    credentials = self.session.get_credentials()
    if not credentials:
        raise RuntimeError("No AWS credentials found")

    frozen_credentials = credentials.get_frozen_credentials()

    # Create the request to sign
    request = AWSRequest(method="GET", url=https_url, headers={"host": url.hostname})

    # Sign the request with SigV4QueryAuth
    signer = SigV4QueryAuth(
        credentials=frozen_credentials,
        service_name="bedrock-agentcore",
        region_name=self.region,
        expires=expires,
    )
    signer.add_auth(request)

    if not request.url:
        raise RuntimeError("Failed to generate presigned URL")

    # Convert back to wss:// for WebSocket connection
    presigned_url = request.url.replace("https://", "wss://")

    self.logger.info("✓ Presigned URL generated (expires in %s seconds, Session: %s)", expires, session_id)
    return presigned_url

generate_ws_connection(runtime_arn, session_id=None, endpoint_name=None)

Generate WebSocket URL and SigV4 signed headers for runtime connection.

Parameters:

Name Type Description Default
runtime_arn str

Full runtime ARN (e.g., 'arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime-abc')

required
session_id Optional[str]

Session ID to use. If None, auto-generates a UUID.

None
endpoint_name Optional[str]

Endpoint name to use as 'qualifier' query parameter. If provided, adds ?qualifier={endpoint_name} to the URL.

None

Returns:

Type Description
Tuple[str, Dict[str, str]]

Tuple[str, Dict[str, str]]: A tuple containing: - WebSocket URL (wss://...) with query parameters - Headers dictionary with SigV4 signature

Raises:

Type Description
RuntimeError

If no AWS credentials are found.

ValueError

If runtime_arn format is invalid.

Example

client = AgentCoreRuntimeClient('us-west-2') ws_url, headers = client.generate_ws_connection( ... runtime_arn='arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime', ... endpoint_name='DEFAULT' ... )

Source code in bedrock_agentcore/runtime/agent_core_runtime_client.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def generate_ws_connection(
    self,
    runtime_arn: str,
    session_id: Optional[str] = None,
    endpoint_name: Optional[str] = None,
) -> Tuple[str, Dict[str, str]]:
    """Generate WebSocket URL and SigV4 signed headers for runtime connection.

    Args:
        runtime_arn (str): Full runtime ARN
            (e.g., 'arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime-abc')
        session_id (Optional[str]): Session ID to use. If None, auto-generates a UUID.
        endpoint_name (Optional[str]): Endpoint name to use as 'qualifier' query parameter.
            If provided, adds ?qualifier={endpoint_name} to the URL.

    Returns:
        Tuple[str, Dict[str, str]]: A tuple containing:
            - WebSocket URL (wss://...) with query parameters
            - Headers dictionary with SigV4 signature

    Raises:
        RuntimeError: If no AWS credentials are found.
        ValueError: If runtime_arn format is invalid.

    Example:
        >>> client = AgentCoreRuntimeClient('us-west-2')
        >>> ws_url, headers = client.generate_ws_connection(
        ...     runtime_arn='arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime',
        ...     endpoint_name='DEFAULT'
        ... )
    """
    self.logger.info("Generating WebSocket connection credentials...")

    # Validate ARN
    self._parse_runtime_arn(runtime_arn)

    # Auto-generate session ID if not provided
    if not session_id:
        session_id = str(uuid.uuid4())
        self.logger.debug("Auto-generated session ID: %s", session_id)

    # Build WebSocket URL
    ws_url = self._build_websocket_url(runtime_arn, endpoint_name)

    # Get AWS credentials
    credentials = self.session.get_credentials()
    if not credentials:
        raise RuntimeError("No AWS credentials found")

    frozen_credentials = credentials.get_frozen_credentials()

    # Convert wss:// to https:// for signing
    https_url = ws_url.replace("wss://", "https://")
    parsed = urlparse(https_url)
    host = parsed.netloc

    # Create the request to sign
    request = AWSRequest(
        method="GET",
        url=https_url,
        headers={
            "host": host,
            "x-amz-date": datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%dT%H%M%SZ"),
        },
    )

    # Sign the request with SigV4
    auth = SigV4Auth(frozen_credentials, "bedrock-agentcore", self.region)
    auth.add_auth(request)

    # Build headers for WebSocket connection
    headers = {
        "Host": host,
        "X-Amz-Date": request.headers["x-amz-date"],
        "Authorization": request.headers["Authorization"],
        "X-Amzn-Bedrock-AgentCore-Runtime-Session-Id": session_id,
        "Upgrade": "websocket",
        "Connection": "Upgrade",
        "Sec-WebSocket-Version": "13",
        "Sec-WebSocket-Key": base64.b64encode(secrets.token_bytes(16)).decode(),
        "User-Agent": "AgentCoreRuntimeClient/1.0",
    }

    # Add session token if present
    if frozen_credentials.token:
        headers["X-Amz-Security-Token"] = frozen_credentials.token

    self.logger.info("✓ WebSocket connection credentials generated (Session: %s)", session_id)
    return ws_url, headers

generate_ws_connection_oauth(runtime_arn, bearer_token, session_id=None, endpoint_name=None)

Generate WebSocket URL and OAuth headers for runtime connection.

This method uses OAuth bearer token authentication instead of AWS SigV4. Suitable for scenarios where OAuth tokens are used for authentication.

Parameters:

Name Type Description Default
runtime_arn str

Full runtime ARN (e.g., 'arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime-abc')

required
bearer_token str

OAuth bearer token for authentication.

required
session_id Optional[str]

Session ID to use. If None, auto-generates one.

None
endpoint_name Optional[str]

Endpoint name to use as 'qualifier' query parameter. If provided, adds ?qualifier={endpoint_name} to the URL.

None

Returns:

Type Description
Tuple[str, Dict[str, str]]

Tuple[str, Dict[str, str]]: A tuple containing: - WebSocket URL (wss://...) with query parameters - Headers dictionary with OAuth authentication

Raises:

Type Description
ValueError

If runtime_arn format is invalid or bearer_token is empty.

Example

client = AgentCoreRuntimeClient('us-west-2') ws_url, headers = client.generate_ws_connection_oauth( ... runtime_arn='arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime', ... bearer_token='eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...', ... endpoint_name='DEFAULT' ... )

Source code in bedrock_agentcore/runtime/agent_core_runtime_client.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def generate_ws_connection_oauth(
    self,
    runtime_arn: str,
    bearer_token: str,
    session_id: Optional[str] = None,
    endpoint_name: Optional[str] = None,
) -> Tuple[str, Dict[str, str]]:
    """Generate WebSocket URL and OAuth headers for runtime connection.

    This method uses OAuth bearer token authentication instead of AWS SigV4.
    Suitable for scenarios where OAuth tokens are used for authentication.

    Args:
        runtime_arn (str): Full runtime ARN
            (e.g., 'arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime-abc')
        bearer_token (str): OAuth bearer token for authentication.
        session_id (Optional[str]): Session ID to use. If None, auto-generates one.
        endpoint_name (Optional[str]): Endpoint name to use as 'qualifier' query parameter.
            If provided, adds ?qualifier={endpoint_name} to the URL.

    Returns:
        Tuple[str, Dict[str, str]]: A tuple containing:
            - WebSocket URL (wss://...) with query parameters
            - Headers dictionary with OAuth authentication

    Raises:
        ValueError: If runtime_arn format is invalid or bearer_token is empty.

    Example:
        >>> client = AgentCoreRuntimeClient('us-west-2')
        >>> ws_url, headers = client.generate_ws_connection_oauth(
        ...     runtime_arn='arn:aws:bedrock-agentcore:us-west-2:123:runtime/my-runtime',
        ...     bearer_token='eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...',
        ...     endpoint_name='DEFAULT'
        ... )
    """
    self.logger.info("Generating WebSocket connection with OAuth authentication...")

    # Validate inputs
    if not bearer_token:
        raise ValueError("Bearer token cannot be empty")

    # Validate ARN
    self._parse_runtime_arn(runtime_arn)

    # Auto-generate session ID if not provided
    if not session_id:
        session_id = str(uuid.uuid4())
        self.logger.debug("Auto-generated session ID: %s", session_id)

    # Build WebSocket URL
    ws_url = self._build_websocket_url(runtime_arn, endpoint_name)

    # Convert wss:// to https:// to get host
    https_url = ws_url.replace("wss://", "https://")
    parsed = urlparse(https_url)

    # Generate WebSocket key
    ws_key = base64.b64encode(secrets.token_bytes(16)).decode()

    # Build OAuth headers
    headers = {
        "Authorization": f"Bearer {bearer_token}",
        "X-Amzn-Bedrock-AgentCore-Runtime-Session-Id": session_id,
        "Host": parsed.netloc,
        "Connection": "Upgrade",
        "Upgrade": "websocket",
        "Sec-WebSocket-Key": ws_key,
        "Sec-WebSocket-Version": "13",
        "User-Agent": "OAuth-WebSocket-Client/1.0",
    }

    self.logger.info("✓ OAuth WebSocket connection credentials generated (Session: %s)", session_id)
    self.logger.debug("Bearer token length: %d characters", len(bearer_token))

    return ws_url, headers

BedrockAgentCoreApp

Bases: Starlette

Bedrock AgentCore application class that extends Starlette for AI agent deployment.

Source code in bedrock_agentcore/runtime/app.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
class BedrockAgentCoreApp(Starlette):
    """Bedrock AgentCore application class that extends Starlette for AI agent deployment."""

    def __init__(
        self,
        debug: bool = False,
        lifespan: Optional[Lifespan] = None,
        middleware: Sequence[Middleware] | None = None,
    ):
        """Initialize Bedrock AgentCore application.

        Args:
            debug: Enable debug actions for task management (default: False)
            lifespan: Optional lifespan context manager for startup/shutdown
            middleware: Optional sequence of Starlette Middleware objects (or Middleware(...) entries)
        """
        self.handlers: Dict[str, Callable] = {}
        self._ping_handler: Optional[Callable] = None
        self._websocket_handler: Optional[Callable] = None
        self._prompt_key: Optional[str] = None
        self._response_key: Optional[str] = None
        self._active_tasks: Dict[int, Dict[str, Any]] = {}
        self._task_counter_lock: threading.Lock = threading.Lock()
        self._forced_ping_status: Optional[PingStatus] = None
        self._last_status_update_time: float = time.time()
        self._worker_loop: Optional[asyncio.AbstractEventLoop] = None
        self._worker_thread: Optional[threading.Thread] = None
        self._worker_loop_lock: threading.Lock = threading.Lock()

        routes = [
            Route("/invocations", self._handle_invocation, methods=["POST"]),
            Route("/ping", self._handle_ping, methods=["GET"]),
            WebSocketRoute("/ws", self._handle_websocket),
        ]
        super().__init__(routes=routes, lifespan=lifespan, middleware=middleware)
        self.debug = debug  # Set after super().__init__ to avoid override

        self.logger = logging.getLogger("bedrock_agentcore.app")
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            formatter = RequestContextFormatter()
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
            self.logger.setLevel(logging.DEBUG if self.debug else logging.INFO)

    def entrypoint(
        self, func: Callable = None, *, prompt_key: Optional[str] = None, response_key: Optional[str] = None
    ) -> Callable:
        """Decorator to register a function as the main entrypoint.

        Args:
            func: The function to register as entrypoint
            prompt_key: Optional key to extract user prompt from the payload dict.
                If not specified, tries common keys in order (prompt, input, query,
                message, question, user_input) then falls back to JSON serialization.
            response_key: Optional key to extract agent response from the result dict.
                If not specified, the full result is used.

        Returns:
            The decorated function with added serve method

        Examples:
            @app.entrypoint
            def handler(payload):
                ...

            @app.entrypoint(prompt_key="user_input")
            def handler(payload):
                ...
        """

        def decorator(f: Callable) -> Callable:
            self.handlers["main"] = f
            self._prompt_key = prompt_key
            self._response_key = response_key
            f.run = lambda port=8080, host=None: self.run(port, host)
            return f

        if func is not None:
            # Called as @app.entrypoint without arguments
            return decorator(func)
        # Called as @app.entrypoint(...) with arguments
        return decorator

    def ping(self, func: Callable) -> Callable:
        """Decorator to register a custom ping status handler.

        Args:
            func: The function to register as ping status handler

        Returns:
            The decorated function
        """
        self._ping_handler = func
        return func

    def websocket(self, func: Callable) -> Callable:
        """Decorator to register a WebSocket handler at /ws endpoint.

        Args:
            func: The function to register as WebSocket handler

        Returns:
            The decorated function

        Example:
            @app.websocket
            async def handler(websocket, context):
                await websocket.accept()
                # ... handle messages ...
        """
        self._websocket_handler = func
        return func

    def async_task(self, func: Callable) -> Callable:
        """Decorator to track async tasks for ping status.

        When a function is decorated with @async_task, it will:
        - Set ping status to HEALTHY_BUSY while running
        - Revert to HEALTHY when complete
        """
        if not _is_async_callable(func):
            raise ValueError("@async_task can only be applied to async functions")

        async def wrapper(*args, **kwargs):
            task_id = self.add_async_task(func.__name__)

            try:
                self.logger.debug("Starting async task: %s", func.__name__)
                start_time = time.time()
                result = await func(*args, **kwargs)
                duration = time.time() - start_time
                self.logger.info("Async task completed: %s (%.3fs)", func.__name__, duration)
                return result
            except Exception:
                duration = time.time() - start_time
                self.logger.exception("Async task failed: %s (%.3fs)", func.__name__, duration)
                raise
            finally:
                self.complete_async_task(task_id)

        wrapper.__name__ = func.__name__
        return wrapper

    def get_current_ping_status(self) -> PingStatus:
        """Get current ping status (forced > custom > automatic)."""
        current_status = None

        if self._forced_ping_status is not None:
            current_status = self._forced_ping_status
        elif self._ping_handler:
            try:
                result = self._ping_handler()
                if isinstance(result, str):
                    current_status = PingStatus(result)
                else:
                    current_status = result
            except Exception as e:
                self.logger.warning(
                    "Custom ping handler failed, falling back to automatic: %s: %s", type(e).__name__, e
                )

        if current_status is None:
            current_status = PingStatus.HEALTHY_BUSY if self._active_tasks else PingStatus.HEALTHY
        if not hasattr(self, "_last_known_status") or self._last_known_status != current_status:
            self._last_known_status = current_status
            self._last_status_update_time = time.time()

        return current_status

    def force_ping_status(self, status: PingStatus):
        """Force ping status to a specific value."""
        self._forced_ping_status = status

    def clear_forced_ping_status(self):
        """Clear forced status and resume automatic."""
        self._forced_ping_status = None

    def get_async_task_info(self) -> Dict[str, Any]:
        """Get info about running async tasks."""
        running_jobs = []
        for t in self._active_tasks.values():
            try:
                running_jobs.append(
                    {"name": t.get("name", "unknown"), "duration": time.time() - t.get("start_time", time.time())}
                )
            except Exception as e:
                self.logger.warning("Caught exception, continuing...: %s", e)
                continue

        return {"active_count": len(self._active_tasks), "running_jobs": running_jobs}

    def add_async_task(self, name: str, metadata: Optional[Dict] = None) -> int:
        """Register an async task for interactive health tracking.

        This method provides granular control over async task lifecycle,
        allowing developers to interactively start tracking tasks for health monitoring.
        Use this when you need precise control over when tasks begin and end.

        Args:
            name: Human-readable task name for monitoring
            metadata: Optional additional task metadata

        Returns:
            Task ID for tracking and completion

        Example:
            task_id = app.add_async_task("file_processing", {"file": "data.csv"})
            # ... do background work ...
            app.complete_async_task(task_id)
        """
        with self._task_counter_lock:
            task_id = hash(str(uuid.uuid4()))  # Generate truly unique hash-based ID

            # Register task start with same structure as @async_task decorator
            task_info = {"name": name, "start_time": time.time()}
            if metadata:
                task_info["metadata"] = metadata

            self._active_tasks[task_id] = task_info

        self.logger.info("Async task started: %s (ID: %s)", name, task_id)
        return task_id

    def complete_async_task(self, task_id: int) -> bool:
        """Mark an async task as complete for interactive health tracking.

        This method provides granular control over async task lifecycle,
        allowing developers to interactively complete tasks for health monitoring.
        Call this when your background work finishes.

        Args:
            task_id: Task ID returned from add_async_task

        Returns:
            True if task was found and completed, False otherwise

        Example:
            task_id = app.add_async_task("file_processing")
            # ... do background work ...
            completed = app.complete_async_task(task_id)
        """
        with self._task_counter_lock:
            task_info = self._active_tasks.pop(task_id, None)
            if task_info:
                task_name = task_info.get("name", "unknown")
                duration = time.time() - task_info.get("start_time", time.time())

                self.logger.info("Async task completed: %s (ID: %s, Duration: %.2fs)", task_name, task_id, duration)
                return True
            else:
                self.logger.warning("Attempted to complete unknown task ID: %s", task_id)
                return False

    def _build_request_context(self, request) -> RequestContext:
        """Build request context and setup all context variables."""
        try:
            headers = request.headers
            request_id = headers.get(REQUEST_ID_HEADER)
            if not request_id:
                request_id = str(uuid.uuid4())

            session_id = headers.get(SESSION_HEADER)
            BedrockAgentCoreContext.set_request_context(request_id, session_id)

            agent_identity_token = headers.get(ACCESS_TOKEN_HEADER)
            if agent_identity_token:
                BedrockAgentCoreContext.set_workload_access_token(agent_identity_token)

            oauth2_callback_url = headers.get(OAUTH2_CALLBACK_URL_HEADER)
            if oauth2_callback_url:
                BedrockAgentCoreContext.set_oauth2_callback_url(oauth2_callback_url)

            # Collect relevant request headers (Authorization + Custom headers)
            request_headers = {}

            # Add Authorization header if present
            authorization_header = headers.get(AUTHORIZATION_HEADER)
            if authorization_header is not None:
                request_headers[AUTHORIZATION_HEADER] = authorization_header

            # Add custom headers with the specified prefix
            for header_name, header_value in headers.items():
                if header_name.lower().startswith(CUSTOM_HEADER_PREFIX.lower()):
                    request_headers[header_name] = header_value

            # Set in context if any headers were found
            if request_headers:
                BedrockAgentCoreContext.set_request_headers(request_headers)

            # Get the headers from context to pass to RequestContext
            req_headers = BedrockAgentCoreContext.get_request_headers()

            return RequestContext(
                session_id=session_id,
                request_headers=req_headers,
                request=request,  # Pass through the Starlette request object
            )
        except Exception as e:
            self.logger.warning("Failed to build request context: %s: %s", type(e).__name__, e)
            request_id = str(uuid.uuid4())
            BedrockAgentCoreContext.set_request_context(request_id, None)
            return RequestContext(session_id=None, request=None)

    def _takes_context(self, handler: Callable) -> bool:
        try:
            params = list(inspect.signature(handler).parameters.keys())
            return len(params) >= 2 and params[1] == "context"
        except Exception:
            return False

    def _emit_invocation_otel_attributes(self, payload: Any, result: Any) -> None:
        """Emit OTEL span attributes with the invocation input and output.

        These attributes provide a canonical, framework-agnostic source of the
        user's prompt and the agent's response for AgentCore Evaluation. They
        enable evaluation of agents that use custom state schemas (e.g. workflow
        agents with TypedDict states) where the default MessagesState-based
        extraction in the evaluation mapper would fail.

        The attributes are set on the current active span (typically the root
        POST /invocations span created by ADOT auto-instrumentation).

        When prompt_key or response_key is configured via @app.entrypoint(),
        those specific keys are used to extract from dict payloads/results.
        Otherwise, a heuristic tries common keys in priority order.
        """
        try:
            from opentelemetry import trace as otel_trace

            span = otel_trace.get_current_span()
            if not span or not span.is_recording():
                return

            # Extract user prompt from payload
            user_prompt = None
            if isinstance(payload, dict):
                if self._prompt_key is not None:
                    value = payload.get(self._prompt_key)
                    if isinstance(value, str):
                        user_prompt = value
                else:
                    for key in ("prompt", "input", "query", "message", "question", "user_input"):
                        if key in payload and isinstance(payload[key], str):
                            user_prompt = payload[key]
                            break
                if user_prompt is None:
                    user_prompt = json.dumps(payload, ensure_ascii=False, default=str)
            elif isinstance(payload, str):
                user_prompt = payload
            else:
                user_prompt = str(payload)

            # Extract agent response from result
            agent_response = None
            if isinstance(result, str):
                agent_response = result
            elif isinstance(result, Response):
                pass  # Skip for streaming/custom responses — cannot capture full output
            elif isinstance(result, dict) and self._response_key is not None:
                value = result.get(self._response_key)
                if isinstance(value, str):
                    agent_response = value
                elif value is not None:
                    try:
                        agent_response = json.dumps(value, ensure_ascii=False, default=str)
                    except Exception:
                        agent_response = str(value)
            elif result is not None:
                try:
                    agent_response = json.dumps(result, ensure_ascii=False, default=str)
                except Exception:
                    agent_response = str(result)

            if user_prompt:
                span.set_attribute("agentcore.invocation.user_prompt", user_prompt[:16384])
            if agent_response:
                span.set_attribute("agentcore.invocation.agent_response", agent_response[:16384])
        except ImportError:
            pass  # OpenTelemetry not installed — silently skip
        except Exception:
            self.logger.debug("Failed to emit invocation OTEL attributes", exc_info=True)

    async def _handle_invocation(self, request):
        request_context = self._build_request_context(request)

        start_time = time.time()

        try:
            payload = await request.json()
            self.logger.debug("Processing invocation request")

            if self.debug:
                task_response = self._handle_task_action(payload)
                if task_response:
                    duration = time.time() - start_time
                    self.logger.info("Debug action completed (%.3fs)", duration)
                    return task_response

            handler = self.handlers.get("main")
            if not handler:
                self.logger.error("No entrypoint defined")
                return JSONResponse({"error": "No entrypoint defined"}, status_code=500)

            takes_context = self._takes_context(handler)

            handler_name = handler.__name__ if hasattr(handler, "__name__") else "unknown"
            self.logger.debug("Invoking handler: %s", handler_name)
            result = await self._invoke_handler(handler, request_context, takes_context, payload)

            self._emit_invocation_otel_attributes(payload, result)

            duration = time.time() - start_time
            if inspect.isgenerator(result):
                self.logger.info("Returning streaming response (generator) (%.3fs)", duration)
                return StreamingResponse(self._sync_stream_with_error_handling(result), media_type="text/event-stream")
            elif inspect.isasyncgen(result):
                self.logger.info("Returning streaming response (async generator) (%.3fs)", duration)
                return StreamingResponse(self._stream_with_error_handling(result), media_type="text/event-stream")

            # If handler returned a Starlette Response directly, pass it through.
            # This lets handlers control status codes (e.g. JSONResponse(data, status_code=404)).
            if isinstance(result, Response):
                status = getattr(result, "status_code", 200)
                # Log at warning level for error responses so operators can distinguish
                # intentional error responses from successful invocations in logs.
                if status >= 400:
                    self.logger.warning("Invocation returned HTTP %d (%.3fs)", status, duration)
                else:
                    self.logger.info("Invocation completed successfully (%.3fs)", duration)
                return result

            self.logger.info("Invocation completed successfully (%.3fs)", duration)
            # Use safe serialization for consistency with streaming paths
            safe_json_string = self._safe_serialize_to_json_string(result)
            return Response(safe_json_string, media_type="application/json")

        except json.JSONDecodeError as e:
            duration = time.time() - start_time
            self.logger.warning("Invalid JSON in request (%.3fs): %s", duration, e)
            return JSONResponse({"error": "Invalid JSON", "details": str(e)}, status_code=400)
        except UnicodeDecodeError as e:
            duration = time.time() - start_time
            self.logger.warning("Invalid encoding in request (%.3fs): %s", duration, e)
            return JSONResponse({"error": "Invalid encoding", "details": str(e)}, status_code=400)
        except HTTPException as e:
            duration = time.time() - start_time
            # Use error level for 5xx to match the generic Exception handler's severity,
            # since server errors warrant the same urgency regardless of how they're raised.
            # Use warning for 4xx since those are intentional client-error responses.
            if e.status_code >= 500:
                self.logger.error("HTTP %d (%.3fs): %s", e.status_code, duration, e.detail)
            else:
                self.logger.warning("HTTP %d (%.3fs): %s", e.status_code, duration, e.detail)
            return JSONResponse({"error": e.detail}, status_code=e.status_code)
        except Exception as e:
            duration = time.time() - start_time
            self.logger.exception("Invocation failed (%.3fs)", duration)
            return JSONResponse({"error": str(e)}, status_code=500)

    def _handle_ping(self, request):
        try:
            status = self.get_current_ping_status()
            self.logger.debug("Ping request - status: %s", status.value)
            return JSONResponse({"status": status.value, "time_of_last_update": int(self._last_status_update_time)})
        except Exception:
            self.logger.exception("Ping endpoint failed")
            return JSONResponse({"status": PingStatus.HEALTHY.value, "time_of_last_update": int(time.time())})

    async def _handle_websocket(self, websocket: WebSocket):
        """Handle WebSocket connections."""
        request_context = self._build_request_context(websocket)

        try:
            handler = self._websocket_handler
            if not handler:
                self.logger.error("No WebSocket handler defined")
                await websocket.close(code=1011)
                return

            self.logger.debug("WebSocket connection established")
            await handler(websocket, request_context)

        except WebSocketDisconnect:
            self.logger.debug("WebSocket disconnected")
        except Exception:
            self.logger.exception("WebSocket handler failed")
            try:
                await websocket.close(code=1011)
            except Exception:
                pass

    def run(self, port: int = 8080, host: Optional[str] = None, **kwargs):
        """Start the Bedrock AgentCore server.

        Args:
            port: Port to serve on, defaults to 8080
            host: Host to bind to, auto-detected if None
            **kwargs: Additional arguments passed to uvicorn.run()
        """
        import os

        import uvicorn

        if host is None:
            if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_CONTAINER"):
                host = "0.0.0.0"  # nosec B104 - Docker needs this to expose the port
            else:
                host = "127.0.0.1"

        # Set default uvicorn parameters, allow kwargs to override
        uvicorn_params = {
            "host": host,
            "port": port,
            "access_log": self.debug,
            "log_level": "info" if self.debug else "warning",
        }
        uvicorn_params.update(kwargs)

        uvicorn.run(self, **uvicorn_params)

    def _ensure_worker_loop(self) -> asyncio.AbstractEventLoop:
        """Lazily create and start a dedicated worker event loop in a background thread.

        The worker loop isolates async handler execution from the main event loop,
        ensuring that blocking async handlers do not prevent /ping from responding.
        """
        if self._worker_loop is not None and self._worker_loop.is_running():
            return self._worker_loop
        with self._worker_loop_lock:
            if self._worker_loop is None or not self._worker_loop.is_running():
                self._worker_loop = asyncio.new_event_loop()
                self._worker_thread = threading.Thread(
                    target=self._run_worker_loop,
                    daemon=True,
                    name="agentcore-worker-loop",
                )
                self._worker_thread.start()
        return self._worker_loop

    def _run_worker_loop(self) -> None:
        """Entry point for the worker loop background thread."""
        asyncio.set_event_loop(self._worker_loop)
        self._worker_loop.run_forever()

    @staticmethod
    async def _run_with_context(coro: Any, ctx: contextvars.Context) -> Any:
        """Run a coroutine after restoring context variables from a snapshot."""
        _restore_context(ctx)
        return await coro

    def _async_gen_to_sync_gen(self, async_gen: Any, ctx: contextvars.Context) -> Any:
        """Bridge an async generator through the worker loop as a sync generator.

        The async generator is iterated on the worker loop. Chunks are sent to
        a thread-safe queue and yielded synchronously. Starlette's StreamingResponse
        iterates this sync generator via iterate_in_threadpool, so the main event
        loop is never blocked.
        """
        worker_loop = self._ensure_worker_loop()
        q: queue.Queue = queue.Queue(maxsize=100)
        _DONE = object()

        async def _produce() -> None:
            _restore_context(ctx)
            try:
                async for chunk in async_gen:
                    q.put((True, chunk))
                q.put((True, _DONE))
            except BaseException as e:
                q.put((False, e))

        worker_loop.call_soon_threadsafe(lambda: worker_loop.create_task(_produce()))

        while True:
            ok, value = q.get()
            if not ok:
                raise value
            if value is _DONE:
                break
            yield value

    async def _invoke_handler(self, handler: Callable, request_context: Any, takes_context: bool, payload: Any) -> Any:
        """Dispatch handler execution based on handler type.

        - Async generator functions: bridged through the worker loop as a sync generator
        - Regular async functions: run on the dedicated worker event loop
        - Sync functions (including sync generators): run in the thread pool

        This ensures the main event loop stays responsive for /ping health checks
        regardless of whether handlers contain blocking operations.
        """
        try:
            args = (payload, request_context) if takes_context else (payload,)
            ctx = contextvars.copy_context()

            if _is_async_gen_callable(handler):
                return self._async_gen_to_sync_gen(handler(*args), ctx)
            elif _is_async_callable(handler):
                worker_loop = self._ensure_worker_loop()
                future = asyncio.run_coroutine_threadsafe(self._run_with_context(handler(*args), ctx), worker_loop)
                result = await asyncio.wrap_future(future)
                if inspect.isasyncgen(result):
                    return self._async_gen_to_sync_gen(result, ctx)
                return result
            else:
                return await run_in_threadpool(ctx.run, handler, *args)
        except Exception:
            handler_name = getattr(handler, "__name__", "unknown")
            self.logger.debug("Handler '%s' execution failed", handler_name)
            raise

    def _handle_task_action(self, payload: dict) -> Optional[JSONResponse]:
        """Handle task management actions if present in payload."""
        action = payload.get("_agent_core_app_action")
        if not action:
            return None

        self.logger.debug("Processing debug action: %s", action)

        try:
            actions = {
                TASK_ACTION_PING_STATUS: lambda: JSONResponse(
                    {
                        "status": self.get_current_ping_status().value,
                        "time_of_last_update": int(self._last_status_update_time),
                    }
                ),
                TASK_ACTION_JOB_STATUS: lambda: JSONResponse(self.get_async_task_info()),
                TASK_ACTION_FORCE_HEALTHY: lambda: (
                    self.force_ping_status(PingStatus.HEALTHY),
                    self.logger.info("Ping status forced to Healthy"),
                    JSONResponse({"forced_status": "Healthy"}),
                )[2],
                TASK_ACTION_FORCE_BUSY: lambda: (
                    self.force_ping_status(PingStatus.HEALTHY_BUSY),
                    self.logger.info("Ping status forced to HealthyBusy"),
                    JSONResponse({"forced_status": "HealthyBusy"}),
                )[2],
                TASK_ACTION_CLEAR_FORCED_STATUS: lambda: (
                    self.clear_forced_ping_status(),
                    self.logger.info("Forced ping status cleared"),
                    JSONResponse({"forced_status": "Cleared"}),
                )[2],
            }

            if action in actions:
                response = actions[action]()
                self.logger.debug("Debug action '%s' completed successfully", action)
                return response

            self.logger.warning("Unknown debug action requested: %s", action)
            return JSONResponse({"error": f"Unknown action: {action}"}, status_code=400)

        except Exception as e:
            self.logger.exception("Debug action '%s' failed", action)
            return JSONResponse({"error": "Debug action failed", "details": str(e)}, status_code=500)

    async def _stream_with_error_handling(self, generator):
        """Wrap async generator to handle errors and convert to SSE format."""
        try:
            async for value in generator:
                yield self._convert_to_sse(value)
        except Exception as e:
            self.logger.exception("Error in async streaming")
            error_event = {
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "An error occurred during streaming",
            }
            yield self._convert_to_sse(error_event)

    def _safe_serialize_to_json_string(self, obj):
        """Safely serialize object directly to JSON string with progressive fallback handling.

        This method eliminates double JSON encoding by returning the JSON string directly,
        avoiding the test-then-encode pattern that leads to redundant json.dumps() calls.
        Used by both streaming and non-streaming responses for consistent behavior.

        Returns:
            str: JSON string representation of the object
        """
        try:
            # First attempt: direct JSON serialization with Unicode support
            return json.dumps(obj, ensure_ascii=False)
        except (TypeError, ValueError, UnicodeEncodeError):
            try:
                # Second attempt: convert to serializable dictionaries, then JSON encode the dictionaries
                converted_obj = convert_complex_objects(obj)
                return json.dumps(converted_obj, ensure_ascii=False)
            except Exception:
                try:
                    # Third attempt: convert to string, then JSON encode the string
                    return json.dumps(str(obj), ensure_ascii=False)
                except Exception as e:
                    # Final fallback: JSON encode error object with ASCII fallback for problematic Unicode
                    self.logger.warning("Failed to serialize object: %s: %s", type(e).__name__, e)
                    error_obj = {"error": "Serialization failed", "original_type": type(obj).__name__}
                    return json.dumps(error_obj, ensure_ascii=False)

    def _convert_to_sse(self, obj) -> bytes:
        """Convert object to Server-Sent Events format using safe serialization.

        Args:
            obj: Object to convert to SSE format

        Returns:
            bytes: SSE-formatted data ready for streaming
        """
        json_string = self._safe_serialize_to_json_string(obj)
        sse_data = f"data: {json_string}\n\n"
        return sse_data.encode("utf-8")

    def _sync_stream_with_error_handling(self, generator):
        """Wrap sync generator to handle errors and convert to SSE format."""
        try:
            for value in generator:
                yield self._convert_to_sse(value)
        except Exception as e:
            self.logger.exception("Error in sync streaming")
            error_event = {
                "error": str(e),
                "error_type": type(e).__name__,
                "message": "An error occurred during streaming",
            }
            yield self._convert_to_sse(error_event)

__init__(debug=False, lifespan=None, middleware=None)

Initialize Bedrock AgentCore application.

Parameters:

Name Type Description Default
debug bool

Enable debug actions for task management (default: False)

False
lifespan Optional[Lifespan]

Optional lifespan context manager for startup/shutdown

None
middleware Sequence[Middleware] | None

Optional sequence of Starlette Middleware objects (or Middleware(...) entries)

None
Source code in bedrock_agentcore/runtime/app.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(
    self,
    debug: bool = False,
    lifespan: Optional[Lifespan] = None,
    middleware: Sequence[Middleware] | None = None,
):
    """Initialize Bedrock AgentCore application.

    Args:
        debug: Enable debug actions for task management (default: False)
        lifespan: Optional lifespan context manager for startup/shutdown
        middleware: Optional sequence of Starlette Middleware objects (or Middleware(...) entries)
    """
    self.handlers: Dict[str, Callable] = {}
    self._ping_handler: Optional[Callable] = None
    self._websocket_handler: Optional[Callable] = None
    self._prompt_key: Optional[str] = None
    self._response_key: Optional[str] = None
    self._active_tasks: Dict[int, Dict[str, Any]] = {}
    self._task_counter_lock: threading.Lock = threading.Lock()
    self._forced_ping_status: Optional[PingStatus] = None
    self._last_status_update_time: float = time.time()
    self._worker_loop: Optional[asyncio.AbstractEventLoop] = None
    self._worker_thread: Optional[threading.Thread] = None
    self._worker_loop_lock: threading.Lock = threading.Lock()

    routes = [
        Route("/invocations", self._handle_invocation, methods=["POST"]),
        Route("/ping", self._handle_ping, methods=["GET"]),
        WebSocketRoute("/ws", self._handle_websocket),
    ]
    super().__init__(routes=routes, lifespan=lifespan, middleware=middleware)
    self.debug = debug  # Set after super().__init__ to avoid override

    self.logger = logging.getLogger("bedrock_agentcore.app")
    if not self.logger.handlers:
        handler = logging.StreamHandler()
        formatter = RequestContextFormatter()
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.DEBUG if self.debug else logging.INFO)

add_async_task(name, metadata=None)

Register an async task for interactive health tracking.

This method provides granular control over async task lifecycle, allowing developers to interactively start tracking tasks for health monitoring. Use this when you need precise control over when tasks begin and end.

Parameters:

Name Type Description Default
name str

Human-readable task name for monitoring

required
metadata Optional[Dict]

Optional additional task metadata

None

Returns:

Type Description
int

Task ID for tracking and completion

Example

task_id = app.add_async_task("file_processing", {"file": "data.csv"})

... do background work ...

app.complete_async_task(task_id)

Source code in bedrock_agentcore/runtime/app.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def add_async_task(self, name: str, metadata: Optional[Dict] = None) -> int:
    """Register an async task for interactive health tracking.

    This method provides granular control over async task lifecycle,
    allowing developers to interactively start tracking tasks for health monitoring.
    Use this when you need precise control over when tasks begin and end.

    Args:
        name: Human-readable task name for monitoring
        metadata: Optional additional task metadata

    Returns:
        Task ID for tracking and completion

    Example:
        task_id = app.add_async_task("file_processing", {"file": "data.csv"})
        # ... do background work ...
        app.complete_async_task(task_id)
    """
    with self._task_counter_lock:
        task_id = hash(str(uuid.uuid4()))  # Generate truly unique hash-based ID

        # Register task start with same structure as @async_task decorator
        task_info = {"name": name, "start_time": time.time()}
        if metadata:
            task_info["metadata"] = metadata

        self._active_tasks[task_id] = task_info

    self.logger.info("Async task started: %s (ID: %s)", name, task_id)
    return task_id

async_task(func)

Decorator to track async tasks for ping status.

When a function is decorated with @async_task, it will: - Set ping status to HEALTHY_BUSY while running - Revert to HEALTHY when complete

Source code in bedrock_agentcore/runtime/app.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def async_task(self, func: Callable) -> Callable:
    """Decorator to track async tasks for ping status.

    When a function is decorated with @async_task, it will:
    - Set ping status to HEALTHY_BUSY while running
    - Revert to HEALTHY when complete
    """
    if not _is_async_callable(func):
        raise ValueError("@async_task can only be applied to async functions")

    async def wrapper(*args, **kwargs):
        task_id = self.add_async_task(func.__name__)

        try:
            self.logger.debug("Starting async task: %s", func.__name__)
            start_time = time.time()
            result = await func(*args, **kwargs)
            duration = time.time() - start_time
            self.logger.info("Async task completed: %s (%.3fs)", func.__name__, duration)
            return result
        except Exception:
            duration = time.time() - start_time
            self.logger.exception("Async task failed: %s (%.3fs)", func.__name__, duration)
            raise
        finally:
            self.complete_async_task(task_id)

    wrapper.__name__ = func.__name__
    return wrapper

clear_forced_ping_status()

Clear forced status and resume automatic.

Source code in bedrock_agentcore/runtime/app.py
278
279
280
def clear_forced_ping_status(self):
    """Clear forced status and resume automatic."""
    self._forced_ping_status = None

complete_async_task(task_id)

Mark an async task as complete for interactive health tracking.

This method provides granular control over async task lifecycle, allowing developers to interactively complete tasks for health monitoring. Call this when your background work finishes.

Parameters:

Name Type Description Default
task_id int

Task ID returned from add_async_task

required

Returns:

Type Description
bool

True if task was found and completed, False otherwise

Example

task_id = app.add_async_task("file_processing")

... do background work ...

completed = app.complete_async_task(task_id)

Source code in bedrock_agentcore/runtime/app.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def complete_async_task(self, task_id: int) -> bool:
    """Mark an async task as complete for interactive health tracking.

    This method provides granular control over async task lifecycle,
    allowing developers to interactively complete tasks for health monitoring.
    Call this when your background work finishes.

    Args:
        task_id: Task ID returned from add_async_task

    Returns:
        True if task was found and completed, False otherwise

    Example:
        task_id = app.add_async_task("file_processing")
        # ... do background work ...
        completed = app.complete_async_task(task_id)
    """
    with self._task_counter_lock:
        task_info = self._active_tasks.pop(task_id, None)
        if task_info:
            task_name = task_info.get("name", "unknown")
            duration = time.time() - task_info.get("start_time", time.time())

            self.logger.info("Async task completed: %s (ID: %s, Duration: %.2fs)", task_name, task_id, duration)
            return True
        else:
            self.logger.warning("Attempted to complete unknown task ID: %s", task_id)
            return False

entrypoint(func=None, *, prompt_key=None, response_key=None)

Decorator to register a function as the main entrypoint.

Parameters:

Name Type Description Default
func Callable

The function to register as entrypoint

None
prompt_key Optional[str]

Optional key to extract user prompt from the payload dict. If not specified, tries common keys in order (prompt, input, query, message, question, user_input) then falls back to JSON serialization.

None
response_key Optional[str]

Optional key to extract agent response from the result dict. If not specified, the full result is used.

None

Returns:

Type Description
Callable

The decorated function with added serve method

Examples:

@app.entrypoint def handler(payload): ...

@app.entrypoint(prompt_key="user_input") def handler(payload): ...

Source code in bedrock_agentcore/runtime/app.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def entrypoint(
    self, func: Callable = None, *, prompt_key: Optional[str] = None, response_key: Optional[str] = None
) -> Callable:
    """Decorator to register a function as the main entrypoint.

    Args:
        func: The function to register as entrypoint
        prompt_key: Optional key to extract user prompt from the payload dict.
            If not specified, tries common keys in order (prompt, input, query,
            message, question, user_input) then falls back to JSON serialization.
        response_key: Optional key to extract agent response from the result dict.
            If not specified, the full result is used.

    Returns:
        The decorated function with added serve method

    Examples:
        @app.entrypoint
        def handler(payload):
            ...

        @app.entrypoint(prompt_key="user_input")
        def handler(payload):
            ...
    """

    def decorator(f: Callable) -> Callable:
        self.handlers["main"] = f
        self._prompt_key = prompt_key
        self._response_key = response_key
        f.run = lambda port=8080, host=None: self.run(port, host)
        return f

    if func is not None:
        # Called as @app.entrypoint without arguments
        return decorator(func)
    # Called as @app.entrypoint(...) with arguments
    return decorator

force_ping_status(status)

Force ping status to a specific value.

Source code in bedrock_agentcore/runtime/app.py
274
275
276
def force_ping_status(self, status: PingStatus):
    """Force ping status to a specific value."""
    self._forced_ping_status = status

get_async_task_info()

Get info about running async tasks.

Source code in bedrock_agentcore/runtime/app.py
282
283
284
285
286
287
288
289
290
291
292
293
294
def get_async_task_info(self) -> Dict[str, Any]:
    """Get info about running async tasks."""
    running_jobs = []
    for t in self._active_tasks.values():
        try:
            running_jobs.append(
                {"name": t.get("name", "unknown"), "duration": time.time() - t.get("start_time", time.time())}
            )
        except Exception as e:
            self.logger.warning("Caught exception, continuing...: %s", e)
            continue

    return {"active_count": len(self._active_tasks), "running_jobs": running_jobs}

get_current_ping_status()

Get current ping status (forced > custom > automatic).

Source code in bedrock_agentcore/runtime/app.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def get_current_ping_status(self) -> PingStatus:
    """Get current ping status (forced > custom > automatic)."""
    current_status = None

    if self._forced_ping_status is not None:
        current_status = self._forced_ping_status
    elif self._ping_handler:
        try:
            result = self._ping_handler()
            if isinstance(result, str):
                current_status = PingStatus(result)
            else:
                current_status = result
        except Exception as e:
            self.logger.warning(
                "Custom ping handler failed, falling back to automatic: %s: %s", type(e).__name__, e
            )

    if current_status is None:
        current_status = PingStatus.HEALTHY_BUSY if self._active_tasks else PingStatus.HEALTHY
    if not hasattr(self, "_last_known_status") or self._last_known_status != current_status:
        self._last_known_status = current_status
        self._last_status_update_time = time.time()

    return current_status

ping(func)

Decorator to register a custom ping status handler.

Parameters:

Name Type Description Default
func Callable

The function to register as ping status handler

required

Returns:

Type Description
Callable

The decorated function

Source code in bedrock_agentcore/runtime/app.py
188
189
190
191
192
193
194
195
196
197
198
def ping(self, func: Callable) -> Callable:
    """Decorator to register a custom ping status handler.

    Args:
        func: The function to register as ping status handler

    Returns:
        The decorated function
    """
    self._ping_handler = func
    return func

run(port=8080, host=None, **kwargs)

Start the Bedrock AgentCore server.

Parameters:

Name Type Description Default
port int

Port to serve on, defaults to 8080

8080
host Optional[str]

Host to bind to, auto-detected if None

None
**kwargs

Additional arguments passed to uvicorn.run()

{}
Source code in bedrock_agentcore/runtime/app.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
def run(self, port: int = 8080, host: Optional[str] = None, **kwargs):
    """Start the Bedrock AgentCore server.

    Args:
        port: Port to serve on, defaults to 8080
        host: Host to bind to, auto-detected if None
        **kwargs: Additional arguments passed to uvicorn.run()
    """
    import os

    import uvicorn

    if host is None:
        if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_CONTAINER"):
            host = "0.0.0.0"  # nosec B104 - Docker needs this to expose the port
        else:
            host = "127.0.0.1"

    # Set default uvicorn parameters, allow kwargs to override
    uvicorn_params = {
        "host": host,
        "port": port,
        "access_log": self.debug,
        "log_level": "info" if self.debug else "warning",
    }
    uvicorn_params.update(kwargs)

    uvicorn.run(self, **uvicorn_params)

websocket(func)

Decorator to register a WebSocket handler at /ws endpoint.

Parameters:

Name Type Description Default
func Callable

The function to register as WebSocket handler

required

Returns:

Type Description
Callable

The decorated function

Example

@app.websocket async def handler(websocket, context): await websocket.accept() # ... handle messages ...

Source code in bedrock_agentcore/runtime/app.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def websocket(self, func: Callable) -> Callable:
    """Decorator to register a WebSocket handler at /ws endpoint.

    Args:
        func: The function to register as WebSocket handler

    Returns:
        The decorated function

    Example:
        @app.websocket
        async def handler(websocket, context):
            await websocket.accept()
            # ... handle messages ...
    """
    self._websocket_handler = func
    return func

BedrockAgentCoreContext

Unified context manager for Bedrock AgentCore.

Source code in bedrock_agentcore/runtime/context.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class BedrockAgentCoreContext:
    """Unified context manager for Bedrock AgentCore."""

    _workload_access_token: ContextVar[Optional[str]] = ContextVar("workload_access_token")
    _oauth2_callback_url: ContextVar[Optional[str]] = ContextVar("oauth2_callback_url")
    _request_id: ContextVar[Optional[str]] = ContextVar("request_id")
    _session_id: ContextVar[Optional[str]] = ContextVar("session_id")
    _request_headers: ContextVar[Optional[Dict[str, str]]] = ContextVar("request_headers")

    @classmethod
    def set_workload_access_token(cls, token: str):
        """Set the workload access token in the context."""
        cls._workload_access_token.set(token)

    @classmethod
    def get_workload_access_token(cls) -> Optional[str]:
        """Get the workload access token from the context."""
        try:
            return cls._workload_access_token.get()
        except LookupError:
            return None

    @classmethod
    def set_oauth2_callback_url(cls, workload_callback_url: str):
        """Set the oauth2 callback url in the context."""
        cls._oauth2_callback_url.set(workload_callback_url)

    @classmethod
    def get_oauth2_callback_url(cls) -> Optional[str]:
        """Get the oauth2 callback url from the context."""
        try:
            return cls._oauth2_callback_url.get()
        except LookupError:
            return None

    @classmethod
    def set_request_context(cls, request_id: str, session_id: Optional[str] = None):
        """Set request-scoped identifiers."""
        cls._request_id.set(request_id)
        cls._session_id.set(session_id)

    @classmethod
    def get_request_id(cls) -> Optional[str]:
        """Get current request ID."""
        try:
            return cls._request_id.get()
        except LookupError:
            return None

    @classmethod
    def get_session_id(cls) -> Optional[str]:
        """Get current session ID."""
        try:
            return cls._session_id.get()
        except LookupError:
            return None

    @classmethod
    def set_request_headers(cls, headers: Dict[str, str]):
        """Set request headers in the context."""
        cls._request_headers.set(headers)

    @classmethod
    def get_request_headers(cls) -> Optional[Dict[str, str]]:
        """Get request headers from the context."""
        try:
            return cls._request_headers.get()
        except LookupError:
            return None

get_oauth2_callback_url() classmethod

Get the oauth2 callback url from the context.

Source code in bedrock_agentcore/runtime/context.py
52
53
54
55
56
57
58
@classmethod
def get_oauth2_callback_url(cls) -> Optional[str]:
    """Get the oauth2 callback url from the context."""
    try:
        return cls._oauth2_callback_url.get()
    except LookupError:
        return None

get_request_headers() classmethod

Get request headers from the context.

Source code in bedrock_agentcore/runtime/context.py
87
88
89
90
91
92
93
@classmethod
def get_request_headers(cls) -> Optional[Dict[str, str]]:
    """Get request headers from the context."""
    try:
        return cls._request_headers.get()
    except LookupError:
        return None

get_request_id() classmethod

Get current request ID.

Source code in bedrock_agentcore/runtime/context.py
66
67
68
69
70
71
72
@classmethod
def get_request_id(cls) -> Optional[str]:
    """Get current request ID."""
    try:
        return cls._request_id.get()
    except LookupError:
        return None

get_session_id() classmethod

Get current session ID.

Source code in bedrock_agentcore/runtime/context.py
74
75
76
77
78
79
80
@classmethod
def get_session_id(cls) -> Optional[str]:
    """Get current session ID."""
    try:
        return cls._session_id.get()
    except LookupError:
        return None

get_workload_access_token() classmethod

Get the workload access token from the context.

Source code in bedrock_agentcore/runtime/context.py
39
40
41
42
43
44
45
@classmethod
def get_workload_access_token(cls) -> Optional[str]:
    """Get the workload access token from the context."""
    try:
        return cls._workload_access_token.get()
    except LookupError:
        return None

set_oauth2_callback_url(workload_callback_url) classmethod

Set the oauth2 callback url in the context.

Source code in bedrock_agentcore/runtime/context.py
47
48
49
50
@classmethod
def set_oauth2_callback_url(cls, workload_callback_url: str):
    """Set the oauth2 callback url in the context."""
    cls._oauth2_callback_url.set(workload_callback_url)

set_request_context(request_id, session_id=None) classmethod

Set request-scoped identifiers.

Source code in bedrock_agentcore/runtime/context.py
60
61
62
63
64
@classmethod
def set_request_context(cls, request_id: str, session_id: Optional[str] = None):
    """Set request-scoped identifiers."""
    cls._request_id.set(request_id)
    cls._session_id.set(session_id)

set_request_headers(headers) classmethod

Set request headers in the context.

Source code in bedrock_agentcore/runtime/context.py
82
83
84
85
@classmethod
def set_request_headers(cls, headers: Dict[str, str]):
    """Set request headers in the context."""
    cls._request_headers.set(headers)

set_workload_access_token(token) classmethod

Set the workload access token in the context.

Source code in bedrock_agentcore/runtime/context.py
34
35
36
37
@classmethod
def set_workload_access_token(cls, token: str):
    """Set the workload access token in the context."""
    cls._workload_access_token.set(token)

PingStatus

Bases: str, Enum

Ping status enum for health check responses.

Source code in bedrock_agentcore/runtime/models.py
 9
10
11
12
13
class PingStatus(str, Enum):
    """Ping status enum for health check responses."""

    HEALTHY = "Healthy"
    HEALTHY_BUSY = "HealthyBusy"

RequestContext

Bases: BaseModel

Request context containing metadata from HTTP requests.

Source code in bedrock_agentcore/runtime/context.py
12
13
14
15
16
17
18
19
20
21
22
class RequestContext(BaseModel):
    """Request context containing metadata from HTTP requests."""

    session_id: Optional[str] = Field(None)
    request_headers: Optional[Dict[str, str]] = Field(None)
    request: Optional[Any] = Field(None, description="The underlying Starlette request object")

    class Config:
        """Allow non-serializable types like Starlette Request."""

        arbitrary_types_allowed = True

Config

Allow non-serializable types like Starlette Request.

Source code in bedrock_agentcore/runtime/context.py
19
20
21
22
class Config:
    """Allow non-serializable types like Starlette Request."""

    arbitrary_types_allowed = True

__getattr__(name)

Lazy imports for A2A and AG-UI symbols so optional dependencies are not required at import time.

Source code in bedrock_agentcore/runtime/__init__.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def __getattr__(name: str):
    """Lazy imports for A2A and AG-UI symbols so optional dependencies are not required at import time."""
    _a2a_exports = {"BedrockCallContextBuilder", "build_a2a_app", "build_runtime_url", "serve_a2a"}
    if name in _a2a_exports:
        from . import a2a as _a2a_module

        return getattr(_a2a_module, name)

    _ag_ui_exports = {"AGUIApp", "build_ag_ui_app", "serve_ag_ui"}
    if name in _ag_ui_exports:
        from . import ag_ui as _ag_ui_module

        return getattr(_ag_ui_module, name)

    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")