已迁移至掘金社区pika主从复制原理之工作流程
上一篇pika主从复制原理之binlog中介绍了主从复制binlog的元信息、日志的格式及对应的api,本篇介绍下主从复制有关的线程、全量复制过程、增量复制过程。
线程
PikaBinlogReceiverThread: 系统启动时初始化,占用端口port+1000,作为Slave接收Master同步过来的Redis命令。
PikaHeartbeatThread: 系统启动时初始化,占用端口port+2000,作为Master接收Slave发送的ping、spci指令,对所有Slave进行存活检测
PikaTrysyncThread: 系统启动时初始化,无角色,执行slaveof host port命令对应的后台任务包括定期检查是否需要跟某个Master建立连接、db替换、启动或关闭rsync任务等,当启动rsync服务时,占用端口port+3000。
PikaSlavepingThread: 成为某个Master的Slave后,作为Slave的一方初始化,定时向Master发送ping、spci指令,如果超时超过30秒,生成以后关闭Master的后台任务,由PikaBinlogReceiverThread来执行。
BinlogBGWorker: 系统启动时初始化,每个BinlogBGWorker包含一个binlogbg_thread,作为Slave的后台执行模块,接收PikaBinlogReceiverThread的调度、执行具体的redis命令。
PikaBinlogSenderThread: 成为某个Slave的Master后,作为Master的一方初始化,根据slave传过来的filenum、offset消费日志,并发送到PikaBinlogReceiverThread所在的服务端口。
类图
PikaBinlogReceiverThread
PikaBinlogSenderThread
主要API描述
Thread
//初始化
int Thread::InitHandle()
//执行线程内的定时任务
void Thread::CronHandle()
//创建线程,调用RunThread()
int Thread::StartThread()
//处理逻辑入口函数
void *Thread::RunThread(void *arg)
HolyThread
virtual int InitHandle()
1.初始化epoll监听事件,并将套接字加入server_fds
virtual void *ThreadMain()
1.调用CronHandle,检测是否存在需要执行的定时任务,如果有则执行。
2.如果server_fds上存在EPOLLIN事件,则接受连接请求,建立slave-master连接
3.如果是slave-master连接上存在EPOLLIN事件,则调用in_conn->GetRequest()进行具体redis命令处理。
4.如果是slave-master连接上存在EPOLLOUT事件,则调用in_conn->SendReply()发送应答。
PikaBinlogReceiverThread
//执行后台任务,PikaSlavepingThread检测到应该退出或者主挂掉时,添加Kill PikaBinlogSenderThread后台任务。
void PikaBinlogReceiverThread::CronHandle()
PikaMasterConn
int PikaMasterConn::DealMessage()
1.如果存在monitor,则将当前命令发到monitor一份(即redis的monitor命令)。
2.生成全局日志序号serial,保证按顺序并发写入slave binlog。
3.如果是只读,则由PikaBinlogReceiverThread完成binlog日志的追加,否则由PikaBinlogReceiverThread分配的BinlogBGWorker完成。
BinlogBGWorker
void Schedule()
1.第一次调度时启动后台线程。
2.通过DoBinlogBG执行具体的后台任务。
void BinlogBGWorker::DoBinlogBG(void* arg)
1.根据解析的参数获取需要执行的命令。
2.如果slave非只读,则获取一个全局记录锁(应该是用不到的)。
3.记录binlog。
4.执行命令。
5.记录慢日志。
Slaveof流程图
slave
master
slaveof过程
1.如果是slaveof no one,停止rsync服务,删除master,repl_state_ = PIKA_REPL_NO_CONNECT,role=master
2.如果slaveof ip port filenum offset,则将filenum之前的binlog删除,同时如果filenum存在,则将offset之前的文件内容填充空格。
3.role更新为slave,repl_state更新为PIKA_REPL_CONNECT,应答成功。
//后台任务部分
4.PikaTrysyncThread检测到repl_state==PIKA_REPL_CONNECT,在端口port+3000启动rsync服务,准备接受master的db内容。
5.slave建立与master的连接,如果有auth,则主动发送auth命令。
6.发送trysync命令,主动要求master进行数据同步。
7.如果master应答结果为kInnerReplWait,则repl_state_ = PIKA_REPL_WAIT_DBSYNC。
8.slave会一直等待,直到db_sync_path目录下存在info文件时,用新的db替换之前的db,根据info中的filenum、offset更新slave对应的filenum、offset,重置repl_state_ = PIKA_REPL_CONNECT
9.再次执行3-6步骤,master应答kInnerReplOk,更新repl_state_ = PIKA_REPL_CONNECTING,停掉rsync服务,创建PikaSlavepingThread,在PikaSlavepingThread ping master成功以后更新repl_state_ = PIKA_REPL_CONNECTED,主从同步正式建立。
trysync过程
1.根据slave的ip、port构造slave的唯一标识,stage=SLAVE_ITEM_STAGE_ONE。
2.如果master没有找到slave传过来的filenum,则执行bgsave,生成一份db镜像和info信息,通过rsync传给slave,并清理掉master的临时文件
3.根据slave的ip、port,bgsave的filenum、offset构建PikaBinlogSenderThread
4.过滤binlog中可能已经损坏的内容(什么场景可能会用到?可能主从数据不一致吗?)
5.更新role_ = PIKA_ROLE_MASTER,PikaSlavepingThread第二次存活检测时发送spci命令,主根据PikaBinlogSenderThread,更新stage=SLAVE_ITEM_STAGE_TWO,主从同步正式建立。
结语
个人认为主从复制是pika里面体量最大也是最复杂的一个模块,通过采用了类似LevelDB的WAL日志的方式,当然由于redis命令的问题,这个日志非幂等的,不是为了在启动时重放,而是解决了redis中增量复制buffer被打满的问题。