Skip to content

Commit dc687d4

Browse files
jackywang-dbsryza
authored andcommitted
[SPARK-52853][SDP] Prevent imperative PySpark methods in declarative pipelines
### What changes were proposed in this pull request? This PR adds a context manager `block_imperative_construct()` that prevents the execution of imperative Spark operations within declarative pipeline definitions. When these blocked methods are called, users receive clear error messages with guidance on declarative alternatives. #### Blocked Methods ##### Configuration Management - **`spark.conf.set()`** → Use pipeline spec or `spark_conf` decorator parameter ##### Catalog Management - **`spark.catalog.setCurrentCatalog()`** → Set via pipeline spec or dataset decorator `name` argument - **`spark.catalog.setCurrentDatabase()`** → Set via pipeline spec or dataset decorator `name` argument ##### Temporary View Management - **`spark.catalog.dropTempView()`** → Remove temporary view definition directly - **`spark.catalog.dropGlobalTempView()`** → Remove temporary view definition directly - **`DataFrame.createTempView()`** → Use `temporary_view` decorator - **`DataFrame.createOrReplaceTempView()`** → Use `temporary_view` decorator - **`DataFrame.createGlobalTempView()`** → Use `temporary_view` decorator - **`DataFrame.createOrReplaceGlobalTempView()`** → Use `temporary_view` decorator ##### UDF Registration - **`spark.udf.register()`** → Define and register UDFs before pipeline execution - **`spark.udf.registerJavaFunction()`** → Define and register Java UDFs before pipeline execution - **`spark.udf.registerJavaUDAF()`** → Define and register Java UDAFs before pipeline execution ### Why are the changes needed? These are imperative construct that can cause friction and unexpected behavior from within a pipeline declaration. E.g. it makes pipeline behavior sensitive to the order that Python files are imported in, which can be unpredictable. There are already existing mechanisms for setting Spark confs for pipelines: ### Does this PR introduce _any_ user-facing change? Yes, it prevents the behavior of setting spark confs imperatively in the pipeline definition file. ### How was this patch tested? Created new test suite to test that the context manager behave as expected and ran `spark-pipelines` cli manually. ### Was this patch authored or co-authored using generative AI tooling? No Closes #51590 from JiaqiWang18/SPARK-52853-prevent-py-conf-set. Authored-by: Jacky Wang <jacky.wang@databricks.com> Signed-off-by: Sandy Ryza <sandy.ryza@databricks.com>
1 parent 5a9929c commit dc687d4

File tree

5 files changed

+465
-1
lines changed

5 files changed

+465
-1
lines changed

