Dependency Injection with Spark
Dependency injection is a design pattern that letâs you write Spark code thatâs more flexible and easier to test.
This blog post introduces code that has a dependency, shows how to inject the path as a dependency, and then shows how to inject an entire DataFrame.
Code with a dependency
Letâs create a withStateFullName
method that appends a state_name
column to a DataFrame.
def withStateFullName()(df: DataFrame): DataFrame = {
val stateMappingsDF = spark
.read
.option("header", true)
.csv(Config.get("stateMappingsPath"))
df
.join(
broadcast(stateMappingsDF),
df("state") <=> stateMappingsDF("state_abbreviation"),
"left_outer"
)
.drop("state_abbreviation")
}
The withStateFullName
appends the state_name
column with a broadcast join.
withStateFullName
depends on the Config
object. withStateFullName
âhas a dependencyâ. This is the dependency thatâll be âinjectedâ.
The Config
object is defined as follows:
object Config {
val test: Map[String, String] = {
Map(
"stateMappingsPath" -> new java.io.File(s"./src/test/resources/state_mappings.csv").getCanonicalPath
)
}
val production: Map[String, String] = {
Map(
"stateMappingsPath" -> "s3a://some-fake-bucket/state_mappings.csv"
)
}
var environment = sys.env.getOrElse("PROJECT_ENV", "production")
def get(key: String): String = {
if (environment == "test") {
test(key)
} else {
production(key)
}
}
}
This blog post describes environment specific configuration in Scala projects if youâre interested in more details about this design pattern.
Letâs define a src/test/resources/state_mappings.csv
file, so we can run the withStateFullName
method on some sample data.
state_name,state_abbreviation
Tennessee,TN
New York,NY
Mississippi,MS
Run the withStateFullName
method.
val df = Seq(
("john", 23, "TN"),
("sally", 48, "NY")
).toDF("first_name", "age", "state")
df
.transform(withStateFullName())
.show()+----------+---+-----+----------+
|first_name|age|state|state_name|
+----------+---+-----+----------+
| john| 23| TN| Tennessee|
| sally| 48| NY| New York|
+----------+---+-----+----------+
Letâs refactor the withStateFullName
so it does not depend on the Config
object. In other words, letâs remove the Config
dependency from withStateFullName
with the dependency injection design pattern.
Injecting a path
Letâs create a withStateFullNameInjectPath
method that takes the path to the state mappings data as an argument.
def withStateFullNameInjectPath(
stateMappingsPath: String = Config.get("stateMappingsPath")
)(df: DataFrame): DataFrame = {
val stateMappingsDF = spark
.read
.option("header", true)
.csv(stateMappingsPath)
df
.join(
broadcast(stateMappingsDF),
df("state") <=> stateMappingsDF("state_abbreviation"),
"left_outer"
)
.drop("state_abbreviation")
}
The stateMappingsPath
leverages a smart default, so users can easily use the function without explicitly referring to the path. This code is more flexible because it allows users to override the smart default and use any stateMappingsPath
when running the function.
Letâs rely on the smart default and run this code.
val df = Seq(
("john", 23, "TN"),
("sally", 48, "NY")
).toDF("first_name", "age", "state")
df
.transform(withStateFullNameInjectPath())
.show()+----------+---+-----+----------+
|first_name|age|state|state_name|
+----------+---+-----+----------+
| john| 23| TN| Tennessee|
| sally| 48| NY| New York|
+----------+---+-----+----------+
The withStateFullNameInjectPath
method does not depend on the Config
object.
Injecting an entire DataFrame
Letâs refactor the code again to inject the entire DataFrame as an argument, again with a smart default.
def withStateFullNameInjectDF(
stateMappingsDF: DataFrame = spark
.read
.option("header", true)
.csv(Config.get("stateMappingsPath"))
)(df: DataFrame): DataFrame = {
df
.join(
broadcast(stateMappingsDF),
df("state") <=> stateMappingsDF("state_abbreviation"),
"left_outer"
)
.drop("state_abbreviation")
}
This code provides the same functionality and is even more flexible. We can now run the function with any DataFrame. We can read a Parquet file and run this code or create a DataFrame with toDF
in our test suite.
Letâs override the smart default and run this code in our test suite:
val stateMappingsDF = Seq(
("Tennessee", "TN"),
("New York", "NY")
).toDF("state_full_name", "state_abbreviation")
val df = Seq(
("john", 23, "TN"),
("sally", 48, "NY")
).toDF("first_name", "age", "state")
df
.transform(withStateFullNameInjectDF(stateMappingsDF))
.show()+----------+---+-----+---------------+
|first_name|age|state|state_full_name|
+----------+---+-----+---------------+
| john| 23| TN| Tennessee|
| sally| 48| NY| New York|
+----------+---+-----+---------------+
Injecting the entire DataFrame as a dependency allows us to test our code without reading from a file. Avoiding file I/O in your test suite is a great way to make your tests run faster.
This design pattern also makes your tests more readable. Your coworkers wonât need to open up random CSV files to understand the tests.
Conclusion
Dependency injection can be used to make code thatâs more flexible and easier to test.
We went from having code that relied on a CSV file stored in a certain path to code thatâs flexible enough to be run with any DataFrame.
Before productionalizing this code, itâd be a good idea to run some DataFrame validations (on both the underlying DataFrame and the injected DataFrame) and make the code even more flexible by making it schema independent.
Make sure to leverage this design pattern so you donât need to read from CSV / Parquet files in your test suite anymore!