Skip to content

Commit ef7a5e0

Browse files
gatorsmileAndrew Or
authored andcommitted
[SPARK-14603][SQL][FOLLOWUP] Verification of Metadata Operations by Session Catalog
#### What changes were proposed in this pull request? This follow-up PR is to address the remaining comments in apache#12385 The major change in this PR is to issue better error messages in PySpark by using the mechanism that was proposed by davies in apache#7135 For example, in PySpark, if we input the following statement: ```python >>> l = [('Alice', 1)] >>> df = sqlContext.createDataFrame(l) >>> df.createTempView("people") >>> df.createTempView("people") ``` Before this PR, the exception we will get is like ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/dataframe.py", line 152, in createTempView self._jdf.createTempView(name) File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__ File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o35.createTempView. : org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException: Temporary table 'people' already exists; at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTempView(SessionCatalog.scala:324) at org.apache.spark.sql.SparkSession.createTempView(SparkSession.scala:523) at org.apache.spark.sql.Dataset.createTempView(Dataset.scala:2328) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:211) at java.lang.Thread.run(Thread.java:745) ``` After this PR, the exception we will get become cleaner: ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/dataframe.py", line 152, in createTempView self._jdf.createTempView(name) File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__ File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/utils.py", line 75, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"Temporary table 'people' already exists;" ``` #### How was this patch tested? Fixed an existing PySpark test case Author: gatorsmile <[email protected]> Closes apache#13126 from gatorsmile/followup-14684.
1 parent 9308bf1 commit ef7a5e0

File tree

4 files changed

+14
-7
lines changed

4 files changed

+14
-7
lines changed

python/pyspark/sql/dataframe.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,7 @@ def createTempView(self, name):
144144
>>> df.createTempView("people") # doctest: +IGNORE_EXCEPTION_DETAIL
145145
Traceback (most recent call last):
146146
...
147-
Py4JJavaError: ...
148-
: org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException...
147+
AnalysisException: u"Temporary table 'people' already exists;"
149148
>>> spark.catalog.dropTempView("people")
150149
151150
"""

python/pyspark/sql/utils.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ class ContinuousQueryException(CapturedException):
5151
"""
5252

5353

54+
class QueryExecutionException(CapturedException):
55+
"""
56+
Failed to execute a query.
57+
"""
58+
59+
5460
def capture_sql_exception(f):
5561
def deco(*a, **kw):
5662
try:
@@ -61,12 +67,14 @@ def deco(*a, **kw):
6167
e.java_exception.getStackTrace()))
6268
if s.startswith('org.apache.spark.sql.AnalysisException: '):
6369
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
64-
if s.startswith('org.apache.spark.sql.catalyst.analysis.NoSuchTableException: '):
70+
if s.startswith('org.apache.spark.sql.catalyst.analysis'):
6571
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
6672
if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
6773
raise ParseException(s.split(': ', 1)[1], stackTrace)
6874
if s.startswith('org.apache.spark.sql.ContinuousQueryException: '):
6975
raise ContinuousQueryException(s.split(': ', 1)[1], stackTrace)
76+
if s.startswith('org.apache.spark.sql.execution.QueryExecutionException: '):
77+
raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
7078
if s.startswith('java.lang.IllegalArgumentException: '):
7179
raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
7280
raise

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
8787
db: String,
8888
table: String,
8989
specs: Seq[TablePartitionSpec]): Unit = {
90-
specs foreach { s =>
90+
specs.foreach { s =>
9191
if (!partitionExists(db, table, s)) {
9292
throw new NoSuchPartitionException(db = db, table = table, spec = s)
9393
}
@@ -98,7 +98,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
9898
db: String,
9999
table: String,
100100
specs: Seq[TablePartitionSpec]): Unit = {
101-
specs foreach { s =>
101+
specs.foreach { s =>
102102
if (partitionExists(db, table, s)) {
103103
throw new PartitionAlreadyExistsException(db = db, table = table, spec = s)
104104
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,13 @@ class SessionCatalog(
111111
fs.makeQualified(hadoopPath)
112112
}
113113

114-
protected[this] def requireDbExists(db: String): Unit = {
114+
private def requireDbExists(db: String): Unit = {
115115
if (!databaseExists(db)) {
116116
throw new NoSuchDatabaseException(db)
117117
}
118118
}
119119

120-
protected[this] def requireTableExists(name: TableIdentifier): Unit = {
120+
private def requireTableExists(name: TableIdentifier): Unit = {
121121
if (!tableExists(name)) {
122122
val db = name.database.getOrElse(currentDb)
123123
throw new NoSuchTableException(db = db, table = name.table)

0 commit comments

Comments
 (0)