dev/sparktestsupport/modules.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1520,6 +1520,7 @@ def __hash__(self):
15201520
source_file_regexes=["python/pyspark/pipelines"],
15211521
python_test_goals=[
15221522
"pyspark.pipelines.tests.test_block_connect_access",
1523+
"pyspark.pipelines.tests.test_block_session_mutations",
15231524
"pyspark.pipelines.tests.test_cli",
15241525
"pyspark.pipelines.tests.test_decorators",
15251526
"pyspark.pipelines.tests.test_graph_element_registry",

python/pyspark/errors/error-conditions.json

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,6 +1007,73 @@
10071007
"Cannot start a remote Spark session because there is a regular Spark session already running."
10081008
]
10091009
},
1010+
"SESSION_MUTATION_IN_DECLARATIVE_PIPELINE": {
1011+
"message": [
1012+
"Session mutation <method> is not allowed in declarative pipelines."
1013+
],
1014+
"sub_class": {
1015+
"SET_RUNTIME_CONF": {
1016+
"message": [
1017+
"Instead set configuration via the pipeline spec or use the 'spark_conf' argument in various decorators."
1018+
]
1019+
},
1020+
"SET_CURRENT_CATALOG": {
1021+
"message": [
1022+
"Instead set catalog via the pipeline spec or the 'name' argument on the dataset decorators."
1023+
]
1024+
},
1025+
"SET_CURRENT_DATABASE": {
1026+
"message": [
1027+
"Instead set database via the pipeline spec or the 'name' argument on the dataset decorators."
1028+
]
1029+
},
1030+
"DROP_TEMP_VIEW": {
1031+
"message": [
1032+
"Instead remove the temporary view definition directly."
1033+
]
1034+
},
1035+
"DROP_GLOBAL_TEMP_VIEW": {
1036+
"message": [
1037+
"Instead remove the temporary view definition directly."
1038+
]
1039+
},
1040+
"CREATE_TEMP_VIEW": {
1041+
"message": [
1042+
"Instead use the @temporary_view decorator to define temporary views."
1043+
]
1044+
},
1045+
"CREATE_OR_REPLACE_TEMP_VIEW": {
1046+
"message": [
1047+
"Instead use the @temporary_view decorator to define temporary views."
1048+
]
1049+
},
1050+
"CREATE_GLOBAL_TEMP_VIEW": {
1051+
"message": [
1052+
"Instead use the @temporary_view decorator to define temporary views."
1053+
]
1054+
},
1055+
"CREATE_OR_REPLACE_GLOBAL_TEMP_VIEW": {
1056+
"message": [
1057+
"Instead use the @temporary_view decorator to define temporary views."
1058+
]
1059+
},
1060+
"REGISTER_UDF": {
1061+
"message": [
1062+
""
1063+
]
1064+
},
1065+
"REGISTER_JAVA_UDF": {
1066+
"message": [
1067+
""
1068+
]
1069+
},
1070+
"REGISTER_JAVA_UDAF": {
1071+
"message": [
1072+
""
1073+
]
1074+
}
1075+
}
1076+
},
10101077
"SESSION_NEED_CONN_STR_OR_BUILDER": {
10111078
"message": [
10121079
"Needs either connection string or channelBuilder (mutually exclusive) to create a new SparkSession."
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
from contextlib import contextmanager
18+
from typing import Generator, NoReturn, List, Callable
19+
20+
from pyspark.errors import PySparkException
21+
from pyspark.sql.connect.catalog import Catalog
22+
from pyspark.sql.connect.conf import RuntimeConf
23+
from pyspark.sql.connect.dataframe import DataFrame
24+
from pyspark.sql.connect.udf import UDFRegistration
25+
26+
# pyspark methods that should be blocked from executing in python pipeline definition files
27+
ERROR_CLASS = "SESSION_MUTATION_IN_DECLARATIVE_PIPELINE"
28+
BLOCKED_METHODS: List = [
29+
{
30+
"class": RuntimeConf,
31+
"method": "set",
32+
"error_sub_class": "SET_RUNTIME_CONF",
33+
},
34+
{
35+
"class": Catalog,
36+
"method": "setCurrentCatalog",
37+
"error_sub_class": "SET_CURRENT_CATALOG",
38+
},
39+
{
40+
"class": Catalog,
41+
"method": "setCurrentDatabase",
42+
"error_sub_class": "SET_CURRENT_DATABASE",
43+
},
44+
{
45+
"class": Catalog,
46+
"method": "dropTempView",
47+
"error_sub_class": "DROP_TEMP_VIEW",
48+
},
49+
{
50+
"class": Catalog,
51+
"method": "dropGlobalTempView",
52+
"error_sub_class": "DROP_GLOBAL_TEMP_VIEW",
53+
},
54+
{
55+
"class": DataFrame,
56+
"method": "createTempView",
57+
"error_sub_class": "CREATE_TEMP_VIEW",
58+
},
59+
{
60+
"class": DataFrame,
61+
"method": "createOrReplaceTempView",
62+
"error_sub_class": "CREATE_OR_REPLACE_TEMP_VIEW",
63+
},
64+
{
65+
"class": DataFrame,
66+
"method": "createGlobalTempView",
67+
"error_sub_class": "CREATE_GLOBAL_TEMP_VIEW",
68+
},
69+
{
70+
"class": DataFrame,
71+
"method": "createOrReplaceGlobalTempView",
72+
"error_sub_class": "CREATE_OR_REPLACE_GLOBAL_TEMP_VIEW",
73+
},
74+
{
75+
"class": UDFRegistration,
76+
"method": "register",
77+
"error_sub_class": "REGISTER_UDF",
78+
},
79+
{
80+
"class": UDFRegistration,
81+
"method": "registerJavaFunction",
82+
"error_sub_class": "REGISTER_JAVA_UDF",
83+
},
84+
{
85+
"class": UDFRegistration,
86+
"method": "registerJavaUDAF",
87+
"error_sub_class": "REGISTER_JAVA_UDAF",
88+
},
89+
]
90+
91+
92+
def _create_blocked_method(error_method_name: str, error_sub_class: str) -> Callable:
93+
def blocked_method(*args: object, **kwargs: object) -> NoReturn:
94+
raise PySparkException(
95+
errorClass=f"{ERROR_CLASS}.{error_sub_class}",
96+
messageParameters={
97+
"method": error_method_name,
98+
},
99+
)
100+
101+
return blocked_method
102+
103+
104+
@contextmanager
105+
def block_session_mutations() -> Generator[None, None, None]:
106+
"""
107+
Context manager that blocks imperative constructs found in a pipeline python definition file
108+
See BLOCKED_METHODS above for a list
109+
"""
110+
# Store original methods
111+
original_methods = {}
112+
for method_info in BLOCKED_METHODS:
113+
cls = method_info["class"]
114+
method_name = method_info["method"]
115+
original_methods[(cls, method_name)] = getattr(cls, method_name)
116+
117+
try:
118+
# Replace methods with blocked versions
119+
for method_info in BLOCKED_METHODS:
120+
cls = method_info["class"]
121+
method_name = method_info["method"]
122+
error_method_name = f"'{cls.__name__}.{method_name}'"
123+
blocked_method = _create_blocked_method(
124+
error_method_name, method_info["error_sub_class"]
125+
)
126+
setattr(cls, method_name, blocked_method)
127+
128+
yield
129+
finally:
130+
# Restore original methods
131+
for method_info in BLOCKED_METHODS:
132+
cls = method_info["class"]
133+
method_name = method_info["method"]
134+
original_method = original_methods[(cls, method_name)]
135+
setattr(cls, method_name, original_method)

python/pyspark/pipelines/cli.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
from pyspark.errors import PySparkException, PySparkTypeError
3434
from pyspark.sql import SparkSession
35+
from pyspark.pipelines.block_session_mutations import block_session_mutations
3536
from pyspark.pipelines.graph_element_registry import (
3637
graph_element_registration_context,
3738
GraphElementRegistry,
@@ -192,7 +193,8 @@ def register_definitions(
192193
assert (
193194
module_spec.loader is not None
194195
), f"Module spec has no loader for {file}"
195-
module_spec.loader.exec_module(module)
196+
with block_session_mutations():
197+
module_spec.loader.exec_module(module)
196198
elif file.suffix == ".sql":
197199
log_with_curr_timestamp(f"Registering SQL file {file}...")
198200
with file.open("r") as f:

0 commit comments

Comments
 (0)