基础消息队列

在UNIX系统所提供的经典进程间通信机制(IPC):管道FIFO消息队列信号量以及共享储存。这些机制允许在同一台计算机上运行的进程可以相互通信。但是当考察到不同计算机(通过网络相连)的进程相互通信时就必须借助网络通信机制(network IPC),在分布式计算环境中,为了集成分布式应用,开发者需要对异构网络环境下的分布式应用提供有效的通信手段。为了管理需要共享的信息,对应用提供公共的信息交换机制是重要的。
设计分布式应用的方法主要有:

  1. 远程过程调用(PRC)--分布式计算环境(DCE)的基础标准成分之一;
  2. 对象事务监控(OTM)--基于CORBA的面向对象工业标准与事务处理(TP)监控技术的组合;
  3. 消息队列(MessageQueue)--构造分布式应用的松耦合方法;
  • 消息队列的API调用被嵌入到新的或现存的应用中,通过消息发送到内存或基于磁盘的队列或从它读出而提供信息交换。消息队列可用在应用中以执行多种功能,比如要求服务、交换信息或异步处理等。

项目目标:简单实现精简版消息中间件

项目任务:

  1. 作为msgsnd系列的扩展服务
  2. 使msg系列支持跨ip通信能力,而其他使用msg系列接口的应用无感知

项目分析:

  1. 某client向相应key值的消息队列发送消息
  2. 服务器在定时器的作用下每1000微秒,就读取自身管理的消息队列数据
  3. 读取数据根据消息的type值和服务器配置文件,来决定将消息发送到哪一个服务器上
  4. 当服务器收到远程发送来消息的时候,读取该消息的type值,判断该type是否需要继续转发,如果是本服务器接受的数据是就保留数据,否则就转发。

技术要点:

  • 进程间通信 - 消息队列
  • TCP socket
  • epoll的IO复用
  • 自定义通信协议

一、进程间通信 - 消息队列

消息队列的消息的链接表储存在内核中,有消息队列标示符标示。

1、有关消息队列的相关函数

  • int msgget(key_t key, int flag);
    msgget用于创建一个新队列打开一个现有队列
  • int msgsnd(int msqid, const void * ptr, size_t nbytes, int flag);
    msgsnd将新消息添加到队列尾端。每一个消息包含一个正的长整形类型的字段、一个非负的长度以及实际数据字节数,所有这些都在将消息添加到队列时,传送给msgsnd。
  • int msgctl(int msqid, int cmd, struct msqid_ds * buf);
    对队列执行cmd操作,例如:IPC_STAT(读取消息)、IPC_SET(设置消息)、IPC_RMID(删除消息)
  • ssize_t msgrcv(int msqid, void * ptr, size_t nbytes, long type,int flag);
    msgrcv用于从队列中取消息,我们不一定要已先进先出次序取消息,也可以按照消息的类型字段取消息

注意点
: 这里唯一值得提醒的是函数msgget中的key值,每个内核中的IPC(进程间通信)结构都用一个非负整数的标示符加以引用,只需知道其队列标示符。与文件描述符不同,IPC标示符不是小的整数。当IPC结构被创建,然后又被删除时,与这种结构相关的标示符连续加1,知道达到一个整型数的最大值,然后又转到0。每一个IPC对象都与一个键(key)现关联。

当我们启动这个服务后,肯定是通过读取配置文件来确定读取哪一个key值的消息队列,本项目使用的是libxml来解析的xml配置文件保存在一个存有配置文件结构体的数组里。通过配置文件我们可以获知消息的转发路径(和路由表相似)配置文件结构如下:

<?xml version="1.0" encoding="UTF-8"?>
<msgTypeToipInfo>
  <listenPort>8787</listenPort>         <!-- 本服务监听的端口号 -->
  <ip>10.81.12.240</ip>                 <!-- 本服务监听的IP地址 -->
  <alarmSeconds>1000</alarmSeconds>     <!-- 取队列的刷新时间 微秒-->
  <mesgqKey>10000</mesgqKey>            <!-- 本服务维护的消息队列的key值 -->
  <msg type="101">                      <!-- 消息key值为101时,发送ip为10.81.12.240 端口号为8787的服务器-->
    <ipAddr>10.81.12.240</ipAddr>
    <portNum>8787</portNum>
  </msg>
  <msg type="102">
    <ipAddr>10.81.12.240</ipAddr>
    <portNum>8787</portNum>
  </msg>
  ......
</msgTypeToipInfo>

在该配置文件中只监听了一台服务器,如有需监听多台服务器添加即可。。。

2、定时器刷新

函数alarm设置的定时器只能精确到秒,而以下函数理论上可以精确到微妙:

