

Spark DataFrame の repartition メソッドは何をするものか

Spark DataFrame の repartition(パーティション数, カラム名) とすると指定したカラムで指定したパーティション数にパーティショニングする。パーティション数を省略するとデフォルト値(Spark 2.3.2 では 200)になる。

repartition(numPartitions, *cols)
Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.

numPartitions can be an int to specify the target number of partitions or a Column. If it is a Column, it will be used as the first partitioning column. If not specified, the default number of partitions is used.

pyspark.sql module — PySpark 2.3.2 documentation

Other Configuration Options
The following options can also be used to tune the performance of query execution. It is possible that these options will be deprecated in future release as more optimizations are performed automatically.

Property Name Default Meaning
spark.sql.shuffle.partitions 200 Configures the number of partitions to use when shuffling data for joins or aggregations.
Spark SQL and DataFrames - Spark 2.3.2 Documentation

PySpark を起動して、S3 の JSON を読んで DataFrame を生成する。

$ pyspark


Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.2

Using Python version 2.7.14 (default, May  2 2018 18:31:34)
SparkSession available as 'spark'.
>>> from pyspark.context import SparkContext
>>> from pyspark.sql import SQLContext
>>> from pyspark.sql.functions import year, month, date_format
>>> your_backet_name = "data-bucket"
>>> dataset = "sh10"
>>> in_path = "s3://{your_backet_name}/data/json/{dataset}/sales/*.gz".format(your_backet_name=your_backet_name, dataset=dataset)
>>> out_path = "s3://{your_backet_name}/data/parquet_pyspark/{dataset}/sales/".format(your_backet_name=your_backet_name, dataset=dataset)
>>> sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")
>>> sqlContext = SQLContext(sc)
>>> sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
>>> df = sqlContext.read.json(in_path)

Dataframe の内容を表示する。

>>> df.show()
|amount_sold|channel_id|courier_org| cust_id|fulfillment_center|prod_id|promo_id|quantity_sold|seller|tax_country|tax_region|            time_id|
|       82.0|         3|       1190|11635215|             13260|     62|     256|         44.0| 10691|         NQ|        HA|1995-01-01 00:00:00|
|       66.0|         3|       1459| 7887314|             14912|    102|     120|         84.0| 10944|         CI|        MU|1995-01-01 00:00:00|
|       16.0|         5|       1124| 7245615|             13368|     66|     136|         16.0| 10337|         YH|          |1995-01-01 00:00:00|
|       26.0|         2|       1319| 5363281|             14091|     59|     445|         93.0| 10863|         LH|        CU|1995-01-01 00:00:00|
|       91.0|         2|       1201|14565805|             13145|    126|     422|         21.0| 10568|         PH|        QW|1995-01-01 00:00:00|
|       72.0|         9|       1497|13073124|             14572|     43|     495|         57.0| 10778|         EW|        JB|1995-01-01 00:00:00|
|       83.0|         4|       1419|13223847|             13466|     15|     185|          6.0| 10153|         DW|          |1995-01-01 00:00:00|
|       28.0|         5|       1083| 8831083|             13148|     72|      91|         75.0| 10038|         NP|          |1995-01-01 00:00:00|
|       79.0|         2|       1364| 1122827|             14594|     76|     307|         83.0| 10896|         HR|          |1995-01-01 00:00:00|
|       36.0|         4|       1408|13016476|             14703|     65|      84|         70.0| 10116|         SB|        RV|1995-01-01 00:00:00|
|       65.0|         3|       1183| 6557310|             14384|     89|     362|         65.0| 10765|         CD|        OC|1995-01-01 00:00:00|
|        9.0|         9|       1023|11633931|             13586|     72|     316|         48.0| 10572|         JN|        LU|1995-01-01 00:00:00|
|        1.0|         5|       1515| 7868183|             14906|     97|     244|         98.0| 10605|         HE|        WO|1995-01-01 00:00:00|
|       93.0|         4|       1243| 4513870|             14918|     66|     170|         83.0| 10336|         XG|        US|1995-01-01 00:00:00|
|       13.0|         4|       1456| 9731389|             14339|     21|      90|         74.0| 10326|         TR|        NO|1995-01-01 00:00:00|
|        8.0|         9|       1541|12388703|             13591|    125|     124|         45.0| 10733|         AU|        AA|1995-01-01 00:00:00|
|       39.0|         9|       1221| 4713346|             14190|    111|     400|         43.0| 10979|         PD|          |1995-01-01 00:00:00|
|       48.0|         5|       1554| 4589564|             13727|    121|     331|         70.0| 10626|         AF|        PV|1995-01-01 00:00:00|
|        4.0|         4|       1048| 1051232|             14155|     48|     511|         36.0| 10224|         TF|          |1995-01-01 00:00:00|
|        7.0|         4|       1031|14342970|             14068|    130|     507|         29.0| 10128|         TU|          |1995-01-01 00:00:00|
only showing top 20 rows

