Skip to content

Commit e18b571

Browse files
holdenkJoshRosen
authored andcommitted
[SPARK-10447][SPARK-3842][PYSPARK] upgrade pyspark to py4j0.9
Upgrade to Py4j0.9 Author: Holden Karau <[email protected]> Author: Holden Karau <[email protected]> Closes apache#8615 from holdenk/SPARK-10447-upgrade-pyspark-to-py4j0.9.
1 parent 9413955 commit e18b571

File tree

17 files changed

+34
-66
lines changed

17 files changed

+34
-66
lines changed

LICENSE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
265265
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
266266
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
267267
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
268-
(The New BSD License) Py4J (net.sf.py4j:py4j:0.8.2.1 - http://py4j.sourceforge.net/)
268+
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
269269
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
270270
(BSD licence) sbt and sbt-launch-lib.bash
271271
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)

bin/pyspark

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ export PYSPARK_PYTHON
6565

6666
# Add the PySpark classes to the Python path:
6767
export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
68-
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
68+
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
6969

7070
# Load the PySpark shell.py script when ./pyspark is used interactively:
7171
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"

bin/pyspark2.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
3030
)
3131

3232
set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
33-
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.8.2.1-src.zip;%PYTHONPATH%
33+
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9-src.zip;%PYTHONPATH%
3434

3535
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
3636
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@
350350
<dependency>
351351
<groupId>net.sf.py4j</groupId>
352352
<artifactId>py4j</artifactId>
353-
<version>0.8.2.1</version>
353+
<version>0.9</version>
354354
</dependency>
355355
<dependency>
356356
<groupId>org.apache.spark</groupId>

core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ private[spark] object PythonUtils {
3232
val pythonPath = new ArrayBuffer[String]
3333
for (sparkHome <- sys.env.get("SPARK_HOME")) {
3434
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
35-
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip").mkString(File.separator)
35+
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9-src.zip").mkString(File.separator)
3636
}
3737
pythonPath ++= SparkContext.jarOfObject(this)
3838
pythonPath.mkString(File.pathSeparator)

python/docs/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ SPHINXBUILD = sphinx-build
77
PAPER =
88
BUILDDIR = _build
99

10-
export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.8.2.1-src.zip)
10+
export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9-src.zip)
1111

1212
# User-friendly check for sphinx-build
1313
ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)

python/lib/py4j-0.8.2.1-src.zip

-36.7 KB
Binary file not shown.

python/lib/py4j-0.9-src.zip

43.8 KB
Binary file not shown.

python/pyspark/streaming/context.py

Lines changed: 8 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -32,48 +32,6 @@
3232
__all__ = ["StreamingContext"]
3333

3434

35-
def _daemonize_callback_server():
36-
"""
37-
Hack Py4J to daemonize callback server
38-
39-
The thread of callback server has daemon=False, it will block the driver
40-
from exiting if it's not shutdown. The following code replace `start()`
41-
of CallbackServer with a new version, which set daemon=True for this
42-
thread.
43-
44-
Also, it will update the port number (0) with real port
45-
"""
46-
# TODO: create a patch for Py4J
47-
import socket
48-
import py4j.java_gateway
49-
logger = py4j.java_gateway.logger
50-
from py4j.java_gateway import Py4JNetworkError
51-
from threading import Thread
52-
53-
def start(self):
54-
"""Starts the CallbackServer. This method should be called by the
55-
client instead of run()."""
56-
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
57-
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
58-
1)
59-
try:
60-
self.server_socket.bind((self.address, self.port))
61-
if not self.port:
62-
# update port with real port
63-
self.port = self.server_socket.getsockname()[1]
64-
except Exception as e:
65-
msg = 'An error occurred while trying to start the callback server: %s' % e
66-
logger.exception(msg)
67-
raise Py4JNetworkError(msg)
68-
69-
# Maybe thread needs to be cleanup up?
70-
self.thread = Thread(target=self.run)
71-
self.thread.daemon = True
72-
self.thread.start()
73-
74-
py4j.java_gateway.CallbackServer.start = start
75-
76-
7735
class StreamingContext(object):
7836
"""
7937
Main entry point for Spark Streaming functionality. A StreamingContext
@@ -123,10 +81,14 @@ def _ensure_initialized(cls):
12381

12482
# start callback server
12583
# getattr will fallback to JVM, so we cannot test by hasattr()
126-
if "_callback_server" not in gw.__dict__:
127-
_daemonize_callback_server()
128-
# use random port
129-
gw._start_callback_server(0)
84+
if "_callback_server" not in gw.__dict__ or gw._callback_server is None:
85+
gw.callback_server_parameters.eager_load = True
86+
gw.callback_server_parameters.daemonize = True
87+
gw.callback_server_parameters.daemonize_connections = True
88+
gw.callback_server_parameters.port = 0
89+
gw.start_callback_server(gw.callback_server_parameters)
90+
cbport = gw._callback_server.server_socket.getsockname()[1]
91+
gw._callback_server.port = cbport
13092
# gateway with real port
13193
gw._python_proxy_port = gw._callback_server.port
13294
# get the GatewayServer object in JVM by ID

python/pyspark/streaming/flume.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from io import BytesIO
2121
else:
2222
from StringIO import StringIO
23-
from py4j.java_gateway import Py4JJavaError
23+
from py4j.protocol import Py4JJavaError
2424

2525
from pyspark.storagelevel import StorageLevel
2626
from pyspark.serializers import PairDeserializer, NoOpSerializer, UTF8Deserializer, read_int

0 commit comments

Comments
 (0)