Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 16 additions & 21 deletions python/pyspark/errors/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,11 @@
"<arg_name> index out of range, got '<index>'."
]
},
"INPUT_NOT_FULLY_CONSUMED": {
"message": [
"The input iterator must be fully consumed."
]
},
"INVALID_ARROW_UDTF_RETURN_TYPE": {
"message": [
"The return type of the arrow-optimized Python UDTF should be of type 'pandas.DataFrame', but the '<func>' method returned a value of type <return_type> with value: <value>."
Expand Down Expand Up @@ -830,6 +835,11 @@
],
"sqlState": "0A000"
},
"OUTPUT_EXCEEDS_INPUT_ROWS": {
"message": [
"The number of output rows must not exceed the number of input rows."
]
},
"PACKAGE_NOT_INSTALLED": {
"message": [
"<package_name> >= <minimum_version> must be installed; however, it was not found."
Expand All @@ -842,11 +852,6 @@
"Alternatively set a pandas-on-spark option 'compute.fail_on_ansi_mode' to `False` to force it to work, although it can cause unexpected behavior."
]
},
"PANDAS_UDF_OUTPUT_EXCEEDS_INPUT_ROWS": {
"message": [
"The Pandas SCALAR_ITER UDF outputs more rows than input rows."
]
},
"PIPELINE_SPEC_DICT_KEY_NOT_STRING": {
"message": [
"For pipeline spec field `<field_name>`, key should be a string, got <key_type>."
Expand Down Expand Up @@ -943,24 +948,19 @@
"<error_type> on the server but responses were already received from it."
]
},
"RESULT_COLUMNS_MISMATCH_FOR_ARROW_UDF": {
"message": [
"Column names of the returned pyarrow.Table do not match specified schema.<missing><extra>"
]
},
"RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF": {
"RESULT_COLUMN_NAMES_MISMATCH": {
"message": [
"Column names of the returned pandas.DataFrame do not match specified schema.<missing><extra>"
"Column names of the returned data do not match specified schema.<missing><extra>"
]
},
"RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF": {
"RESULT_COLUMN_SCHEMA_MISMATCH": {
"message": [
"Number of columns of the returned pandas.DataFrame doesn't match specified schema. Expected: <expected> Actual: <actual>"
"Number of columns of the returned data doesn't match specified schema. Expected: <expected> Actual: <actual>"
]
},
"RESULT_LENGTH_MISMATCH_FOR_SCALAR_ITER_PANDAS_UDF": {
"RESULT_ROWS_MISMATCH": {
"message": [
"The length of output in Scalar iterator pandas UDF should be the same with the input's; however, the length of output was <output_length> and the length of input was <input_length>."
"The number of output rows (<output_length>) must match the number of input rows (<input_length>)."
]
},
"RESULT_TYPE_MISMATCH_FOR_ARROW_UDF": {
Expand Down Expand Up @@ -1096,11 +1096,6 @@
"Caught StopIteration thrown from user's code; failing the task: <exc>"
]
},
"STOP_ITERATION_OCCURRED_FROM_SCALAR_ITER_PANDAS_UDF": {
"message": [
"pandas iterator UDF should exhaust the input iterator."
]
},
"STREAMING_CONNECT_SERIALIZATION_ERROR": {
"message": [
"Cannot serialize the function `<name>`. If you accessed the Spark session, or a DataFrame defined outside of the function, or any object that contains a Spark session, please be aware that they are not allowed in Spark Connect. For `foreachBatch`, please access the Spark session using `df.sparkSession`, where `df` is the first parameter in your `foreachBatch` function. For `StreamingQueryListener`, please access the Spark session using `self.spark`. For details please check out the PySpark doc for `foreachBatch` and `StreamingQueryListener`."
Expand Down
5 changes: 2 additions & 3 deletions python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def stats(key, left, right):
with self.quiet():
with self.assertRaisesRegex(
PythonException,
"Column names of the returned pyarrow.Table do not match specified schema. "
"Column names of the returned data do not match specified schema. "
"Missing: m. Unexpected: v, v2.",
):
# stats returns three columns while here we set schema with two columns
Expand Down Expand Up @@ -227,8 +227,7 @@ def odd_means(key, left, _):
with self.quiet():
with self.assertRaisesRegex(
PythonException,
"Column names of the returned pyarrow.Table do not match specified schema. "
"Missing: m.",
"Column names of the returned data do not match specified schema. Missing: m.",
):
# stats returns one column for even keys while here we set schema with two columns
self.cogrouped.applyInArrow(odd_means, schema="id long, m double").collect()
Expand Down
5 changes: 2 additions & 3 deletions python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def stats(key, table):
for func_variation in function_variations(stats):
with self.assertRaisesRegex(
PythonException,
"Column names of the returned pyarrow.Table do not match specified schema. "
"Column names of the returned data do not match specified schema. "
"Missing: m. Unexpected: v, v2.",
):
# stats returns three columns while here we set schema with two columns
Expand Down Expand Up @@ -260,8 +260,7 @@ def odd_means(key, table):
with self.quiet():
with self.assertRaisesRegex(
PythonException,
"Column names of the returned pyarrow.Table do not match specified schema. "
"Missing: m.",
"Column names of the returned data do not match specified schema. Missing: m.",
):
# stats returns one column for even keys while here we set schema with two columns
df.groupby("id").applyInArrow(odd_means, schema="id long, m double").collect()
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def merge_pandas(lft, rgt):
self._test_merge_error(
fn=merge_pandas,
errorClass=PythonException,
error_message_regex="Column names of the returned pandas.DataFrame "
error_message_regex="Column names of the returned data "
"do not match specified schema. Unexpected: add, more.",
)

Expand All @@ -229,7 +229,7 @@ def merge_pandas(lft, rgt):
self._test_merge_error(
fn=merge_pandas,
errorClass=PythonException,
error_message_regex="Number of columns of the returned pandas.DataFrame "
error_message_regex="Number of columns of the returned data "
"doesn't match specified schema. Expected: 4 Actual: 6",
)

Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def test_apply_in_pandas_returning_wrong_column_names(self):
def check_apply_in_pandas_returning_wrong_column_names(self):
with self.assertRaisesRegex(
PythonException,
"Column names of the returned pandas.DataFrame do not match specified schema. "
"Column names of the returned data do not match specified schema. "
"Missing: mean. Unexpected: median, std.",
):
self._test_apply_in_pandas(
Expand All @@ -320,7 +320,7 @@ def test_apply_in_pandas_returning_no_column_names_and_wrong_amount(self):
def check_apply_in_pandas_returning_no_column_names_and_wrong_amount(self):
with self.assertRaisesRegex(
PythonException,
"Number of columns of the returned pandas.DataFrame doesn't match "
"Number of columns of the returned data doesn't match "
"specified schema. Expected: 2 Actual: 3",
):
self._test_apply_in_pandas(
Expand Down Expand Up @@ -666,7 +666,7 @@ def invalid_positional_types(pdf):
with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}):
with self.assertRaisesRegex(
PythonException,
"Column names of the returned pandas.DataFrame do not match "
"Column names of the returned data do not match "
"specified schema. Missing: id. Unexpected: iid.",
):
grouped_df.apply(column_name_typo).collect()
Expand Down
24 changes: 12 additions & 12 deletions python/pyspark/sql/tests/pandas/test_pandas_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ def dataframes_with_other_column_names(iterator):

with self.assertRaisesRegex(
PythonException,
"PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] "
"Column names of the returned pandas.DataFrame do not match "
"PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] "
"Column names of the returned data do not match "
"specified schema. Missing: id. Unexpected: iid.",
):
(
Expand All @@ -233,8 +233,8 @@ def dataframes_with_other_column_names(iterator):

with self.assertRaisesRegex(
PythonException,
"PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] "
"Column names of the returned pandas.DataFrame do not match "
"PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] "
"Column names of the returned data do not match "
"specified schema. Missing: id2.",
):
(
Expand All @@ -254,17 +254,17 @@ def check_dataframes_with_less_columns(self):

with self.assertRaisesRegex(
PythonException,
"PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] "
"Column names of the returned pandas.DataFrame do not match "
"PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] "
"Column names of the returned data do not match "
"specified schema. Missing: id2.",
):
f = self.identity_dataframes_iter("id", "value")
(df.mapInPandas(f, "id int, id2 long, value int").collect())

with self.assertRaisesRegex(
PythonException,
"PySparkRuntimeError: \\[RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF\\] "
"Number of columns of the returned pandas.DataFrame doesn't match "
"PySparkRuntimeError: \\[RESULT_COLUMN_SCHEMA_MISMATCH\\] "
"Number of columns of the returned data doesn't match "
"specified schema. Expected: 3 Actual: 2",
):
f = self.identity_dataframes_wo_column_names_iter("id", "value")
Expand Down Expand Up @@ -375,8 +375,8 @@ def test_empty_dataframes_with_less_columns(self):
def check_empty_dataframes_with_less_columns(self):
with self.assertRaisesRegex(
PythonException,
"PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] "
"Column names of the returned pandas.DataFrame do not match "
"PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] "
"Column names of the returned data do not match "
"specified schema. Missing: value.",
):
f = self.dataframes_and_empty_dataframe_iter("id")
Expand Down Expand Up @@ -404,8 +404,8 @@ def empty_dataframes_with_other_columns(iterator):

with self.assertRaisesRegex(
PythonException,
"PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] "
"Column names of the returned pandas.DataFrame do not match "
"PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] "
"Column names of the returned data do not match "
"specified schema. Missing: id. Unexpected: iid.",
):
(
Expand Down
23 changes: 10 additions & 13 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ def verify_pandas_result(result, return_type, assign_cols_by_name, truncate_retu
extra = f" Unexpected: {', '.join(extra)}." if extra else ""

raise PySparkRuntimeError(
errorClass="RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF",
errorClass="RESULT_COLUMN_NAMES_MISMATCH",
messageParameters={
"missing": missing,
"extra": extra,
Expand All @@ -606,7 +606,7 @@ def verify_pandas_result(result, return_type, assign_cols_by_name, truncate_retu
# otherwise the number of columns of result have to match the return type
elif len(result_columns) != len(return_type):
raise PySparkRuntimeError(
errorClass="RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF",
errorClass="RESULT_COLUMN_SCHEMA_MISMATCH",
messageParameters={
"expected": str(len(return_type)),
"actual": str(len(result.columns)),
Expand Down Expand Up @@ -691,7 +691,7 @@ def verify_arrow_result(result, assign_cols_by_name, expected_cols_and_types):
extra = f" Unexpected: {', '.join(extra)}." if extra else ""

raise PySparkRuntimeError(
errorClass="RESULT_COLUMNS_MISMATCH_FOR_ARROW_UDF",
errorClass="RESULT_COLUMN_NAMES_MISMATCH",
messageParameters={
"missing": missing,
"extra": extra,
Expand Down Expand Up @@ -998,7 +998,7 @@ def verify_element(result):
or (len(result.columns) == 0 and result.empty)
):
raise PySparkRuntimeError(
errorClass="RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF",
errorClass="RESULT_COLUMN_SCHEMA_MISMATCH",
messageParameters={
"expected": str(len(return_type)),
"actual": str(len(result.columns)),
Expand Down Expand Up @@ -2918,29 +2918,26 @@ def process_results():
yield pa.RecordBatch.from_arrays([result], ["_0"])

# Apply row limit check (fail-fast)
# TODO(SPARK-55579): Create Arrow-specific error class (e.g., ARROW_UDF_OUTPUT_EXCEEDS_INPUT_ROWS)
limited = verify_output_row_limit(
process_results(),
lambda: num_input_rows,
error_class="PANDAS_UDF_OUTPUT_EXCEEDS_INPUT_ROWS",
error_class="OUTPUT_EXCEEDS_INPUT_ROWS",
)

# Apply row count match check (final)
# TODO(SPARK-55579): Create Arrow-specific error class (e.g., RESULT_LENGTH_MISMATCH_FOR_SCALAR_ITER_ARROW_UDF)
matched = verify_output_row_count(
limited,
lambda: num_input_rows,
error_class="RESULT_LENGTH_MISMATCH_FOR_SCALAR_ITER_PANDAS_UDF",
error_class="RESULT_ROWS_MISMATCH",
)

# Yield batches
yield from matched

# Verify iterator consumed
# TODO(SPARK-55579): Create Arrow-specific error class (e.g., STOP_ITERATION_OCCURRED_FROM_SCALAR_ITER_ARROW_UDF)
verify_iterator_exhausted(
args_iter,
error_class="STOP_ITERATION_OCCURRED_FROM_SCALAR_ITER_PANDAS_UDF",
error_class="INPUT_NOT_FULLY_CONSUMED",
)

# profiling is not supported for UDF
Expand Down Expand Up @@ -2984,7 +2981,7 @@ def map_batch(batch):
# input length.
if is_scalar_iter and num_output_rows > num_input_rows:
raise PySparkRuntimeError(
errorClass="PANDAS_UDF_OUTPUT_EXCEEDS_INPUT_ROWS", messageParameters={}
errorClass="OUTPUT_EXCEEDS_INPUT_ROWS", messageParameters={}
)
yield (result_batch, result_type)

Expand All @@ -2995,13 +2992,13 @@ def map_batch(batch):
pass
else:
raise PySparkRuntimeError(
errorClass="STOP_ITERATION_OCCURRED_FROM_SCALAR_ITER_PANDAS_UDF",
errorClass="INPUT_NOT_FULLY_CONSUMED",
messageParameters={},
)

if num_output_rows != num_input_rows:
raise PySparkRuntimeError(
errorClass="RESULT_LENGTH_MISMATCH_FOR_SCALAR_ITER_PANDAS_UDF",
errorClass="RESULT_ROWS_MISMATCH",
messageParameters={
"output_length": str(num_output_rows),
"input_length": str(num_input_rows),
Expand Down