在UNIX系统所提供的经典进程间通信机制(IPC):管道、FIFO、消息队列、信号量以及共享储存。这些机制允许在同一台计算机上运行的进程可以相互通信。但是当考察到不同计算机(通过网络相连)的进程相互通信时就必须借助网络通信机制(network IPC),在分布式计算环境中,为了集成分布式应用,开发者需要对异构网络环境下的分布式应用提供有效的通信手段。为了管理需要共享的信息,对应用提供公共的信息交换机制是重要的。
设计分布式应用的方法主要有:
- 远程过程调用(PRC)--分布式计算环境(DCE)的基础标准成分之一;
- 对象事务监控(OTM)--基于CORBA的面向对象工业标准与事务处理(TP)监控技术的组合;
- 消息队列(MessageQueue)--构造分布式应用的松耦合方法;
- 消息队列的API调用被嵌入到新的或现存的应用中,通过消息发送到内存或基于磁盘的队列或从它读出而提供信息交换。消息队列可用在应用中以执行多种功能,比如要求服务、交换信息或异步处理等。
项目目标:简单实现精简版消息中间件
项目任务:
- 作为msgsnd系列的扩展服务
- 使msg系列支持跨ip通信能力,而其他使用msg系列接口的应用无感知
项目分析:
- 某client向相应key值的消息队列发送消息
- 服务器在定时器的作用下每1000微秒,就读取自身管理的消息队列数据
- 读取数据根据消息的type值和服务器配置文件,来决定将消息发送到哪一个服务器上
- 当服务器收到远程发送来消息的时候,读取该消息的type值,判断该type是否需要继续转发,如果是本服务器接受的数据是就保留数据,否则就转发。
技术要点:
- 进程间通信 - 消息队列
- TCP socket
- epoll的IO复用
- 自定义通信协议
一、进程间通信 - 消息队列
消息队列的消息的链接表,储存在内核中,有消息队列标示符标示。
1、有关消息队列的相关函数
-
int msgget(key_t key, int flag);
msgget用于创建一个新队列或打开一个现有队列。 -
int msgsnd(int msqid, const void * ptr, size_t nbytes, int flag);
msgsnd将新消息添加到队列尾端。每一个消息包含一个正的长整形类型的字段、一个非负的长度以及实际数据字节数,所有这些都在将消息添加到队列时,传送给msgsnd。 -
int msgctl(int msqid, int cmd, struct msqid_ds * buf);
对队列执行cmd操作,例如:IPC_STAT(读取消息)、IPC_SET(设置消息)、IPC_RMID(删除消息) -
ssize_t msgrcv(int msqid, void * ptr, size_t nbytes, long type,int flag);
msgrcv用于从队列中取消息,我们不一定要已先进先出次序取消息,也可以按照消息的类型字段取消息。
注意点
: 这里唯一值得提醒的是函数msgget
中的key值,每个内核中的IPC(进程间通信)结构都用一个非负整数的标示符加以引用,只需知道其队列标示符。与文件描述符不同,IPC标示符不是小的整数。当IPC结构被创建,然后又被删除时,与这种结构相关的标示符连续加1,知道达到一个整型数的最大值,然后又转到0。每一个IPC对象都与一个键(key)现关联。
当我们启动这个服务后,肯定是通过读取配置文件来确定读取哪一个key值的消息队列,本项目使用的是libxml来解析的xml配置文件保存在一个存有配置文件结构体的数组里。通过配置文件我们可以获知消息的转发路径(和路由表相似)配置文件结构如下:
<?xml version="1.0" encoding="UTF-8"?>
<msgTypeToipInfo>
<listenPort>8787</listenPort> <!-- 本服务监听的端口号 -->
<ip>10.81.12.240</ip> <!-- 本服务监听的IP地址 -->
<alarmSeconds>1000</alarmSeconds> <!-- 取队列的刷新时间 微秒-->
<mesgqKey>10000</mesgqKey> <!-- 本服务维护的消息队列的key值 -->
<msg type="101"> <!-- 消息key值为101时,发送ip为10.81.12.240 端口号为8787的服务器-->
<ipAddr>10.81.12.240</ipAddr>
<portNum>8787</portNum>
</msg>
<msg type="102">
<ipAddr>10.81.12.240</ipAddr>
<portNum>8787</portNum>
</msg>
......
</msgTypeToipInfo>
在该配置文件中只监听了一台服务器,如有需监听多台服务器添加即可。。。
2、定时器刷新
函数alarm设置的定时器只能精确到秒,而以下函数理论上可以精确到微妙:
#include <sys/select.h>
#include <sys/itimer.h>
int getitimer(int which, struct itimerval *value);
int setitimer(int which, const struct itimerval *value, struct itimerval *ovalue);
函数setitimer可以提供三种定时器,它们相互独立,任意一个定时完成都将发送定时信号到进程,并且自动重新计时。参数which确定了定时器的类型,如表所示:
取值 | 含义 | 信号发送 |
---|---|---|
ITIMER_REAL | 定时真实时间,与alarm类型相同。 | SIGALRM |
ITIMER_VIRT | 定时进程在用户态下的实际执行时间。 | SIGVTALRM |
ITIMER_PROF | 定时进程在用户态和核心态下的实际执行时间 | SIGPROF |
这三种定时器定时完成时给进程发送的信号各不相同.
- ITIMER_REAL类定时器发送SIGALRM信号,
- ITIMER_VIRT类定时器发送SIGVTALRM信号,
- ITIMER_REAL类定时器发送SIGPROF信号。
函数alarm本质上设置的是低精确、非重载的ITIMER_REAL类定时器,它只能精确到秒,并且每次设置只能产生一次定时。函数setitimer
设置的定时器则不同,它们不但可以计时到微妙(理论上),还能自动循环定时。在一个Unix进程中,不能同时使用alarm和ITIMER_REAL类定时器。
//结构itimerval描述了定时器的组成:
struct itimerval
{
struct tim. it_interval; /* 下次定时取值 */
struct tim. it_value; /* 本次定时设置值 */
}
//结构tim.描述了一个精确到微妙的时间:
struct tim.
{
long tv_sec; /* 秒(1000000微秒) */
long tv_usec; /* 微妙 */
}
设置定时器代码,如下:
void NoMesgQue::setRefreshTime(double seconds)
{
this->refreshTime = seconds;
//1微秒=10的-6次方秒=0.000001秒
struct itimerval value;
value.it_value.tv_sec = 0;
value.it_value.tv_usec = seconds;
value.it_interval.tv_sec = 0; //val秒
value.it_interval.tv_usec = seconds;
signal(SIGALRM, timeReady);
setitimer(ITIMER_REAL,&value,NULL);
}
设置void timeReady(int signo)
为响应函数,读取队列中的数据
void timeReady(int signo)
{
//在mesgQueue中取数据
s_msg * rebuf = new s_msg();
NoMesgQue * pp = NoMesgQue::getInstance();
int length = sizeof(s_msg) - sizeof(long);
while( msgrcv(pp->getMsgqid(), rebuf, length, 0, IPC_NOWAIT) > 0)
{
cout << "Message : "<<rebuf->mtext << " FromType:"<<rebuf->FromType << " toType:"<<rebuf->type << endl;
int sockfd; //sockfd socket对应的描述符
if((pp->findConfigFileToRemote(rebuf->type,&sockfd) < 0 ) || sockfd < 0)
{
cout<<"Don't find Remote Info from Config or Remote is not connecet"<<endl;
}
else
{
//包装package信息
s_msgPackage * package = new s_msgPackage();
package->data = *rebuf;
(package->packageHead).mesgLength = sizeof(*rebuf);
//将包装好的package信息 放到epoll中write
s_msgFdOfData * fdData = NoMesgQue::getInstance()->getMsgFdOfData();
s_msgFdOfData * thisPP = NULL;
for(int ii = 0; ii < number; ii++)
{
if (sockfd == fdData[ii].sockfd)
{
thisPP = &fdData[ii];
break;
}
}
thisPP->sockfd = sockfd;
memcpy(thisPP->data,package,sizeof(*package));
thisPP->size = sizeof(*package);
cout << "数量 " << number <<"内容fd "<< thisPP->sockfd << endl;
}
printf("socketfd %d\n", sockfd);
}
signal(SIGALRM, timeReady);
}
在while中不断取队列中的数据,当msgrcv(int msqid, void * ptr, size_t nbytes, long type, int flag);
成功执行时,内核会更新与该消息队列相关联的msgid_ds结构,以指示调用者的进程ID(msg_lrpid)和调用时间(msg_rtime),并指示队列中的消息数减少1个。当取到数据后通过读取配置文件与消息type来获取与之相连的sockfd,通过epoll来实现IO复用进而发送数据。当队列中没有数据时,再一次注册信号SIGALRM,就OK了。