最近一直在思考一个问题:关于海量数据,如果不借助Hadoop/MapReduce模型该如何处理呢?首先,我们可以先来了解一下MapReduce模型,MapReduce是一个高度抽象易用编程的模型,它是在总结大量应用的共同特点的基础上抽象出来的分布式计算框架,在其编程模型中,通过Map和Reduce俩大组件将任务分解成互相独立的子任务,然后对每个子任务进行处理,最后将处理的结果汇总输出。从上面的描述我们可以看出MapReduce采取的是"分而治之"的思想,那么面对海量数据处理时我们是否也可以借鉴这种思想,答案是肯定的。下面我们借助具体案例来分析。
案列:海量日志访问数据,提取出访问次数Top10的访问地址信息列表
首先我们对问题进行分析:
- IP是32位的,地址最多有2^32=4G种取值情况
- 海量数据超过单台机器处理能力,不能一次将数据全部加载到内存,我们可以将IP地址Hash(IP)/1024到1024个小文件中,每个小文件存放部分IP地址;
- 对于每一个小文件,可以构建一个IP为key,出现次数为value的HashMap,同时记录当前出现次数最多的那个IP地址;
- 可以得到1024个小文件中的出现次数最多的IP,再依据常规的排序算法得到总体上出现次数最多Top10的IP即就是所求IP
- pom.xml依赖
1.pom.xml依赖
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>
- 具体算法实现
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.collect.FluentIterable;
import com.google.common.io.ByteSink;
import com.google.common.io.Files;
import com.sun.istack.internal.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static lombok.Lombok.checkNotNull;
/**
* Created by edwin on 2019/4/29.
* eg:海量日志数据,提取出访问前N次的IP信息
* <p>
* IP是32位的,地址最多有2^32=4G种取值情况,不能完全加载到内存中处理;
* 采用"分而治之"的思想,按照IP地址的Hash(IP)/1024值,把海量IP日志分别切割存储到1024个小文件中,每个小文件最多包含4MB个IP地址;
* 对于每一个小文件,可以构建一个IP为key,出现次数为value的HashMap,同时记录当前出现次数最多的那个IP地址;
* 可以得到1024个小文件中的出现次数最多的IP,再依据常规的排序算法得到总体上出现次数最多TopN的IP;
*
* @author edwin
*/
@Slf4j
public class SimpleTopN {
/***
* 保存每个文件的ByteSink对象
*/
private final Map<Integer, ByteSink> bufferedMap = new HashMap<Integer, ByteSink>();
/***
* 分隔文件-缓存每个小文件存放对象
*/
private final Map<Integer, List<String>> dataMap = new HashMap<Integer, List<String>>();
/***
* 切割文件
* 将源大文件切割成小文件,然后将值Hash到对应的小文件里
* @param sourceFile 源文件
* @param dataShardingPath 小文件分片路径
* @param dataSharding 分片数量
* @throws Exception
*/
public void splitSharding(File sourceFile, String dataShardingPath, int dataSharding) throws Exception {
checkNotNull(sourceFile, "sourceFile must not be null.");
checkNotNull(dataShardingPath, "dataShardingPath must not be null.");
checkNotNull(dataSharding, "dataSharding must not be null.");
Stopwatch stopwatch = Stopwatch.createStarted();
//创建小文件
for (int i = 0; i < dataSharding; i++) {
File file = new File(dataShardingPath + "shard_" + i + ".txt");
if (!file.exists()) {
file.createNewFile();
}
bufferedMap.put(i, Files.asByteSink(file));
dataMap.put(i, new LinkedList<String>());
}
//读取源文件
//readBigDataFileByGuava(sourceFile,dataSharding);
//读取源文件
readBigDataFileByCommonsIO(sourceFile, dataSharding);
long costTimes = stopwatch.elapsed(TimeUnit.MILLISECONDS);
log.info("sharding file finish, total cost time:{} ms.", costTimes);
}
/***
* Guava readLines 方式读取文件
* <p>需全量读入内存,如果数据文件过大会造成内存溢出OutOfMemoryError</p>
* @param sourceFile
* @param dataSharding
*/
private void readBigDataFileByGuava(File sourceFile, int dataSharding) {
try {
List<String> readLines = Files.readLines(sourceFile, Charsets.UTF_8);
for (String ip : readLines) {
//按照IP地址的Hash(IPNode)%1024值,把整个大文件映射为1024个小文件
int fileIndex = hashCode(ip) % dataSharding;
List<String> list = dataMap.get(fileIndex);
list.add(ip + "\n");
if (list.size() % 1000 == 0) {
//将数据写入文件
ByteSink byteSink = bufferedMap.get(fileIndex);
byteSink.write(Joiner.on(" ").join(list.toArray()).getBytes());
}
}
} catch (Exception e) {
log.error("read(Guava readLines) file exception,msg:{}", e.getMessage(), e);
}
}
/****
* Apache Commons IO方式读取文件
* <p>非全量读入内存,资源消耗小</p>
* @param sourceFile 源文件
* @param dataSharding 分片数
*/
private void readBigDataFileByCommonsIO(File sourceFile, int dataSharding) {
LineIterator it = null;
try {
//使用Apache Commons 自定义LineIterator处理IO流
it = FileUtils.lineIterator(sourceFile, "UTF-8");
while (it.hasNext()) {
//读取每一行数据
String ip = it.nextLine();
int fileIndex = hashCode(ip) % dataSharding;
List<String> list = dataMap.get(fileIndex);
list.add(ip + "\n");
if (list.size() % 1000 == 0) {
//将数据写入文件
ByteSink byteSink = bufferedMap.get(fileIndex);
byteSink.write(Joiner.on(" ").join(list.toArray()).getBytes());
}
}
} catch (Exception e) {
log.error("read(Apache Commons IO) file exception,msg:{}", e.getMessage(), e);
} finally {
LineIterator.closeQuietly(it);
}
}
/***
* 分析数据
* @param dataShardingPath 数据文件目录
* @param topNumber 访问前TopN值
* @return
* @throws Exception
*/
private List<Map.Entry<String, Integer>> analysis(String dataShardingPath, int topNumber) throws Exception {
checkNotNull(dataShardingPath, "dataShardingPath must not be null.");
Stopwatch stopwatch = Stopwatch.createStarted();
File shardingFile = new File(dataShardingPath);
//获取Path下所有子目录
//Iterable<File> childrens = Files.fileTreeTraverser().children(shardingFile);
//获取Path目录下所有目录包含 preOrderTraversal(前序遍历) postOrderTraversal(后序遍历) breadthFirstTraversal(广度优先)
FluentIterable<File> childrens = Files.fileTreeTraverser().breadthFirstTraversal(shardingFile).filter(new Predicate<File>() {
@Override
public boolean apply(@Nullable File file) {
//过滤analysis目录
return !file.getName().equals("analysis");
}
});
log.info("scan sharding directory:{}, file total : {}", dataShardingPath, childrens.size());
//存放每个小文件访问最多次数IP集合
Map<String, Integer> collectMap = new HashMap<String, Integer>();
for (File file : childrens) {
//临时存放当前文件所有ip
Map<String, Integer> tempMap = new HashMap<String, Integer>();
List<String> readLines = Files.readLines(file, Charsets.UTF_8);
for (String ip : readLines) {
ip = ip.replaceAll("\r|\n", "").trim();
if (tempMap.containsKey(ip)) {
tempMap.put(ip, tempMap.get(ip) + 1);
} else {
tempMap.put(ip, 1);
}
}
//Collectors.toMap 直接返回排好序的map
tempMap = tempMap.entrySet().stream()
.sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
.collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue(), (x1, x2) -> x2, LinkedHashMap<String, Integer>::new));
//获取分片访问次数最多的IP,并将其汇总到集合
Map.Entry<String, Integer> entry = tempMap.entrySet().iterator().next();
collectMap.put(entry.getKey(), entry.getValue());
}
//将Map转换为List
List<Map.Entry<String, Integer>> list = new ArrayList<Map.Entry<String, Integer>>(collectMap.entrySet());
//倒序排列
Collections.sort(list, (o1, o2) -> o2.getValue().compareTo(o1.getValue()));
//取出TopN的IP信息
List<Map.Entry<String, Integer>> limitList = list.stream().limit(topNumber).collect(Collectors.toList());
long costTimes = stopwatch.elapsed(TimeUnit.MILLISECONDS);
log.info("analysis file finish, total cost time:{} ms.", costTimes);
return limitList;
}
/***
* Hash算法
* @param key
* @return
*/
private int hashCode(String key) {
int hash;
int i;
for (hash = 0, i = 0; i < key.length(); ++i) {
hash += key.charAt(i);
hash += (hash << 10);
hash ^= (hash >> 6);
}
hash += (hash << 3);
hash ^= (hash >> 11);
hash += (hash << 15);
return Math.abs(hash);
}
/***
* Hash算法2
* @param key
* @return
*/
private final int hashCode2(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
public void generateIpFiles(String filePathName, int ipCount) {
try {
Stopwatch stopwatch = Stopwatch.createStarted();
File file = new File(filePathName);
file.createNewFile();
StringBuffer ipAddress = new StringBuffer();
for (int i = 0; i < ipCount; i++) {
//文件追加多次I/O比较慢有性能问题,这里将每次生成的ip地址buffer起来,再一次写入文件
//根据自己的机器配置及需要生成的ip数量选择是否需要buffer,如果数据量过大会产生java.lang.OutOfMemoryError
//Files.append(generateRandomIp()+"\n", file, Charsets.UTF_8);
ipAddress.append(generateRandomIp() + "\n");
}
Files.write(ipAddress.toString(), file, Charsets.UTF_8);
long time = stopwatch.elapsed(TimeUnit.MILLISECONDS);
log.info("Generate ip finish, ip count:{} , total cost time:{} ms.", ipCount, time);
} catch (IOException e) {
e.printStackTrace();
}
}
/***
* 生成一个随机IP
* Tips:
* IP范围,IP地址是一个32位的二进制数,通常被分割为4个"8位二进制数"(也就是4个字节).
* IP地址通常用"点分十进制"表示成(a.b.c.d)的形式,其中,a,b,c,d都是0~255之间的十进制整数。
* 例:点分十进IP地址(100.4.5.6),实际上是32位二进制数(11000000.10100111.00010111.00111000)
* @return
*/
private String generateRandomIp() {
int[][] range = {{607649792, 608174079}, // 36.56.0.0-36.63.255.255
{1038614528, 1039007743}, // 61.232.0.0-61.237.255.255
{1783627776, 1784676351}, // 106.80.0.0-106.95.255.255
{2035023872, 2035154943}, // 121.76.0.0-121.77.255.255
{2078801920, 2079064063}, // 123.232.0.0-123.235.255.255
{-1950089216, -1948778497}, // 139.196.0.0-139.215.255.255
{-1425539072, -1425014785}, // 171.8.0.0-171.15.255.255
{-1236271104, -1235419137}, // 182.80.0.0-182.92.255.255
{-770113536, -768606209}, // 210.25.0.0-210.47.255.255
{-569376768, -564133889}, // 222.16.0.0-222.95.255.255
};
Random random = new Random();
int index = random.nextInt(10);
String ip = convert2IpAddress(range[index][0] + new Random().nextInt(range[index][1] - range[index][0]));
return ip;
}
/***
* 将十进制转换成IP地址
* @param ip
* @return
*/
private String convert2IpAddress(int ip) {
int[] ipArray = new int[4];
ipArray[0] = (int) ((ip >> 24) & 0xff);
ipArray[1] = (int) ((ip >> 16) & 0xff);
ipArray[2] = (int) ((ip >> 8) & 0xff);
ipArray[3] = (int) (ip & 0xff);
String realIp = Integer.toString(ipArray[0]) + "." + Integer.toString(ipArray[1]) + "." + Integer.toString(ipArray[2]) + "." + Integer.toString(ipArray[3]);
return realIp;
}
- 测试示例
public static void main(String[] args) throws Exception {
SimpleTopN simpleTopN = new SimpleTopN();
//IP文件切割分片子目录
final String shardingFilePath = "/Users/edwin/smart/home/data/ip/analysis/";
//生成IP文件目录
final String filePathName = "/Users/edwin/smart/home/data/ip/ipAddress.txt";
//生成IP文件
simpleTopN.generateIpFiles(filePathName, 10000000);
//切割文件1024个Sharding
simpleTopN.splitSharding(new File(filePathName), shardingFilePath, 1024);
//获取访问Top10的IP信息
List<Map.Entry<String, Integer>> topList = simpleTopN.analysis(shardingFilePath, 10);
for (Map.Entry<String, Integer> list : topList) {
System.out.println("IP:" + list.getKey() + " ,Count:" + list.getValue());
}
}
面对海量数据我们一般都会有如下几种处理思路,但其本质都是将数据分而治之/hash映射 + hash统计 + 堆/快速/归并排序,如何取舍根据具体的业务场景而定.
- Bloom filter/Bitmap;
- Trie树/数据库/倒排索引;
- 外排序;
- 分布式处理之hadoop/mapreduce