去掉重复数据的原理:相同的Key会被同一个Reducer处理,因此在Reducer中直接输出这个Key即可。因为没有用到Value,所以Value的类型是NullWritable。
1.程序源码
//DistinctMapper.java
package demo.distinct;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
// 数据:Hadoop
String data = value1.toString().trim();
//直接将这个字符串作为Key2输出
context.write(new Text(data), NullWritable.get());
}
}
//DistinctReducer.java
package demo.distinct;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class DistinctReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key3, Iterable<NullWritable> value3, Context context)
throws IOException, InterruptedException {
//直接将key3作为key4输出
context.write(key3, NullWritable.get());
}
}
//DistinctMain.java
package demo.distinct;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DistinctMain {
public static void main(String[] args) throws Exception {
//创建job
Job job = Job.getInstance(new Configuration());
//指定任务入口
job.setJarByClass(DistinctMain.class);
//指定任务的Mapper和输出类型
job.setMapperClass(DistinctMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//指定任务的Reducer和输出类型
job.setReducerClass(DistinctReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//指定输入和输出目录:HDFS路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行任务
job.waitForCompletion(true);
}
}
2.打包执行
待处理的重复数据:
# hdfs dfs -cat /input/strings.txt
HBase
Hive
Hadoop
Spark
Storm
Pig
ZooKeeper
HUE
MapReduce
HDFS
Kafka
Flume
HBase
HDFS
将程序打包成Distinct.jar,上传到服务器上执行:
# hadoop jar Distinct.jar /input/strings.txt /output/distinct
……
18/11/18 10:04:16 INFO mapreduce.Job: map 0% reduce 0%
18/11/18 10:04:23 INFO mapreduce.Job: map 100% reduce 0%
18/11/18 10:04:27 INFO mapreduce.Job: map 100% reduce 100%
18/11/18 10:04:29 INFO mapreduce.Job: Job job_1542506318955_0001 completed successfully
……
查看处理结果:
# hdfs dfs -ls /output/distinct
Found 2 items
-rw-r--r-- 1 root supergroup 0 2018-11-18 10:04 /output/distinct/_SUCCESS
-rw-r--r-- 1 root supergroup 75 2018-11-18 10:04 /output/distinct/part-r-00000# hdfs dfs -cat /output/distinct/part-r-00000
Flume
HBase
HDFS
HUE
Hadoop
Hive
Kafka
MapReduce
Pig
Spark
Storm
ZooKeeper