RTS2/AOCS整体框架
在 RTS2 中,将观测的组件抽象成多层的框架,在每层框架中添加功能,并逐层继承,许多功能都是虚函数,需要在子类中继承并各自重新实现。
主要类图结构:
RTS2的关键类包括Object到Device、Connection、DevClient、Command。
- 继承自Object的类遵循一套Event机制
- App维护了一些命令行和日志相关的操作、Block维护了Socket通信操作、Daemon实现守护进程以及socket server
- Connection管理了组件之间的底层通信,主要是实现socket的收发以及通信协议,具体状态逻辑会调用创建的DevClient
- DevClient与Connection互存指针,维护了所连接的设备的状态、信息以及一些逻辑业务
- Command封装了当前向别的组件发的命令以及返回的处理(命令模式?)
这些类都有虚函数甚至纯虚函数,以实现多态。除了Connection类,一般都需要继承并override(覆写,注意与overload、overwrite区别)虚函数。对于RTS2,还要注意要在子类调用父类的对应函数,如postEvent,否则无法实现事件分发以及销毁资源。
此外还有一些工具类比如LogStream
RTS2本质是多进程程序,一个组件一个进程,进程间通信方式是基于TCP的自定义字符流,由于是TCP通信因此可以自然地拓展成分布式的架构。单个RTS2进程一般是一个单线程事件循环(也有的会ConnFork进行子进程间通信),通过select(新版的rts2改用poll)实现文件描述符I/O多路复用。
关于EPICS IOC的集成。目前是通过XML格式的deviceConfig文件定义了pv量和rts2 value的映射关系,centrald监听各设备组件的State,各组件根据Centrald的反馈解析deviceConfig里面相应的设备。这种集成方式目前存在一些问题,后面详述。
RTS2核心类介绍
Connection类
Connection基类以及各种子类都是为不同的组件间连接而定义。所有组件间的通信都是通过socket。每个设备启动时都会监听在一个端口,等待其他设备的接入。针对不同的通信方向,Connection类的子类主要如上图右下角。
Connection类管理本组件与目标组件的连接通信(一般是TCP的socket),存有相应的文件描述符(fd),对负责收发消息,以及解析。Connection存有设备自身的指针Block* master
,也存有维护目标组件信息的指针DevClient* otherDevice
,概念上连接了设备自身和目标设备。Connection存有命令队列std::list<Command*> commandQue
以及正在执行的runningCommand
,以维护命令的收发(命令模式?)。
Connection类的add
方法会被Block的addSelectSocks
调用(新版rts2则改为addPollSocks
),从而将其管理的文件描述符加入到描述符集中。而Connection的receive
方法和writable
会被Block的selectSuccess
调用(新版rts2改为pollSuccess
),在receive中则会read文件描述符,读取目标组件发送过来的信息,并进行解析。RTS2的消息都是以'\r\n'结尾的,Connection会将完整的一条信息取出交给processLine
方法处理,processLine
则进行具体的协议解析(isCommand
),做相应的处理,会调用本组件(Block)master或者目标连接(DevClient)otherDevice的相应方法。
Connection类的setOtherType会被device、client调用,device与client在和目标组件建立connection后会创建相应的DevClient,而setOtherType正是负责调用master的createOtherType(抽象工厂模式?)。
也有因为特殊需要继承自Connection的类,比如ConnImgProcess -> ConnImgOnlyProcess -> ConnExe -> ConnFork -> ConnNoSend -> Connection
的继承链,ConnFork是管理与子进程的通信连接,ConnExe管理与外部脚本的调用,ConnImgProcess则管理具体的外部图片处理进程。即它支持当前组件fork子进程,并通过execv替换成外部进程调用,从而调用处理图片的脚本,并接受处理的反馈。RTS2的scripts以及imgproc会用到相应的机制,一些DevClient会创建诸如ConnImgProcess调用外部脚本处理图片,在根据既定的协议处理返回的结果。
Connection类里面的initEpics解析deviceConfig文件。对于value类型的则ca_create_channel
以及ca_create_substitution
监听pv量,并转换成相应的rts2 value存储,其中State
PV量会和RTS2里面的State概念关联。对于cmd类型的则映射成RTS2的命令,以queEpicsCommand
被发送。对于data类型的则作为一个特殊的pv量,存图。
感觉data pv的实现还有点乱,比如要依赖ArrayCounter_RBV
这个PV量的变化。以及cmd类型要特殊处理expose
,queEpicsCommand这个实现和rts2既有的命令模式不太契合。
Object继承链(到Device、Centrald以及Client)
Device、Centrald以及Client 的功能主要继承链是: Daemon -> Block -> App -> Object
graph LR
Device-->Daemon
Centrald-->Daemon
Client-->Block
Daemon-->Block
Block-->App
App-->Object
Object
在这条继承链中最基本的,该类定义了一个重要的虚函数 virtual void postEvent (Event * event)
,在Object的子类实现postEvent
时,规定必须调用父类的postEvent
,这样完成了消息的层层传递(责任链模式?事件冒泡?),也在Object的postEvent
中将消息对象析构。
App
与具体的设备操作、网络IO无关,作为可执行程序(应用)本身的一些功能支持,比如日志流、信号处理、命令行参数解析、定义初始化入口init
-
addOption
函数可用于添加新的命令行参数定义 -
processOption
虚函数则让每一层子类处理自己关心的命令行参 -
logStream
虚函数可用于记录以及广播自己的日志消息(会发给别的组件?) processArgs
现在添加了关于deviceConfig的路径指定(--device-config),避免了写死文件名,方便调试。
Block
定义了一些协议命令字、管理connections、管理sockets连接、定义通信相关hook、事件循环架构、等。
Block类定义了oneRunLoop
(注意,这个不是虚函数),oneRunLoop是一次事件循环所要执行的流程,具体在后面分析。
-
addSelectSocks/addPollSocks
方法将文件描述符添加到描述符集中,由于该方法是虚函数,因此其子类可以将自己感兴趣的文件描述符添加其中。 - 在
select/poll
超时阻塞调用之后,调用selectSuccess/pollSuccess
进行处理,由于这个函数也是虚函数,因此子类可以对自己添加的文件描述符做处理。 - Block类还定义了
deleteConnection
,connectionRemoved
等函数,可以看做是hook,供子类调用 - Block的
postEvent
会调用每一个Connection的postEvent
,而Connection的postEvent
调用otherDevice->postEvent
,即横向分发事件给不同的DevClient。
主要是在其中依赖 Connection 类。Client 不需要进入子进程的功能,所以直接继承自 Block。block.h文件中有如下代码:
/** Hold list of connections. It is used to store @see Connection objects. */
typedef std::vector < Connection * > connections_t;
Block类的私有成员变量包含:
private:
int port;
long int idle_timeout; // in nsec
// timers - time when they should be executed, event which should be triggered
std::map <double, Event*> timers;
connections_t connections;
// vector which holds connections which were recently added - idle loop will move them to connections
connections_t connections_added;
connections_t centraldConns;
// entries to delete from timers map; delete will happen in idle call
std::vector <std::map <double, Event *>::iterator> toDelete;
// vector which holds connections which were recently added - idle loop will move them to connections
connections_t centraldConns_added;
Block类将所有连接放在vector容器中进行管理,并定义了一系列函数对连接进行操作,如:
void setPort (int in_port);
int getPort (void);
void addConnection (Connection *_conn);
void removeConnection (Connection *_conn);
void addCentraldConnection (Connection *_conn, bool added);
int connectionSize ()
{
return connections.size ();
}
...
Daemon
主要是为设备类 Device 和 Centrald 类提供 fork 子进程的功能(实现守护进程)。同时在Block的基础上还实现了监听端口的功能(TCP Socket Server)以受理别的组件发出的消息。
-
setIdleInfoInterval
设置了idle调用的最大间隔,即当任何I/O事件都没有时仍触发一次循环,并向别的组件发送自己更新的信息
下面是一个简化版(剔除了错误处理的代码)的int Daemon::init ()
int Daemon::init ()
{
int ret;
ret = rts2core::Block::init ();
listen_sock = socket (PF_INET6, SOCK_STREAM, 0);
const int so_reuseaddr = 1;
setsockopt (listen_sock, SOL_SOCKET, SO_REUSEADDR, &so_reuseaddr,
sizeof (so_reuseaddr));
struct sockaddr_in6 server;
server.sin6_family = AF_INET6;
server.sin6_port = htons (getPort ());
server.sin6_addr = in6addr_any;
ret = bind (listen_sock, (struct sockaddr *) &server, sizeof (server));
socklen_t sock_size = sizeof (server);
ret = getsockname (listen_sock, (struct sockaddr *) &server, &sock_size);
setPort (ntohs (server.sin6_port));
ret = listen (listen_sock, 5);
return 0;
}
熟悉socket编程可以看出,daemon建立了一个监听的TCP端口,监听的端口通过getPort ()
函数获得。通常情况下,centrald监听的端口是一个固定值,而其他组件随机监听在某个空闲端口,下面代码也验证了这个事实。
// src/centrald/centrald.cpp
int Centrald::init ()
{
int ret;
setPort (atoi (RTS2_CENTRALD_PORT));
ret = Daemon::init ();
...
}
RTS2_CENTRALD_PORT
是一个宏,通常定义为8618
// lib/rts2/getopt_own.c
char *optarg = 0;
// lib/rts2/daemon.cpp:int Daemon::processOption (int in_opt)
setPort (atoi (optarg));
因为代码中设置port值为0,即每个Daemon监听的端口是系统随机分配的,这样才能保证组件的动态增减(相比与固定端口),又由于Daemon不存在局域网广播机制(毕竟RTS2原本是在WAN上的),这才有了Centrald的必要性。
Device
-
commandAuthorized
虚函数提供了命令接口,受理别的组件发的命令。
Client
Centrald
提供了namesolver以及bad weather机制。促成每个设备点对点的网状结构(去中心化)。
Centrald只负责管理网络内的组件的存活状态,告知彼此的存在,不负责具体的控制以及转发。
其实Centrald存在一个问题,它以及能从socket得到组件的IP、Port了,但是依然要对方提供host信息,这样反而无法让在NAT后面的组件与别的组件互联
Command
一般被queCommand
调用时创建,封装向别的组件发的命令。对于EPICS IOC则有queEpicsCommand
。
DevClient
简直是RTS2最坑爹的东西。。。茫茫多继承和状态实现。小心处理状态同步、事件触发。还有BopState,得看看原生的camd、teld怎么利用这个的。。。
RTS2事件循环机理
从Daemon初始化流程看
Daemon类的入口是函数Daemon::run()
,Daemon类的主要调用关系如下图:
Daemon的init()
已经在上文叙述,主要功能是建立监听端口。
Daemon的InitValues()
通过rts2core::Iniparser
解析配置文件并从中读入Values。
Daemon的主循环实现如下:
while (!getEndLoop ())
oneRunLoop ();
Daemon类没有实现自己的oneRunLoop()
,所以这里调用的是父类Block的oneRunLoop()
。
oneRunLoop()
的大致运行流程如下图:
- 设置select的阻塞时间的主要代码如下:
if (timers.begin () != timers.end () && \
(USEC_SEC * (t_diff = (timers.begin ()->first - getNow ()))) < idle_timeout)
{
if (t_diff <= 0)
{
read_tout.tv_sec = 0;
read_tout.tv_usec = 0;
}
else
{
read_tout.tv_sec = t_diff;
read_tout.tv_usec = (t_diff - floor (t_diff)) * USEC_SEC;
}
}
...
为了不因为select
阻塞调用而错过定时器时间,这里将定时器timer
的第一个元素与现在的时间比较(即代码中的t_diff
),从而可以设置用于select
调用的安全阻塞时间。
-
addSelectSocks
也是一个虚函数,分别调用了Daemon::addSelectSocks
和Daemon::addSelectSocks
。由下面代码可以清楚看出其功能:将监听端口的文件描述符以及所有连接的文件描述符都加入监控集合。
//Daemon
void Daemon::addSelectSocks (fd_set &read_set, fd_set &write_set, fd_set &exp_set)
{
FD_SET (listen_sock, &read_set);
rts2core::Block::addSelectSocks (read_set, write_set, exp_set);
}
//Block
void Block::addSelectSocks (fd_set &read_set, fd_set &write_set, fd_set &exp_set)
{
connections_t::iterator iter;
for (iter = connections.begin (); iter != connections.end (); iter++)
(*iter)->add (&read_set, &write_set, &exp_set);
for (iter = centraldConns.begin (); iter != centraldConns.end (); iter++)
(*iter)->add (&read_set, &write_set, &exp_set);
}
- 如果监控的文件描述符可读或可写,则会进入
selectSuccess
,同样的,selectSuccess
也是一个虚函数,会先进入Daemon::selectSuccess
。
void Daemon::selectSuccess (fd_set &read_set, fd_set &write_set, fd_set &exp_set)
{
int client;
// accept connection on master
if (FD_ISSET (listen_sock, &read_set))
{
struct sockaddr_in6 other_side;
socklen_t addr_size = sizeof (struct sockaddr_in6);
client = accept (listen_sock, (struct sockaddr *) &other_side, &addr_size);
addConnectionSock (client);
}
rts2core::Block::selectSuccess (read_set, write_set, exp_set);
}
如果Daemon监听的listen_sock
被SETS了,则调用accept函数,接受其他client的连接,并且通过addConnectionSock
保存该连接。
接下来进入Block::selectSuccess
,遍历了connections
以及centraldConns
,清除其中断开的连接,代码较长,这里就不列举了。
- 可以通过复写idle()来指定在空闲时候要做的事情。
Daemon::idle()
处理了SIGHUP
信号,实现如下:
int Daemon::idle ()
{
if (doHupIdleLoop)
{
signaledHUP ();
doHupIdleLoop = false;
}
return rts2core::Block::idle ();
}
Block::idle()
则处理了较多的事务。
机制的不足以及改进
-
(其实可能反而是优势)对多线程的支持比较匮乏,比如不支持多个事件循环,对于EPICS的Channel Callback无法及时响应。
在BSST中,因为EPICS的回调函数都是在子线程中调用,因此为了避免锁以及数据冲突,只能先设置一个flag(其实这仍然不严谨),将真正EPICS的事务处理放到idle函数里处理,而且需要调快idle频率或者依赖别的事件。
改进:引入进程内通信机制(self notify),对于Linux可以用
eventfd
,考虑通用性可以用pipe
,每个connectino都存有一个这样的fd。在Block中将该fd加入到read_set中,并在selectSuccess中检查。当子线程有事件时可以激活该fd,从而主线程能及时处理。这种方式其实可以实现子线程向主线程传递信息。如今ioc pv量的变化可以更及时反应到rts2内部,且不依赖别的事件。
-
timer也没有与文件描述符关联
这个可能也不算不足,不过感觉他的实现方式精度确实可能不行。考虑timerfd?
RTS2组件通信流程
关于centrald与组件的连接
centrald与组件连接大致是这么个流程:
在 RTS2 中,最先启动的是 Centrald 核心组件,然后再启动设备组件或服务
组件。每个设备或服务都会主动连接 Centrald。Centrald 会将连接保存在自己的
connection vector 中,设备或其他的组件也会将 centrald 的连接保存在 Centrald
connection vector 中。这样启动过程就完成了。
上面简单介绍了设备及其他组件的启动及连接过程。设备和服务组件启动过
程是一致的,因为他们都是继承自 daemon 类。
首先,设备启动时,会主动连接 Centrald,设备类创建 DevConnectionMaster
连接类,然后会发送注册命令到 centrald:queSend(CommandRegister)。会将设
备名等信息注册到 Centrald 中。Centrald 中注册完后,会将该连接信息发送给所
有其他的连接,同时会将所有连接的信息发送给该连接。设备收到其他设备的信
息之后会相互连接。
————《EPICS 在望远镜控制系统中的应用研究》,吴文庆,2014
将这一过程用更直观的图来表示:
- 设备组件或服务组件连接centrald。设备组件的连接由
DevConnectionMaster
类负责,服务组件的连接由ConnCentraldClient
类负责,前文类图结构也已提及,ConnCentraldClient
和DevConnectionMaster
类都是继承自Connection
类且专门负责组件到centrald的连接。简化后的DevConnectionMaster::init
和ConnCentraldClient::init
如下,都在虚函数init
中实现了到centrald的连接。
int DevConnectionMaster::init ()
{
struct addrinfo hints = {0};
struct addrinfo *master_addr;
int ret;
hints.ai_flags = 0;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = 0;
ret = getaddrinfo (master_host, RTS2_CENTRALD_PORT, &hints, &master_addr);
sock = socket (master_addr->ai_family, master_addr->ai_socktype, master_addr->ai_protocol);
ret = fcntl (sock, F_SETFL, O_NONBLOCK);
ret = connect (sock, master_addr->ai_addr, master_addr->ai_addrlen);
freeaddrinfo (master_addr);
connConnected ();
return 0;
}
int ConnCentraldClient::init ()
{
int ret;
struct addrinfo hints = {0};
struct addrinfo *master_addr;
hints.ai_flags = 0;
hints.ai_family = AF_UNSPEC;
//hints.ai_family = AF_INET6;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = 0;
ret = getaddrinfo (master_host, master_port, &hints, &master_addr);
sock = socket (master_addr->ai_family, master_addr->ai_socktype, master_addr->ai_protocol);
ret = connect (sock, master_addr->ai_addr, master_addr->ai_addrlen);
freeaddrinfo (master_addr);
setConnState (CONN_CONNECTED);
queCommand (new CommandLogin (master, login, getName (), password));
return 0;
}
在Client
类和Device
类的init
函数中,则分别调用了上面两个连接类的init
int Device::init ()
{
...
std::list <HostString>::iterator iter = centraldHosts.begin ();
for (int i = 0; iter != centraldHosts.end (); iter++, i++)
{
Connection * conn_master = new DevConnectionMaster (this, device_host, getPort (), device_name, device_type, (*iter).getHostname (), (*iter).getPort (), i);
conn_master->init ();
addCentraldConnection (conn_master, true);
}
return initHardware ();
}
int Client::init ()
{
...
ConnCentraldClient *central_conn = createCentralConn ();
while (!getEndLoop ())
{
ret = central_conn->init ();
if (!ret || getEndLoop ())
break;
std::cerr << "Trying to contact centrald\n";
sleep (10);
}
addCentraldConnection (central_conn, true);
return 0;
}
流程分析案例
TODO
开发指南
基于Device类开发
为了实现自己的业务逻辑,可以通过继承Device
类来复写相应的函数。常见的需要override的函数包括commandAuthorized()
、init()
、initHardware()
、postEvent(Event *event)
、idle()
、createOtherType
等等。
-
init()
、initHardware()
实现相关初始化工作 - ``postEvent(Event *event)```,可以定义事件,并且实现对相关事件的处理
-
idle()
,可以事件循环的空闲期做一些检查等操作,比如Camera类在idle()时检查CCD状态,执行自动拍照。 -
commandAuthorized()
,定义命令接口,用于别的RTS2组件调用。 -
createOtherType()
,创建与不同类型设备的DevClient。
此外可能需要继承DevClient进行override,实现相应的postEvent
以及stateChanged
等,从而可以同步作业。
下面的示例代码示范了一些功能,包括:
- 如何定义事件
- 如何override
init()
、postEvent(Event *event)
、idle()
- 如何触发一次性事件以及周期性事件
- 如何正确处理事件
需要提前说明的是,addTimer (double timer_time, Event *event)
用于给定时器Block::timers
添加待处理的事件以及处理时间,其实现也很简单明了。
/**
* Add new user timer.
*
* @param timer_time Timer time in seconds, counted from now.
* @param event Event which will be posted for triger. Event argument
*
* @see Event
*/
void addTimer (double timer_time, Event *event)
{
timers[getNow () + timer_time] = event;
}
一次性事件只需要调用addTimer
一次,并在postEvent(Event *event)
中处理该事件即可;周期性事件则需要在postEvent(Event *event)
中再调用addTimer
,从而可以实现周期性的事件触发。
最后附上示例代码,关键语句已经注释:
#include <iostream>
#include "status.h"
#include "device.h"
#include "event.h"
#define EVENT_DEMO_ONCE (RTS2_LOCAL_EVENT + 10001)
#define EVENT_DEMO_PERIOD (EVENT_DEMO_ONCE + 1)
using namespace rts2core;
class TemplateDev : public Device {
public:
TemplateDev(int argc, char **argv);
virtual ~TemplateDev();
protected:
virtual int init() override;
virtual void postEvent(Event *) override;
int idle() override;
private:
ValueInteger *counter;
ValueDouble *setStep;
};
TemplateDev::TemplateDev(int argc, char **argv) : Device(argc, argv, DEVICE_TYPE_UNKNOW, "TEMPLATE_DEV") {
createValue(counter, "counter", "demo counter", false, RTS2_VALUE_INTEGER, 0);
createValue(setStep, "setStep", "count step value to be set", false, RTS2_VALUE_WRITABLE|RTS2_VALUE_INTEGER, 0);
setStep->setValueInteger(1);
setIdleInfoInterval(5); // update info every 5s
}
TemplateDev::~TemplateDev() {
std::cout << time(NULL) << ":" << "TemplateDev destructed" << std::endl;
}
int TemplateDev::init() {
int ret = Device::init();
std::cout << time(NULL) << ":" << "TemplateDev initializing" << std::endl;
addTimer(1, new Event(EVENT_DEMO_ONCE)); // trigger EVENT_DEMO_ONCE 1s later
addTimer(2, new Event(EVENT_DEMO_PERIOD)); // trigger EVENT_DEMO_PERIOD 2s later
return ret;
}
void TemplateDev::postEvent(Event *event) {
switch ( event->getType() ) {
case EVENT_DEMO_ONCE:
std::cout << time(NULL) << ":" << "Handle TemplateDev EVENT_DEMO_ONCE" << std::endl;
break;
case EVENT_DEMO_PERIOD:
std::cout << time(NULL) << ":" << "Handle TemplateDev EVENT_DEMO_PERIOD" << std::endl;
counter->setValueInteger(counter->getValueInteger() + setStep->getValueInteger());
std::cout << time(NULL) << ":" << "counter value:" << counter->getValueInteger() << std::endl;
addTimer(1, new Event(event)); // Triggered 1s later again
break;
}
//must call parent's postEvent method, which can delete event or memory leaking would happen
return Device::postEvent(event);
}
int TemplateDev::idle() {
std::cout << time(NULL) << ":" << "TemplateDev idle" << std::endl;
return rts2core::Device::idle();
}
int main(int argc, char **argv) {
TemplateDev dev(argc, argv);
return dev.run();
}
运行结果:
BSST模块实现的反思
- 本来的RTS2的Device类及子类如Executor是不存具体的DevClient作为成员变量的,但是我们的执行器类存了,并且会直接调用相应queCommand,类的关系变得微妙,本来的事件驱动、状态机被弱化,更加“中央集权”
- 原来的调焦继承自Focusd,但是Focusd是硬件Focuser的调焦而不是自动调焦。自动调焦似乎更适合继承自Focusc。调焦组件感觉也浪费资源,感觉作为client被临时创建更妥,或者直接执行器负责
- 执行器的模板定义复杂却又不灵活,这也是导致Focus这种流程不确定的得单独剥离开的。感觉当前的模板设计属于反面模式的SoftCoding
- 退一步、作为一个autonomous的模块,是否需要进行调焦不应该是软件自己判断的么。