Skip to content

Commit 8f7b0dc

Browse files
authored
BAEL-3862: Spark differences DS, DF, RDD (eugenp#9976)
1 parent 1b2c8ce commit 8f7b0dc

File tree

6 files changed

+2609
-0
lines changed

6 files changed

+2609
-0
lines changed

apache-spark/data/Tourist.csv

Lines changed: 2247 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package com.baeldung.differences.dataframe.dataset.rdd;
2+
3+
4+
public class TouristData {
5+
6+
private String region;
7+
private String country;
8+
private String year;
9+
private String series;
10+
private Double value;
11+
private String footnotes;
12+
private String source;
13+
14+
public String getRegion() {
15+
return region;
16+
}
17+
18+
public void setRegion(String region) {
19+
this.region = region;
20+
}
21+
22+
public String getCountry() {
23+
return country;
24+
}
25+
26+
public void setCountry(String country) {
27+
this.country = country;
28+
}
29+
30+
public String getYear() {
31+
return year;
32+
}
33+
34+
public void setYear(String year) {
35+
this.year = year;
36+
}
37+
38+
public String getSeries() {
39+
return series;
40+
}
41+
42+
public void setSeries(String series) {
43+
this.series = series;
44+
}
45+
46+
public Double getValue() {
47+
return value;
48+
}
49+
50+
public void setValue(Double value) {
51+
this.value = value;
52+
}
53+
54+
public String getFootnotes() {
55+
return footnotes;
56+
}
57+
58+
public void setFootnotes(String footnotes) {
59+
this.footnotes = footnotes;
60+
}
61+
62+
public String getSource() {
63+
return source;
64+
}
65+
66+
public void setSource(String source) {
67+
this.source = source;
68+
}
69+
70+
@Override
71+
public String toString() {
72+
return "TouristData [region=" + region + ", country=" + country + ", year=" + year + ", series=" + series + ", value=" + value + ", footnotes=" + footnotes + ", source=" + source + "]";
73+
}
74+
75+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package com.baeldung.differences.rdd;
2+
3+
import static org.junit.Assert.assertEquals;
4+
5+
import java.util.List;
6+
7+
import org.apache.spark.SparkConf;
8+
import org.apache.spark.api.java.JavaPairRDD;
9+
import org.apache.spark.api.java.JavaRDD;
10+
import org.apache.spark.api.java.JavaSparkContext;
11+
import org.junit.AfterClass;
12+
import org.junit.BeforeClass;
13+
import org.junit.Test;
14+
15+
import scala.Tuple2;
16+
17+
public class ActionsUnitTest {
18+
private static JavaRDD<String> tourists;
19+
private static JavaSparkContext sc;
20+
public static final String COMMA_DELIMITER = ",(?=([^\"]*\"[^\"]*\")*[^\"]*$)";
21+
22+
@BeforeClass
23+
public static void init() {
24+
SparkConf conf = new SparkConf().setAppName("reduce")
25+
.setMaster("local[*]");
26+
sc = new JavaSparkContext(conf);
27+
tourists = sc.textFile("data/Tourist.csv").filter(line -> !line.startsWith("Region"));
28+
}
29+
30+
@AfterClass
31+
public static void cleanup() {
32+
sc.close();
33+
}
34+
35+
@Test
36+
public void whenDistinctCount_thenReturnDistinctNumRecords() {
37+
JavaRDD<String> countries = tourists.map(line -> {
38+
String[] columns = line.split(COMMA_DELIMITER);
39+
return columns[1];
40+
})
41+
.distinct();
42+
Long numberOfCountries = countries.count();
43+
System.out.println("Count: " + numberOfCountries);
44+
45+
assertEquals(Long.valueOf(220), numberOfCountries);
46+
}
47+
48+
@Test
49+
public void whenReduceByKeySum_thenTotalValuePerKey() {
50+
JavaRDD<String> touristsExpenditure = tourists.filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure"));
51+
52+
JavaPairRDD<String, Double> expenditurePairRdd = touristsExpenditure.mapToPair(line -> {
53+
String[] columns = line.split(COMMA_DELIMITER);
54+
return new Tuple2<>(columns[1], Double.valueOf(columns[6]));
55+
});
56+
List<Tuple2<String, Double>> totalByCountry = expenditurePairRdd.reduceByKey((x, y) -> x + y)
57+
.collect();
58+
System.out.println("Total per Country: " + totalByCountry);
59+
60+
for(Tuple2<String, Double> tuple : totalByCountry) {
61+
if (tuple._1.equals("Mexico")) {
62+
assertEquals(Double.valueOf(99164), tuple._2);
63+
}
64+
}
65+
}
66+
67+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package com.baeldung.differences.rdd;
2+
3+
import static org.apache.spark.sql.functions.col;
4+
import static org.junit.Assert.assertEquals;
5+
import static org.junit.Assert.assertFalse;
6+
import static org.junit.Assert.assertTrue;
7+
8+
import java.util.Arrays;
9+
import java.util.List;
10+
11+
import org.apache.spark.sql.DataFrameReader;
12+
import org.apache.spark.sql.Dataset;
13+
import org.apache.spark.sql.Row;
14+
import org.apache.spark.sql.SparkSession;
15+
import org.junit.AfterClass;
16+
import org.junit.BeforeClass;
17+
import org.junit.Test;
18+
19+
public class DataFrameUnitTest {
20+
private static SparkSession session;
21+
private static Dataset<Row> data;
22+
23+
@BeforeClass
24+
public static void init() {
25+
session = SparkSession.builder()
26+
.appName("TouristDataFrameExample")
27+
.master("local[*]")
28+
.getOrCreate();
29+
DataFrameReader dataFrameReader = session.read();
30+
data = dataFrameReader.option("header", "true")
31+
.csv("data/Tourist.csv");
32+
}
33+
34+
@AfterClass
35+
public static void cleanup() {
36+
session.stop();
37+
}
38+
39+
@Test
40+
public void whenSelectSpecificColumns_thenColumnsFiltered() {
41+
Dataset<Row> selectedData = data.select(col("country"), col("year"), col("value"));
42+
selectedData.show();
43+
44+
List<String> resultList = Arrays.asList(selectedData.columns());
45+
assertTrue(resultList.contains("country"));
46+
assertTrue(resultList.contains("year"));
47+
assertTrue(resultList.contains("value"));
48+
assertFalse(resultList.contains("Series"));
49+
50+
}
51+
52+
@Test
53+
public void whenFilteringByCountry_thenCountryRecordsSelected() {
54+
Dataset<Row> filteredData = data.filter(col("country").equalTo("Mexico"));
55+
filteredData.show();
56+
57+
filteredData.foreach(record -> {
58+
assertEquals("Mexico", record.get(1));
59+
});
60+
61+
}
62+
63+
@Test
64+
public void whenGroupCountByCountry_thenContryTotalRecords() {
65+
Dataset<Row> recordsPerCountry = data.groupBy(col("country"))
66+
.count();
67+
recordsPerCountry.show();
68+
69+
Dataset<Row> filteredData = recordsPerCountry.filter(col("country").equalTo("Sweden"));
70+
assertEquals(new Long(12), filteredData.first()
71+
.get(1));
72+
}
73+
74+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package com.baeldung.differences.rdd;
2+
3+
import static org.apache.spark.sql.functions.col;
4+
import static org.apache.spark.sql.functions.sum;
5+
import static org.junit.Assert.assertEquals;
6+
7+
import org.apache.spark.api.java.function.FilterFunction;
8+
import org.apache.spark.sql.DataFrameReader;
9+
import org.apache.spark.sql.Dataset;
10+
import org.apache.spark.sql.Encoders;
11+
import org.apache.spark.sql.Row;
12+
import org.apache.spark.sql.SparkSession;
13+
import org.junit.AfterClass;
14+
import org.junit.BeforeClass;
15+
import org.junit.Test;
16+
17+
import com.baeldung.differences.dataframe.dataset.rdd.TouristData;
18+
19+
public class DatasetUnitTest {
20+
private static SparkSession session;
21+
private static Dataset<TouristData> typedDataset;
22+
23+
@BeforeClass
24+
public static void init() {
25+
session = SparkSession.builder()
26+
.appName("TouristDatasetExample")
27+
.master("local[*]")
28+
.getOrCreate();
29+
DataFrameReader dataFrameReader = session.read();
30+
Dataset<Row> data = dataFrameReader.option("header", "true")
31+
.csv("data/Tourist.csv");
32+
Dataset<Row> responseWithSelectedColumns = data.select(col("region"),
33+
col("country"), col("year"), col("series"), col("value").cast("double"),
34+
col("footnotes"), col("source"));
35+
typedDataset = responseWithSelectedColumns.as(Encoders.bean(TouristData.class));
36+
}
37+
38+
@AfterClass
39+
public static void cleanup() {
40+
session.stop();
41+
}
42+
43+
@Test
44+
public void whenFilteringByCountry_thenCountryRecordsSelected() {
45+
Dataset<TouristData> selectedData = typedDataset
46+
.filter((FilterFunction<TouristData>) record -> record.getCountry()
47+
.equals("Norway"));
48+
selectedData.show();
49+
50+
selectedData.foreach(record -> {
51+
assertEquals("Norway", record.getCountry());
52+
});
53+
}
54+
55+
@Test
56+
public void whenGroupCountByCountry_thenContryTotalRecords() {
57+
Dataset<Row> countriesCount = typedDataset.groupBy(typedDataset.col("country"))
58+
.count();
59+
countriesCount.show();
60+
61+
assertEquals(Long.valueOf(220), Long.valueOf(countriesCount.count()));
62+
}
63+
64+
@Test
65+
public void whenFilteredByPropertyRange_thenRetreiveValidRecords() {
66+
// Filter records with existing data for years between 2010 and 2017
67+
typedDataset.filter((FilterFunction<TouristData>) record -> record.getYear() != null
68+
&& (Long.valueOf(record.getYear()) > 2010 && Long.valueOf(record.getYear()) < 2017))
69+
.show();
70+
}
71+
72+
@Test
73+
public void whenSumValue_thenRetreiveTotalValue() {
74+
// Total tourist expenditure by country
75+
typedDataset.filter((FilterFunction<TouristData>) record -> record.getValue() != null
76+
&& record.getSeries()
77+
.contains("expenditure"))
78+
.groupBy("country")
79+
.agg(sum("value"))
80+
.show();
81+
}
82+
83+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package com.baeldung.differences.rdd;
2+
3+
4+
import static org.junit.Assert.assertEquals;
5+
import static org.junit.Assert.assertTrue;
6+
7+
import org.apache.commons.lang3.StringUtils;
8+
import org.apache.spark.SparkConf;
9+
import org.apache.spark.api.java.JavaRDD;
10+
import org.apache.spark.api.java.JavaSparkContext;
11+
import org.junit.AfterClass;
12+
import org.junit.BeforeClass;
13+
import org.junit.Test;
14+
15+
public class TransformationsUnitTest {
16+
17+
public static final String COMMA_DELIMITER = ",(?=([^\"]*\"[^\"]*\")*[^\"]*$)";
18+
private static JavaSparkContext sc;
19+
private static JavaRDD<String> tourists;
20+
21+
@BeforeClass
22+
public static void init() {
23+
SparkConf conf = new SparkConf().setAppName("uppercaseCountries")
24+
.setMaster("local[*]");
25+
sc = new JavaSparkContext(conf);
26+
tourists = sc.textFile("data/Tourist.csv")
27+
.filter(line -> !line.startsWith("Region")); //filter header row
28+
}
29+
30+
@AfterClass
31+
public static void cleanup() {
32+
sc.close();
33+
}
34+
35+
@Test
36+
public void whenMapUpperCase_thenCountryNameUppercased() {
37+
JavaRDD<String> upperCaseCountries = tourists.map(line -> {
38+
String[] columns = line.split(COMMA_DELIMITER);
39+
return columns[1].toUpperCase();
40+
})
41+
.distinct();
42+
43+
upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");
44+
45+
upperCaseCountries.foreach(country -> {
46+
//replace non alphanumerical characters
47+
country = country.replaceAll("[^a-zA-Z]", "");
48+
assertTrue(StringUtils.isAllUpperCase(country));
49+
});
50+
}
51+
52+
@Test
53+
public void whenFilterByCountry_thenShowRequestedCountryRecords() {
54+
JavaRDD<String> touristsInMexico = tourists.filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico"));
55+
56+
touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");
57+
58+
touristsInMexico.foreach(record -> {
59+
assertEquals("Mexico", record.split(COMMA_DELIMITER)[1]);
60+
});
61+
}
62+
63+
}

0 commit comments

Comments
 (0)