Skip to content

Gateway Integration Examples

Lambda Function as MCP Tool

from bedrock_agentcore.gateway import GatewayClient
import json

client = GatewayClient(region_name='us-west-2')

# Define Lambda tools with detailed schemas
lambda_config = {
    "arn": "arn:aws:lambda:us-west-2:123:function:DataProcessor",
    "tools": [
        {
            "name": "process_data",
            "description": "Process user data in JSON or CSV format",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "data": {"type": "string"},
                    "format": {"type": "string"}  # Note: enum not supported, document in description
                },
                "required": ["data", "format"]
            }
        },
        {
            "name": "validate_data",
            "description": "Validate data structure",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "data": {"type": "string"}
                },
                "required": ["data"]
            }
        }
    ]
}

# Create Gateway with semantic search enabled
cognito = client.create_oauth_authorizer_with_cognito("data-processor")
gateway = client.setup_gateway(
    gateway_name="data-processor",
    target_source=json.dumps(lambda_config),
    execution_role_arn="arn:aws:iam::123:role/ExecutionRole",
    authorizer_config=cognito['authorizer_config'],
    target_type='lambda',
    enable_semantic_search=True,
    description="Data processing gateway with validation tools"
)

print(f"Gateway created: {gateway.get_mcp_url()}")

OpenAPI Integration

From S3

gateway = client.setup_gateway(
    gateway_name="my-api",
    target_source="s3://my-bucket/api-spec.json",
    execution_role_arn=role_arn,
    authorizer_config=cognito['authorizer_config'],
    target_type='openapi'
)

Inline OpenAPI Specification

openapi_spec = {
    "openapi": "3.0.0",
    "info": {"title": "User API", "version": "1.0.0"},
    "servers": [{"url": "https://api.example.com"}],
    "paths": {
        "/users": {
            "get": {
                "operationId": "listUsers",
                "summary": "List all users",
                "responses": {"200": {"description": "User list"}}
            }
        },
        "/users/{id}": {
            "get": {
                "operationId": "getUser",
                "summary": "Get user by ID",
                "parameters": [{
                    "name": "id",
                    "in": "path",
                    "required": True,
                    "schema": {"type": "string"}
                }],
                "responses": {"200": {"description": "User found"}}
            }
        }
    }
}

gateway = client.setup_gateway(
    gateway_name="user-api",
    target_source=json.dumps(openapi_spec),
    execution_role_arn=role_arn,
    authorizer_config=cognito['authorizer_config'],
    target_type='openapi'
)

YAML OpenAPI (from file)

import yaml

# Load YAML OpenAPI spec
with open('openapi.yaml', 'r') as f:
    yaml_content = f.read()
    openapi_spec = yaml.safe_load(yaml_content)

# Convert to JSON string for inline use
gateway = client.setup_gateway(
    gateway_name="yaml-api",
    target_source=json.dumps(openapi_spec),
    execution_role_arn=role_arn,
    authorizer_config=cognito['authorizer_config'],
    target_type='openapi'
)

# Or use S3 (YAML files work directly)
gateway = client.setup_gateway(
    gateway_name="yaml-api",
    target_source="s3://my-bucket/openapi.yaml",
    execution_role_arn=role_arn,
    authorizer_config=cognito['authorizer_config'],
    target_type='openapi'
)

OAuth Token Management

When integrating Gateway with any agent framework, you'll need to handle OAuth tokens properly:

import os
from datetime import datetime, timedelta
import httpx
import asyncio

class GatewayTokenManager:
    """Manages OAuth tokens with automatic refresh"""

    def __init__(self, client_id, client_secret, token_endpoint, scope):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_endpoint = token_endpoint
        self.scope = scope
        self._token = None
        self._expires_at = None

    async def get_token(self):
        """Get valid token, refreshing if needed"""
        if self._token and self._expires_at > datetime.now():
            return self._token

        # Fetch new token
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.token_endpoint,
                data={
                    'grant_type': 'client_credentials',
                    'client_id': self.client_id,
                    'client_secret': self.client_secret,
                    'scope': self.scope
                },
                headers={'Content-Type': 'application/x-www-form-urlencoded'}
            )
            data = response.json()
            self._token = data['access_token']
            # Buffer expiry by 5 minutes
            expires_in = data.get('expires_in', 3600) - 300
            self._expires_at = datetime.now() + timedelta(seconds=expires_in)
            return self._token

