Skip to content

Commit d7b6994

Browse files
Davies Liurxin
authored andcommitted
[SPARK-7543] [SQL] [PySpark] split dataframe.py into multiple files
dataframe.py is splited into column.py, group.py and dataframe.py: ``` 360 column.py 1223 dataframe.py 183 group.py ``` Author: Davies Liu <[email protected]> Closes apache#6201 from davies/split_df and squashes the following commits: fc8f5ab [Davies Liu] split dataframe.py into multiple files
1 parent adfd366 commit d7b6994

6 files changed

Lines changed: 552 additions & 449 deletions

File tree

python/pyspark/sql/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@
5555

5656
from pyspark.sql.types import Row
5757
from pyspark.sql.context import SQLContext, HiveContext
58-
from pyspark.sql.dataframe import DataFrame, GroupedData, Column, SchemaRDD, DataFrameNaFunctions
59-
from pyspark.sql.dataframe import DataFrameStatFunctions
58+
from pyspark.sql.column import Column
59+
from pyspark.sql.dataframe import DataFrame, SchemaRDD, DataFrameNaFunctions, DataFrameStatFunctions
60+
from pyspark.sql.group import GroupedData
6061

6162
__all__ = [
6263
'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row',

python/pyspark/sql/column.py

Lines changed: 360 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import sys
19+
20+
if sys.version >= '3':
21+
basestring = str
22+
long = int
23+
24+
from pyspark.context import SparkContext
25+
from pyspark.rdd import ignore_unicode_prefix
26+
from pyspark.sql.types import *
27+
28+
__all__ = ["DataFrame", "Column", "SchemaRDD", "DataFrameNaFunctions",
29+
"DataFrameStatFunctions"]
30+
31+
32+
def _create_column_from_literal(literal):
33+
sc = SparkContext._active_spark_context
34+
return sc._jvm.functions.lit(literal)
35+
36+
37+
def _create_column_from_name(name):
38+
sc = SparkContext._active_spark_context
39+
return sc._jvm.functions.col(name)
40+
41+
42+
def _to_java_column(col):
43+
if isinstance(col, Column):
44+
jcol = col._jc
45+
else:
46+
jcol = _create_column_from_name(col)
47+
return jcol
48+
49+
50+
def _to_seq(sc, cols, converter=None):
51+
"""
52+
Convert a list of Column (or names) into a JVM Seq of Column.
53+
54+
An optional `converter` could be used to convert items in `cols`
55+
into JVM Column objects.
56+
"""
57+
if converter:
58+
cols = [converter(c) for c in cols]
59+
return sc._jvm.PythonUtils.toSeq(cols)
60+
61+
62+
def _unary_op(name, doc="unary operator"):
63+
""" Create a method for given unary operator """
64+
def _(self):
65+
jc = getattr(self._jc, name)()
66+
return Column(jc)
67+
_.__doc__ = doc
68+
return _
69+
70+
71+
def _func_op(name, doc=''):
72+
def _(self):
73+
sc = SparkContext._active_spark_context
74+
jc = getattr(sc._jvm.functions, name)(self._jc)
75+
return Column(jc)
76+
_.__doc__ = doc
77+
return _
78+
79+
80+
def _bin_op(name, doc="binary operator"):
81+
""" Create a method for given binary operator
82+
"""
83+
def _(self, other):
84+
jc = other._jc if isinstance(other, Column) else other
85+
njc = getattr(self._jc, name)(jc)
86+
return Column(njc)
87+
_.__doc__ = doc
88+
return _
89+
90+
91+
def _reverse_op(name, doc="binary operator"):
92+
""" Create a method for binary operator (this object is on right side)
93+
"""
94+
def _(self, other):
95+
jother = _create_column_from_literal(other)
96+
jc = getattr(jother, name)(self._jc)
97+
return Column(jc)
98+
_.__doc__ = doc
99+
return _
100+
101+
102+
class Column(object):
103+
104+
"""
105+
A column in a DataFrame.
106+
107+
:class:`Column` instances can be created by::
108+
109+
# 1. Select a column out of a DataFrame
110+
111+
df.colName
112+
df["colName"]
113+
114+
# 2. Create from an expression
115+
df.colName + 1
116+
1 / df.colName
117+
"""
118+
119+
def __init__(self, jc):
120+
self._jc = jc
121+
122+
# arithmetic operators
123+
__neg__ = _func_op("negate")
124+
__add__ = _bin_op("plus")
125+
__sub__ = _bin_op("minus")
126+
__mul__ = _bin_op("multiply")
127+
__div__ = _bin_op("divide")
128+
__truediv__ = _bin_op("divide")
129+
__mod__ = _bin_op("mod")
130+
__radd__ = _bin_op("plus")
131+
__rsub__ = _reverse_op("minus")
132+
__rmul__ = _bin_op("multiply")
133+
__rdiv__ = _reverse_op("divide")
134+
__rtruediv__ = _reverse_op("divide")
135+
__rmod__ = _reverse_op("mod")
136+
137+
# logistic operators
138+
__eq__ = _bin_op("equalTo")
139+
__ne__ = _bin_op("notEqual")
140+
__lt__ = _bin_op("lt")
141+
__le__ = _bin_op("leq")
142+
__ge__ = _bin_op("geq")
143+
__gt__ = _bin_op("gt")
144+
145+
# `and`, `or`, `not` cannot be overloaded in Python,
146+
# so use bitwise operators as boolean operators
147+
__and__ = _bin_op('and')
148+
__or__ = _bin_op('or')
149+
__invert__ = _func_op('not')
150+
__rand__ = _bin_op("and")
151+
__ror__ = _bin_op("or")
152+
153+
# container operators
154+
__contains__ = _bin_op("contains")
155+
__getitem__ = _bin_op("apply")
156+
157+
# bitwise operators
158+
bitwiseOR = _bin_op("bitwiseOR")
159+
bitwiseAND = _bin_op("bitwiseAND")
160+
bitwiseXOR = _bin_op("bitwiseXOR")
161+
162+
def getItem(self, key):
163+
"""An expression that gets an item at position `ordinal` out of a list,
164+
or gets an item by key out of a dict.
165+
166+
>>> df = sc.parallelize([([1, 2], {"key": "value"})]).toDF(["l", "d"])
167+
>>> df.select(df.l.getItem(0), df.d.getItem("key")).show()
168+
+----+------+
169+
|l[0]|d[key]|
170+
+----+------+
171+
| 1| value|
172+
+----+------+
173+
>>> df.select(df.l[0], df.d["key"]).show()
174+
+----+------+
175+
|l[0]|d[key]|
176+
+----+------+
177+
| 1| value|
178+
+----+------+
179+
"""
180+
return self[key]
181+
182+
def getField(self, name):
183+
"""An expression that gets a field by name in a StructField.
184+
185+
>>> from pyspark.sql import Row
186+
>>> df = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
187+
>>> df.select(df.r.getField("b")).show()
188+
+----+
189+
|r[b]|
190+
+----+
191+
| b|
192+
+----+
193+
>>> df.select(df.r.a).show()
194+
+----+
195+
|r[a]|
196+
+----+
197+
| 1|
198+
+----+
199+
"""
200+
return self[name]
201+
202+
def __getattr__(self, item):
203+
if item.startswith("__"):
204+
raise AttributeError(item)
205+
return self.getField(item)
206+
207+
# string methods
208+
rlike = _bin_op("rlike")
209+
like = _bin_op("like")
210+
startswith = _bin_op("startsWith")
211+
endswith = _bin_op("endsWith")
212+
213+
@ignore_unicode_prefix
214+
def substr(self, startPos, length):
215+
"""
216+
Return a :class:`Column` which is a substring of the column
217+
218+
:param startPos: start position (int or Column)
219+
:param length: length of the substring (int or Column)
220+
221+
>>> df.select(df.name.substr(1, 3).alias("col")).collect()
222+
[Row(col=u'Ali'), Row(col=u'Bob')]
223+
"""
224+
if type(startPos) != type(length):
225+
raise TypeError("Can not mix the type")
226+
if isinstance(startPos, (int, long)):
227+
jc = self._jc.substr(startPos, length)
228+
elif isinstance(startPos, Column):
229+
jc = self._jc.substr(startPos._jc, length._jc)
230+
else:
231+
raise TypeError("Unexpected type: %s" % type(startPos))
232+
return Column(jc)
233+
234+
__getslice__ = substr
235+
236+
@ignore_unicode_prefix
237+
def inSet(self, *cols):
238+
""" A boolean expression that is evaluated to true if the value of this
239+
expression is contained by the evaluated values of the arguments.
240+
241+
>>> df[df.name.inSet("Bob", "Mike")].collect()
242+
[Row(age=5, name=u'Bob')]
243+
>>> df[df.age.inSet([1, 2, 3])].collect()
244+
[Row(age=2, name=u'Alice')]
245+
"""
246+
if len(cols) == 1 and isinstance(cols[0], (list, set)):
247+
cols = cols[0]
248+
cols = [c._jc if isinstance(c, Column) else _create_column_from_literal(c) for c in cols]
249+
sc = SparkContext._active_spark_context
250+
jc = getattr(self._jc, "in")(_to_seq(sc, cols))
251+
return Column(jc)
252+
253+
# order
254+
asc = _unary_op("asc", "Returns a sort expression based on the"
255+
" ascending order of the given column name.")
256+
desc = _unary_op("desc", "Returns a sort expression based on the"
257+
" descending order of the given column name.")
258+
259+
isNull = _unary_op("isNull", "True if the current expression is null.")
260+
isNotNull = _unary_op("isNotNull", "True if the current expression is not null.")
261+
262+
def alias(self, *alias):
263+
"""Returns this column aliased with a new name or names (in the case of expressions that
264+
return more than one column, such as explode).
265+
266+
>>> df.select(df.age.alias("age2")).collect()
267+
[Row(age2=2), Row(age2=5)]
268+
"""
269+
270+
if len(alias) == 1:
271+
return Column(getattr(self._jc, "as")(alias[0]))
272+
else:
273+
sc = SparkContext._active_spark_context
274+
return Column(getattr(self._jc, "as")(_to_seq(sc, list(alias))))
275+
276+
@ignore_unicode_prefix
277+
def cast(self, dataType):
278+
""" Convert the column into type `dataType`
279+
280+
>>> df.select(df.age.cast("string").alias('ages')).collect()
281+
[Row(ages=u'2'), Row(ages=u'5')]
282+
>>> df.select(df.age.cast(StringType()).alias('ages')).collect()
283+
[Row(ages=u'2'), Row(ages=u'5')]
284+
"""
285+
if isinstance(dataType, basestring):
286+
jc = self._jc.cast(dataType)
287+
elif isinstance(dataType, DataType):
288+
sc = SparkContext._active_spark_context
289+
ssql_ctx = sc._jvm.SQLContext(sc._jsc.sc())
290+
jdt = ssql_ctx.parseDataType(dataType.json())
291+
jc = self._jc.cast(jdt)
292+
else:
293+
raise TypeError("unexpected type: %s" % type(dataType))
294+
return Column(jc)
295+
296+
@ignore_unicode_prefix
297+
def between(self, lowerBound, upperBound):
298+
""" A boolean expression that is evaluated to true if the value of this
299+
expression is between the given columns.
300+
"""
301+
return (self >= lowerBound) & (self <= upperBound)
302+
303+
@ignore_unicode_prefix
304+
def when(self, condition, value):
305+
"""Evaluates a list of conditions and returns one of multiple possible result expressions.
306+
If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions.
307+
308+
See :func:`pyspark.sql.functions.when` for example usage.
309+
310+
:param condition: a boolean :class:`Column` expression.
311+
:param value: a literal value, or a :class:`Column` expression.
312+
313+
"""
314+
sc = SparkContext._active_spark_context
315+
if not isinstance(condition, Column):
316+
raise TypeError("condition should be a Column")
317+
v = value._jc if isinstance(value, Column) else value
318+
jc = sc._jvm.functions.when(condition._jc, v)
319+
return Column(jc)
320+
321+
@ignore_unicode_prefix
322+
def otherwise(self, value):
323+
"""Evaluates a list of conditions and returns one of multiple possible result expressions.
324+
If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions.
325+
326+
See :func:`pyspark.sql.functions.when` for example usage.
327+
328+
:param value: a literal value, or a :class:`Column` expression.
329+
"""
330+
v = value._jc if isinstance(value, Column) else value
331+
jc = self._jc.otherwise(value)
332+
return Column(jc)
333+
334+
def __repr__(self):
335+
return 'Column<%s>' % self._jc.toString().encode('utf8')
336+
337+
338+
def _test():
339+
import doctest
340+
from pyspark.context import SparkContext
341+
from pyspark.sql import SQLContext
342+
import pyspark.sql.column
343+
globs = pyspark.sql.column.__dict__.copy()
344+
sc = SparkContext('local[4]', 'PythonTest')
345+
globs['sc'] = sc
346+
globs['sqlContext'] = SQLContext(sc)
347+
globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')]) \
348+
.toDF(StructType([StructField('age', IntegerType()),
349+
StructField('name', StringType())]))
350+
351+
(failure_count, test_count) = doctest.testmod(
352+
pyspark.sql.column, globs=globs,
353+
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
354+
globs['sc'].stop()
355+
if failure_count:
356+
exit(-1)
357+
358+
359+
if __name__ == "__main__":
360+
_test()

0 commit comments

Comments
 (0)