根据水质监测信息预测水质变化趋势,对水环境的有效防范治理具有重要意义。目前水质预测方法主要分为两类,一类为基于污染物在水环境中的理化过程建立的数值模型,主要包括WASP、QUAL、MIKE等;另一类为基于数据驱动的机器学习方法及深度学习方法,主要包括LSTM、adaboost、随机森林等。本文基于spark分布式计算框架实现随机森林算法进行水质预测。
1、准备数据
将数据上传到HDFS分布式文件系统上,再利用hive建立外部表,建表语句如下:
create external table wayeal_forecast.water (`time` string COMMENT 'from deserializer',
`id` string COMMENT 'from deserializer',
`name` STRING COMMENT 'from deserializer',
`basin` string COMMENT 'from deserializer',
`section` STRING COMMENT 'from deserializer',
`ph` STRING COMMENT 'from deserializer',
`ph_type` string COMMENT 'from deserializer',
`do` string COMMENT 'from deserializer',
`do_type` string COMMENT 'from deserializer',
`nh3_n` string COMMENT 'from deserializer',
`nh3_n_type` STRING COMMENT 'from deserializer',
`codmn` STRING COMMENT 'from deserializer',
`codmn_type` STRING COMMENT 'from deserializer',
`c` STRING COMMENT 'from deserializer',
`c_type` STRING COMMENT 'from deserializer')
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = ","
)
部分数据如下所示:
2、模型开发
首先,从hive中读取数据:
data = self.spark.sql(HIVE_SQL).select(WATER_FACTOR)
data1 = data.filter(data['id'] == '78')
然后,由于原始数据为时间序列数据,需将其转换成监督学习数据,代码如下:
data = data.withColumn("id", monotonically_increasing_id())
for colName in SELECT_WATER_FACTOR:
for i in range(1, n_hours + 1, 1):
w = Window.orderBy("id")
data = data.withColumn("{}(t-{})".format(colName, i), lag(colName, i).over(w))
data = data.na.drop()
data = data.drop("id")
最后,利用pipeline封装整个算法流程,并基于ParamGridBuilder及TrainValidationSplit实现网格搜索进行模型调优。代码如下:
(train_data, test_data) = data.randomSplit([0.7, 0.3])
data_col = data.columns
data_col.remove('time')
input_cols = [col for col in data_col if col not in SELECT_WATER_FACTOR]
vector_assembler = VectorAssembler(inputCols=input_cols, outputCol="featureVector")
rf_regressor = RandomForestRegressor()\
.setFeaturesCol("featureVector")\
.setLabelCol("ph")\
.setPredictionCol("prediction")
param_grid = ParamGridBuilder()\
.addGrid(rf_regressor.numTrees, [10, 50, 100, 150, 200, 500])\
.build()
pipeline = Pipeline(stages=[vector_assembler, rf_regressor])
# model = pipeline.fit(train_data)
# predictions = model.transform(test_data)
evaluator = RegressionEvaluator(labelCol="ph", predictionCol="prediction", metricName="rmse")
validator = TrainValidationSplit()\
.setEstimator(pipeline)\
.setEstimatorParamMaps(param_grid)\
.setEvaluator(evaluator)\
.setTrainRatio(0.9)
validator_model = validator.fit(train_data)
best_model = validator_model.bestModel
predictions = best_model.transform(test_data)
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
rf_model = best_model.stages[1]
print(rf_model)
predictions.show(truncate=False)
最优模型结果如下:
Root Mean Squared Error (RMSE) on test data = 0.202249
RandomForestRegressionModel (uid=RandomForestRegressor_8bde32f77a3e) with 150 trees