Data Source: https://archive.ics.uci.edu/ml/datasets/Combined+Cycle+Power+Plant
Features consist of hourly average ambient variables
Temperature (T) in the range 1.81°C and 37.11°C, Ambient Pressure (AP) in the range 992.89-1033.30 milibar, Relative Humidity (RH) in the range 25.56% to 100.16% Exhaust Vacuum (V) in teh range 25.36-81.56 cm Hg Net hourly electrical energy output (EP) 420.26-495.76 MW The averages are taken from various sensors located around the plant that record the ambient variables every second. The variables are given without normalization.
Dataset Information:
The dataset contains 9568 data points collected from a Combined Cycle Power Plant over 6 years (2006-2011), when the power plant was set to work with full load. Features consist of hourly average ambient variables Temperature (T), Ambient Pressure (AP), Relative Humidity (RH) and Exhaust Vacuum (V) to predict the net hourly electrical energy output (EP) of the plant. A combined cycle power plant (CCPP) is composed of gas turbines (GT), steam turbines (ST) and heat recovery steam generators. In a CCPP, the electricity is generated by gas and steam turbines, which are combined in one cycle, and is transferred from one turbine to another. While the Vacuum is colected from and has effect on the Steam Turbine, he other three of the ambient variables effect the GT performance.
In [1]:
!ls -ltr /data
In [2]:
spark
Out[2]:
In [3]:
df = spark.read.format("csv").option("header","true")\
.option("inferSchema","true").load("/data/Combined_Cycle_Power_Plant.csv")
In [4]:
df.show()
In [5]:
df.cache()
Out[5]:
In [6]:
df.limit(10).toPandas().head()
Out[6]:
In [7]:
from pyspark.ml.feature import *
In [8]:
vectorizer = VectorAssembler()
vectorizer.setInputCols(["AT", "V", "AP", "RH"])
vectorizer.setOutputCol("features")
df_vect = vectorizer.transform(df)
df_vect.show(10, False)
In [9]:
print(vectorizer.explainParams())
In [10]:
from pyspark.ml.regression import LinearRegression
In [11]:
lr = LinearRegression()
print(lr.explainParams())
In [12]:
lr.setLabelCol("EP")
lr.setFeaturesCol("features")
model = lr.fit(df_vect)
In [13]:
type(model)
Out[13]:
In [14]:
print("R2:", model.summary.r2)
print("Intercept: ", model.intercept, "Coefficients", model.coefficients)
In [15]:
df_pred = model.transform(df_vect)
df_pred.show()
In [16]:
from pyspark.ml.evaluation import RegressionEvaluator
In [17]:
evaluator = RegressionEvaluator()
print(evaluator.explainParams())
In [18]:
evaluator = RegressionEvaluator(labelCol = "EP",
predictionCol = "prediction",
metricName = "rmse")
evaluator.evaluate(df_pred)
Out[18]:
In [19]:
from pyspark.ml.pipeline import Pipeline, PipelineModel
In [20]:
pipeline = Pipeline()
print(pipeline.explainParams())
pipeline.setStages([vectorizer, lr])
pipelineModel = pipeline.fit(df)
In [21]:
pipeline.getStages()
Out[21]:
In [22]:
lr_model = pipelineModel.stages[1]
lr_model .coefficients
Out[22]:
In [23]:
pipelineModel.transform(df).show()
In [24]:
evaluator.evaluate(pipelineModel.transform(df))
Out[24]:
In [25]:
pipelineModel.save("/tmp/lr-pipeline")
In [26]:
!tree /tmp/lr-pipeline
In [27]:
saved_model = PipelineModel.load("/tmp/lr-pipeline")
saved_model.stages[1].coefficients
Out[27]:
In [28]:
saved_model.transform(df).show()
In [29]:
df_train, df_test = df.randomSplit(weights=[0.7, 0.3], seed = 200)
In [30]:
pipelineModel = pipeline.fit(df_train)
evaluator.evaluate(pipelineModel.transform(df_test))
Out[30]:
In [31]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
In [32]:
paramGrid = ParamGridBuilder()\
.addGrid(lr.regParam, [0.1, 0.01]) \
.addGrid(lr.fitIntercept, [False, True])\
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
.build()
# In this case the estimator is simply the linear regression.
# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=lr,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
trainRatio=0.8)
tuned_model = tvs.fit(vectorizer.transform(df_train))
In [33]:
tuned_model.bestModel, tuned_model.validationMetrics
Out[33]:
In [34]:
df_test_pred = tuned_model.transform(vectorizer.transform(df_test))
df_test_pred.show()
In [35]:
evaluator.evaluate(df_test_pred)
Out[35]:
In [ ]:
In [ ]: