Skip to content

Commit 34ea8d6

Browse files
authored
Merge pull request #211 from aws-samples/feat_python_container
feat: Add Python backend implementation with multi-language CDK support
2 parents 2568d46 + 01b79b1 commit 34ea8d6

25 files changed

+2047
-25
lines changed

samples/speech-to-speech/.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
!jest.config.js
2-
*.d.ts
2+
# *.d.ts
33
node_modules
44

55
# CDK asset staging directory

samples/speech-to-speech/README.md

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,23 @@
22

33
## Table of Contents
44

5-
- [Overview](#overview)
6-
- [Architecture](#architecture)
7-
- [Project Structure](#project-structure)
8-
- [Prerequisites](#prerequisites)
9-
- [Deployment](#deployment)
10-
- [User creation](#user-creation)
11-
- [Usage](#usage)
12-
- [Load testing](#load-testing)
13-
- [Clean Up](#clean-up)
14-
- [Content Security Legal Disclaimer](#content-security-legal-disclaimer)
15-
- [Operational Metrics Collection](#operational-metrics-collection)
5+
- [Nova Sonic Solution](#nova-sonic-solution)
6+
- [Table of Contents](#table-of-contents)
7+
- [Overview](#overview)
8+
- [Architecture](#architecture)
9+
- [Project Structure](#project-structure)
10+
- [Backend Implementation Options](#backend-implementation-options)
11+
- [Java WebSocket Server (Default)](#java-websocket-server-default)
12+
- [Python WebSocket Server](#python-websocket-server)
13+
- [Language Selection](#language-selection)
14+
- [Prerequisites](#prerequisites)
15+
- [Deployment](#deployment)
16+
- [User creation](#user-creation)
17+
- [Usage](#usage)
18+
- [Load testing](#load-testing)
19+
- [Clean Up](#clean-up)
20+
- [Content Security Legal Disclaimer](#content-security-legal-disclaimer)
21+
- [Operational Metrics Collection](#operational-metrics-collection)
1622

1723
## Overview
1824

@@ -49,11 +55,39 @@ The solution consists of three main components:
4955
├── frontend/ # React + TypeScript frontend application
5056
├── backend/ # AWS CDK infrastructure and Java WebSocket server
5157
│ ├── app/ # Java WebSocket server implementation
58+
│ ├── python_app/ # Python WebSocket server implementation
5259
│ ├── stack/ # CDK infrastructure code
5360
│ └── load-test/ # WebSocket load testing suite
5461
└── images/ # Architecture diagrams and documentation images
5562
```
5663

64+
### Backend Implementation Options
65+
66+
The solution supports two backend implementations with identical functionality:
67+
68+
#### Java WebSocket Server (Default)
69+
- Java-based WebSocket server implementation
70+
- Production-ready with comprehensive error handling
71+
- Jetty WebSocket server with connection pooling
72+
- Real-time speech-to-speech communication
73+
- Cognito token validation
74+
- Connection management and logging
75+
76+
#### Python WebSocket Server
77+
- Python-based alternative implementation
78+
- AWS Bedrock integration with `aws_sdk_bedrock_runtime` beta SDK
79+
- Equivalent functionality to Java implementation
80+
- **Python AWS SDK Notice**: This implementation uses the experimental AWS SDK for Python async clients. The SDK is in early development and may see rapid iteration with potential breaking changes between minor versions. For more details about the experimental AWS SDK, see: https://github.com/awslabs/aws-sdk-python/tree/develop
81+
82+
### Language Selection
83+
84+
The CDK deployment supports backend language selection:
85+
86+
- **Java (default)**: `cdk deploy`
87+
- **Python**: `cdk deploy --context custom:backendLanguage=python`
88+
89+
The `cdk.json` file contains the context parameter `custom:backendLanguage` (set to "python" for Python deployment).
90+
5791
## Prerequisites
5892

5993
- [Python](https://www.python.org/downloads/) 3.11 or higher

samples/speech-to-speech/backend/cdk.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
]
1616
},
1717
"context": {
18+
"custom:backendLanguage": "",
1819
"@aws-cdk/aws-lambda:recognizeLayerVersion": true,
1920
"@aws-cdk/core:checkSecretUsage": true,
2021
"@aws-cdk/core:target-partitions": [
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
FROM python:3.13-alpine
2+
3+
WORKDIR /app
4+
5+
# Copy requirements file
6+
COPY requirements.txt ./
7+
8+
# Install dependencies
9+
RUN apk add --update jq curl py-pip inotify-tools
10+
RUN pip install --no-cache-dir -r requirements.txt
11+
12+
# Create non-root user
13+
RUN addgroup -S appuser && adduser -S appuser -G appuser
14+
15+
# Copy all Python files, directories, and entrypoint.sh
16+
# Refactored structure includes core/, clients/, events/, tools/ directories
17+
COPY . .
18+
19+
# Set environment variables
20+
ENV LOGLEVEL=INFO
21+
ENV HOST=0.0.0.0
22+
ENV PORT=8080
23+
ENV AWS_DEFAULT_REGION=us-east-1
24+
ENV PYTHONPATH=/app
25+
26+
# Set execute permission for entrypoint.sh
27+
RUN chmod +x entrypoint.sh
28+
29+
# Change ownership of the application files to the non-root user
30+
RUN chown -R appuser:appuser /app
31+
32+
# Switch to non-root user
33+
USER appuser
34+
35+
# Expose single port
36+
EXPOSE ${PORT}
37+
38+
# Health check
39+
HEALTHCHECK --interval=30s --timeout=5s --start-period=30s --retries=3 CMD curl -f http://localhost:${PORT}/health || exit 1
40+
41+
ENTRYPOINT ["./entrypoint.sh"]

samples/speech-to-speech/backend/python_app/clients/__init__.py

Whitespace-only changes.
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance
6+
# with the License. A copy of the License is located at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES
11+
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions
12+
# and limitations under the License.
13+
#
14+
import json
15+
import logging
16+
import warnings
17+
import os
18+
import time
19+
from aws_sdk_bedrock_runtime.client import (
20+
BedrockRuntimeClient,
21+
InvokeModelWithBidirectionalStreamOperationInput,
22+
)
23+
from aws_sdk_bedrock_runtime.models import (
24+
InvokeModelWithBidirectionalStreamInputChunk,
25+
BidirectionalInputPayloadPart,
26+
)
27+
from aws_sdk_bedrock_runtime.config import (
28+
Config,
29+
HTTPAuthSchemeResolver,
30+
SigV4AuthScheme,
31+
)
32+
from smithy_aws_core.credentials_resolvers.environment import (
33+
EnvironmentCredentialsResolver,
34+
)
35+
36+
# Configure logging
37+
logger = logging.getLogger(__name__)
38+
39+
# Suppress warnings
40+
warnings.filterwarnings("ignore")
41+
42+
43+
class BedrockInteractClient:
44+
"""Client for interacting with AWS Bedrock Nova Sonic model"""
45+
46+
def __init__(self, model_id="amazon.nova-sonic-v1:0", region="us-east-1"):
47+
"""Initialize the Bedrock client.
48+
49+
Args:
50+
model_id (str): Bedrock model ID to use
51+
region (str): AWS region
52+
"""
53+
self.model_id = model_id
54+
self.region = region
55+
self.bedrock_client = None
56+
self.last_credential_check = 0
57+
self.credential_signal_file = "/tmp/credentials_refreshed"
58+
logger.info(
59+
f"Initializing BedrockInteractClient [model_id={model_id}, region={region}]"
60+
)
61+
62+
def _check_credential_refresh(self):
63+
"""Check if credentials have been refreshed and recreate client if needed."""
64+
try:
65+
if os.path.exists(self.credential_signal_file):
66+
signal_mtime = os.path.getmtime(self.credential_signal_file)
67+
68+
if signal_mtime > self.last_credential_check:
69+
# A real credential refresh from background daemon
70+
logger.info("Credential refresh signal detected - recreating Bedrock client")
71+
self.bedrock_client = None # Force recreation
72+
self.last_credential_check = signal_mtime
73+
# Remove the signal file after processing
74+
os.remove(self.credential_signal_file)
75+
except Exception as e:
76+
logger.error(f"Error checking credential refresh signal: {e}")
77+
78+
def initialize_client(self):
79+
"""Initialize the Bedrock client."""
80+
# Check if credentials were refreshed
81+
self._check_credential_refresh()
82+
83+
if self.bedrock_client is not None:
84+
return True
85+
86+
logger.info(f"Initializing Bedrock client for region {self.region}")
87+
try:
88+
config = Config(
89+
endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com",
90+
region=self.region,
91+
aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
92+
http_auth_scheme_resolver=HTTPAuthSchemeResolver(),
93+
http_auth_schemes={"aws.auth#sigv4": SigV4AuthScheme()},
94+
)
95+
self.bedrock_client = BedrockRuntimeClient(config=config)
96+
logger.info(
97+
"Bedrock client initialized successfully with EnvironmentCredentialsResolver"
98+
)
99+
return True
100+
except Exception as e:
101+
logger.error(
102+
f"Failed to initialize Bedrock client: {str(e)}", exc_info=True
103+
)
104+
return False
105+
106+
async def refresh_credentials_immediately(self):
107+
"""Refresh credentials immediately by calling the container metadata endpoint"""
108+
try:
109+
logger.info("Refreshing credentials due to ExpiredToken...")
110+
# Get credentials from ECS container metadata endpoint
111+
uri = os.environ.get('AWS_CONTAINER_CREDENTIALS_RELATIVE_URI')
112+
if not uri:
113+
logger.error("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI not found in environment")
114+
return False
115+
116+
import aiohttp
117+
try:
118+
async with aiohttp.ClientSession() as session:
119+
async with session.get(f"http://169.254.170.2{uri}", timeout=2) as response:
120+
if not response.ok:
121+
logger.error(f"Failed to fetch credentials: {response.status}")
122+
return False
123+
124+
creds = await response.json()
125+
except ImportError:
126+
# Fall back to requests if aiohttp is not available
127+
import requests
128+
response = requests.get(f"http://169.254.170.2{uri}", timeout=2)
129+
if not response.ok:
130+
logger.error(f"Failed to fetch credentials: {response.status_code}")
131+
return False
132+
133+
creds = response.json()
134+
135+
os.environ['AWS_ACCESS_KEY_ID'] = creds['AccessKeyId']
136+
os.environ['AWS_SECRET_ACCESS_KEY'] = creds['SecretAccessKey']
137+
os.environ['AWS_SESSION_TOKEN'] = creds['Token']
138+
139+
logger.info(f"Successfully refreshed credentials, new key ends with: ...{creds['AccessKeyId'][-4:]}")
140+
141+
# Force client recreation on next use
142+
self.bedrock_client = None
143+
return True
144+
except Exception as e:
145+
logger.error(f"Error refreshing credentials: {str(e)}")
146+
return False
147+
148+
async def create_stream(self):
149+
"""Create a bidirectional stream with Bedrock.
150+
151+
Returns:
152+
stream: Bedrock bidirectional stream
153+
"""
154+
logger.info(f"Creating bidirectional stream for model {self.model_id}")
155+
try:
156+
if not self.bedrock_client:
157+
if not self.initialize_client():
158+
raise Exception("Failed to initialize Bedrock client")
159+
160+
stream = await self.bedrock_client.invoke_model_with_bidirectional_stream(
161+
InvokeModelWithBidirectionalStreamOperationInput(model_id=self.model_id)
162+
)
163+
logger.info("Stream initialized successfully")
164+
return stream
165+
except Exception as e:
166+
if "ExpiredToken" in str(e):
167+
current_key = os.environ.get('AWS_ACCESS_KEY_ID', '')
168+
logger.warning(f"ExpiredToken error occurred with credential ending: ...{current_key[-4:] if len(current_key) >= 4 else 'NONE'}")
169+
170+
# Try to refresh and retry
171+
if await self.refresh_credentials_immediately():
172+
logger.info("Retrying stream creation with new credentials")
173+
# Recursive retry once
174+
return await self.create_stream()
175+
176+
logger.error(f"Failed to initialize stream: {str(e)}", exc_info=True)
177+
raise
178+
179+
async def send_event(self, stream, event_data):
180+
"""Send an event to the Bedrock stream.
181+
182+
Args:
183+
stream: Bedrock bidirectional stream
184+
event_data (dict): Event data to send
185+
186+
Returns:
187+
bool: True if successful, False otherwise
188+
"""
189+
try:
190+
list(event_data.get("event", {}).keys())[
191+
0
192+
] if "event" in event_data else "unknown"
193+
194+
event_json = json.dumps(event_data)
195+
event = InvokeModelWithBidirectionalStreamInputChunk(
196+
value=BidirectionalInputPayloadPart(bytes_=event_json.encode("utf-8"))
197+
)
198+
await stream.input_stream.send(event)
199+
return True
200+
except Exception as e:
201+
logger.error(f"Error sending event: {str(e)}", exc_info=True)
202+
return False
203+
204+
async def close_stream(self, stream):
205+
"""Close the Bedrock stream.
206+
207+
Args:
208+
stream: Bedrock bidirectional stream
209+
210+
Returns:
211+
bool: True if successful, False otherwise
212+
"""
213+
try:
214+
if stream:
215+
await stream.input_stream.close()
216+
logger.info("Stream closed successfully")
217+
return True
218+
return False
219+
except Exception as e:
220+
logger.error(f"Error closing stream: {str(e)}", exc_info=True)
221+
return False

0 commit comments

Comments
 (0)