消息传递
QNX是个微内核结构的操作系统,靠的是进程间通讯来实现整个系统功能的。那么具体到写一个程序的时候,到底这个通讯是如何完成的呢?这章就是具体介绐最底层的消息传递API的。消息传递是通过内核进行的,所以所谓的API,实际也就是最底层的内核调用了。需要指出的是,真正在QNX上写程序的时候,很少会直接用到这些API,而是利用更高层的API,不过,知道这些底层的API对于将来理解建立在这些API上的界面,应该会有帮助的。
频道(Channel)与连接(Connect)
消息传递是基于服务器与客户端的模式来进行的,那么客户端怎样才能与服务器端通讯呢?最简单的,当然是指定对方的进程号。要发送的一方,将消息加一个头,告诉内核“把这个消息发给 pid 12345"就行了。其实这也是QNX4时候的做法。但QNX6开始完整支持POSIX线程后,这种方法似乎就不太适合了。如果服务器,有两个线程,分别进行不同的服务,那该怎么办呢?或者你会说“把这个消息发给pid 12345 tid 3"就行了。可是,如果某一个服务,不是由单一线程来进行服务的,而是有一组线程进行的,那又怎么办呢?
为此,QNX6抽象出了”频道“(Channel)这个概念。一个频道,就是一个服务的入口;至于这个频道到底具体有多少线程为其服务,那都是服务器端自己的事情。
一个服务器如果有多个服务,它也可以开多个频道。而客户端,在向“频道”发送消息前,需要先建立连接(Connection),然后将消息在连接上发出去。这样同一个客户端,如果需要,可以与同一个频道建立多个连接。所以,大致上通讯的准备过程是这样的:
服务代码:
ChannelId = ChannelCreate(Flags);
客户端代码:
ConnectionId = ConnectAttach(Node, Pid, Chid, Index, Flag);
服务器端就不用解释了,客户端要建立连接的话,它需要Node,这个就是机器号。如果过网络(透明分布处理)时这个值决定了哪一台机器;如果客户端与服务器在同一台机器里时,这个数字是0,或者说ND_LOCAL_NODE;pid是服备器的进程号;而chid就是服务器调用 ChannelCreate()后得到的频道号了。Index与Flag以后再讨论。基本上客户端就是同"Node这台机器里的,Pid这个进程的,Chid频道"做一个连接。有了连接以后,就可以进行消息传递了。
连接的终止是ConnectDetach(),而频道的结束则是ChannelDestroy()了。不过,一般服务器都是长久存在的,不大有需要ChannelDestroy()的时候。
发送(Send),接收(Receive)和应答(Reply)
QNX的消息传递,与我们传统常见的进程间通讯最大的不同,就是这是一个"同步的"消息传递。一个消息传递,都要经过发送,接收和应答三个部份,所谓的 SRR过程。具体来说,客户端在连接上"发送"消息,一旦发送,客户端会被阻塞,服务器端会接收到消息,进行处理,最后,将处理结果"应答"给客户端;只有服务器"应答"了以后,客户端的阻塞状态才会被解除。这种同步的过程,不但保证的客户端与服务器端的时序,也大大简化了编程。具体用API来说,就是这样。
服务代码:
ReceiveId = MsgReceive(ChannelId, ReceiveBuffer, ReceiveBufLength, &MsgInfo);
(... 检查Buffer里的消息进行处理 ...)
MsgReply(RceeiveId, ReplyStatus, ReplyBuf, ReplyLen);
客户端代码:
MsgSend(ConnectionId, SendBuf, SendLen, ReplyBuf, ReplyLen);
(... 由OS将这个线程挂起 ...)
(... 当服务器MsgReply()后,OS 解除线程的阻塞状态, 客户端可以检查自己的 ReceiveBuf 看看应答结果 ...)
服务器端在频道上进行接收,处理完后应答;客户端则是在连接上发送,要注意在发送的同时,客户端还提供了接收应答用的缓冲。如果你细心的话,或许你会问,服务器端的MsgReceive()与客户端的MsgSend()没有同步,会不会有问题呢?比如,如果MsgSend()时,服务器没有在 MsgReceive(),会出什么事呢?答案是OS依然会把发送线程挂起,发送线程从执行状态(RUNNING)转入“发送阻塞”状态(SEND BLOCK),一直等到服务器来MsgReceive()时,再将SendBuf里的东西复制到ReceiveBuffer里去,同时发送线程的状态变成 “应答阻塞”(REPLY BLOCK)。
同样的,如果服务器调用MsgReceive()时,没有客户端,服务器线程也会被挂起,进入“接收阻塞”状态(RECEIVE BLOCK)。
在应答时,还可以用MsgError()来告诉发送方有错误发生了。因为MsgReply()也可以返回一个状态,或许你会问这两者之间有什么区别?MsgReply(rcvid, EINVAL, 0, 0);的结果是,MsgSend() 这个函数的返回值是22(EINVAL);而MsgError(rcvid, EINVAL);的结果,是MsgSend()返回-1,而errno被设为EINVAL。
数据区与iov
除了用线性的缓冲区进行消息传递以外,为了方便使用,还提供了用iov_t来“汇集”数据。也就是说,可以一次传送几块数据。好象下面的图这样子。虽然在客户端蓝色的Header同红色的databuf是两块不相邻的内存,但传递到服务器端的ReceiveBuffer里,就是连续的了。也就是说在服务器端,要想得到原来databuf里的数据,只需要(ReceiveBuffer + sizeof(header))就可以了。(要注意数据结构对其)
SETIOV(&iov[0], &header, sizeof(header));
SETIOV(&iov[1], databuf, datalen);
MsgSendvs(ConnectionId, iov, 2, Replybf, ReplyLen);
"header" 与 "databuf" 是不连续的两块数据。
服务器接收后,"header"与"databuf"被连续地存在ReceiveBuffer里。
ReceiveId = MsgReceive(ChannelId, ReceiveBuffer, ReceiveBufLength, &MsgInfo);
header = (struct header *)ReceiveBUffer;
databuf = (char *)((char *)header + sizeof(*header));
例子
好了,有了以上这些基本函数(内核调用),我们就可以写一个客户端和一个服务器端,进行最基本的通信了。
服务嚣:这个服务器,准务好频道后,就从频道上接收信息。如果信息是字符串”Hello“的话,这个服务器应答一个”World“字符串。如果收到的信处是字符串“Ni Hao", 那么它会应答”Zhong Guo",其它任何消息都用MsgError()回答一个错误。
$ cat simple_server.c
// Simple server
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <sys/neutrino.h>
int main()
{
int chid, rcvid, status;
char buf[128];
if ((chid = ChannelCreate(0)) == -1) {
perror("ChannelCreate");
return -1;
}
printf("Server is ready, pid = %d, chid = %d\n", getpid(), chid);
for (;;) {
if ((rcvid = MsgReceive(chid, buf, sizeof(buf), NULL)) == -1) {
perror("MsgReceive");
return -1;
}
printf("Server: Received '%s'\n", buf);
/* Based on what we receive, return some message */
if (strcmp(buf, "Hello") == 0) {
MsgReply(rcvid, 0, "World", strlen("World") + 1);
} else if (strcmp(buf, "Ni Hao") == 0) {
MsgReply(rcvid, 0, "Zhong Guo", strlen("Zhong Guo") + 1);
} else {
MsgError(rcvid, EINVAL);
}
}
ChannelDestroy(chid);
return 0;
}
客户端:客户端通过从命令行得到的服务器的进程号与频道号,与服务器建立连接。然后向服务器发送三遍"Hello"和”Ni Hao",并检查返回值。最后发一个“unknown"看是不是MsgSend()会得到一个出错返回。
$ cat simple_client.c
//simple client
#include <stdio.h>
#include <string.h>
#include <sys/neutrino.h>
int main(int argc, char **argv)
{
pid_t spid;
int chid, coid, i;
char buf[128];
if (argc < 3) {
fprintf(stderr, "Usage: simple_client <pid> <chid>\n");
return -1;
}
spid = atoi(argv[1]);
chid = atoi(argv[2]);
if ((coid = ConnectAttach(0, spid, chid, 0, 0)) == -1) {
perror("ConnectAttach");
return -1;
}
/* sent 3 pairs of "Hello" and "Ni Hao" */
for (i = 0; i < 3; i++) {
sprintf(buf, "Hello");
printf("client: sent '%s'\n", buf);
if (MsgSend(coid, buf, strlen(buf) + 1, buf, sizeof(buf)) != 0) {
perror("MsgSend");
return -1;
}
printf("client: returned '%s'\n", buf);
sprintf(buf, "Ni Hao");
printf("client: sent '%s'\n", buf);
if (MsgSend(coid, buf, strlen(buf) + 1, buf, sizeof(buf)) != 0) {
perror("MsgSend");
return -1;
}
printf("client: returned '%s'\n", buf);
}
/* sent a bad message, see if we get an error */
sprintf(buf, "Unknown");
printf("client: sent '%s'\n", buf);
if (MsgSend(coid, buf, strlen(buf) + 1, buf, sizeof(buf)) != 0) {
perror("MsgSend");
return -1;
}
ConnectDetach(coid);
return 0;
}
执行结果
$ ./simple_server
Server is ready, pid = 36409378, chid = 2
Server: Received 'Hello'
Server: Received 'Ni Hao'
Server: Received 'Hello'
Server: Received 'Ni Hao'
Server: Received 'Hello'
Server: Received 'Ni Hao'
Server: Received 'Unknown'
Server: Received ''
$ ./simple_client 36409378 2
client: sent 'Hello'
client: returned 'World'
client: sent 'Ni Hao'
client: returned 'Zhong Guo'
client: sent 'Hello'
client: returned 'World'
client: sent 'Ni Hao'
client: returned 'Zhong Guo'
client: sent 'Hello'
client: returned 'World'
client: sent 'Ni Hao'
client: returned 'Zhong Guo'
client: sent 'Unknown'
MsgSend: Invalid argument
可变消息长度
从上面的程序也可以看出来,消息传递的实质是把数据从一个缓冲,复制到(另一个进程的)另一个缓冲里去。问题是,如何确定缓冲的大小呢?上述的例子里,服务器端用了一个128字节的缓冲,万一客户端发送一个比如说512字节的消息,是不是消息传递就会出错了呢?
答案是,传递依然成功,但是,只有SendBuffer的最初的128个字节的数据会被复制。设计思想是,服务器必须发现这样的情形,并设法取得完整的数据。
在MsgRecieve()时,第四个参数是一个 struct _msg_info。内核会在进行消息传递的同时,填充这个结构,从而告诉让你得到一些信息。在这个结构中,"msglen"告诉你这次消息传递你实际收到了多少字节(在我们的例子里,就是128),而"srcmsglen"则告诉你发送方的实际Buffer会有多大(在我们的例子里,是512)。通过比较这两个值,服务器端就可以判断有没有收到全部数据。
一旦服务器知道了还有更多的数据没有收到,那该怎么办呢?QNX提供了 MsgRead()这个特殊函数。服务器端可以用这个函数,从发送缓冲中“读取”数据。MsgRead()基本上就是告诉内核,从发送缓冲的某个指定偏移开始,读取一定长的数据回来。所以服务器端这部份的代码基本上是这样的。
int rcvid;
struct _msg_info info;
char buf[128], *totalmsg;
...
rcvid = MsgReceive(chid, buf, 128, &info);
...
if (info->srcmsglen > info->msglen) {
totalmsg = malloc(info->srcmsglen);
if (!totalmsg) {
MsgError(rcvid, ENOMEM);
continue;
}
memcpy(totalmsg, buf , 128);
if (MsgRead(rcvid, &totalmsg[128], 128, info->srcmsglen - info->msglen) == -1) {
MsgError(rcvid, EINVAL);
continue;
}
} else {
totalmsg = buf;
}
/* Now totalmsg point to a full message, don't forget to free() it later on,
* if totalmsg is malloc()'d here
*/
你或者会问,为什么消息接收都已经结束了,服务器端还能去读取客户端的数据?这是因为从一开始我们就提到的,QNX的消息传递是“同步”的。还记得吗?在服务器端“应答”之前,客户端是被阻塞的;也说是说客户端的发送缓冲会一直保留在那里,不会变化。(另外再开个线程去把这个缓冲搞乱甚至free掉?当然可以。不过,这是你客户端程序的BUG了)
与此相近的,有的时候,服务器需要返回大量的数据给客户端(比如说1M)。服务器不希望 malloc(1024 * 1024),然后MsgReply(),然后再free()。(在嵌入式程序里,经常地进行malloc()/free()不是一个很好的习惯)那么服务器也可以用一个小的定长缓冲,比方说16K,然后把数据“一部份一部份地写回”客户端的应答缓冲里。好象下面的样子。要记得最后还是要做一个 MsgReply() 以让客户端继续运行。
char *buf[16 * 1024];
unsigned offset;
for (offset = 0; offset < 1024 * 1024; offset += 16 * 1024) {
/* moving data into buffer */
MsgWrite(rcvid, buffer, 16 * 1024, offset);
}
/* 1MB returned, Reply() to let client go */
MsgReply(rcvid, 0, 0, 0);
实例
以下是QNX的C库中的read()和write()函数实装,有了前面的基础,应该很好理解了。先不管fd是如何得到的,只要理解fd就是 ConnectAttach()返加的连接号就可以了。虽然read()是从服务器取得数据,而write()是向服务器输出数据,但实质上,它们都是向服务器提出一个请求,由服务器来应答。而对于write()来说,这是一个io_write_t,一个MsgWritev()把请求与要传递的数据一起发给服务器;而对于read()来说,请求被封装在 io_read_t 里,MsgSend()把这请求传给服务器,read()的结果缓冲,则做为应答缓冲,由服务器MsgReply()时填入。
read():
/*
* $QNXLicenseC:
* Copyright 2007, QNX Software Systems. All Rights Reserved.
*
* You must obtain a written license from and pay applicable license fees to QNX
* Software Systems before you may reproduce, modify or distribute this software,
* or any work that includes all or part of this software. Free development
* licenses are available for evaluation and non-commercial purposes. For more
* information visit http://licensing.qnx.com or email licensing@qnx.com.
*
* This file may contain contributions from others. Please review this entire
* file for other proprietary rights or license notices, as well as the QNX
* Development Suite License Guide at http://licensing.qnx.com/license-guide/
* for other information.
* $
*/
#include <unistd.h>
#include <sys/iomsg.h>
ssize_t read(int fd, void *buff, size_t nbytes) {
io_read_t msg;
msg.i.type = _IO_READ;
msg.i.combine_len = sizeof msg.i;
msg.i.nbytes = nbytes;
msg.i.xtype = _IO_XTYPE_NONE;
msg.i.zero = 0;
return MsgSend(fd, &msg.i, sizeof msg.i, buff, nbytes);
}
/*
* $QNXLicenseC:
* Copyright 2007, QNX Software Systems. All Rights Reserved.
*
* You must obtain a written license from and pay applicable license fees to QNX
* Software Systems before you may reproduce, modify or distribute this software,
* or any work that includes all or part of this software. Free development
* licenses are available for evaluation and non-commercial purposes. For more
* information visit http://licensing.qnx.com or email licensing@qnx.com.
*
* This file may contain contributions from others. Please review this entire
* file for other proprietary rights or license notices, as well as the QNX
* Development Suite License Guide at http://licensing.qnx.com/license-guide/
* for other information.
* $
*/
#include <unistd.h>
#include <sys/iomsg.h>
ssize_t write(int fd, const void *buff, size_t nbytes) {
io_write_t msg;
iov_t iov[2];
msg.i.type = _IO_WRITE;
msg.i.combine_len = sizeof msg.i;
msg.i.xtype = _IO_XTYPE_NONE;
msg.i.nbytes = nbytes;
msg.i.zero = 0;
SETIOV(iov + 0, &msg.i, sizeof msg.i);
SETIOV(iov + 1, buff, nbytes);
return MsgSendv(fd, iov, 2, 0, 0);
}
服务器端应该是怎样进行处理的?想想MsgRead()/MsgWrite(),你应该不难想像服务器端是如何工作的吧。
脉冲(Pulse)
脉冲其实更像一个短消息,也是在“连接”上发送的。脉冲最大的特点是它是异步的。发送方不必要等接收方应答,直接可以继续执行。但是,这种异步性也给脉冲带来了限制。脉冲能携带的数据量有限,只有一个8位的"code"域用来区分不同的脉冲,和一个32位的“value"域来携带数据。脉冲最主要的用途就是用来进行“通知”(Notification)。不仅是用户程序,内核也会生成发送特殊的“系统脉冲”到用户程序,以通知某一特殊情况的发生。
脉冲的接收比较简单,如果你知道频道上不会有别的消息,只有脉冲的话,可以用MsgReceivePulse()来只接收脉冲;如果频道既可以接收消息,也可以接收脉冲时,就直接用MsgReceive(),只要确保接收缓冲(ReveiveBuf)至少可以容下一个脉冲(sizeof struct _pulse)就可以了。在后一种情况下,如果MsgReceive()返回的rcvid是0,就代表接收到了一个脉冲,反之,则收到了一个消息。所以,一个既接收脉冲,又接收消息的服务器,可以是这样的。
union {
struct _pulse pulse;
msg_header header;
} msgs;
...
if ((rcvid = MsgReceive(chid, &msgs, sizeof(msgs), &info)) == -1) {
perror("MsgReceive");
continue;
}
if (rcvid == 0) {
process_pulse(&msgs, &info);
} else {
process_message(&msgs, &info);
}
脉冲的发送,最直接的就是MsgSendPulse()。不过,这个函数通常只在一个进程中,用在一个线程要通知另一个线程的情形。在跨进程的时候,通常不会用到这个函数,而是用到下面将要提到的 MsgDeliverEvent()。
与消息传递相比,消息传递永远是在进程间进行的。也就是说,不会有一个进程向内核发送数据的情形。而脉冲就不一样,除了用户进程间可以发脉冲以外,内核也会向用户进程发送“系统脉冲”来通知某一事件的发生.
消息传递的方向与MsgDeliverEvent()
从一开始就提到,QNX的消息传递是客户、服务器型的。也就是说,总是由客户端向服务器端发送请求,等待被回复的。但在现实情况中,客户端与服务器端并不是很容易区分开来的。有的服务器端为了处理客户端的请求,本身就需要向别的服务器发送消息;有的客户端需要从不同的服务器那里得到服务,而不能阻塞在某一特定的服务器上;还有的时候,两个进程间的数据是互相流动的,这应该怎么办呢?
也许有人认为,两个进程互为通讯就可以了。每个进程都建立自己的频道,然后都与对方的频道建一个连接就好了;这样,需要的时候,就可以直接通过连接向对方发送消息了。就好象管道(pipe)或是socketpair一样。请注意,这种设计在QNX的消息传递中是应该避免的。因为很容易就造成死锁。一个常见的情形是这样的。
进程A:MsgSend() 到进程B
进程B:MsgReceive()接收到消息
进程B:处理消息,然后MsgSend()给进程A
因为进程A正在阻塞状态中,无法接收并处理B的请求;所以A会在STATE_REPLY里,而B则会因MsgSend()而进入STATE_SEND,两个进程就互为死锁住了。当然,如果A和B都使用多线程,专门用一个线程来MsgReceive(),这个情形或许可以避免;但你要保证 MsgReceive()的线程不会去MsgSend(),否则一样会死锁。在程序简单的时候或许你还有控制,如果程序变得复杂,又或者你写的只是一个程序库,别人怎么来用你完全没有控制,那么最好还是不要用这种设计。
在QNX中,正确的方法是这样的。
客户端: 准备一个“通知事件”(Notification Event),并把这个事件用MsgSend()发给服务器端,意思是:“如果xxx情况发生的话,请用这个事件通知我”。
服务器: 收到这个消息后,记录下当时的rcvid,和传过来的事件,然后应答“好的,知道了”。
客户端: 因为有了服务器的应答,客户端不再阻塞,可以去做别的事
......
服务器: 在某个时刻,客户端所要求的“xxx情况”满足了,服务器调用 MsgDeliverEvent(rcvid, event);以通知客户端
客户端: 收到通知,再用MsgSend()发关“xxx 情况的数据在哪里?”
服务器: 用MsgReply()把数据返回给客户端
具体的例子,可以参考MsgDeliverEvent()的文档说明。
路径名(Path Name)
现在来回想一下我们最初的例子,客户端与服务器是怎样取得连接的?客户端需要服务器的 nd, pid, chid,才能与服务器正确地建立连接。在我们的例子里,我们是让服务器显示这几个数,然后在客户端的启动时,通过命令行里传给客户端。但是,在一个现实的系统里,进程不断地启动、终止;服务器与客户端的起动过程也无法控制,这种方法显然是行不通的。
QNX的解决办法,是把“路径名”与上述的“服务频道”概念巧妙地结合起来。让服务器进程可以注册一个路径名,与服务频道的nd, pid, chid关联起来。这样,客户端就不需要知道服务器的nd, pid, chid,而只要请求连接版务器路径名就可以了。具体来说 name_attach()就是用来建立一个频道,并为频道注册一个名字的;而name_open()则是用来连接注册过的服务器频道;具体的例子,可以在name_attach()的文档里找到,这里就不再重复了。