DataFrame に year カラムを追加して表示する。

>>> yearAddedDf = df.withColumn("year", year(df.time_id))
>>> yearAddedDf.show()
|amount_sold|channel_id|courier_org| cust_id|fulfillment_center|prod_id|promo_id|quantity_sold|seller|tax_country|tax_region|            time_id|year|
|       82.0|         3|       1190|11635215|             13260|     62|     256|         44.0| 10691|         NQ|        HA|1995-01-01 00:00:00|1995|
|       66.0|         3|       1459| 7887314|             14912|    102|     120|         84.0| 10944|         CI|        MU|1995-01-01 00:00:00|1995|
|       16.0|         5|       1124| 7245615|             13368|     66|     136|         16.0| 10337|         YH|          |1995-01-01 00:00:00|1995|
|       26.0|         2|       1319| 5363281|             14091|     59|     445|         93.0| 10863|         LH|        CU|1995-01-01 00:00:00|1995|
|       91.0|         2|       1201|14565805|             13145|    126|     422|         21.0| 10568|         PH|        QW|1995-01-01 00:00:00|1995|
|       72.0|         9|       1497|13073124|             14572|     43|     495|         57.0| 10778|         EW|        JB|1995-01-01 00:00:00|1995|
|       83.0|         4|       1419|13223847|             13466|     15|     185|          6.0| 10153|         DW|          |1995-01-01 00:00:00|1995|
|       28.0|         5|       1083| 8831083|             13148|     72|      91|         75.0| 10038|         NP|          |1995-01-01 00:00:00|1995|
|       79.0|         2|       1364| 1122827|             14594|     76|     307|         83.0| 10896|         HR|          |1995-01-01 00:00:00|1995|
|       36.0|         4|       1408|13016476|             14703|     65|      84|         70.0| 10116|         SB|        RV|1995-01-01 00:00:00|1995|
|       65.0|         3|       1183| 6557310|             14384|     89|     362|         65.0| 10765|         CD|        OC|1995-01-01 00:00:00|1995|
|        9.0|         9|       1023|11633931|             13586|     72|     316|         48.0| 10572|         JN|        LU|1995-01-01 00:00:00|1995|
|        1.0|         5|       1515| 7868183|             14906|     97|     244|         98.0| 10605|         HE|        WO|1995-01-01 00:00:00|1995|
|       93.0|         4|       1243| 4513870|             14918|     66|     170|         83.0| 10336|         XG|        US|1995-01-01 00:00:00|1995|
|       13.0|         4|       1456| 9731389|             14339|     21|      90|         74.0| 10326|         TR|        NO|1995-01-01 00:00:00|1995|
|        8.0|         9|       1541|12388703|             13591|    125|     124|         45.0| 10733|         AU|        AA|1995-01-01 00:00:00|1995|
|       39.0|         9|       1221| 4713346|             14190|    111|     400|         43.0| 10979|         PD|          |1995-01-01 00:00:00|1995|
|       48.0|         5|       1554| 4589564|             13727|    121|     331|         70.0| 10626|         AF|        PV|1995-01-01 00:00:00|1995|
|        4.0|         4|       1048| 1051232|             14155|     48|     511|         36.0| 10224|         TF|          |1995-01-01 00:00:00|1995|
|        7.0|         4|       1031|14342970|             14068|    130|     507|         29.0| 10128|         TU|          |1995-01-01 00:00:00|1995|
only showing top 20 rows

