注意:该项目只展示部分功能
1.开发环境
发语言:python
采用技术:Spark、Hadoop、Django、Vue、Echarts等技术框架
数据库:MySQL
开发环境:PyCharm
2 系统设计
随着全球人口持续增长和气候变化加剧,农业生产面临着前所未有的挑战,如何提高农作物产量、优化种植结构成为各国政府和农业从业者关注的焦点问题。传统农业生产主要依靠经验判断和简单统计分析,难以准确把握复杂的农业生产规律,特别是在面对多变的气候条件、不同的土壤类型、各种农业投入措施时,缺乏科学有效的数据分析手段。近年来,大数据技术和机器学习算法在各个领域得到广泛应用,为农业数据分析提供了新的技术路径。农业部门积累了大量的生产数据,包括不同区域的作物产量、气候条件、土壤特征、农业措施等信息,这些数据蕴含着丰富的农业生产规律,但由于数据量大、维度复杂,传统分析方法难以有效处理。基于spark大数据的农作物产量数据分析与可视化系统统能够充分挖掘这些数据价值,为农业生产决策提供科学依据,这也是本课题选择农作物产量数据分析作为研究对象的主要背景。
本课题通过构建基于spark大数据的农作物产量数据分析与可视化系统,能够在一定程度上为农业生产实践提供参考价值。系统通过对历史产量数据的深入分析,可以帮助了解不同地区、不同作物的产量特点和影响因素,为种植户在选择作物品种、制定种植计划时提供一些数据支持。通过分析化肥、灌溉等农业措施的效果,能够为农业投入决策提供量化参考,在一定程度上帮助提高资源利用效率。系统的可视化功能使复杂的数据分析结果更加直观易懂,便于农业管理人员和种植户理解和应用。从技术角度来看,本课题将大数据处理技术与农业领域相结合,为相关专业学生提供了实践机会,有助于加深对Spark、Hadoop、机器学习等技术的理解和应用能力。虽然作为毕业设计项目,系统的规模和功能相对有限,但通过这个实践过程,能够培养运用现代信息技术解决实际问题的能力,为今后从事相关工作积累一定的经验基础。
基于spark大数据的农作物产量数据分析与可视化系统是一套完整的大数据处理与分析解决方案,该系统采用Python作为主要开发语言,集成Spark分布式计算框架和Hadoop生态系统,专门针对农作物产量数据进行深度挖掘与智能分析。系统核心功能涵盖五大分析维度:地理环境因素对产量的影响分析、农业生产措施的效益分析、作物种类与生长周期分析、气候条件影响分析以及多维度综合下探与模式挖掘,通过24个具体分析项目全面解析农作物产量的影响因素和变化规律。在技术架构上,系统利用Hadoop分布式文件系统存储海量农业数据,通过Spark强大的内存计算能力实现快速数据处理,结合机器学习算法挖掘技术挖掘隐藏在数据背后的关联规律。前端采用Vue框架构建用户交互界面,集成Echarts可视化库生成丰富的图表展示,包括区域产量对比图、作物生长周期分析图、气候因素关联热力图等多种可视化形式,直观展现分析结果。数据存储采用MySQL数据库,确保数据的持久化和高效查询,整个系统从数据采集、清洗、分析到可视化展示形成完整的数据处理流水线,为农业决策者提供科学的数据支撑和智能化的分析工具。
3 系统展示
3.1 大屏页面
3.2 分析页面
3.3 基础页面
4 更多推荐
计算机专业毕业设计新风向,2026年大数据 + AI前沿60个毕设选题全解析,涵盖Hadoop、Spark、机器学习、AI等类型
基于大数据的脑卒中风险数据分析与可视化系统
基于大数据技术的慢性肾病数据可视化分析系统
基于Hadoop的软科大学排名可视化分析系统
基于大数据和Python的金融风险评估与数据可视化分析系统
5 部分功能代码
spark = SparkSession.builder.appName("CropYieldAnalysis").config("spark.sql.adaptive.enabled", "true").config("spark.sql.adaptive.coalescePartitions.enabled", "true").getOrCreate()
def regional_yield_analysis():
df = spark.read.option("header", "true").option("inferSchema", "true").csv("hdfs://localhost:9000/crop_data/crop_yield.csv")
regional_stats = df.groupBy("Region").agg(
avg("Yield_tons_per_hectare").alias("avg_yield"),
count("*").alias("sample_count"),
max("Yield_tons_per_hectare").alias("max_yield"),
min("Yield_tons_per_hectare").alias("min_yield"),
stddev("Yield_tons_per_hectare").alias("yield_stddev")
)
regional_stats = regional_stats.withColumn("yield_coefficient_variation",
col("yield_stddev") / col("avg_yield") * 100)
regional_crop_analysis = df.groupBy("Region", "Crop").agg(
avg("Yield_tons_per_hectare").alias("crop_avg_yield"),
count("*").alias("crop_sample_count")
)
regional_crop_pivot = regional_crop_analysis.groupBy("Region").pivot("Crop").agg(
first("crop_avg_yield").alias("avg_yield")
)
soil_region_analysis = df.groupBy("Region", "Soil_Type").agg(
avg("Yield_tons_per_hectare").alias("soil_region_yield")
)
final_regional_data = regional_stats.join(
regional_crop_pivot, on="Region", how="left"
).join(
soil_region_analysis.groupBy("Region").agg(
collect_list("Soil_Type").alias("soil_types"),
avg("soil_region_yield").alias("avg_soil_yield")
), on="Region", how="left"
)
result_pandas = final_regional_data.toPandas()
conn = mysql.connector.connect(host='localhost', user='root', password='123456', database='crop_analysis')
cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS regional_analysis")
cursor.execute("""CREATE TABLE regional_analysis (
id INT AUTO_INCREMENT PRIMARY KEY,
region VARCHAR(100),
avg_yield DECIMAL(10,2),
sample_count INT,
max_yield DECIMAL(10,2),
min_yield DECIMAL(10,2),
yield_stddev DECIMAL(10,2),
yield_cv DECIMAL(10,2),
avg_soil_yield DECIMAL(10,2)
)""")
for _, row in result_pandas.iterrows():
cursor.execute("""INSERT INTO regional_analysis
(region, avg_yield, sample_count, max_yield, min_yield, yield_stddev, yield_cv, avg_soil_yield)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
(row['Region'], float(row['avg_yield']), int(row['sample_count']),
float(row['max_yield']), float(row['min_yield']), float(row['yield_stddev']),
float(row['yield_coefficient_variation']), float(row['avg_soil_yield']) if pd.notna(row['avg_soil_yield']) else 0.0))
conn.commit()
cursor.close()
conn.close()
return result_pandas.to_dict('records')
def fertilizer_irrigation_effect_analysis():
df = spark.read.option("header", "true").option("inferSchema", "true").csv("hdfs://localhost:9000/crop_data/crop_yield.csv")
fertilizer_effect = df.groupBy("Fertilizer_Used").agg(
avg("Yield_tons_per_hectare").alias("avg_yield_fertilizer"),
count("*").alias("fertilizer_count"),
stddev("Yield_tons_per_hectare").alias("fertilizer_stddev")
)
irrigation_effect = df.groupBy("Irrigation_Used").agg(
avg("Yield_tons_per_hectare").alias("avg_yield_irrigation"),
count("*").alias("irrigation_count"),
stddev("Yield_tons_per_hectare").alias("irrigation_stddev")
)
combined_effect = df.groupBy("Fertilizer_Used", "Irrigation_Used").agg(
avg("Yield_tons_per_hectare").alias("combined_avg_yield"),
count("*").alias("combined_count"),
max("Yield_tons_per_hectare").alias("combined_max_yield"),
min("Yield_tons_per_hectare").alias("combined_min_yield")
)
crop_fertilizer_analysis = df.groupBy("Crop", "Fertilizer_Used").agg(
avg("Yield_tons_per_hectare").alias("crop_fertilizer_yield"),
count("*").alias("crop_fertilizer_count")
)
fertilizer_improvement = crop_fertilizer_analysis.filter(col("Fertilizer_Used") == "Yes").select(
col("Crop"), col("crop_fertilizer_yield").alias("with_fertilizer")
).join(
crop_fertilizer_analysis.filter(col("Fertilizer_Used") == "No").select(
col("Crop"), col("crop_fertilizer_yield").alias("without_fertilizer")
), on="Crop", how="inner"
).withColumn("improvement_rate",
(col("with_fertilizer") - col("without_fertilizer")) / col("without_fertilizer") * 100)
regional_modernization = df.groupBy("Region").agg(
(sum(when(col("Fertilizer_Used") == "Yes", 1).otherwise(0)) / count("*") * 100).alias("fertilizer_adoption_rate"),
(sum(when(col("Irrigation_Used") == "Yes", 1).otherwise(0)) / count("*") * 100).alias("irrigation_adoption_rate")
).withColumn("modernization_score",
(col("fertilizer_adoption_rate") + col("irrigation_adoption_rate")) / 2)
soil_fertilizer_effect = df.groupBy("Soil_Type", "Fertilizer_Used").agg(
avg("Yield_tons_per_hectare").alias("soil_fertilizer_yield")
)
fertilizer_pandas = fertilizer_effect.toPandas()
irrigation_pandas = irrigation_effect.toPandas()
combined_pandas = combined_effect.toPandas()
improvement_pandas = fertilizer_improvement.toPandas()
modernization_pandas = regional_modernization.toPandas()
conn = mysql.connector.connect(host='localhost', user='root', password='123456', database='crop_analysis')
cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS agricultural_measures_effect")
cursor.execute("""CREATE TABLE agricultural_measures_effect (
id INT AUTO_INCREMENT PRIMARY KEY,
measure_type VARCHAR(50),
measure_value VARCHAR(20),
avg_yield DECIMAL(10,2),
sample_count INT,
yield_stddev DECIMAL(10,2),
analysis_category VARCHAR(50)
)""")
for _, row in fertilizer_pandas.iterrows():
cursor.execute("""INSERT INTO agricultural_measures_effect
(measure_type, measure_value, avg_yield, sample_count, yield_stddev, analysis_category)
VALUES (%s, %s, %s, %s, %s, %s)""",
('Fertilizer', row['Fertilizer_Used'], float(row['avg_yield_fertilizer']),
int(row['fertilizer_count']), float(row['fertilizer_stddev']), 'single_factor'))
for _, row in irrigation_pandas.iterrows():
cursor.execute("""INSERT INTO agricultural_measures_effect
(measure_type, measure_value, avg_yield, sample_count, yield_stddev, analysis_category)
VALUES (%s, %s, %s, %s, %s, %s)""",
('Irrigation', row['Irrigation_Used'], float(row['avg_yield_irrigation']),
int(row['irrigation_count']), float(row['irrigation_stddev']), 'single_factor'))
conn.commit()
cursor.close()
conn.close()
return {
'fertilizer_effect': fertilizer_pandas.to_dict('records'),
'irrigation_effect': irrigation_pandas.to_dict('records'),
'combined_effect': combined_pandas.to_dict('records'),
'crop_improvement': improvement_pandas.to_dict('records'),
'regional_modernization': modernization_pandas.to_dict('records')
}
def machine_learning_yield_prediction():
df = spark.read.option("header", "true").option("inferSchema", "true").csv("hdfs://localhost:9000/crop_data/crop_yield.csv")
df_encoded = df.withColumn("Fertilizer_Used_Encoded", when(col("Fertilizer_Used") == "Yes", 1).otherwise(0)) \
.withColumn("Irrigation_Used_Encoded", when(col("Irrigation_Used") == "Yes", 1).otherwise(0))
region_indexer = df_encoded.select("Region").distinct().rdd.map(lambda row: row[0]).collect()
region_mapping = {region: idx for idx, region in enumerate(region_indexer)}
region_broadcast = spark.sparkContext.broadcast(region_mapping)
def map_region(region):
return region_broadcast.value.get(region, 0)
map_region_udf = udf(map_region, IntegerType())
df_encoded = df_encoded.withColumn("Region_Encoded", map_region_udf(col("Region")))
crop_indexer = df_encoded.select("Crop").distinct().rdd.map(lambda row: row[0]).collect()
crop_mapping = {crop: idx for idx, crop in enumerate(crop_indexer)}
crop_broadcast = spark.sparkContext.broadcast(crop_mapping)
def map_crop(crop):
return crop_broadcast.value.get(crop, 0)
map_crop_udf = udf(map_crop, IntegerType())
df_encoded = df_encoded.withColumn("Crop_Encoded", map_crop_udf(col("Crop")))
soil_indexer = df_encoded.select("Soil_Type").distinct().rdd.map(lambda row: row[0]).collect()
soil_mapping = {soil: idx for idx, soil in enumerate(soil_indexer)}
soil_broadcast = spark.sparkContext.broadcast(soil_mapping)
def map_soil(soil):
return soil_broadcast.value.get(soil, 0)
map_soil_udf = udf(map_soil, IntegerType())
df_encoded = df_encoded.withColumn("Soil_Type_Encoded", map_soil_udf(col("Soil_Type")))
feature_columns = ["Region_Encoded", "Crop_Encoded", "Soil_Type_Encoded",
"Fertilizer_Used_Encoded", "Irrigation_Used_Encoded",
"Rainfall_mm", "Temperature_Celsius", "Days_to_Harvest"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_assembled = assembler.transform(df_encoded).select("features", "Yield_tons_per_hectare")
train_data, test_data = df_assembled.randomSplit([0.8, 0.2], seed=42)
rf = RandomForestRegressor(featuresCol="features", labelCol="Yield_tons_per_hectare",
numTrees=100, maxDepth=10, seed=42)
rf_model = rf.fit(train_data)
predictions = rf_model.transform(test_data)
evaluator = RegressionEvaluator(labelCol="Yield_tons_per_hectare", predictionCol="prediction")
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
feature_importance = rf_model.featureImportances.toArray()
importance_data = []
for i, importance in enumerate(feature_importance):
importance_data.append({
'feature_name': feature_columns[i],
'importance_score': float(importance),
'rank': i + 1
})
importance_data = sorted(importance_data, key=lambda x: x['importance_score'], reverse=True)
for i, item in enumerate(importance_data):
item['rank'] = i + 1
prediction_results = predictions.select("Yield_tons_per_hectare", "prediction").toPandas()
prediction_results['prediction_error'] = abs(prediction_results['Yield_tons_per_hectare'] - prediction_results['prediction'])
prediction_results['error_percentage'] = (prediction_results['prediction_error'] / prediction_results['Yield_tons_per_hectare']) * 100
conn = mysql.connector.connect(host='localhost', user='root', password='123456', database='crop_analysis')
cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS ml_prediction_results")
cursor.execute("""CREATE TABLE ml_prediction_results (
id INT AUTO_INCREMENT PRIMARY KEY,
model_type VARCHAR(50),
rmse_score DECIMAL(10,4),
mae_score DECIMAL(10,4),
r2_score DECIMAL(10,4),
train_samples INT,
test_samples INT,
created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)""")
cursor.execute("""INSERT INTO ml_prediction_results
(model_type, rmse_score, mae_score, r2_score, train_samples, test_samples)
VALUES (%s, %s, %s, %s, %s, %s)""",
('RandomForest', float(rmse), float(mae), float(r2),
train_data.count(), test_data.count()))
cursor.execute("DROP TABLE IF EXISTS feature_importance")
cursor.execute("""CREATE TABLE feature_importance (
id INT AUTO_INCREMENT PRIMARY KEY,
feature_name VARCHAR(100),
importance_score DECIMAL(10,6),
feature_rank INT
)""")
for item in importance_data:
cursor.execute("""INSERT INTO feature_importance
(feature_name, importance_score, feature_rank)
VALUES (%s, %s, %s)""",
(item['feature_name'], item['importance_score'], item['rank']))
conn.commit()
cursor.close()
conn.close()
return {
'model_performance': {'rmse': rmse, 'mae': mae, 'r2': r2},
'feature_importance': importance_data,
'prediction_sample': prediction_results.head(100).to_dict('records')
}
源码项目、定制开发、文档报告、PPT、代码答疑
希望和大家多多交流