Hadoop是个高效的工具
介绍了Hadoop的由来和组成,当然提供给你用来分析的数据,以及最快的方式。更重要的是描述第一个Hadoop工程的详细编写过程。
1.认识Hadoop
1.1解决高速增长的存储空间和读取速度不匹配的问题
引入了多个磁盘同时读取数据的技术。但为实现这个技术,又有两个问题需要解决:
- 硬盘故障问题
- 各种分布式系统允许结合不同来源的数据进行分析,很难保证其正确性。
而在Hadoop中对这两个问题都做到处理和解决。对于第一个问题,常用的做法是保存数据副本(replica),Hadoop文件系统(HDFS, Hadoop Distributed FileSystem)的使用原理类似,略有不同。第二个问题Hadoop中引入了MapReduce模型,模型抽象出了硬盘读写问题并将其转换为对一个数据集的计算,同时也具备较高的可靠性。
MapReduce 是一种线性的可伸缩编程模型。使用者要写两个函数,分别是Map函数和Reduce函数,每个函数定义从一个键值对集合到另一个键值对集合到映射。性能方面,MapReduce尽量在计算节点上存储数据,以实现数据的本地快速访问,数据本地化是MapReduce的核心特征,从而获得更好的性能。另外有多种基于MapReduce的高级查询语言(Pig和Hive)供使用。稳定性上,MapReduce采用无共享(shared-nothing)框架,实现了失败检测,所有使用者无需担心系统的部分失效问题。
1.2.气象数据下载
书中的数据分析实例使用的是ncdc的气象数据,在手动编写程序之前,首先要准备好这些数据。最开始找到了ncdc的ftp站点ftp://ftp.ncdc.noaa.gov/pub/data/
,下载经常性的出现断线,下载速度异常缓慢。所以不得不重新搜索新的源,最终找到了https://www1.ncdc.noaa.gov/pub/data/noaa
这个地址,但是在下载时却不像ftp可以批量下载。
而只能通过脚本去抓去数据。这个脚本实现的功能是,按年份批量下载对应地址的压缩包,并将这些数据按年份保存。值得一说的是这个shell脚本使用了并行下载方式,节省了大量的时间。
#! /bin/bash
for i in {1901..2019}
do {
mkdir -p /Users/macos/noaaData/$i
wget --execute robots=off -r -np -nH --cut-dirs=4 -R index.html* https://www1.ncdc.noaa.gov/pub/data/noaa/$i/ -P /Users/macos/noaaData/$i
}&
done
2.第一个Hadoop工程
2.1 安装并运行Hadoop
下载最新2.8.1版本
具体安装方式和配置过程参考官方文档 http://hadoop.apache.org/docs/current/
进入hadoop-x.x.x/sbin目录下运行star-all脚本(中间需要输入root密码)
启动成功验证:
打开浏览器:
http://192.168.8.88:50070 (hdfs管理界面)显示active活跃状态
http://192.168.8.88:8088 (yarn管理界面)
以上两个地址正常显示,则说明启动成功。
2.2 Hadoop程序编写
MapReduce任务过程分为两个处理阶段:
- map阶段
- reduce阶段
每个阶段都以键值对作为输入和输出,类型可供选择。两个处理阶段需要分别编写相应的函数方法,并加上运行作业的代码。
新建Maven项目
- 在pom.xml文件中增加以下依赖关系
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
- 编写Map函数、Reduce函数和调用执行代码
Map函数
完成功能:在天气数据中截取温度数据。并写入到contex中为Reduce方法准备好数据。
// cc MaxTemperatureMapper Mapper for maximum temperature example
// vv MaxTemperatureMapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
// ^^ MaxTemperatureMapper
Reduce函数
完成功能:根据Map函数传递来的数据计算最大值,并输出年份和最高温度的键值对。
// cc MaxTemperatureReducer Reducer for maximum temperature example
// vv MaxTemperatureReducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
// ^^ MaxTemperatureReducer
main方法:
完成功能:创建运行Job,传递数据目录并设置Map和Reduce对应class;同时设置输出键值对格式。
// cc MaxTemperature Application to find the maximum temperature in the weather dataset
// vv MaxTemperature
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
// ^^ MaxTemperature
- 设置数据输入输出目录
在Run configurations中Program arguments输入框中,设置数据目录和输出目录的绝对路径。
运行会在输出目录下生成两个文件:
_SUCCESS
part-r-00000
第二个文件为我们需要的运行结果如下:
1948 342
1949 311
...
到此我们对Hadoop工程有了一个初步认识,并成功运行了我们的第一个项目。好了,这篇分享就到这了,感兴趣可以持续关注博客更新哦🌹