DataFrame に month を追加して表示する。

>>> monthAddedDf = yearAddedDf.withColumn("month", month(yearAddedDf.time_id))
>>> monthAddedDf.show()
|amount_sold|channel_id|courier_org| cust_id|fulfillment_center|prod_id|promo_id|quantity_sold|seller|tax_country|tax_region|            time_id|year|month|
|       82.0|         3|       1190|11635215|             13260|     62|     256|         44.0| 10691|         NQ|        HA|1995-01-01 00:00:00|1995|    1|
|       66.0|         3|       1459| 7887314|             14912|    102|     120|         84.0| 10944|         CI|        MU|1995-01-01 00:00:00|1995|    1|
|       16.0|         5|       1124| 7245615|             13368|     66|     136|         16.0| 10337|         YH|          |1995-01-01 00:00:00|1995|    1|
|       26.0|         2|       1319| 5363281|             14091|     59|     445|         93.0| 10863|         LH|        CU|1995-01-01 00:00:00|1995|    1|
|       91.0|         2|       1201|14565805|             13145|    126|     422|         21.0| 10568|         PH|        QW|1995-01-01 00:00:00|1995|    1|
|       72.0|         9|       1497|13073124|             14572|     43|     495|         57.0| 10778|         EW|        JB|1995-01-01 00:00:00|1995|    1|
|       83.0|         4|       1419|13223847|             13466|     15|     185|          6.0| 10153|         DW|          |1995-01-01 00:00:00|1995|    1|
|       28.0|         5|       1083| 8831083|             13148|     72|      91|         75.0| 10038|         NP|          |1995-01-01 00:00:00|1995|    1|
|       79.0|         2|       1364| 1122827|             14594|     76|     307|         83.0| 10896|         HR|          |1995-01-01 00:00:00|1995|    1|
|       36.0|         4|       1408|13016476|             14703|     65|      84|         70.0| 10116|         SB|        RV|1995-01-01 00:00:00|1995|    1|
|       65.0|         3|       1183| 6557310|             14384|     89|     362|         65.0| 10765|         CD|        OC|1995-01-01 00:00:00|1995|    1|
|        9.0|         9|       1023|11633931|             13586|     72|     316|         48.0| 10572|         JN|        LU|1995-01-01 00:00:00|1995|    1|
|        1.0|         5|       1515| 7868183|             14906|     97|     244|         98.0| 10605|         HE|        WO|1995-01-01 00:00:00|1995|    1|
|       93.0|         4|       1243| 4513870|             14918|     66|     170|         83.0| 10336|         XG|        US|1995-01-01 00:00:00|1995|    1|
|       13.0|         4|       1456| 9731389|             14339|     21|      90|         74.0| 10326|         TR|        NO|1995-01-01 00:00:00|1995|    1|
|        8.0|         9|       1541|12388703|             13591|    125|     124|         45.0| 10733|         AU|        AA|1995-01-01 00:00:00|1995|    1|
|       39.0|         9|       1221| 4713346|             14190|    111|     400|         43.0| 10979|         PD|          |1995-01-01 00:00:00|1995|    1|
|       48.0|         5|       1554| 4589564|             13727|    121|     331|         70.0| 10626|         AF|        PV|1995-01-01 00:00:00|1995|    1|
|        4.0|         4|       1048| 1051232|             14155|     48|     511|         36.0| 10224|         TF|          |1995-01-01 00:00:00|1995|    1|
|        7.0|         4|       1031|14342970|             14068|    130|     507|         29.0| 10128|         TU|          |1995-01-01 00:00:00|1995|    1|
only showing top 20 rows

DataFrame に yyyyMM を追加して表示する。

