-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52853][SDP] Prevent imperative PySpark methods in declarative pipelines #51590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-52853][SDP] Prevent imperative PySpark methods in declarative pipelines #51590
Conversation
addressing discussion |
@sryza @anishm-db Ready for review. Re-scoped PR to only block imperative python methods on the client-side via a context manager |
python/pyspark/pipelines/tests/test_block_imperative_construct.py
Outdated
Show resolved
Hide resolved
|
||
|
||
@contextmanager | ||
def block_imperative_construct() -> Generator[None, None, None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def block_imperative_construct() -> Generator[None, None, None]: | |
def block_imperative_constructs() -> Generator[None, None, None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed to block_session_mutations
to be more clear
{ | ||
"class": RuntimeConf, | ||
"method": "set", | ||
"suggestion": "Instead set configuration via the pipeline spec " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to have this text inside of the error-conditions.json – that way it's in a central place that can be internationalized more easily. Thoughts on having a sub-error code for each of these? E.g. SET_CURRENT_CATALOG
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, added sub classes for each method in error-conditons.json
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One tiny more nitpick – then LGTM!
"Session mutation <method> is not allowed in declarative pipelines." | ||
], | ||
"sub_class": { | ||
"RUNTIME_CONF_SET": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: should the SET
be on the other side of RUNTIME_CONF
for consistency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Will merge once build is green.
What changes were proposed in this pull request?
This PR adds a context manager
block_imperative_construct()
that prevents the execution of imperative Spark operations within declarative pipeline definitions. When these blocked methods are called, users receive clear error messages with guidance on declarative alternatives.Blocked Methods
Configuration Management
spark.conf.set()
→ Use pipeline spec orspark_conf
decorator parameterCatalog Management
spark.catalog.setCurrentCatalog()
→ Set via pipeline spec or dataset decoratorname
argumentspark.catalog.setCurrentDatabase()
→ Set via pipeline spec or dataset decoratorname
argumentTemporary View Management
spark.catalog.dropTempView()
→ Remove temporary view definition directlyspark.catalog.dropGlobalTempView()
→ Remove temporary view definition directlyDataFrame.createTempView()
→ Use@temporary_view
decoratorDataFrame.createOrReplaceTempView()
→ Use@temporary_view
decoratorDataFrame.createGlobalTempView()
→ Use@temporary_view
decoratorDataFrame.createOrReplaceGlobalTempView()
→ Use@temporary_view
decoratorUDF Registration
spark.udf.register()
→ Define and register UDFs before pipeline executionspark.udf.registerJavaFunction()
→ Define and register Java UDFs before pipeline executionspark.udf.registerJavaUDAF()
→ Define and register Java UDAFs before pipeline executionWhy are the changes needed?
These are imperative construct that can cause friction and unexpected behavior from within a pipeline declaration. E.g. it makes pipeline behavior sensitive to the order that Python files are imported in, which can be unpredictable. There are already existing mechanisms for setting Spark confs for pipelines:
Does this PR introduce any user-facing change?
Yes, it prevents the behavior of setting spark confs imperatively in the pipeline definition file.
How was this patch tested?
Created new test suite to test that the context manager behave as expected and ran
spark-pipelines
cli manually.Was this patch authored or co-authored using generative AI tooling?
No