12
12
import requests
13
13
from botocore .client import BaseClient
14
14
from botocore .config import Config
15
+ from botocore .exceptions import ClientError
15
16
from typing_extensions import Tuple , override
16
17
17
18
_PORT : int = 8080
@@ -45,6 +46,10 @@ def do_GET(self):
45
46
self ._handle_kinesis_request ()
46
47
if self .in_path ("bedrock" ):
47
48
self ._handle_bedrock_request ()
49
+ if self .in_path ("secretsmanager" ):
50
+ self ._handle_secretsmanager_request ()
51
+ if self .in_path ("stepfunctions" ):
52
+ self ._handle_stepfunctions_request ()
48
53
49
54
self ._end_request (self .main_status )
50
55
@@ -246,7 +251,11 @@ def _handle_bedrock_request(self) -> None:
246
251
set_main_status (200 )
247
252
bedrock_client .meta .events .register (
248
253
"before-call.bedrock.GetGuardrail" ,
249
- lambda ** kwargs : inject_200_success (guardrailId = "bt4o77i015cu" , ** kwargs ),
254
+ lambda ** kwargs : inject_200_success (
255
+ guardrailId = "bt4o77i015cu" ,
256
+ guardrailArn = "arn:aws:bedrock:us-east-1:000000000000:guardrail/bt4o77i015cu" ,
257
+ ** kwargs ,
258
+ ),
250
259
)
251
260
bedrock_client .get_guardrail (
252
261
guardrailIdentifier = "arn:aws:bedrock:us-east-1:000000000000:guardrail/bt4o77i015cu"
@@ -301,6 +310,69 @@ def _handle_bedrock_request(self) -> None:
301
310
else :
302
311
set_main_status (404 )
303
312
313
+ def _handle_secretsmanager_request (self ) -> None :
314
+ secretsmanager_client = boto3 .client ("secretsmanager" , endpoint_url = _AWS_SDK_ENDPOINT , region_name = _AWS_REGION )
315
+ if self .in_path (_ERROR ):
316
+ set_main_status (400 )
317
+ try :
318
+ error_client = boto3 .client ("secretsmanager" , endpoint_url = _ERROR_ENDPOINT , region_name = _AWS_REGION )
319
+ error_client .describe_secret (
320
+ SecretId = "arn:aws:secretsmanager:us-west-2:000000000000:secret:unExistSecret"
321
+ )
322
+ except Exception as exception :
323
+ print ("Expected exception occurred" , exception )
324
+ elif self .in_path (_FAULT ):
325
+ set_main_status (500 )
326
+ try :
327
+ fault_client = boto3 .client (
328
+ "secretsmanager" , endpoint_url = _FAULT_ENDPOINT , region_name = _AWS_REGION , config = _NO_RETRY_CONFIG
329
+ )
330
+ fault_client .get_secret_value (
331
+ SecretId = "arn:aws:secretsmanager:us-west-2:000000000000:secret:nonexistent-secret"
332
+ )
333
+ except Exception as exception :
334
+ print ("Expected exception occurred" , exception )
335
+ elif self .in_path ("describesecret/my-secret" ):
336
+ set_main_status (200 )
337
+ secretsmanager_client .describe_secret (SecretId = "testSecret" )
338
+ else :
339
+ set_main_status (404 )
340
+
341
+ def _handle_stepfunctions_request (self ) -> None :
342
+ sfn_client = boto3 .client ("stepfunctions" , endpoint_url = _AWS_SDK_ENDPOINT , region_name = _AWS_REGION )
343
+ if self .in_path (_ERROR ):
344
+ set_main_status (400 )
345
+ try :
346
+ error_client = boto3 .client ("stepfunctions" , endpoint_url = _ERROR_ENDPOINT , region_name = _AWS_REGION )
347
+ error_client .describe_state_machine (
348
+ stateMachineArn = "arn:aws:states:us-west-2:000000000000:stateMachine:unExistStateMachine"
349
+ )
350
+ except Exception as exception :
351
+ print ("Expected exception occurred" , exception )
352
+ elif self .in_path (_FAULT ):
353
+ set_main_status (500 )
354
+ try :
355
+ fault_client = boto3 .client ("stepfunctions" , endpoint_url = _FAULT_ENDPOINT , region_name = _AWS_REGION )
356
+ fault_client .meta .events .register (
357
+ "before-call.stepfunctions.ListStateMachineVersions" ,
358
+ lambda ** kwargs : inject_500_error ("ListStateMachineVersions" , ** kwargs ),
359
+ )
360
+ fault_client .list_state_machine_versions (
361
+ stateMachineArn = "arn:aws:states:us-west-2:000000000000:stateMachine:invalid-state-machine" ,
362
+ )
363
+ except Exception as exception :
364
+ print ("Expected exception occurred" , exception )
365
+ elif self .in_path ("describestatemachine/my-state-machine" ):
366
+ set_main_status (200 )
367
+ sfn_client .describe_state_machine (
368
+ stateMachineArn = "arn:aws:states:us-west-2:000000000000:stateMachine:testStateMachine"
369
+ )
370
+ elif self .in_path ("describeactivity/my-activity" ):
371
+ set_main_status (200 )
372
+ sfn_client .describe_activity (activityArn = "arn:aws:states:us-west-2:000000000000:activity:testActivity" )
373
+ else :
374
+ set_main_status (404 )
375
+
304
376
def _end_request (self , status_code : int ):
305
377
self .send_response_only (status_code )
306
378
self .end_headers ()
@@ -310,6 +382,7 @@ def set_main_status(status: int) -> None:
310
382
RequestHandler .main_status = status
311
383
312
384
385
+ # pylint: disable=too-many-locals
313
386
def prepare_aws_server () -> None :
314
387
requests .Request (method = "POST" , url = "http://localhost:4566/_localstack/state/reset" )
315
388
try :
@@ -345,6 +418,57 @@ def prepare_aws_server() -> None:
345
418
# Set up Kinesis so tests can access a stream.
346
419
kinesis_client : BaseClient = boto3 .client ("kinesis" , endpoint_url = _AWS_SDK_ENDPOINT , region_name = _AWS_REGION )
347
420
kinesis_client .create_stream (StreamName = "test_stream" , ShardCount = 1 )
421
+
422
+ # Set up Secrets Manager so tests can access a secret.
423
+ secretsmanager_client : BaseClient = boto3 .client (
424
+ "secretsmanager" , endpoint_url = _AWS_SDK_ENDPOINT , region_name = _AWS_REGION
425
+ )
426
+ secretsmanager_response = secretsmanager_client .list_secrets ()
427
+ secret = next ((s for s in secretsmanager_response ["SecretList" ] if s ["Name" ] == "testSecret" ), None )
428
+ if not secret :
429
+ secretsmanager_client .create_secret (
430
+ Name = "testSecret" , SecretString = "secretValue" , Description = "This is a test secret"
431
+ )
432
+
433
+ # Set up Step Functions so tests can access a state machine and activity.
434
+ sfn_client : BaseClient = boto3 .client ("stepfunctions" , endpoint_url = _AWS_SDK_ENDPOINT , region_name = _AWS_REGION )
435
+ sfn_response = sfn_client .list_state_machines ()
436
+ state_machine_name = "testStateMachine"
437
+ activity_name = "testActivity"
438
+ state_machine = next ((st for st in sfn_response ["stateMachines" ] if st ["name" ] == state_machine_name ), None )
439
+ if not state_machine :
440
+ # create state machine needs an iam role so we create it here
441
+ iam_client : BaseClient = boto3 .client ("iam" , endpoint_url = _AWS_SDK_ENDPOINT , region_name = _AWS_REGION )
442
+ iam_role_name = "testRole"
443
+ iam_role_arn = None
444
+ trust_policy = {
445
+ "Version" : "2012-10-17" ,
446
+ "Statement" : [
447
+ {"Effect" : "Allow" , "Principal" : {"Service" : "states.amazonaws.com" }, "Action" : "sts:AssumeRole" }
448
+ ],
449
+ }
450
+ try :
451
+ iam_response = iam_client .create_role (
452
+ RoleName = iam_role_name , AssumeRolePolicyDocument = json .dumps (trust_policy )
453
+ )
454
+ iam_client .attach_role_policy (
455
+ RoleName = iam_role_name , PolicyArn = "arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess"
456
+ )
457
+ print (f"IAM Role '{ iam_role_name } ' create successfully." )
458
+ iam_role_arn = iam_response ["Role" ]["Arn" ]
459
+ sfn_defintion = {
460
+ "Comment" : "A simple sequential workflow" ,
461
+ "StartAt" : "FirstState" ,
462
+ "States" : {"FirstState" : {"Type" : "Pass" , "Result" : "Hello, World!" , "End" : True }},
463
+ }
464
+ definition_string = json .dumps (sfn_defintion )
465
+ sfn_client .create_state_machine (
466
+ name = state_machine_name , definition = definition_string , roleArn = iam_role_arn
467
+ )
468
+ sfn_client .create_activity (name = activity_name )
469
+ except Exception as exception :
470
+ print ("Something went wrong with Step Functions setup" , exception )
471
+
348
472
except Exception as exception :
349
473
print ("Unexpected exception occurred" , exception )
350
474
@@ -363,6 +487,9 @@ def inject_200_success(**kwargs):
363
487
guardrail_id = kwargs .get ("guardrailId" )
364
488
if guardrail_id is not None :
365
489
response_body ["guardrailId" ] = guardrail_id
490
+ guardrail_arn = kwargs .get ("guardrailArn" )
491
+ if guardrail_arn is not None :
492
+ response_body ["guardrailArn" ] = guardrail_arn
366
493
367
494
HTTPResponse = namedtuple ("HTTPResponse" , ["status_code" , "headers" , "body" ])
368
495
headers = kwargs .get ("headers" , {})
@@ -371,6 +498,16 @@ def inject_200_success(**kwargs):
371
498
return http_response , response_body
372
499
373
500
501
+ def inject_500_error (api_name : str , ** kwargs ):
502
+ raise ClientError (
503
+ {
504
+ "Error" : {"Code" : "InternalServerError" , "Message" : "Internal Server Error" },
505
+ "ResponseMetadata" : {"HTTPStatusCode" : 500 , "RequestId" : "mock-request-id" },
506
+ },
507
+ api_name ,
508
+ )
509
+
510
+
374
511
def main () -> None :
375
512
prepare_aws_server ()
376
513
server_address : Tuple [str , int ] = ("0.0.0.0" , _PORT )
0 commit comments