>>> yyyymmAddedDf = monthAddedDf.withColumn("yyyymm", date_format(monthAddedDf.time_id, 'yyyyMM'))
>>> yyyymmAddedDf.show()
|amount_sold|channel_id|courier_org| cust_id|fulfillment_center|prod_id|promo_id|quantity_sold|seller|tax_country|tax_region|            time_id|year|month|yyyymm|
|       82.0|         3|       1190|11635215|             13260|     62|     256|         44.0| 10691|         NQ|        HA|1995-01-01 00:00:00|1995|    1|199501|
|       66.0|         3|       1459| 7887314|             14912|    102|     120|         84.0| 10944|         CI|        MU|1995-01-01 00:00:00|1995|    1|199501|
|       16.0|         5|       1124| 7245615|             13368|     66|     136|         16.0| 10337|         YH|          |1995-01-01 00:00:00|1995|    1|199501|
|       26.0|         2|       1319| 5363281|             14091|     59|     445|         93.0| 10863|         LH|        CU|1995-01-01 00:00:00|1995|    1|199501|
|       91.0|         2|       1201|14565805|             13145|    126|     422|         21.0| 10568|         PH|        QW|1995-01-01 00:00:00|1995|    1|199501|
|       72.0|         9|       1497|13073124|             14572|     43|     495|         57.0| 10778|         EW|        JB|1995-01-01 00:00:00|1995|    1|199501|
|       83.0|         4|       1419|13223847|             13466|     15|     185|          6.0| 10153|         DW|          |1995-01-01 00:00:00|1995|    1|199501|
|       28.0|         5|       1083| 8831083|             13148|     72|      91|         75.0| 10038|         NP|          |1995-01-01 00:00:00|1995|    1|199501|
|       79.0|         2|       1364| 1122827|             14594|     76|     307|         83.0| 10896|         HR|          |1995-01-01 00:00:00|1995|    1|199501|
|       36.0|         4|       1408|13016476|             14703|     65|      84|         70.0| 10116|         SB|        RV|1995-01-01 00:00:00|1995|    1|199501|
|       65.0|         3|       1183| 6557310|             14384|     89|     362|         65.0| 10765|         CD|        OC|1995-01-01 00:00:00|1995|    1|199501|
|        9.0|         9|       1023|11633931|             13586|     72|     316|         48.0| 10572|         JN|        LU|1995-01-01 00:00:00|1995|    1|199501|
|        1.0|         5|       1515| 7868183|             14906|     97|     244|         98.0| 10605|         HE|        WO|1995-01-01 00:00:00|1995|    1|199501|
|       93.0|         4|       1243| 4513870|             14918|     66|     170|         83.0| 10336|         XG|        US|1995-01-01 00:00:00|1995|    1|199501|
|       13.0|         4|       1456| 9731389|             14339|     21|      90|         74.0| 10326|         TR|        NO|1995-01-01 00:00:00|1995|    1|199501|
|        8.0|         9|       1541|12388703|             13591|    125|     124|         45.0| 10733|         AU|        AA|1995-01-01 00:00:00|1995|    1|199501|
|       39.0|         9|       1221| 4713346|             14190|    111|     400|         43.0| 10979|         PD|          |1995-01-01 00:00:00|1995|    1|199501|
|       48.0|         5|       1554| 4589564|             13727|    121|     331|         70.0| 10626|         AF|        PV|1995-01-01 00:00:00|1995|    1|199501|
|        4.0|         4|       1048| 1051232|             14155|     48|     511|         36.0| 10224|         TF|          |1995-01-01 00:00:00|1995|    1|199501|
|        7.0|         4|       1031|14342970|             14068|    130|     507|         29.0| 10128|         TU|          |1995-01-01 00:00:00|1995|    1|199501|
only showing top 20 rows

yyyymm 列でパーティショニングして表示する。

