Skip to content

Commit 8c2edf4

Browse files
[SPARK-24324][PYTHON][FOLLOW-UP] Rename the Conf to spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName
## What changes were proposed in this pull request? Add the legacy prefix for spark.sql.execution.pandas.groupedMap.assignColumnsByPosition and rename it to spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName ## How was this patch tested? The existing tests. Closes apache#22540 from gatorsmile/renameAssignColumnsByPosition. Lead-authored-by: gatorsmile <[email protected]> Co-authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
1 parent 9bb3a0c commit 8c2edf4

4 files changed

Lines changed: 18 additions & 19 deletions

File tree

python/pyspark/sql/tests.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5802,7 +5802,8 @@ def test_positional_assignment_conf(self):
58025802
import pandas as pd
58035803
from pyspark.sql.functions import pandas_udf, PandasUDFType
58045804

5805-
with self.sql_conf({"spark.sql.execution.pandas.groupedMap.assignColumnsByPosition": True}):
5805+
with self.sql_conf({
5806+
"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName": False}):
58065807

58075808
@pandas_udf("a string, b float", PandasUDFType.GROUPED_MAP)
58085809
def foo(_):

python/pyspark/worker.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,9 @@ def verify_result_length(*a):
9797

9898

9999
def wrap_grouped_map_pandas_udf(f, return_type, argspec, runner_conf):
100-
assign_cols_by_pos = runner_conf.get(
101-
"spark.sql.execution.pandas.groupedMap.assignColumnsByPosition", False)
100+
assign_cols_by_name = runner_conf.get(
101+
"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName", "true")
102+
assign_cols_by_name = assign_cols_by_name.lower() == "true"
102103

103104
def wrapped(key_series, value_series):
104105
import pandas as pd
@@ -119,7 +120,7 @@ def wrapped(key_series, value_series):
119120
"Expected: {} Actual: {}".format(len(return_type), len(result.columns)))
120121

121122
# Assign result columns by schema name if user labeled with strings, else use position
122-
if not assign_cols_by_pos and any(isinstance(name, basestring) for name in result.columns):
123+
if assign_cols_by_name and any(isinstance(name, basestring) for name in result.columns):
123124
return [(result[field.name], to_arrow_type(field.dataType)) for field in return_type]
124125
else:
125126
return [(result[result.columns[i]], to_arrow_type(field.dataType))

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1295,15 +1295,15 @@ object SQLConf {
12951295
.booleanConf
12961296
.createWithDefault(true)
12971297

1298-
val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION =
1299-
buildConf("spark.sql.execution.pandas.groupedMap.assignColumnsByPosition")
1298+
val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME =
1299+
buildConf("spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName")
13001300
.internal()
1301-
.doc("When true, a grouped map Pandas UDF will assign columns from the returned " +
1302-
"Pandas DataFrame based on position, regardless of column label type. When false, " +
1303-
"columns will be looked up by name if labeled with a string and fallback to use " +
1304-
"position if not. This configuration will be deprecated in future releases.")
1301+
.doc("When true, columns will be looked up by name if labeled with a string and fallback " +
1302+
"to use position if not. When false, a grouped map Pandas UDF will assign columns from " +
1303+
"the returned Pandas DataFrame based on position, regardless of column label type. " +
1304+
"This configuration will be deprecated in future releases.")
13051305
.booleanConf
1306-
.createWithDefault(false)
1306+
.createWithDefault(true)
13071307

13081308
val REPLACE_EXCEPT_WITH_FILTER = buildConf("spark.sql.optimizer.replaceExceptWithFilter")
13091309
.internal()
@@ -1915,8 +1915,8 @@ class SQLConf extends Serializable with Logging {
19151915

19161916
def pandasRespectSessionTimeZone: Boolean = getConf(PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE)
19171917

1918-
def pandasGroupedMapAssignColumnssByPosition: Boolean =
1919-
getConf(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION)
1918+
def pandasGroupedMapAssignColumnsByName: Boolean =
1919+
getConf(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME)
19201920

19211921
def replaceExceptWithFilter: Boolean = getConf(REPLACE_EXCEPT_WITH_FILTER)
19221922

sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,8 @@ object ArrowUtils {
131131
} else {
132132
Nil
133133
}
134-
val pandasColsByPosition = if (conf.pandasGroupedMapAssignColumnssByPosition) {
135-
Seq(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION.key -> "true")
136-
} else {
137-
Nil
138-
}
139-
Map(timeZoneConf ++ pandasColsByPosition: _*)
134+
val pandasColsByName = Seq(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME.key ->
135+
conf.pandasGroupedMapAssignColumnsByName.toString)
136+
Map(timeZoneConf ++ pandasColsByName: _*)
140137
}
141138
}

0 commit comments

Comments
 (0)