#include  <sys/select.h>
#include  <sys/itimer.h>
int getitimer(int which, struct itimerval *value);
int setitimer(int which, const struct itimerval *value, struct itimerval *ovalue);

函数setitimer可以提供三种定时器,它们相互独立,任意一个定时完成都将发送定时信号到进程,并且自动重新计时。参数which确定了定时器的类型,如表所示:

取值 含义 信号发送
ITIMER_REAL 定时真实时间,与alarm类型相同。 SIGALRM
ITIMER_VIRT 定时进程在用户态下的实际执行时间。 SIGVTALRM
ITIMER_PROF 定时进程在用户态和核心态下的实际执行时间 SIGPROF

这三种定时器定时完成时给进程发送的信号各不相同.

  • ITIMER_REAL类定时器发送SIGALRM信号,
  • ITIMER_VIRT类定时器发送SIGVTALRM信号,
  • ITIMER_REAL类定时器发送SIGPROF信号。

函数alarm本质上设置的是低精确、非重载的ITIMER_REAL类定时器,它只能精确到秒,并且每次设置只能产生一次定时。函数setitimer设置的定时器则不同,它们不但可以计时到微妙(理论上),还能自动循环定时。在一个Unix进程中,不能同时使用alarm和ITIMER_REAL类定时器。

//结构itimerval描述了定时器的组成:
struct itimerval
{
    struct tim.  it_interval;     /* 下次定时取值 */
    struct tim.  it_value;        /* 本次定时设置值 */
}
//结构tim.描述了一个精确到微妙的时间:
struct tim.
{
    long    tv_sec;                 /* 秒(1000000微秒) */
    long    tv_usec;                 /* 微妙 */
}

设置定时器代码,如下:

void NoMesgQue::setRefreshTime(double seconds)
{
    this->refreshTime = seconds;
    //1微秒=10的-6次方秒=0.000001秒
    struct itimerval value;
    value.it_value.tv_sec = 0;
    value.it_value.tv_usec = seconds;
    value.it_interval.tv_sec = 0; //val秒
    value.it_interval.tv_usec = seconds;
    signal(SIGALRM, timeReady);
    setitimer(ITIMER_REAL,&value,NULL);
}

设置void timeReady(int signo)为响应函数,读取队列中的数据

void timeReady(int signo)
{
    //在mesgQueue中取数据
    s_msg * rebuf = new s_msg();
    NoMesgQue * pp = NoMesgQue::getInstance();
    int length = sizeof(s_msg) - sizeof(long);
    while( msgrcv(pp->getMsgqid(), rebuf, length, 0, IPC_NOWAIT) > 0) 
    {
        cout << "Message : "<<rebuf->mtext  << " FromType:"<<rebuf->FromType << " toType:"<<rebuf->type << endl;

        int sockfd;           //sockfd socket对应的描述符
        if((pp->findConfigFileToRemote(rebuf->type,&sockfd) < 0 ) || sockfd < 0)
        {
            cout<<"Don't find Remote Info from Config or Remote is not connecet"<<endl;
        }
        else
        {
            //包装package信息
            s_msgPackage * package = new s_msgPackage();
            package->data = *rebuf;
            (package->packageHead).mesgLength = sizeof(*rebuf);

            //将包装好的package信息 放到epoll中write
            s_msgFdOfData * fdData = NoMesgQue::getInstance()->getMsgFdOfData();

            s_msgFdOfData * thisPP = NULL;
            for(int ii = 0; ii < number; ii++)
            {
                if (sockfd == fdData[ii].sockfd)
                {
                    thisPP = &fdData[ii];
                    break;
                }
            }

            thisPP->sockfd = sockfd;
            memcpy(thisPP->data,package,sizeof(*package));
            thisPP->size = sizeof(*package);
            cout << "数量 " << number <<"内容fd "<< thisPP->sockfd << endl;
        }

        printf("socketfd %d\n", sockfd);
    }
     signal(SIGALRM, timeReady);
}

在while中不断取队列中的数据,当msgrcv(int msqid, void * ptr, size_t nbytes, long type, int flag); 成功执行时,内核会更新与该消息队列相关联的msgid_ds结构,以指示调用者的进程ID(msg_lrpid)和调用时间(msg_rtime),并指示队列中的消息数减少1个。当取到数据后通过读取配置文件与消息type来获取与之相连的sockfd,通过epoll来实现IO复用进而发送数据。当队列中没有数据时,再一次注册信号SIGALRM,就OK了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,591评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,448评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,823评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,204评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,228评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,190评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,078评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,923评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,334评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,550评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,727评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,428评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,022评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,672评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,826评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,734评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,619评论 2 354

推荐阅读更多精彩内容