>>> repartitionedDf = yyyymmAddedDf.repartition("yyyymm")
>>> repartitionedDf.show()
|amount_sold|channel_id|courier_org| cust_id|fulfillment_center|prod_id|promo_id|quantity_sold|seller|tax_country|tax_region|            time_id|year|month|yyyymm|
|       16.0|         3|       1209|10459355|             14599|     87|     532|         28.0| 10706|         RQ|        XO|2000-02-01 00:00:00|2000|    2|200002|
|       77.0|         2|       1412| 5872809|             13794|     17|     286|         10.0| 10566|         LE|        DX|2000-02-01 00:00:00|2000|    2|200002|
|       85.0|         2|       1133| 1736310|             14520|    143|     232|         70.0| 10234|         GK|        RL|2000-02-01 00:00:00|2000|    2|200002|
|       16.0|         4|       1061| 2921916|             13141|    146|     498|         53.0| 10542|         UN|        JT|2000-02-01 00:00:00|2000|    2|200002|
|       74.0|         3|       1169| 7134487|             13887|     55|     198|         67.0| 10557|         XV|          |2000-02-01 00:00:00|2000|    2|200002|
|        1.0|         9|       1465| 9105076|             14640|     48|     363|          9.0| 10349|         LU|        QC|2000-02-01 00:00:00|2000|    2|200002|
|       73.0|         9|       1408| 5426572|             13072|     74|     339|         66.0| 10914|         BQ|        BD|2000-02-01 00:00:00|2000|    2|200002|
|       35.0|         4|       1013|11750382|             14504|    144|      89|         12.0| 10220|         FC|        UY|2000-02-01 00:00:00|2000|    2|200002|
|        3.0|         4|       1467|14292100|             14318|     70|     526|         77.0| 10458|         EW|        HL|2000-02-01 00:00:00|2000|    2|200002|
|       96.0|         4|       1255|14304714|             13334|     77|     460|         34.0| 10409|         EU|          |2000-02-01 00:00:00|2000|    2|200002|
|       74.0|         5|       1373|12033050|             14882|     74|     476|          3.0| 10191|         BC|        US|2000-02-01 00:00:00|2000|    2|200002|
|       81.0|         9|       1363| 5426406|             14751|     84|     195|         14.0| 10506|         RK|        EL|2000-02-01 00:00:00|2000|    2|200002|
|       32.0|         4|       1548| 3004427|             13914|     66|     266|         21.0| 10631|         DD|        TY|2000-02-01 00:00:00|2000|    2|200002|
|       79.0|         3|       1508| 6564626|             14302|     39|     520|         52.0| 10762|         RZ|        CN|2000-02-01 00:00:00|2000|    2|200002|
|       64.0|         3|       1478| 6779800|             13550|    110|     309|         38.0| 10794|         KY|        EN|2000-02-01 00:00:00|2000|    2|200002|
|       67.0|         5|       1542|14438640|             14058|     46|     514|         19.0| 10945|         AY|          |2000-02-01 00:00:00|2000|    2|200002|
|       38.0|         2|       1092|13135799|             13862|     31|     295|         72.0| 10349|         RD|          |2000-02-01 00:00:00|2000|    2|200002|
|       80.0|         5|       1178|  458983|             13180|     99|     526|         64.0| 10931|         IY|        QG|2000-02-01 00:00:00|2000|    2|200002|
|       68.0|         2|       1511| 7818776|             13687|    147|     506|         67.0| 10979|         UY|          |2000-02-01 00:00:00|2000|    2|200002|
|       71.0|         3|       1514|10769032|             14082|    101|     122|         36.0| 10187|         YE|          |2000-02-01 00:00:00|2000|    2|200002|
only showing top 20 rows

パーティショニングのために追加した yyyymm 列を削除する。

