21
21
22
22
from aiobotocore .config import AioConfig
23
23
from pydantic import StrictBool , StrictStr
24
+ from tenacity import (
25
+ retry ,
26
+ retry_if_exception_type ,
27
+ stop_after_attempt ,
28
+ wait_exponential ,
29
+ )
24
30
25
31
from feast import Entity , FeatureView , utils
26
32
from feast .infra .infra_object import DYNAMODB_INFRA_OBJECT_CLASS_TYPE , InfraObject
@@ -786,6 +792,14 @@ def _latest_data_to_write(
786
792
return (v for _ , v in OrderedDict ((ah [0 ], ah [1 ]) for ah in sorted_data ).items ())
787
793
788
794
795
+ class RetryableBotoError (Exception ):
796
+ pass
797
+
798
+
799
+ class LimitExceededException (RetryableBotoError ):
800
+ pass
801
+
802
+
789
803
class _DynamoTableManager :
790
804
def __init__ (
791
805
self , dynamodb_resource , config : RepoConfig , feature_view : FeatureView
@@ -820,6 +834,12 @@ def table_tags(self) -> list[dict[str, str]]:
820
834
821
835
return common_tags + table_tags
822
836
837
+ @retry (
838
+ wait = wait_exponential (multiplier = 1 , max = 4 ),
839
+ retry = retry_if_exception_type (RetryableBotoError ),
840
+ stop = stop_after_attempt (3 ),
841
+ reraise = True ,
842
+ )
823
843
def _update_tags (self , new_tags : list [dict [str , str ]]):
824
844
table_arn = self ._dynamodb_client .describe_table (TableName = self .table_name )[
825
845
"Table"
@@ -834,7 +854,11 @@ def _update_tags(self, new_tags: list[dict[str, str]]):
834
854
)
835
855
836
856
if new_tags :
837
- self ._dynamodb_client .tag_resource (ResourceArn = table_arn , Tags = new_tags )
857
+ try :
858
+ self ._dynamodb_client .tag_resource (ResourceArn = table_arn , Tags = new_tags )
859
+ except ClientError as ce :
860
+ if ce .response ["Error" ]["Code" ] == "LimitExceededException" :
861
+ raise LimitExceededException from ce
838
862
839
863
def update (self ):
840
864
# Add Tags attribute to creation request only if configured to prevent
0 commit comments