redis版本
redis 6.0+
关键点
- 配置修改。
- 有足够的IO并发压力。
一、配置修改
设置io-thread的值为想要的io线程数,设置io-threads-do-reads yes打开读事件处理的多线程。
二、造成足够的压力
2.1 开启并发写IO的条件参数
通过写IO的堆栈图依次寻找写IO的条件判断函数,找到handleClientsWithPendingWritesUsingThreads函数。
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
/* If I/O threads are disabled or we have few clients to serve, don't
* use I/O threads, but thejboring synchronous code. */
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
/* Start threads if needed. */
if (!io_threads_active) startThreadedIO()
// 多线程处理写数据
}
int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
/* Return ASAP if IO threads are disabled (single threaded mode). */
if (server.io_threads_num == 1) return 1;
if (pending < (server.io_threads_num*2)) {
if (io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
}
}
void stopThreadedIO(void) {
/* We may have still clients with pending reads when this function
* is called: handle them before stopping the threads. */
handleClientsWithPendingReadsUsingThreads();
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]);
io_threads_active = 0;
}
根据上面的源码,我们可以很清晰的看到,如果只开启了单线程或者要处理的写事件penging < server.io_threads_num * 2,io_threads_num是前面配置中设置的值,则不开启多线程处理模式,并且要关闭多线程处理。
PS:无论是否开启多线程IO,redis都是采用异步的方式发送数据,在命令处理过程中,将数据缓存在缓冲区,然后统一在handleClientsWithPendingWritesUsingThreads该函数中将所有client的缓存数据发送出去。
2.2 开启并发读事件的条件
开启并发读需要在读事件处理时,不直接处理而是将所有的读事件缓存在server.clients_pending_read中。
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;
}
int postponeClientRead(client *c) {
if (io_threads_active && // 在上面写事件处理过程中 如果忙则会被设置为开启
server.io_threads_do_reads && // 上面配置修改对应的字段的值
!ProcessingEventsWhileBlocked && // 默认值是0
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))) // 避免重复重发
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c); // 将client加入待处理队列
return 1;
} else {
return 0;
}
}
由上面的源码可知,如果写事件繁忙, 则设置io_threads_active = 1, 而且配置也配置了开启多线程读事件, 则在处理读事件时, 将client添加到server.clients_pending_read, 延后处理.
再来看多线程并行处理读事件的函数handleClientsWithPendingReadsUsingThreads.
int handleClientsWithPendingReadsUsingThreads(void) {
if (!io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
// 并行处理IO读事件
这里并行处理IO读事件的前提条件和上面将读事件延后处理的条件是一样的.
2.3 分析总结
- 首先必须修改配置, 开启多线程IO和设置开启多线程处理读事件.
- 写事件必须满足并发写的数量concurrent > 2 * io_threads_num, 才会开启多线程处理模式.
- 在并发写不满足上述条件的情况下, 写处理时会主动关闭多线程处理模式.
- 在写事件处理时, 开启了多线程模式的前提下, 才会开启多线程处理读事件.
- 多线程处理模式具有一定的弹性, 只有在读写繁忙时才会触发, 在不繁忙时主动关闭.
2.4 修改源码开启多线程处理模式
上面提到, 要开启多线程处理模式需要并发压力大于 2 * io_threads_num时才能触发, 对于断点调试多线程处理模式的源码是很不方便. 因此考虑调整源码, 强行开启多线程处理模式.
修改stopThreadedIOIfNeeded函数中的判断条件. 修改如下:
// 每次判断是否要关闭多线程模式时都选择否
int stopThreadedIOIfNeeded(void) {
return 0;
int pending = listLength(server.clients_pending_write);
/* Return ASAP if IO threads are disabled (single threaded mode). */
if (server.io_threads_num == 1) return 1;
if (pending <= 0)) {
if (io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
}
}
附上多线程下处理setCommond的堆栈图, 因为只有一个连接, 因此被分给了主线程(也是第一个IO线程)进行处理, 没有分配给其它IO线程.
总结
- 问: 为什么在单线程的情况下也选择在处理完所有事件后统一写数据?
答: 能提高整体的速度吗? 不能! 但是每个事件的耗时会相对平均, 如果处理一个事件就发送一个, 那么在前面的事件延迟总是更低, 但是后面的事件的延迟总是更高. - 问: 为什么是在写事件处理时检测繁忙状态, 然后选择性的开启多线程处理模式?
答: 还有一个可选方案是在apiPollEvent后, 获得所有触发的读事件, 然后根据读事件的数量选择是否开启多线程处理模式. - redis根据繁忙状态弹性的开启多线程处理模式, 这个做法非常值得借鉴, 并且开启多线程处理模式是在主线程触发, 避免了并发的可能性.