>>> droppedDf = repartitionedDf.drop("yyyymm")
>>> droppedDf.show()
|amount_sold|channel_id|courier_org| cust_id|fulfillment_center|prod_id|promo_id|quantity_sold|seller|tax_country|tax_region|            time_id|year|month|
|       64.0|         3|       1238|  240661|             14588|    115|      79|         59.0| 10109|         IC|        QY|2000-02-01 00:00:00|2000|    2|
|       35.0|         5|       1305| 5279056|             13013|    131|      61|         11.0| 10523|         UG|        HU|2000-02-01 00:00:00|2000|    2|
|       18.0|         9|       1149|  562588|             14627|     87|     515|         91.0| 10146|         CE|        CY|2000-02-01 00:00:00|2000|    2|
|       50.0|         4|       1387|12492889|             13649|     73|     289|         67.0| 10447|         ZS|          |2000-02-01 00:00:00|2000|    2|
|       20.0|         4|       1457| 4023446|             14150|    125|     173|         94.0| 10694|         GE|        DB|2000-02-01 00:00:00|2000|    2|
|       24.0|         4|       1212|12139139|             13012|    140|     280|         67.0| 10111|         VV|        DJ|2000-02-01 00:00:00|2000|    2|
|       66.0|         3|       1311| 2633171|             14815|    106|     242|         21.0| 10388|         MJ|        BD|2000-02-01 00:00:00|2000|    2|
|       73.0|         5|       1511|10583767|             14488|    109|     531|         89.0| 10392|         PP|          |2000-02-01 00:00:00|2000|    2|
|       21.0|         3|       1363|  877617|             14501|     29|     311|         28.0| 10962|         GF|        FX|2000-02-01 00:00:00|2000|    2|
|       62.0|         4|       1549| 4429647|             13447|     33|     355|         52.0| 10699|         VC|        BF|2000-02-01 00:00:00|2000|    2|
|       16.0|         3|       1164|11654304|             13910|    125|     274|         25.0| 10337|         JI|        BI|2000-02-01 00:00:00|2000|    2|
|        5.0|         3|       1167| 4927333|             13123|     51|     508|         82.0| 10219|         FB|          |2000-02-01 00:00:00|2000|    2|
|        7.0|         3|       1001| 8434437|             14778|     18|     407|         19.0| 10192|         WD|        OW|2000-02-01 00:00:00|2000|    2|
|       53.0|         3|       1038|14058980|             14742|    111|     160|         92.0| 10527|         BM|        XN|2000-02-01 00:00:00|2000|    2|
|       55.0|         4|       1169| 9633844|             14862|     90|     322|          4.0| 10712|         HJ|        XC|2000-02-01 00:00:00|2000|    2|
|       80.0|         2|       1380| 1516950|             14065|     56|     166|          5.0| 10301|         BF|          |2000-02-01 00:00:00|2000|    2|
|       90.0|         9|       1039| 9921850|             14812|     46|     361|         23.0| 10062|         CY|          |2000-02-01 00:00:00|2000|    2|
|       40.0|         5|       1150| 7263915|             13059|     48|     116|         10.0| 10455|         CL|        OS|2000-02-01 00:00:00|2000|    2|
|       93.0|         4|       1131| 4353455|             13106|    110|     415|         83.0| 10533|         FJ|        AJ|2000-02-01 00:00:00|2000|    2|
|       97.0|         4|       1016|12261020|             14187|     27|     281|          1.0| 10495|         UO|        XY|2000-02-01 00:00:00|2000|    2|
only showing top 20 rows


>>> droppedDf.rdd.getNumPartitions()


withColumn(colName, col)
Returns a new DataFrame by adding a column or replacing the existing column that has the same name.

The column expression must be an expression over this DataFrame; attempting to add a column from some other dataframe will raise an error.

  • Parameters:
    • colName – string, name of the new column.
    • col – a Column expression for the new column.
>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name='Alice', age2=4), Row(age=5, name='Bob', age2=7)]
pyspark.sql module — PySpark 2.3.2 documentation

repartition(numPartitions, *cols)
Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.

numPartitions can be an int to specify the target number of partitions or a Column. If it is a Column, it will be used as the first partitioning column. If not specified, the default number of partitions is used.

Changed in version 1.6: Added optional arguments to specify the partitioning columns. Also made numPartitions optional if partitioning columns are specified.

>>> df.repartition(10).rdd.getNumPartitions()
>>> data = df.union(df).repartition("age")
>>> data.show()
|age| name|
|  5|  Bob|
|  5|  Bob|
|  2|Alice|
|  2|Alice|
>>> data = data.repartition(7, "age")
>>> data.show()
|age| name|
|  2|Alice|
|  5|  Bob|
|  2|Alice|
|  5|  Bob|
>>> data.rdd.getNumPartitions()
>>> data = data.repartition("name", "age")
>>> data.show()
|age| name|
|  5|  Bob|
|  5|  Bob|
|  2|Alice|
|  2|Alice|
pyspark.sql module — PySpark 2.3.2 documentation