Skip to content

Commit 7ba59b7

Browse files
committed
Fixing some bugs in example feature repo for spark:
- creating parquet files with timestamps in us units instead of ns (pyspark has problems reading ns timestamps) - removing on-the-fly features that aren't declared before - settings join keys for entities.
1 parent b89fadd commit 7ba59b7

File tree

3 files changed

+4
-12
lines changed

3 files changed

+4
-12
lines changed

sdk/python/feast/templates/spark/bootstrap.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def bootstrap():
2121
driver_stats_df.to_parquet(
2222
path=str(data_path / "driver_hourly_stats.parquet"),
2323
allow_truncated_timestamps=True,
24+
coerce_timestamps="us",
2425
)
2526

2627
customer_entities = [201, 202, 203]
@@ -30,6 +31,7 @@ def bootstrap():
3031
customer_profile_df.to_parquet(
3132
path=str(data_path / "customer_daily_profile.parquet"),
3233
allow_truncated_timestamps=True,
34+
coerce_timestamps="us",
3335
)
3436

3537

sdk/python/feast/templates/spark/feature_repo/example_repo.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,8 @@
1616

1717

1818
# Entity definitions
19-
driver = Entity(
20-
name="driver",
21-
description="driver id",
22-
)
23-
customer = Entity(
24-
name="customer",
25-
description="customer id",
26-
)
19+
driver = Entity(name="driver", description="driver id", join_keys=["driver_id"])
20+
customer = Entity(name="customer", description="customer id", join_keys=["customer_id"])
2721

2822
# Sources
2923
driver_hourly_stats = SparkSource(

sdk/python/feast/templates/spark/feature_repo/test_workflow.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,6 @@ def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring:
8282
"driver_hourly_stats:conv_rate",
8383
"driver_hourly_stats:acc_rate",
8484
"driver_hourly_stats:avg_daily_trips",
85-
"transformed_conv_rate:conv_rate_plus_val1",
86-
"transformed_conv_rate:conv_rate_plus_val2",
8785
],
8886
).to_df()
8987
print(training_df.head())
@@ -109,8 +107,6 @@ def fetch_online_features(store, use_feature_service: bool):
109107
features_to_fetch = [
110108
"driver_hourly_stats:acc_rate",
111109
"driver_hourly_stats:avg_daily_trips",
112-
"transformed_conv_rate:conv_rate_plus_val1",
113-
"transformed_conv_rate:conv_rate_plus_val2",
114110
]
115111
returned_features = store.get_online_features(
116112
features=features_to_fetch,

0 commit comments

Comments
 (0)