随着复赛今天截止,为期两个月的挑战赛也终于结束了.这两个月里很大一部分时间花在这上面,有过欢乐,也有为分数刷不上去而发愁.作为第一次参加比赛,对比赛结果还算是满意吧.而在这个过程中,对多线程知识,netty,nio等知识的深入认识.下面是对比赛的总结和思考.排名如下
初赛:《Service Mesh Agent for Apache Dubbo (Incubating) 》
- 赛题的思考
题目看起来是让我们实现一个rpc agent.因为官方已经给出了consumer和provider,选手就是要实现两个代理,第一个代理是consumer-agent,负责把consumer的调用通过自定义协议发动给Provider-agent.第二个代理就是provider-agent.他的任务就是接收Consumer-agent通过网络发动过来的消息,然后通过dubbo调用provider.最后把结果返回给consumer-agent.整个系统的调用图如下:
- 设计和实现.
整个调用过程如下所示:
- ①处这里采用Netty Http应用作为服务端,处理Consumer发送过来的http请求.
- ②处这里就是在Consumer-agent开启Netty Client,Provider-agent端开启Netty Server进行请求和响应.
- ③ Provider-agent通Netty Client去调用Provider的服务.
- ④ Provider把结果返回给Consumer-agent.
- ⑤ Consumer-agent把结果封装成HttpResponse返回给客户端.
Provider提供的服务如下:
public interface IHelloService {
/**
* 计算传入参数的哈希值.
*
* @param str 随机字符串
* @return 该字符串的哈希值
*/
int hash(String str);
}
整个代码我放在github中,这里不对整个代码做分析,只分析出关键的点.
负载均衡
如下图,3个provider的负载能力如下,那么我们可以选择负载均衡算法的时候,把这个考虑进去.我选择是随机加权算法.根据大家的一致认同,small:meddium:large = 1:2:2.
所有的服务都运行在docker环境中,而用的etcd作为服务发现的组件.事先并不知道那台机器是small,large,meddium.那么我们可以考虑把参数加上启动参数.一旦服务启动,这些信息,都会注册到etcd中.然后取出来,做相应的判断就行.
在etcd做服务发现的时候,把型号信息转换成比例注册上去
//small 1; meddium和large是2.
if(val.equals("small")) {
endpoints.add(new Endpoint(host, port, 1));
}else{
endpoints.add(new Endpoint(host, port, 2));
}
Consumer在选择那个Provider的时候就可以根据以上的信息,轮询选择一个.
//向endpoints加入5个实例,small一个,meddium和large都是2个.
if (null == endpoints) {
synchronized (ConsumerAgentHttpServerHandler.class) {
if (null == endpoints) {
endpoints = RegistryInstance.getInstance().find("com.alibaba.dubbo.performance.demo.provider.IHelloService");
ListIterator<Endpoint> it = endpoints.listIterator();
while (it.hasNext()){
Endpoint temp = it.next();
if(temp.getSize()==2) {
it.add(temp);
}
}
}
}
}
int id = count.getAndIncrement();
if(id>=4){
count.set(0);
id=4;
}
// 简单的负载均衡,随机取一个
Endpoint endpoint = endpoints.get(id);
这样一个随机加权的算法就实现了.
EventLoop复用
当我们创建Provident-agent的时候,我们是否可以考虑Eventloop的复用,这样每个请求从接收到发动都是用同一个线程处理的,没有上下文切换.另外一个,这样做好处,把channel和Eventloop绑定起来,也就限定了channel的个数,相当于做了一个channel的缓存(因为channel的数量得控制).一举两得.
private void providerServerStart(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup(4);
putMap(workerGroup);
try {
ServerBootstrap sbs = new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ProviderAgentHttpServerChannelInitializer());
LOGGER.info("provider netty server start");
ChannelFuture future = sbs.bind(port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
//预先把channel设置好,复用上面的eventloop.
public void putMap(EventLoopGroup group) {
for (EventExecutor executor : group) {
try {
map.put((EventLoop) executor, connecManager.getChannel( (EventLoop) executor));
} catch (Exception e) {
e.printStackTrace();
}
}
}
回调的设计
当Provider返回给结果后,那我们应该如何把结果返回给Consumer-agent呢,也就是它如何记住之前的通道.这里采用的是一个回调的设计.这样就能够记住上下文,也就是记住过来时候的ChannelHandlerContext
,通过这个把结果返回回去.
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
Map<String, String> data = HttpParser.parse(msg);
handle(new RequestWrapper(data.get("interface"),
data.get("method"),
data.get("parameterTypesString"),
data.get("parameter")), (result) -> {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(result.getBytes()));
response.headers().set(CONTENT_TYPE, "text/plain");
response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
ctx.write(response);
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
},ctx.channel().eventLoop());
}
当结果返回后,通过回调调用回调函数的逻辑
//拿到结果后,回调
public void done(RpcResponse response){
this.response = response;
sender.accept(new String(response.getBytes()).trim());
}
- 反思和思考
可以批量flush,批量decode(来源于朋友徐靖峰的思想)
Netty 提供了一个方便的解码工具类 ByteToMessageDecoder ,如图上半部分所示,这个类具备 accumulate 批量解包能力,可以尽可能的从 socket 里读取字节,然后同步调用 decode 方法,解码出业务对象,并组成一个 List 。最后再循环遍历该 List ,依次提交到 ChannelPipeline 进行处理。此处我们做了一个细小的改动,如图下半部分所示,即将提交的内容从单个 command ,改为整个 List 一起提交,如此能减少 pipeline 的执行次数,同时提升吞吐量。这个模式在低并发场景,并没有什么优势,而在高并发场景下对提升吞吐量有不小的性能提升。
负载均衡
上面我的做法有点硬编码的意思,而且随机的话,而且不确定性有点大.那是是否可以考虑根据调用的次数来做负载均衡,也就是说,给句每个Provider请求的次数,尽量把请求分给请求量少的Provider,当然这个量还是得加权.实现的复杂性有点高.
限流
经过朋友提醒,是否可以尝试下,限流,也就是说不放那么多请求进取,只通过一部分来请求,待完成之后,再放另外一部分,这个可以尝试用令牌桶来实现.处于理论阶段,没实际尝试过.
编码
我做的处理里面都是采用的jdk自带的编码方式.如果采用kryo,protobuf的方式,性能上也会有一定的提升.
我的代码:https://github.com/maskwang520/springforall.git
复赛:实现一个进程内的队列引擎,单机可支持100万队列以上,能够承受2亿消息的存取.
- 赛题的思考
题目要求有5个:
1.各个阶段线程数在20~30左右
2.发送阶段:消息大小在50字节左右,消息条数在20亿条左右,也即发送总数据在100G左右
3.索引校验阶段:会对所有队列的索引进行随机校验;平均每个队列会校验1~2次;
4.顺序消费阶段:挑选20%的队列进行全部读取和校验;
5.发送阶段最大耗时不能超过1800s;索引校验阶段和顺序消费阶段加在一起,最大耗时也不能超过1800s;超时会被判断为评测失败。
100万个queue,20亿消息,如果放内存是完全不现实的,内存肯定会爆.接下来自然想到把消息存放到文件中,内存中只放索引就行.但是内存存放索引,是20亿消息的消息,索引自然是由(消息起始位置+长度)构成.但是这样的Map<queue,Index>存放的索引有20亿,疯狂的FullGc是不可避免的,Full Gc一多,Tps自然上不去.后来想到,消息按块存储(多个消息存在一个块中),索引的时候按块索引.这样就能把Map里面存的只有100万(queue的个数),示意图如下:
Block的设计
public class Block {
//开始位置
public long startPosition;
//长度
public int length;
//Block中已经存放的消息的条数
public int size;
public Block(Long startPosition, int length) {
this.startPosition = startPosition;
this.length = length;
this.size = 0;
}
}
因为一个queue中可能有多个Block,在消息检索的时候给出的是在队列中的偏移量,那么size这个域方便后面消息检索的时候判断在哪个block中.
消息缓存的设计
因为每当来一个消息都要flush到文件中去,这样Io的时间就太多了,题目的关键点在于如何减少Io的时间.所以可以采用消息的缓存来处理.每当来一个消息,就放入缓存中,当缓存中超过10次消息的时候,就同步写入到文件中去.这样的话,相当于每10次写,才做一次Io.
public class DataCache {
//消息缓存
public ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
public int count;
}
这里将缓存的大小设置为1024Byte,当然你也可以设置成更大.这里有个小Tips.缓存的消息最好设置成Block的大小.这样当缓存满了之后,就可以直接写入到一个Block块中,而不用接着上一个Block写(上面一个Block写),这样设计,写入更简单,每次flush到文件的时候,只要新开辟一个新的Block,而不用管之前的Block.
//以块为索引,一个队列可能有多个块,且块的写入有顺序,所有用List来存Block.
public Map<String, List<Block>> blockMap = new ConcurrentHashMap<>();
public Map<String, DataCache> cacheMap = new ConcurrentHashMap<>();
消息的存储
因为不可能每个队列的消息都用一个文件来存放,所以这里用hash来把文件限定在32个.一个queue的Block必须在一个文件里面.不同queue的Block可以在一个文件里面.
//根据队列的名字hash到对应的文件中,共32个文件
int hashFile(String queueName) {
return queueName.hashCode() & 0x1f;
//return 0;
}
还存在一个问题就是,往一个文件中写入消息的时候,什么位置写,因为按块写.所以已经写过的块不能用.只能从新开辟一个块,块与块之间尽可能紧凑.
//block的大小为1024,根据当前文件已经存在的写的位置,找到下一个比该位置大的,且是1024的倍数
public long getLeastBlockPosition(long length) {
if (length == 0) {
return 0;
}
int initSize = 1 << 10;
int i = 1;
while (i * initSize <= length) {
i++;
}
//定义到可用的块的第一个位置
return i * initSize;
}
消息存放
这里采用的是原生的filechannel
去读写.本打算用mmap去写的,经过一位朋友提醒,mmap在这个场景下不合适.原因是不是长期读写,写完就释放,不是长期的.
public void put(String queueName, byte[] message) {
int hash = hashFile(queueName);
String path = DIRPATH + hash + ".txt";
lock.lock();
//创建文件
File file = new File(path);
if (!file.exists()) {
try {
file.createNewFile();
} catch (Exception e) {
e.printStackTrace();
}
}
if (!blockMap.containsKey(queueName)) {
List<Block> list = new ArrayList();
blockMap.put(queueName, list);
}
if (!cacheMap.containsKey(queueName)) {
DataCache dataCache = new DataCache();
cacheMap.put(queueName, dataCache);
}
DataCache dataCache = cacheMap.get(queueName);
//每10次flush到文件中
if (dataCache.count == 10) {
FileChannel fileChannel = null;
// long fileLength = 0;
try {
fileChannel = new RandomAccessFile(file, "rw").getChannel();
//fileLength = raf.length();
} catch (Exception e) {
e.printStackTrace();
}
long blockPosition;
try {
blockPosition = getLeastBlockPosition(getLeastBlockPosition(fileChannel.size()));
Block block = new Block(blockPosition, dataCache.dataBuffer.position());
block.size = 10;
blockMap.get(queueName).add(block);
dataCache.dataBuffer.flip();
fileChannel.position(blockPosition);
fileChannel.write(dataCache.dataBuffer);
dataCache.dataBuffer.clear();
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
fileChannel.close();
}catch (Exception e){
e.printStackTrace();
}
}
} else {
//放入缓存中
dataCache.dataBuffer.putInt(message.length);
dataCache.dataBuffer.put(message);
dataCache.count++;
}
lock.unlock();
}
消息获取
消息获取的思路是根据队列名,找到该队列对应的List<Block>,然后根据偏移量,找到属于哪个block.找到具体的Block后,然后遍历Block,找到偏移量的开始位置,取相应数量的消息即可.
public Collection<byte[]> get(String queueName, long offset, long num) {
//队列不存在
if (!blockMap.containsKey(queueName)) {
return EMPTY;
}
//消息集合
List<byte[]> msgs = new ArrayList();
List<Block> blocks = blockMap.get(queueName);
int hash = hashFile(queueName);
String path = DIRPATH + hash + ".txt";
FileChannel fileChannel = null;
int size = blocks.get(0).size;
int eleNum = 0;
//记录了目标block所在的下标
int blockNum = 0;
lock.lock();
try {
fileChannel = new RandomAccessFile(new File(path), "rw").getChannel();
for (int i = 1; i < blocks.size() && size < offset; i++, blockNum++) {
size += blocks.get(i).size;
}
size = size - blocks.get(blockNum).size;
for (int i = blockNum; i < blocks.size(); i++) {
//size+=blocks.get(i).size;
// size-=blocks.get(i).size;
int length = blocks.get(i).length;
MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, blocks.get(i).startPosition, length);
int sum = 0;
while (sum < length && size < offset) {
int len = buffer.getInt();
sum += 4;
sum += len;
buffer.position(sum);
size++;
}
if (size >= offset) {
while (buffer.position() < length && eleNum <= num) {
int len = buffer.getInt();
byte[] temp = new byte[len];
buffer.get(temp, 0, len);
eleNum++;
msgs.add(temp);
}
if (eleNum > num) {
break;
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
fileChannel.close();
} catch (Exception e) {
e.printStackTrace();
}
lock.unlock();
}
return msgs;
}
思考
- 将所有的ByteBuf池化,包括缓存的那部分ByteBuf.通过ThreadLocal,将ByteBuf与线程绑定起来,后面申请Buffer,直接从对应的线程里面去申请即可.
- 在写入的时候,可以不同步写,实现异步写.由一个线程去异步flush到文件里面
- 当读取消息块达到临界点的时候,由单线程申请buffer资源来预读后面的消息块存入,并缓存.
我的代码:https://github.com/maskwang520/messagequeue.git