你是无意穿堂风 偏偏孤倨引山洪
作业内容:
1 远程读取某机器(用ip地址或主机名标识)HDFS目录/user/.../when_you_old.txt文件(保存一首英文诗)
2 统计文件中各个单词出现次数最多的5个(不区分大小写)
3 将统计的单词和次数写回到该机器某目录下 保存为 /user/.../top.txt
作业注:
作业 when_you_old.txt 原文件:
统计结果 top.txt 文件
解答:
解决思路
1 先读取hdfs文件
2 统计
- 大小写转换
- 拆成单词
- 以单词为 key 统计
- 排序求 top 5
3 写到hdfs文件
代码
练习连接hdfs:(非此题解答代码)
import java.net.URI;
import java.net.URISyntaxException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
/**
* 练习连接hdfs 读取内容
* @author hongXkeX
*/
public class Test {
public static void main(String[] args) throws IOException, URISyntaxException {
//创建配置文件
Configuration conf = new Configuration();
//创建需访问的路径
String path ="hdfs://192.168.71.111:9000/user/hadoop/when_you_old.txt";
FileSystem fs = null;
fs = FileSystem.get(URI.create(path),conf);
//打开文件
FSDataInputStream fsr = fs.open(new Path(path));
//创建缓冲流
BufferedReader reader = new BufferedReader(new InputStreamReader(fsr));
String lineTxt = null;
StringBuffer buffer = new StringBuffer();
//逐行读取文件内容
while ((lineTxt = reader.readLine()) != null) {
buffer.append(System.lineSeparator()+lineTxt);
}
//输出
System.out.println(buffer.toString());
reader.close();
fsr.close();
fs.close();
}
}
测试代码运行结果:
此题解答代码:
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class HdfsOperation {
public static void main(String [] args) throws IOException, URISyntaxException {
// 读取并统计各个单词出现次数的 top5
String fileRead = "hdfs://192.168.71.111:9000/user/hadoop/when_you_old.txt";
String statLine = ReadStatHDFS(fileRead, 5);
System.out.println(statLine);
// 将统计结果写回hdfs中的 top.txt 文件
String fileWrite = "hdfs://192.168.71.111:9000/user/hadoop/top.txt";
WriteToHDFS(fileWrite, statLine);
}
/**
* 读取指定文件并统计 top n 结果
* @param file 文件所在的URI
* @param top 指定top n的n值
* @return 返回表示统计结果的字符串
* @throws IOException
*/
public static String ReadStatHDFS(String file, Integer top) throws IOException {
// key存放单词 value存放其出现的次数
HashMap<String, Integer> hasWord = new HashMap<String, Integer>();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(file), conf);
// 创建需访问的路径
Path path = new Path(file);
// 打开文件
FSDataInputStream hdfsInStream = fs.open(path);
// 创建缓冲流
BufferedReader br = new BufferedReader(new InputStreamReader(hdfsInStream));
try {
// 逐行读取文件内容
String line = br.readLine();
while (line != null){
// 将大写全转换为小写 再用一些特定符号分离出单词
String[] arrLine = line.toLowerCase().trim().split(",|:|;|[.]|[?]|!| ");
// 循环处理一行中获得的单词
for (int i = 0; i < arrLine.length; i++) {
String word = arrLine[i].trim();
if(word == null || word.equals("")){
continue;
}
// 若尚无此单词 新建一个key-1对
if (!hasWord.containsKey(word)) {
hasWord.put(word, 1);
} else { //如果有,就在将次数加1
Integer nCounts = hasWord.get(word);
hasWord.put(word, nCounts + 1);
}
}
// 再读取一行以循环遍历完整个文本
line = br.readLine();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
br.close();
hdfsInStream.close();
fs.close();
}
// 排序
List<Map.Entry<String, Integer>> mapList = new ArrayList<Map.Entry<String, Integer>>(hasWord.entrySet());
Collections.sort(mapList, new Comparator<Map.Entry<String, Integer>>() {
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
return o2.getValue() - o1.getValue();
}
});
//排序后
String top_line = "";
for(int i = 0; i < Math.min(mapList.size(), top); i++) {
top_line = top_line + mapList.get(i).toString() + "\n";
}
return top_line;
}
/**
* 在指定位置新建一个文件,并写入字符
* @param file
* @param words
* @throws IOException
* @throws URISyntaxException
*/
public static void WriteToHDFS(String file, String words) throws IOException, URISyntaxException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(file), conf);
Path path = new Path(file);
FSDataOutputStream out = fs.create(path); //创建文件
out.write(words.getBytes("UTF-8"));
out.close();
}
}
解答代码运行结果:
项目查看下载:
代码拓展
trim() 去掉字符串首尾的空格
split() 完后返回一个数组
注:
已经配好 hadoop-eclipse-plugin 插件
远程连接调试配置参见:
远程连接调试Hadoop
作业拓展
将以上统计用 hdfs shell 实现
fs -cat /user/hadoop/when_you_old.txt | tr A-Z a-z | tr -s "\t|,| " "\n" | sort | uniq -c | sort -nr | head -5
hadoop fs -cat /user/hadoop/when_you_old.txt | tr '[A-Z]' '[a-z]' | awk 'BEGIN{RS="[,.:;/!?]"}{for(i=1;i<=NF;i++)array[$i]++;}END{for(i in array) print i,"=",array[i]}' | sort -k 3 -r -n | head -5
世界上所有的追求都是因为热爱
一枚爱编码 爱生活 爱分享的IT信徒
— hongXkeX