forked from LearningJournal/Spark-Programming-In-Python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathHelloRDD.py
More file actions
37 lines (27 loc) · 1.09 KB
/
HelloRDD.py
File metadata and controls
37 lines (27 loc) · 1.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import sys
from pyspark import SparkConf
from collections import namedtuple
from pyspark.sql import SparkSession
from lib.logger import Log4j
SurveyRecord = namedtuple("SurveyRecord", ["Age", "Gender", "Country", "State"])
if __name__ == "__main__":
conf = SparkConf() \
.setMaster("local[3]") \
.setAppName("HelloRDD")
# sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
logger = Log4j(spark)
if len(sys.argv) != 2:
logger.error("Usage: HelloSpark <filename>")
sys.exit(-1)
linesRDD = sc.textFile(sys.argv[1])
partitionedRDD = linesRDD.repartition(2)
colsRDD = partitionedRDD.map(lambda line: line.replace('"', '').split(","))
selectRDD = colsRDD.map(lambda cols: SurveyRecord(int(cols[1]), cols[2], cols[3], cols[4]))
filteredRDD = selectRDD.filter(lambda r: r.Age < 40)
kvRDD = filteredRDD.map(lambda r: (r.Country, 1))
countRDD = kvRDD.reduceByKey(lambda v1, v2: v1 + v2)
colsList = countRDD.collect()
for x in colsList:
logger.info(x)