Generic Agent Integration

Here's how to integrate Gateway with any agent framework:

import os
import asyncio
from bedrock_agentcore import BedrockAgentCoreApp

# Initialize token manager with Gateway credentials
token_manager = GatewayTokenManager(
    client_id=os.environ['GATEWAY_CLIENT_ID'],
    client_secret=os.environ['GATEWAY_CLIENT_SECRET'],
    token_endpoint=os.environ['GATEWAY_TOKEN_ENDPOINT'],
    scope=os.environ['GATEWAY_SCOPE']
)

# Gateway MCP endpoint
GATEWAY_URL = os.environ['GATEWAY_MCP_URL']

# Generic function to call Gateway tools
async def call_gateway_tool(tool_name: str, arguments: dict):
    """Call any tool exposed through Gateway"""
    token = await token_manager.get_token()

    async with httpx.AsyncClient() as client:
        response = await client.post(
            GATEWAY_URL,
            headers={
                "Authorization": f"Bearer {token}",
                "Content-Type": "application/json"
            },
            json={
                "jsonrpc": "2.0",
                "id": 1,
                "method": "tools/call",
                "params": {
                    "name": tool_name,
                    "arguments": arguments
                }
            }
        )

        result = response.json()
        if 'error' in result:
            raise Exception(f"Tool error: {result['error']}")

        return result.get('result')

# Example: Using in your agent logic
async def process_user_request(user_message: str):
    # Parse intent from user message
    if "weather" in user_message.lower():
        # Extract location (this would be done by your agent's NLU)
        location = extract_location(user_message)
        weather_data = await call_gateway_tool("get_weather", {"location": location})
        return f"The weather in {location} is: {weather_data}"

    elif "user" in user_message.lower():
        # Get user information
        user_id = extract_user_id(user_message)
        user_data = await call_gateway_tool("getUser", {"id": user_id})
        return f"User information: {user_data}"

    return "I couldn't understand your request."

Complete Example: Weather Agent

from bedrock_agentcore.gateway import GatewayClient
import json
import asyncio
import httpx

# Step 1: Create Gateway
async def setup_weather_gateway():
    client = GatewayClient(region_name='us-west-2')

    # Configure Lambda with weather tools
    lambda_config = {
        "arn": "arn:aws:lambda:us-west-2:123:function:WeatherService",
        "tools": [
            {
                "name": "get_current_weather",
                "description": "Get current weather for a city",
                "inputSchema": {
                    "type": "object",
                    "properties": {
                        "city": {"type": "string"},
                        "country": {"type": "string"}
                    },
                    "required": ["city"]
                }
            },
            {
                "name": "get_forecast",
                "description": "Get 5-day weather forecast",
                "inputSchema": {
                    "type": "object",
                    "properties": {
                        "city": {"type": "string"},
                        "days": {"type": "number"}
                    },
                    "required": ["city"]
                }
            }
        ]
    }

    # Create Gateway with EZ Auth
    cognito = client.create_oauth_authorizer_with_cognito("weather-service")
    gateway = client.setup_gateway(
        gateway_name="weather-service",
        target_source=json.dumps(lambda_config),
        execution_role_arn="arn:aws:iam::123:role/WeatherExecutionRole",
        authorizer_config=cognito['authorizer_config'],
        target_type='lambda',
        enable_semantic_search=True
    )

    return gateway, cognito['client_info']

# Step 2: Use the Gateway
async def weather_agent():
    gateway, client_info = await setup_weather_gateway()

    # Initialize token manager
    token_manager = GatewayTokenManager(
        client_id=client_info['client_id'],
        client_secret=client_info['client_secret'],
        token_endpoint=client_info['token_endpoint'],
        scope=client_info['scope']
    )

    # Get weather for multiple cities
    cities = ["Seattle", "New York", "London"]

    for city in cities:
        token = await token_manager.get_token()

        async with httpx.AsyncClient() as client:
            response = await client.post(
                gateway.get_mcp_url(),
                headers={"Authorization": f"Bearer {token}"},
                json={
                    "jsonrpc": "2.0",
                    "id": 1,
                    "method": "tools/call",
                    "params": {
                        "name": "get_current_weather",
                        "arguments": {"city": city}
                    }
                }
            )

            result = response.json()
            print(f"Weather in {city}: {result.get('result')}")

# Run the agent
if __name__ == "__main__":
    asyncio.run(weather_agent())