This is a Python Notebook from EdX Machine Learning course. The goal is to know the ins and outs of linear regression, and use it to predict power plant electricity consumption.
# Power plant power output prediction - Linear Regression, Random Forest
# %sh pip install --upgrade spark_mooc_meta
### Above is for databricks_test_helper
# ‘AT’# ‘V’# ‘AP’# ‘RH’# ‘PE’
powerPlantDF = sqlContext.read.format('com.databricks.spark.csv').options(delimiter='\t', header='true', inferschema='true').load("/databricks-datasets/power-plant/data")
altPowerPlantDF = sqlContext.read.format('com.databricks.spark.csv').options(delimiter='\t', header='true', inferschema='false').load("/databricks-datasets/power-plant/data", schema=customSchema) sqlContext.sql("DROP TABLE IF EXISTS power_plant")dbutils.fs.rm("dbfs:/user/hive/warehouse/power_plant", True)sqlContext.registerDataFrameAsTable(powerPlantDF, "power_plant") # We'll hold out 20% of our data for testing and leave 80% for training seed = 1800009193L(split20DF, split80DF) = datasetDF.randomSplit([0.2, 0.8], seed) # Let's cache these datasets for performance testSetDF = split20DF.cache()trainingSetDF = split80DF.cache() # ***** LINEAR REGRESSION MODEL **** from pyspark.ml.regression import LinearRegressionfrom pyspark.ml.regression import LinearRegressionModel from pyspark.ml import Pipeline # Let's initialize our linear regression learnerlr = LinearRegression() # We use explain params to dump the parameters we can use print(lr.explainParams()) # Now we set the parameters for the method lr.setPredictionCol("Predicted_PE").setLabelCol("PE").setMaxIter(100).setRegParam(0.1) # We will use the new spark.ml pipeline API. If you have worked with scikit-learn this will be very familiar. lrPipeline = Pipeline() lrPipeline.setStages([vectorizer, lr]) # Let's first train on the entire dataset to see what we get lrModel = lrPipeline.fit(trainingSetDF) # The intercept is as follows: intercept = lrModel.stages[1].intercept # The coefficents (i.e., weights) are as follows: weights = lrModel.stages[1].coefficients # Create a list of the column names (without PE) featuresNoLabel = [col for col in datasetDF.columns if col != "PE"] # Merge the weights and labels coefficents = zip(weights, featuresNoLabel) # Now let's sort the coefficients from greatest absolute weight most to the least absolute weight coefficents.sort(key=lambda tup: abs(tup[0]), reverse=True) equation = "y = {intercept}".format(intercept=intercept) variables = [] for x in coefficents: weight = abs(x[0]) name = x[1] symbol = "+" if (x[0] > 0) else "-" equation += (" {} ({} * {})".format(symbol, weight, name)) # Finally here is our equation print("Linear Regression Equation: " + equation) # Apply our LR model to the test data and predict power output predictionsAndLabelsDF = lrModel.transform(testSetDF).select("AT", "V", "AP", "RH", "PE", "Predicted_PE") # Now let's compute an evaluation metric for our test dataset from pyspark.ml.evaluation import RegressionEvaluator # Create an RMSE evaluator using the label and predicted columns regEval = RegressionEvaluator(predictionCol="Predicted_PE", labelCol="PE", metricName="rmse") # Run the evaluator on the DataFrame rmse = regEval.evaluate(predictionsAndLabelsDF)print("Root Mean Squared Error: %.2f" % rmse) # Now let's compute another evaluation metric for our test dataset r2 = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName: "r2"})print("r2: {0:.2f}".format(r2)) # First we remove the table if it already exists sqlContext.sql("DROP TABLE IF EXISTS Power_Plant_RMSE_Evaluation") dbutils.fs.rm("dbfs:/user/hive/warehouse/Power_Plant_RMSE_Evaluation", True) # Next we calculate the residual error and divide it by the RMSE """ NOTE: selectExpr("col_name_in_table col_name_in_dataframe") -> NO 'AS' in between the two!! """ predictionsAndLabelsDF.selectExpr("PE", "Predicted_PE", "PE - Predicted_PE Residual_Error", "(PE - Predicted_PE) / {} Within_RSME".format(rmse)).registerTempTable("Power_Plant_RMSE_Evaluation") %sql SELECT case when Within_RSME <= 1.0 AND Within_RSME >= -1.0 then 1 when Within_RSME <= 2.0 AND Within_RSME >= -2.0 then 2 else 3 end RSME_Multiple, COUNT(*) AS count FROM Power_Plant_RMSE_Evaluation GROUP BY case when Within_RSME <= 1.0 AND Within_RSME >= -1.0 then 1 when Within_RSME <= 2.0 AND Within_RSME >= -2.0 then 2 else 3 end from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # We can reuse the RegressionEvaluator, regEval, to judge the model based on the best Root Mean Squared Error # Let's create our CrossValidator with 3 fold cross validation crossval = CrossValidator(estimator=lrPipeline, evaluator=regEval, numFolds=3) # Let's tune over our regularization parameter from 0.01 to 0.10 regParam = [x / 100.0 for x in range(1, 11)] # We'll create a paramter grid using the ParamGridBuilder, and add the grid to the CrossValidator paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, regParam) .build())crossval.setEstimatorParamMaps(paramGrid) # Now let's find and return the best model cvModel = crossval.fit(trainingSetDF).bestModel # Now let's use cvModel to compute an evaluation metric for our test dataset: testSetDFpredictionsAndLabelsDF = cvModel.transform(testSetDF).select("AT", "V", "AP", "RH", "PE", "Predicted_PE") # Run the previously created RMSE evaluator, regEval, on the predictionsAndLabelsDF DataFrame rmseNew = regEval.evaluate(predictionsAndLabelsDF) # Now let's compute the r2 evaluation metric for our test dataset r2New = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName: "r2"}) print("Original Root Mean Squared Error: {0:2.2f}".format(rmse)) print("New Root Mean Squared Error: {0:2.2f}".format(rmseNew)) print("Old r2: {0:2.2f}".format(r2))print("New r2: {0:2.2f}".format(r2New)) # Get regularization parameter by reaching through the JVM version of the API print("Regularization parameter of the best model: {0:.2f}". format(cvModel.stages[-1]._java_obj.parent().getRegParam())) # Decision tree instantiation and pipeline construction from pyspark.ml.regression import DecisionTreeRegressor # Create a DecisionTreeRegressor dt = DecisionTreeRegressor() dt.setLabelCol("PE")\ .setPredictionCol("Predicted_PE")\ .setFeaturesCol("features")\ .setMaxBins(100) # Create a Pipeline dtPipeline = Pipeline() # Set the stages of the Pipeline dtPipeline.setStages([vectorizer, dt]) # Let's just reuse our CrossValidator with the new dtPipeline, RegressionEvaluator regEval, and 3 fold cross validation crossval.setEstimator(dtPipeline) # Let's tune over our dt.maxDepth parameter on the values 2 and 3, create a paramter grid using the ParamGridBuilder paramGrid = (ParamGridBuilder() .addGrid(dt.maxDepth, [2, 3]) .build()) # Add the grid to the CrossValidator crossval.setEstimatorParamMaps(paramGrid) # Now let's find and return the best model dtModel = crossval.fit(trainingSetDF).bestModel # Now let's use dtModel to compute an evaluation metric for our test dataset: testSetDFpredictionsAndLabelsDF = dtModel.transform(testSetDF).select("AT", "V", "AP", "RH", "PE", "Predicted_PE") # Run the previously created RMSE evaluator, regEval, on the predictionsAndLabelsDF DataFrame rmseDT = regEval.evaluate(predictionsAndLabelsDF) # Now let's compute the r2 evaluation metric for our test dataset r2DT = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName: "r2"}) print("LR Root Mean Squared Error: {0:.2f}".format(rmseNew)) print("DT Root Mean Squared Error: {0:.2f}".format(rmseDT)) print("LR r2: {0:.2f}".format(r2New))print("DT r2: {0:.2f}".format(r2DT)) # DecisionTreeRegressionModel: display as if-then clauses print dtModel.stages[-1]._java_obj.toDebugString() # Random Forest (Ensemble of Decision Trees) from pyspark.ml.regression import RandomForestRegressor # Create a RandomForestRegressor rf = RandomForestRegressor() rf.setLabelCol("PE")\ .setPredictionCol("Predicted_PE")\ .setFeaturesCol("features")\ .setSeed(100088121L)\ .setMaxDepth(8)\ .setNumTrees(30) # Create a Pipeline rfPipeline = Pipeline() # Set the stages of the Pipeline rfPipeline.setStages([vectorizer, rf]) # Let's just reuse our CrossValidator with the new rfPipeline, RegressionEvaluator regEval, and 3 fold cross validation crossval.setEstimator(rfPipeline) # Let's tune over our rf.maxBins parameter on the values 50 and 100, create a parameter grid using the ParamGridBuilder paramGrid = (ParamGridBuilder() .addGrid(rf.maxBins, [50, 100]) .build()) # Add the grid to the CrossValidator crossval.setEstimatorParamMaps(paramGrid) # Now let's find and return the best model rfModel = crossval.fit(trainingSetDF).bestModel
There you go! It’s long but not complicated. Basically you:
- Set up the environment
- Build ML pipeline
- Apply different algorithms: linear regression, decision tree, and random forest
- Cross validation
- Grid search for hyper parameter
- Predict test set labels.
The above are standard ML end-to-end process. Have fun!
