QNX相关历史文章:
The Bones of a Resource Manager
这篇文章将从服务器端和客户端两侧来描述大体的框架和分层,并会给出实例。
1. Under the covers
1.1 Under the client's covers
当一个客户端调用需要路径名解析的函数时(比如open()/rename()/stat()/unlink()
),它会同时向进程管理器和对应的资源管理器发送消息,进而去获取文件描述符,并通过这个文件描述符向与路径名关联的设备发送消息。
/*
* In this stage, the client talks
* to the process manager and the resource manager.
*/
fd = open("/dev/ser1", O_RDWR);
/*
* In this stage, the client talks directly to the
* resource manager.
*/
for (packet = 0; packet < npackets; packet++)
{
write(fd, packets[packet], PACKET_SIZE);
}
close(fd);
上边的这个代码,背后的逻辑是怎么实现的呢?我们假设这是名为devc-ser8250
资源管理器管理的串口设备,注册的路径名为dev/ser1
:
客户端发送
query
消息,open()
函数会向进程管理器发送消息,请求查找名字,比如/dev/ser1
;进程管理器会返回与路径名关联的
nd/pid/child/handle
。当devc-ser8250
资源管理器使用/dev/ser1
注册时,进程管理器会维护一个类似于下边的条目信息:
0, 47167, 1, 0, 0, /dev/ser1
其中条目的各项分别代表:
-
0
, Node descriptor(nd
),用于描述在网络中的节点; -
47167
, Process ID(pid
),资源管理器的进程ID号; -
1
, Channel ID(chid
),资源管理器用于接收消息的通道; -
0
, Handle的索引号,由资源管理器注册的Handle; -
0
, 在名称注册期间传递的打开类型; -
/dev/ser1
,注册的路径名;
在进程管理器中会维护一个条目表,用于记录各个资源管理器的信息。在名字匹配的时候会选择最长匹配。
- 客户端需要向资源管理器发送一个
connect
消息,首先它需要创建一个连接的通道:
fd = ConnectAttach(nd, pid, chid, 0, 0);
客户端也是使用这个fd
向资源管理器发送连接消息,请求打开/dev/ser1
。当资源管理器收到连接消息后,会进行权限检查。
资源管理器通常会回应通过或失败。
当客户端获取到文件描述符后,可以直接通过它向设备发送消息。示例代码看起来像是客户端直接向设备写入,实际上在调用
write()
时,会先发送一个_IO_WRITE
消息给资源管理器,请求数据写入,客户端调用close()
时,会发送_IO_CLOSE_DUP
消息给资源管理器,完成最后资源的回收清理工作。
1.2 Under the resource manager's covers
资源管理器就是一个服务器,通过send/receive/reply
消息协议来接收和回复消息,伪代码如下:
initialize the resource manager
register the name with the process manager
DO forever
receive a message
SWITCH on the type of message
CASE _IO_CONNECT:
call io_open handler
ENDCASE
CASE _IO_READ:
call io_read handler
ENDCASE
CASE _IO_WRITE:
call io_write handler
ENDCASE
. /* etc. handle all other messages */
. /* that may occur, performing */
. /* processing as appropriate */
ENDSWITCH
ENDDO
事实上,上述代码中的很多细节在实现一个资源管理器中可能用不到,因为有一些库进行了封装,但是自己还是需要去实现对消息的回复。
2. Layers in a resource manager
资源管理器由四层组成,从下到上分别为:
- iofunc layer
- resmgr layer
- dispatch layer
- thread pool layer
2.1 iofunc layer
这一层的主要功能是提供POSIX特性,用户编写资源管理器时不需要太关心向外界提供POSIX文件系统所涉及的细节。
这一层由一组函数iofunc_*
组成,包含了默认处理程序,如果没有提供自己的处理程序的话,就会使用默认提供的程序。比如,如果没有提供io_open
处理程序的话,就会调用iofunc_open_default()
。同时也包含了默认处理程序调用的帮助函数,当自己实现处理程序时,也可以调用这些帮助函数。
2.2 resmgr layer
这一层负责管理资源管理器库的大部分细节:
- 检查传入消息
-
调用合适的处理程序来处理消息
如果不使用这一层的话,则需要用户自己解析消息,大部分资源管理器都会用到这一层。
2.3 dispatch layer
这一层充当多种事件类型的阻塞点,使用这一层,可以处理:
- `IO* 消息, 使用resmgr层;
-
select()
,注册一个处理函数,当数据包到达时调用,这里的函数是select_*()
函数; -
Pulses
,注册一个处理函数,当pulses
来的时候调用,这里的函数是pulse_*()
函数; - 其他消息,可以提供自己创建的一系列消息类型和处理函数,当消息到达时调用对应的处理函数,这里的函数是
message_*()
函数;
消息进行分发时,会进行查找和匹配,然后会调用到匹配的Handler函数,如果没有找到匹配的,则返回MsgError
。
2.4 thread pool layer
这一层允许用户创建单线程或多线程资源管理器,这意味着可以一个线程读,一个线程写。需要为线程提供阻塞函数,以及阻塞函数返回时需要调用的处理函数。大多数情况下,可以将dispatch layer
的任务分配到线程上来,当然,也可以是resmgr layer
任务或自己实现的功能。
3. Simple examples of device resource managers
3.1 Single-threaded device resource managers
先来看一份完整的单线程设备资源管理器代码吧:
#include <errno.h>
#include <stdio.h>
#include <stddef.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/iofunc.h>
#include <sys/dispatch.h>
static resmgr_connect_funcs_t connect_funcs;
static resmgr_io_funcs_t io_funcs;
static iofunc_attr_t attr;
main(int argc, char **argv)
{
/* declare variables we'll be using */
resmgr_attr_t resmgr_attr;
dispatch_t *dpp;
dispatch_context_t *ctp;
int id;
/* initialize dispatch interface */
if((dpp = dispatch_create()) == NULL) {
fprintf(stderr,
"%s: Unable to allocate dispatch handle.\n",
argv[0]);
return EXIT_FAILURE;
}
/* initialize resource manager attributes */
memset(&resmgr_attr, 0, sizeof resmgr_attr);
resmgr_attr.nparts_max = 1;
resmgr_attr.msg_max_size = 2048;
/* initialize functions for handling messages */
iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs,
_RESMGR_IO_NFUNCS, &io_funcs);
/* initialize attribute structure used by the device */
iofunc_attr_init(&attr, S_IFNAM | 0666, 0, 0);
/* attach our device name */
id = resmgr_attach(
dpp, /* dispatch handle */
&resmgr_attr, /* resource manager attrs */
"/dev/sample", /* device name */
_FTYPE_ANY, /* open type */
0, /* flags */
&connect_funcs, /* connect routines */
&io_funcs, /* I/O routines */
&attr); /* handle */
if(id == -1) {
fprintf(stderr, "%s: Unable to attach name.\n", argv[0]);
return EXIT_FAILURE;
}
/* allocate a context structure */
ctp = dispatch_context_alloc(dpp);
/* start the resource manager message loop */
while(1) {
if((ctp = dispatch_block(ctp)) == NULL) {
fprintf(stderr, "block error\n");
return EXIT_FAILURE;
}
dispatch_handler(ctp);
}
}
上述代码完成以下功能:
初始化dispatch接口
通过调用dispatch_create()
接口创建dispatch_t
结构,这个结构中包含了channel ID
,但channel ID
实际的创建是在附加其他内容的时候,比如调用resmgr_attach()/message_attach()/pulse_attach()
等。初始化资源管理器属性
当调用resmgr_attach()
时会传入resmgr_attr_t
结构,在这个示例中主要配置:
-
nparts_max
,可供服务器回复的IOV的个数; -
msg_max_size
,最大接收缓冲大小;
- 初始化消息处理函数
在这个例子中提供了两张表,用于指定在特定消息到达时调用哪个函数:
- 连接函数表;
- I/O函数表;
可以调用iofunc_func_init()
接口来配置默认操作函数。
- 初始化设备使用的属性结构
调用iofunc_attr_init()
接口来设置,属性结构中至少应该包括以下信息:
- 权限和设备类型
- 组ID和属主ID
在名字空间中注册一个名字
调用resmgr_attach()
来注册资源管理器的路径。在资源管理器能接收到其他程序的消息之前,需要通过进程管理器通知其他程序它与路径名的绑定关系。分配context结构体
调用dispatch_context_alloc()
接口来分配,context
结构体包含了用于接收消息的缓冲,也包含了可用于回复消息的IOVs
缓冲。开始资源管理器消息循环
当进入循环后,资源管理器便在dispatch_block()
中接收消息,并调用dispatch_handler()
进行分发处理,选择连接函数表和I/O函数表中的合适函数进行执行。当执行完毕后,会再进入dispatch_block()
来等待接收其他消息。
3.2 Multi-threaded device resource managers
下边是多线程设备资源管理器示例代码:
#include <errno.h>
#include <stdio.h>
#include <stddef.h>
#include <stdlib.h>
#include <unistd.h>
/*
* Define THREAD_POOL_PARAM_T such that we can avoid a compiler
* warning when we use the dispatch_*() functions below
*/
#define THREAD_POOL_PARAM_T dispatch_context_t
#include <sys/iofunc.h>
#include <sys/dispatch.h>
static resmgr_connect_funcs_t connect_funcs;
static resmgr_io_funcs_t io_funcs;
static iofunc_attr_t attr;
main(int argc, char **argv)
{
/* declare variables we'll be using */
thread_pool_attr_t pool_attr;
resmgr_attr_t resmgr_attr;
dispatch_t *dpp;
thread_pool_t *tpp;
dispatch_context_t *ctp;
int id;
/* initialize dispatch interface */
if((dpp = dispatch_create()) == NULL) {
fprintf(stderr, "%s: Unable to allocate dispatch handle.\n",
argv[0]);
return EXIT_FAILURE;
}
/* initialize resource manager attributes */
memset(&resmgr_attr, 0, sizeof resmgr_attr);
resmgr_attr.nparts_max = 1;
resmgr_attr.msg_max_size = 2048;
/* initialize functions for handling messages */
iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs,
_RESMGR_IO_NFUNCS, &io_funcs);
/* initialize attribute structure used by the device */
iofunc_attr_init(&attr, S_IFNAM | 0666, 0, 0);
/* attach our device name */
id = resmgr_attach(dpp, /* dispatch handle */
&resmgr_attr, /* resource manager attrs */
"/dev/sample", /* device name */
_FTYPE_ANY, /* open type */
0, /* flags */
&connect_funcs, /* connect routines */
&io_funcs, /* I/O routines */
&attr); /* handle */
if(id == -1) {
fprintf(stderr, "%s: Unable to attach name.\n", argv[0]);
return EXIT_FAILURE;
}
/* initialize thread pool attributes */
memset(&pool_attr, 0, sizeof pool_attr);
pool_attr.handle = dpp;
pool_attr.context_alloc = dispatch_context_alloc;
pool_attr.block_func = dispatch_block;
pool_attr.unblock_func = dispatch_unblock;
pool_attr.handler_func = dispatch_handler;
pool_attr.context_free = dispatch_context_free;
pool_attr.lo_water = 2;
pool_attr.hi_water = 4;
pool_attr.increment = 1;
pool_attr.maximum = 50;
/* allocate a thread pool handle */
if((tpp = thread_pool_create(&pool_attr,
POOL_FLAG_EXIT_SELF)) == NULL) {
fprintf(stderr, "%s: Unable to initialize thread pool.\n",
argv[0]);
return EXIT_FAILURE;
}
/* start the threads; will not return */
thread_pool_start(tpp);
}
从代码中可以看出,大部分代码与单线程示例一样。在这个代码中,线程在阻塞循环中用到了dispatch_*()
函数(dispatch layer
)。
初始化线程池的属性
给pool_attr
的各个字段赋值,用于告知线程在阻塞循环时调用哪些函数,以及线程池需要维护多少线程。分配一个线程池的
handle
调用thread_pool_create()
接口来分配,这个handle
用于控制整个线程池。开启线程
调用thread_pool_start()
接口,启动线程池。每个新创建的线程都会使用在属性结构中给出的context_alloc()
函数来分配THREAD_POOL_PARAM_T
定义类型的context
结构。
3.3 Using MsgSend() and MsgReply()
可以不使用read()
和write()
接口来与资源管理器交互,可以调用MsgSend()
来发送消息,下边的示例将分两部分:服务器和客户端。服务器端需要使能PROCMGR_AID_PATHSPACE
,用于确保能调用resmgr_attach()
函数。
- 服务器代码:
/*
* ResMgr and Message Server Process
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/neutrino.h>
#include <sys/iofunc.h>
#include <sys/dispatch.h>
resmgr_connect_funcs_t ConnectFuncs;
resmgr_io_funcs_t IoFuncs;
iofunc_attr_t IoFuncAttr;
typedef struct
{
uint16_t msg_no;
char msg_data[255];
} server_msg_t;
int message_callback( message_context_t *ctp, int type, unsigned flags,
void *handle )
{
server_msg_t *msg;
int num;
char msg_reply[255];
/* Cast a pointer to the message data */
msg = (server_msg_t *)ctp->msg;
/* Print some useful information about the message */
printf( "\n\nServer Got Message:\n" );
printf( " type: %d\n" , type );
printf( " data: %s\n\n", msg->msg_data );
/* Build the reply message */
num = type - _IO_MAX;
snprintf( msg_reply, 254, "Server Got Message Code: _IO_MAX + %d", num );
/* Send a reply to the waiting (blocked) client */
MsgReply( ctp->rcvid, EOK, msg_reply, strlen( msg_reply ) + 1 );
return 0;
}
int main( int argc, char **argv )
{
resmgr_attr_t resmgr_attr;
message_attr_t message_attr;
dispatch_t *dpp;
dispatch_context_t *ctp, *ctp_ret;
int resmgr_id, message_id;
/* Create the dispatch interface */
dpp = dispatch_create();
if( dpp == NULL )
{
fprintf( stderr, "dispatch_create() failed: %s\n",
strerror( errno ) );
return EXIT_FAILURE;
}
memset( &resmgr_attr, 0, sizeof( resmgr_attr ) );
resmgr_attr.nparts_max = 1;
resmgr_attr.msg_max_size = 2048;
/* Setup the default I/O functions to handle open/read/write/... */
iofunc_func_init( _RESMGR_CONNECT_NFUNCS, &ConnectFuncs,
_RESMGR_IO_NFUNCS, &IoFuncs );
/* Setup the attribute for the entry in the filesystem */
iofunc_attr_init( &IoFuncAttr, S_IFNAM | 0666, 0, 0 );
resmgr_id = resmgr_attach( dpp, &resmgr_attr, "serv", _FTYPE_ANY,
0, &ConnectFuncs, &IoFuncs, &IoFuncAttr );
if( resmgr_id == -1 )
{
fprintf( stderr, "resmgr_attach() failed: %s\n", strerror( errno ) );
return EXIT_FAILURE;
}
/* Setup our message callback */
memset( &message_attr, 0, sizeof( message_attr ) );
message_attr.nparts_max = 1;
message_attr.msg_max_size = 4096;
/* Attach a callback (handler) for two message types */
message_id = message_attach( dpp, &message_attr, _IO_MAX + 1,
_IO_MAX + 2, message_callback, NULL );
if( message_id == -1 )
{
fprintf( stderr, "message_attach() failed: %s\n", strerror( errno ) );
return EXIT_FAILURE;
}
/* Setup a context for the dispatch layer to use */
ctp = dispatch_context_alloc( dpp );
if( ctp == NULL )
{
fprintf( stderr, "dispatch_context_alloc() failed: %s\n",
strerror( errno ) );
return EXIT_FAILURE;
}
/* The "Data Pump" - get and process messages */
while( 1 )
{
ctp_ret = dispatch_block( ctp );
if( ctp_ret )
{
dispatch_handler( ctp );
}
else
{
fprintf( stderr, "dispatch_block() failed: %s\n",
strerror( errno ) );
return EXIT_FAILURE;
}
}
return EXIT_SUCCESS;
}
- 首先是调用
dispatch_create()
接口来创建dpp
,通过这个handle
将接收到的消息转发到合适的层(resmgr, message, pulse
); - 设置调用
resmgr_attach()
所需要的变量; - 调用
iofunc_func_init()
来设置默认处理函数,调用iofunc_attr_init()
来设置属性结构; - 调用
resmgr_attach()
,注意这时候注册的路径不是绝对路径,因此默认在它的执行路径下。. - 告诉
dispatch layer
,除了由resmgr layer
处理的标准I/O和连接消息之外,还需要处理自己的消息。 - 调用
message_attach()
接口来注册自己的消息处理函数。 - 当调用
dispatch_block()
接收消息后,调用dispatch_handler()
来处理,实际上会在dispatch_handler()
中调用message_callback()
函数。当消息类型为_IO_MAX + 1
或_IO_MAX + 2
时,就会回调函数。
- 客户端代码:
/*
* Message Client Process
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <sys/neutrino.h>
#include <sys/iofunc.h>
#include <sys/dispatch.h>
typedef struct
{
uint16_t msg_no;
char msg_data[255];
} client_msg_t;
int main( int argc, char **argv )
{
int fd;
int c;
client_msg_t msg;
int ret;
int num;
char msg_reply[255];
num = 1;
/* Process any command line arguments */
while( ( c = getopt( argc, argv, "n:" ) ) != -1 )
{
if( c == 'n' )
{
num = strtol( optarg, 0, 0 );
}
}
/* Open a connection to the server (fd == coid) */
fd = open( "serv", O_RDWR );
if( fd == -1 )
{
fprintf( stderr, "Unable to open server connection: %s\n",
strerror( errno ) );
return EXIT_FAILURE;
}
/* Clear the memory for the msg and the reply */
memset( &msg, 0, sizeof( msg ) );
memset( &msg_reply, 0, sizeof( msg_reply ) );
/* Set up the message data to send to the server */
msg.msg_no = _IO_MAX + num;
snprintf( msg.msg_data, 254, "client %d requesting reply.", getpid() );
printf( "client: msg_no: _IO_MAX + %d\n", num );
fflush( stdout );
/* Send the data to the server and get a reply */
ret = MsgSend( fd, &msg, sizeof( msg ), msg_reply, 255 );
if( ret == -1 )
{
fprintf( stderr, "Unable to MsgSend() to server: %s\n", strerror( errno ) );
return EXIT_FAILURE;
}
/* Print out the reply data */
printf( "client: server replied: %s\n", msg_reply );
close( fd );
return EXIT_SUCCESS;
}
客户端通过open()
接口获取connection id
,调用MsgSend()
接口去往服务器上发送消息,并且等待回复。
上述服务器和客户端的例子非常简单,但覆盖了大部分的内容,基于这个基础的框架,还可以做其他的事情:
- 基于不同的消息类型,注册不同的消息回调函数;
- 除了消息之外,使用
pulse_attach()
来接收pulse
; - 重写默认的I/O消息处理函数以便客户端也能使用
read()
和write()
来与服务器交互; - 使用线程池来编写多线程服务器;