-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: Make dynamo table tagging opt-in #5438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -15,12 +15,18 @@ | |||||
import contextlib | ||||||
import itertools | ||||||
import logging | ||||||
from collections import OrderedDict, defaultdict | ||||||
from collections import OrderedDict | ||||||
from datetime import datetime | ||||||
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union | ||||||
|
||||||
from aiobotocore.config import AioConfig | ||||||
from pydantic import StrictBool, StrictStr | ||||||
from tenacity import ( | ||||||
retry, | ||||||
retry_if_exception_type, | ||||||
stop_after_attempt, | ||||||
wait_exponential, | ||||||
) | ||||||
|
||||||
from feast import Entity, FeatureView, utils | ||||||
from feast.infra.infra_object import DYNAMODB_INFRA_OBJECT_CLASS_TYPE, InfraObject | ||||||
|
@@ -74,7 +80,10 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): | |||||
"""Whether to read from Dynamodb by forcing consistent reads""" | ||||||
|
||||||
tags: Union[Dict[str, str], None] = None | ||||||
"""AWS resource tags added to each table""" | ||||||
"""Key-value pairs added to each feature-view""" | ||||||
|
||||||
tag_aws_resources: StrictBool = False | ||||||
"""Add the feature-view tags to the underlying AWS dynamodb tables""" | ||||||
|
||||||
session_based_auth: bool = False | ||||||
"""AWS session based client authentication""" | ||||||
|
@@ -138,38 +147,6 @@ async def close(self): | |||||
def async_supported(self) -> SupportedAsyncMethods: | ||||||
return SupportedAsyncMethods(read=True, write=True) | ||||||
|
||||||
@staticmethod | ||||||
def _table_tags(online_config, table_instance) -> list[dict[str, str]]: | ||||||
table_instance_tags = table_instance.tags or {} | ||||||
online_tags = online_config.tags or {} | ||||||
|
||||||
common_tags = [ | ||||||
{"Key": key, "Value": table_instance_tags.get(key) or value} | ||||||
for key, value in online_tags.items() | ||||||
] | ||||||
table_tags = [ | ||||||
{"Key": key, "Value": value} | ||||||
for key, value in table_instance_tags.items() | ||||||
if key not in online_tags | ||||||
] | ||||||
|
||||||
return common_tags + table_tags | ||||||
|
||||||
@staticmethod | ||||||
def _update_tags(dynamodb_client, table_name: str, new_tags: list[dict[str, str]]): | ||||||
table_arn = dynamodb_client.describe_table(TableName=table_name)["Table"][ | ||||||
"TableArn" | ||||||
] | ||||||
current_tags = dynamodb_client.list_tags_of_resource(ResourceArn=table_arn)[ | ||||||
"Tags" | ||||||
] | ||||||
if current_tags: | ||||||
remove_keys = [tag["Key"] for tag in current_tags] | ||||||
dynamodb_client.untag_resource(ResourceArn=table_arn, TagKeys=remove_keys) | ||||||
|
||||||
if new_tags: | ||||||
dynamodb_client.tag_resource(ResourceArn=table_arn, Tags=new_tags) | ||||||
|
||||||
def update( | ||||||
self, | ||||||
config: RepoConfig, | ||||||
|
@@ -189,59 +166,25 @@ def update( | |||||
""" | ||||||
online_config = config.online_store | ||||||
assert isinstance(online_config, DynamoDBOnlineStoreConfig) | ||||||
dynamodb_client = self._get_dynamodb_client( | ||||||
online_config.region, | ||||||
online_config.endpoint_url, | ||||||
online_config.session_based_auth, | ||||||
) | ||||||
|
||||||
dynamodb_resource = self._get_dynamodb_resource( | ||||||
online_config.region, | ||||||
online_config.endpoint_url, | ||||||
online_config.session_based_auth, | ||||||
) | ||||||
|
||||||
do_tag_updates = defaultdict(bool) | ||||||
for table_instance in tables_to_keep: | ||||||
# Add Tags attribute to creation request only if configured to prevent | ||||||
# TagResource permission issues, even with an empty Tags array. | ||||||
table_tags = self._table_tags(online_config, table_instance) | ||||||
kwargs = {"Tags": table_tags} if table_tags else {} | ||||||
def get_table_manager(table): | ||||||
return _DynamoTableManager( | ||||||
dynamodb_resource=dynamodb_resource, | ||||||
config=config, | ||||||
feature_view=table, | ||||||
) | ||||||
|
||||||
table_name = _get_table_name(online_config, config, table_instance) | ||||||
try: | ||||||
dynamodb_resource.create_table( | ||||||
TableName=table_name, | ||||||
KeySchema=[{"AttributeName": "entity_id", "KeyType": "HASH"}], | ||||||
AttributeDefinitions=[ | ||||||
{"AttributeName": "entity_id", "AttributeType": "S"} | ||||||
], | ||||||
BillingMode="PAY_PER_REQUEST", | ||||||
**kwargs, | ||||||
) | ||||||
for table in tables_to_keep: | ||||||
get_table_manager(table).update() | ||||||
|
||||||
except ClientError as ce: | ||||||
do_tag_updates[table_name] = True | ||||||
|
||||||
# If the table creation fails with ResourceInUseException, | ||||||
# it means the table already exists or is being created. | ||||||
# Otherwise, re-raise the exception | ||||||
if ce.response["Error"]["Code"] != "ResourceInUseException": | ||||||
raise | ||||||
|
||||||
for table_instance in tables_to_keep: | ||||||
table_name = _get_table_name(online_config, config, table_instance) | ||||||
dynamodb_client.get_waiter("table_exists").wait(TableName=table_name) | ||||||
# once table is confirmed to exist, update the tags. | ||||||
# tags won't be updated in the create_table call if the table already exists | ||||||
if do_tag_updates[table_name]: | ||||||
tags = self._table_tags(online_config, table_instance) | ||||||
self._update_tags(dynamodb_client, table_name, tags) | ||||||
|
||||||
for table_to_delete in tables_to_delete: | ||||||
_delete_table_idempotent( | ||||||
dynamodb_resource, | ||||||
_get_table_name(online_config, config, table_to_delete), | ||||||
) | ||||||
for table in tables_to_delete: | ||||||
get_table_manager(table).delete() | ||||||
|
||||||
def teardown( | ||||||
self, | ||||||
|
@@ -265,9 +208,11 @@ def teardown( | |||||
) | ||||||
|
||||||
for table in tables: | ||||||
_delete_table_idempotent( | ||||||
dynamodb_resource, _get_table_name(online_config, config, table) | ||||||
) | ||||||
_DynamoTableManager( | ||||||
dynamodb_resource=dynamodb_resource, | ||||||
config=config, | ||||||
feature_view=table, | ||||||
).delete() | ||||||
|
||||||
def online_write_batch( | ||||||
self, | ||||||
|
@@ -845,3 +790,102 @@ def _latest_data_to_write( | |||||
as_hashable = ((d[0].SerializeToString(), d) for d in data) | ||||||
sorted_data = sorted(as_hashable, key=lambda ah: (ah[0], ah[1][2])) | ||||||
return (v for _, v in OrderedDict((ah[0], ah[1]) for ah in sorted_data).items()) | ||||||
|
||||||
|
||||||
class RetryableBotoError(Exception): | ||||||
pass | ||||||
|
||||||
|
||||||
class LimitExceededException(RetryableBotoError): | ||||||
pass | ||||||
|
||||||
|
||||||
class _DynamoTableManager: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the online store class has become a cluttered god class, move the dynamo table creation/destruction to a helper class |
||||||
def __init__( | ||||||
self, dynamodb_resource, config: RepoConfig, feature_view: FeatureView | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I would rather keep the parameter name in the context of Database that is table but accept FeatureView object as is suggests. |
||||||
): | ||||||
self.config = config | ||||||
self.feature_view = feature_view | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
self._dynamodb_resource = dynamodb_resource | ||||||
|
||||||
@property | ||||||
def _dynamodb_client(self): | ||||||
return self._dynamodb_resource.meta.client | ||||||
|
||||||
@property | ||||||
def table_name(self) -> str: | ||||||
return _get_table_name(self.config.online_store, self.config, self.feature_view) | ||||||
|
||||||
def table_tags(self) -> list[dict[str, str]]: | ||||||
table_instance_tags = self.feature_view.tags or {} | ||||||
online_tags = self.config.online_store.tags or {} | ||||||
|
||||||
common_tags = [ | ||||||
{"Key": key, "Value": table_instance_tags.get(key) or value} | ||||||
for key, value in online_tags.items() | ||||||
] | ||||||
table_tags = [ | ||||||
{"Key": key, "Value": value} | ||||||
for key, value in table_instance_tags.items() | ||||||
if key not in online_tags | ||||||
] | ||||||
|
||||||
return common_tags + table_tags | ||||||
|
||||||
@retry( | ||||||
wait=wait_exponential(multiplier=1, max=4), | ||||||
retry=retry_if_exception_type(RetryableBotoError), | ||||||
stop=stop_after_attempt(3), | ||||||
reraise=True, | ||||||
) | ||||||
Comment on lines
+835
to
+840
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tagging a lot of dynamo tables can lead to api rate limiting errors, retry with backoff |
||||||
def _update_tags(self, new_tags: list[dict[str, str]]): | ||||||
table_arn = self._dynamodb_client.describe_table(TableName=self.table_name)[ | ||||||
"Table" | ||||||
]["TableArn"] | ||||||
current_tags = self._dynamodb_client.list_tags_of_resource( | ||||||
ResourceArn=table_arn | ||||||
)["Tags"] | ||||||
if current_tags: | ||||||
remove_keys = [tag["Key"] for tag in current_tags] | ||||||
self._dynamodb_client.untag_resource( | ||||||
ResourceArn=table_arn, TagKeys=remove_keys | ||||||
) | ||||||
|
||||||
if new_tags: | ||||||
try: | ||||||
self._dynamodb_client.tag_resource(ResourceArn=table_arn, Tags=new_tags) | ||||||
except ClientError as ce: | ||||||
if ce.response["Error"]["Code"] == "LimitExceededException": | ||||||
raise LimitExceededException from ce | ||||||
|
||||||
def update(self): | ||||||
# Add Tags attribute to creation request only if configured to prevent | ||||||
# TagResource permission issues, even with an empty Tags array. | ||||||
do_tag_update = self.config.online_store.tag_aws_resources | ||||||
table_tags = self.table_tags() | ||||||
kwargs = {"Tags": table_tags} if table_tags and do_tag_update else {} | ||||||
try: | ||||||
self._dynamodb_resource.create_table( | ||||||
TableName=self.table_name, | ||||||
KeySchema=[{"AttributeName": "entity_id", "KeyType": "HASH"}], | ||||||
AttributeDefinitions=[ | ||||||
{"AttributeName": "entity_id", "AttributeType": "S"} | ||||||
], | ||||||
BillingMode="PAY_PER_REQUEST", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think billing mode should be taken for parameters and not hardcoded ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
**kwargs, | ||||||
) | ||||||
do_tag_update = False | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we setting it to False ? Good to annotate ? |
||||||
except ClientError as ce: | ||||||
# If the table creation fails with ResourceInUseException, | ||||||
# it means the table already exists or is being created. | ||||||
# Otherwise, re-raise the exception | ||||||
if ce.response["Error"]["Code"] != "ResourceInUseException": | ||||||
raise | ||||||
|
||||||
# tags won't be updated in the create_table call if the table already exists | ||||||
self._dynamodb_client.get_waiter("table_exists").wait(TableName=self.table_name) | ||||||
if do_tag_update: | ||||||
self._update_tags(table_tags) | ||||||
|
||||||
def delete(self) -> None: | ||||||
_delete_table_idempotent(self._dynamodb_resource, self.table_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this feature isnt strictly necessary (though a good idea!) and results in a a few extra aws api calls per feature view, so make it opt in.