1.4 DDS介绍
Core:提供Qos定义,支持中间件基于通知的交互样式
Domain:包含DomainParticipant
Publisher: 包含Publisher、DataWriter、PublisherListener、DataWriterListener
Subscriber:包含Subscriber、DataReader、SubscriberListener、DataReaderListener
Topic:定义topic和dataTypes,包含Topic、TopicDescription、TypeSupport、TopicListener
1.4.1 Core
该模块定义了其他类将使用的基础结构类和类型,包括Entity类,QoS,Statues
1.4.1.1 Entity
每个entity都有一个唯一的ID
每个entity的行为都可以用一组qos来配置,
Listener充当异步通知系统,允许entity将entity的状态更改通知到应用程序。所有对象都定义了一个抽象监听接口,这个接口包含回调函数,函数的作用是将entity状态的变更传给应用程序。当有时间发生时,由最低级别的实体处理,高级的监听器集成低级监听器。
每个entity都有一系列状态值,状态的值表示通信的状态,这些状态的改变将触发适当的监听器的回调调用,异步通知应用程序
每个entity都有1个静态状态(StatusCondition),在启用状态发生变化时被通知
1.4.1.2 Policy
22个标准QOS,FAST DDS扩展了14个QOS,XTypes 扩展了2个。
每个QOS都包含唯一的ID
Policy介绍
DeadLineQosPolicy[period]:如果DataWriter和DataReader都使用这个QOS,那么DataWriter时间<=DataReader的
DestinationOrderQosPolicy[kind]:如果DataWriter和DataReader都使用这个QOS,那么DataWriter的kind>=DataReader的
DurabilityQosPolicy[kind]:即使没有DataReader,DataWriter也可以通过topic发送数据,这个QOS就规定了在没有DataReader这段时间内,系统如何处理这些数据
VOLATILE_DURABILITY_QOS:过期的数据将被忽略,新连接的DataReader接收匹配之后的数据
VOLATILE_DURABILITY_QOS:DataReader加入时,过期的数据写入history
TRANSIENT_DURABILITY_QOS: DataReader加入时,过期的数据写入history,并且这些数据持久存储中存储
PERSISTENT_DURABILITY_QOS:(还没实现),所有的数据都存储在持久存储中
如果DataWriter和DataReader都使用这个QOS,那么DataWriter的kind>=DataReader的
DurabilityServiceQoSPolicy:还未实现,当DurabilityQoSPolicy设置为TRANSIENT_DURABILITY_QOS或者PERSISTENT_DURABILITY_QOS,该QOS是为了配置虚构的DataReader和DataWriter 的HistoryQosPolicy和 ResourceLimitsQosPolicy 这些对象用于模拟为永久存储,虚构的DataReader读取写在topic上的数据,并将其存储,因此,如果DataWriter没有DataReader请求的数据时,虚构的DataReader将负责发送数据。
EntityFactoryQosPolicy:可以改变实体对象创建后是否是激活状态
默认所有实体对象都在创建时是启用的
GroupDataQosPolicy:可与DataWriter和DataReader的监听器结合使用,类似于PartitionQosPolicy。
HstoryQosPlicy:
KEEP_LAST_HISTORY_QOS:只保留最新值,丢弃旧的值
KEEP_ALL_HISTORY_QOS: 尝试保留所有的值,直到它可以被交付给所有的DataReader
LatencyBudgetQosPolicy:该QOS制定了写入数据到将数据插入到DataReader historyging通知之间的可以接受的最大延迟。默认值为01.4.1.3
1.4.1.3 Status
每个实体都与一组状态对象相关联,其值表示该实体的通信状态。状态值得变化时由于与每个实体相关的通信事件而发生的。状态被分解为几个状态对象,每个状态对象都涉及通信的不同方面,因此每个状态对象都可以独立于其他状态对象而变化。
状态对象的更改会出发相应的Listener回调,允许Entity将事件通知应用程序。
条件和Wait-sets为应用程序提供了一种替代机制,通过状态条件使其意识到状态对象的更改。
通知不是在内部线程上处理的,而在使用Listener时处理的。
1.4.1.4 Conditions and Wait-sets
conditions和wait-sets提供了另一种机制,允许中间件想应用程序通知通信状态更改(包括数据的到达)
应用程序通过attach_condition()方法调用条件对象(GuardCondition、StatusCondition、ReadCondition),并将他们附加到wait-sets,来指示它想要获得哪些相关信息。然后通过调用wait()等待,直到wait_sets中的conditions对象的触发器的值变为TRUE。
1.4.2 Domain
域代表单独的一个通信层。它在共享公共通信基础设施的实体之间创建了逻辑分离。从概念上讲,可以被视为一个虚拟网络,连接在同一个域中的应用程序和其他域中的应用程序隔离开来。通过这种方式,几个独立的分布式应用程序可以在同一个物理网络中共存。每一个域都有一个domainId的唯一标识。要讲应用程序添加到域,必须创建一个domainId和DomainParticipant实例。在域中引入了另一个实体隔离级别是分区。可以将同一个域中的发布者和订阅者隔离开来,分配在不同的区。
DomainParticipant
DomainParticipant是应用程序到域的入口,创建DomainParticipant后就会链接到1个域,DomainParticipant的行为可以通过DomainParticipantQos上指定的Qos值进行修改。
DomainParticipantListener
DomianParticipantListener是一个抽象类,DomainParticipant的状态改变时将触发回调,默认情况下,所有这下回调都是空的,需要用户去实现。
DomainParticipantListener继承自TopicListener、PublisherListener和SubscriberListener。因此,它有能力对报告给任何附加实体的各种事件作出反应。DomainParticipantListener从其他监听器继承的回调只会在没有其他实体能够处理事件的情况下被调用,要么是因为它没有附加的监听器,要么是因为实体上的状态掩码禁用了回调。
DomainParticipantFactory
允许创建和销毁DomainParticipant对象。它的行为可以通过qos值进行修改。
根据DDSI-RTPS V2.2 domainId不能大于200
删除DomainParticipant必须保证它下面所有的对象都被删除了
Partitions
分区在由域引起的物理隔离中引入了逻辑实体隔离级别的概念。它们代表了在域和主题之外分隔发布者和订阅者的另一个层次。要使发布服务器与订阅服务器通信,它们必须至少属于一个公共分区。
如果发布服务器切换了分区,然后需要再次重新发送一些旧的更改,它将把它交付给新的分区集,而不管创建更改时定义了哪些分区。
与Domain和Topic不同,一个端点可以同时属于多个分区。对于要在不同的Topic上共享的某些数据,每个Topic必须有不同的Publisher,每个Topic共享自己的更改历史。另一方面,单个发布者可以使用单个主题数据更改在不同的分区上共享相同的数据,从而减少网络过载。
可以通过通配符设置分区的名称
1.4.3 Publisher
PublisherListener继承自DataWriterListener,因此,它能够给报告给DataWriter的所有事件做出反应,但是也是只有在DataWriter没有附加的Listener或者是StatusMask禁用了回调,才会调用PublisherListener。
删除Publish
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{
// Error
return;
}
// Create a Publisher with default PublisherQos and no Listener
// The value PUBLISHER_QOS_DEFAULT is used to denote the default QoS.
Publisher* publisher_with_default_qos =
participant->create_publisher(PUBLISHER_QOS_DEFAULT);
if (nullptr == publisher_with_default_qos)
{
// Error
return;
}
// A custom PublisherQos can be provided to the creation method
PublisherQos custom_qos;
// Modify QoS attributes
// (...)
Publisher* publisher_with_custom_qos =
participant->create_publisher(custom_qos);
if (nullptr == publisher_with_custom_qos)
{
// Error
return;
}
// Create a Publisher with default QoS and a custom Listener.
// CustomPublisherListener inherits from PublisherListener.
// The value PUBLISHER_QOS_DEFAULT is used to denote the default QoS.
CustomPublisherListener custom_listener;
Publisher* publisher_with_default_qos_and_custom_listener =
participant->create_publisher(PUBLISHER_QOS_DEFAULT, &custom_listener);
if (nullptr == publisher_with_default_qos_and_custom_listener)
{
// Error
return;
}
而且必须先把DataWriter删除了
1.4.3.1 DataWriter
DataWriter创建之前就必须存在topic,且topic必须与发布的数据绑定。
1.4.3.2 DataWriterListener
on_publication_matched(): DataWriter已经找到与主题匹配的DataReader,并且具有公共分区和兼容的QoS,或者已经停止与先前被认为匹配的DataReader进行匹配。
1.4.3.3 Publishing Data
通过DataWriter上的write()成员函数通知数据实例的更改。然后将此更改传递给与DataWriter匹配的DataReader。
// Register the data type in the DomainParticipant.
TypeSupport custom_type_support(new CustomDataType());
custom_type_support.register_type(participant, custom_type_support.get_type_name());
// Create a Topic with the registered type.
Topic* custom_topic =
participant->create_topic("topic_name", custom_type_support.get_type_name(), TOPIC_QOS_DEFAULT);
if (nullptr == custom_topic)
{
// Error
return;
}
// Create a DataWriter
DataWriter* data_writer =
publisher->create_datawriter(custom_topic, DATAWRITER_QOS_DEFAULT);
if (nullptr == data_writer)
{
// Error
return;
}
// Get a data instance
void* data = custom_type_support->createData();
// Fill the data values
// (...)
// Publish the new value, deduce the instance handle
if (data_writer->write(data, eprosima::fastrtps::rtps::InstanceHandle_t()) != ReturnCode_t::RETCODE_OK)
{
// Error
return;
}
// The data instance can be reused to publish new values,
// but delete it at the end to avoid leaks
custom_type_support->deleteData(data);
1.4.4 Subscriber
当Subscriber收到数据,它通知应用程序有新的数据到达,然后应用程序可以使用DataReader来接收数据。
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{
// Error
return;
}
// Create a Subscriber with default SubscriberQos
Subscriber* subscriber =
participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT);
if (nullptr == subscriber)
{
// Error
return;
}
// Get the current QoS or create a new one from scratch
SubscriberQos qos = subscriber->get_qos();
// Modify QoS attributes
qos.entity_factory().autoenable_created_entities = false;
// Assign the new Qos to the object
subscriber->set_qos(qos);
1个subscriber下多个DataReader,多个DataReader之间除了订阅该subscriber的QOS之外,其他没有任何联系。
subscriberListener继承自DataReaderListener
删除全部的DataReader之后才可以删除subscriber
1.4.4.1 DataReader
DataReader自创建的时候就绑定一个topic,topic必须在DataReader创建之前就存在,且有数据存在
// Create a DataReader with default DataReaderQos and no Listener
// The value DATAREADER_QOS_DEFAULT is used to denote the default QoS.
DataReader* data_reader_with_default_qos =
subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
if (nullptr == data_reader_with_default_qos)
{
// Error
return;
}
// A custom DataReaderQos can be provided to the creation method
DataReaderQos custom_qos;
// Modify QoS attributes
// (...)
DataReader* data_reader_with_custom_qos =
subscriber->create_datareader(topic, custom_qos);
if (nullptr == data_reader_with_custom_qos)
{
// Error
return;
}
// Create a DataReader with default QoS and a custom Listener.
// CustomDataReaderListener inherits from DataReaderListener.
// The value DATAREADER_QOS_DEFAULT is used to denote the default QoS.
CustomDataReaderListener custom_listener;
DataReader* data_reader_with_default_qos_and_custom_listener =
subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT, &custom_listener);
if (nullptr == data_reader_with_default_qos_and_custom_listener)
{
// Error
return;
}
只有当属于DataReader的所有实体都删除之后DataReader才能删除。
1.4.4.2 SampleInfo
在DataReader中返回的样例中,除了示例数据之外还返回一个SampleInfo实例,该对象包含补充返回数据值得附加信息,并帮助解释数据,如原始的DataWriter或发布的时间戳。SampleInfo的数据成员包括:
sample_state:表示以前是否已读取了相应的数据样例,有两个参数(READE:第一次,NOT_READE:以前就已经读过)
view_state:是否是这个数据实例的第一个样例,有两个参数(NEW:是,NOT_NEW:不是)
instance_state:表当前实例是存在还是被释放了,有三个参数(ALIVE:存在,NOT_ALIVE_DISPOSED:远程DataWriter已经释放,NOT_ALIVE_NO_WRITERS:DataReader释放了实例,因为远程的DataWriter不活跃)
disposed_generation_count:实例在释放后变为活跃状态的次数
no_writers_generation_count:实例在被处理为NOT_ALIVE_NO_WRITERS后变为活跃的次数
sample_rank:(还未实现)
generation_rank:(还未实现)
absolute_generation_rank:(还未实现)
source_timestamp:保存DataWriter在样本发布时提供的时间戳
instance_handle:本地实例的句柄
publication_handle:发布数据更改的DataWriter的句柄
valid_data:表示数据样例是否包含值得更改,false表示用于传递实例状态的变化。
应用程序可以通过两种方式获取数据,第一种是依赖listener来获取新的数据值,第二种是设置一个线程用于等待DataReader上有任何可用的数据,可以使用一个等待集(wait-sets)来等待DataAvailable状态的更改。
1.4.5 TOPIC
根据定义。一个topic对应单一的dataType,因此每个与topic相关的data sample都可以理解为dataType所描述的信息的更新,但是也可以包含逻辑分离,在同一个topic中有引用同一DataType的多个data sample。因此,接收到的data sample表示该topic某个特定实例的更新。因此,topic标识单一类型的数据,范围是产品能够单个实例到规定类型的整个实例的集合。
topic可以带key,也可以不带。m_isGetKeyDefined()设置为TRUE。携带key,设置为FALSE,不携带key
三种方式可以实现在TopicDataType 上携带key:
在使用Fast DDS-Gen时,向组成IDL文件中键的成员添加@Key注释。
使用动态主题类型时,向成员及其父成员添加属性Key。
在TopicDataType上手动实现getKey()成员函数,并将m_isGetKeyDefined数据成员值设置为true。
带key的数据类型用于在单个topic上定义数据子流。在同一Topic上具有相同键的数据值表示来自同一子流的数据,而在同一Topic上具有不同键的数据值表示来自不同子流的数据。
同一个topic下不同的data sample可以通过一个或多个数据字段来区分,这些字段构成key。规则为:具有相同key的不同数据值标识同一实例的连续数据样本,具有不同key的不弄数据值表示不同topic的实例。如果没有key,则表示该topic的单个实例。
单个实例的instance_state和view_state的状态图:
1.4.5.1 Topic Description
topic description是一个抽象类,用作描述数据流的所有类的基础。
1.4.5.2 ContentFilteredTopic
是一个具有过滤属性的主题,订阅topic的同时,指定数据集
只可以在创建DataReader时使用,DataWriter不能使用
可以有筛选表达式,也提供表达式参数列表
创建contentFilteredTopic
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{
// Error
return;
}
// Create the Topic.
/* IDL
*
* struct HelloWorld
* {
* long index;
* string message;
* }
*
*/
Topic* topic =
participant->create_topic("HelloWorldTopic", "HelloWorld", TOPIC_QOS_DEFAULT);
if (nullptr == topic)
{
// Error
return;
}
// Create a ContentFilteredTopic using an expression with no parameters
std::string expression = "message like 'Hello*'";
std::vector<std::string> parameters;
ContentFilteredTopic* filter_topic =
participant->create_contentfilteredtopic("HelloWorldFilteredTopic1", topic, expression, parameters);
if (nullptr == filter_topic)
{
// Error
return;
}
// Create a ContentFilteredTopic using an expression with parameters
expression = "message like %0 or index > %1";
parameters.push_back("'*world*'");
parameters.push_back("20");
ContentFilteredTopic* filter_topic_with_parameters =
participant->create_contentfilteredtopic("HelloWorldFilteredTopic2", topic, expression, parameters);
if (nullptr == filter_topic_with_parameters)
{
// Error
return;
}
// The ContentFilteredTopic instances can then be used to create DataReader objects.
Subscriber* subscriber =
participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT);
if (nullptr == subscriber)
{
// Error
return;
}
DataReader* reader_on_filter = subscriber->create_datareader(filter_topic, DATAREADER_QOS_DEFAULT);
if (nullptr == reader_on_filter)
{
// Error
return;
}
DataReader* reader_on_filter_with_parameters =
subscriber->create_datareader(filter_topic_with_parameters, DATAREADER_QOS_DEFAULT);
if (nullptr == reader_on_filter_with_parameters)
{
// Error
return;
}
更新r expression and parameters
// This lambda prints all the information of a ContentFilteredTopic
auto print_filter_info = [](
const ContentFilteredTopic* filter_topic)
{
std::cout << "ContentFilteredTopic info for '" << filter_topic->get_name() << "':" << std::endl;
std::cout << " - Related Topic: " << filter_topic->get_related_topic()->get_name() << std::endl;
std::cout << " - Expression: " << filter_topic->get_filter_expression() << std::endl;
std::cout << " - Parameters:" << std::endl;
std::vector<std::string> parameters;
filter_topic->get_expression_parameters(parameters);
size_t i = 0;
for (const std::string& parameter : parameters)
{
std::cout << " " << i++ << ": " << parameter << std::endl;
}
};
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{
// Error
return;
}
// Create a Topic
/* IDL
*
* struct HelloWorld
* {
* long index;
* string message;
* }
*
*/
Topic* topic =
participant->create_topic("HelloWorldTopic", "HelloWorldTopic", TOPIC_QOS_DEFAULT);
if (nullptr == topic)
{
// Error
return;
}
// Create a ContentFilteredTopic
ContentFilteredTopic* filter_topic =
participant->create_contentfilteredtopic("HelloWorldFilteredTopic", topic, "index > 10", {});
if (nullptr == filter_topic)
{
// Error
return;
}
// Print the information
print_filter_info(filter_topic);
// Use the ContentFilteredTopic on DataReader objects.
// (...)
// Update the expression
if (ReturnCode_t::RETCODE_OK !=
filter_topic->set_filter_expression("message like %0 or index > %1", {"'Hello*'", "15"}))
{
// Error
return;
}
// Print the updated information
print_filter_info(filter_topic);
// Update the parameters
if (ReturnCode_t::RETCODE_OK !=
filter_topic->set_expression_parameters({"'*world*'", "222"}))
{
// Error
return;
}
// Print the updated information
print_filter_info(filter_topic);
1.4.5.3 Definition of data Type
topic中交换的数据类型的定义为两个类:TypeSupport 和TopicDataType.
TopicDataType.描述发布和订阅之间交换的数据类型,即与topic对应的数据。
TypeSupport对象封装了TopicDataType的示例,提供了注册类型和与发布和订阅交互所需的函数。要注册数据类型,请使用TopicDataType实例创建一个新的TypeSupport,并在TypeSupport上使用register_type()成员函数。
不允许在相同的DomainParticipant上以相同的名称注册两个不同的数据类型,但是允许在同一个DomainParticipant中注册相同的数据类型,使用相同或不同的名称。如果相同的数据类型在相同的DomainParticipant上以相同的民初隔行注册两次,那么第二次注册将不起作用,但不会发生错误。
/ Create a DomainParticipant in the desired domain
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{
// Error
return;
}
// Register the data type in the DomainParticipant.
// If nullptr is used as name argument, the one returned by the type itself is used
TypeSupport custom_type_support(new CustomDataType());
custom_type_support.register_type(participant, nullptr);
// The previous instruction is equivalent to the following one
// Even if we are registering the same data type with the same name twice, no error will be issued
custom_type_support.register_type(participant, custom_type_support.get_type_name());
// Create a Topic with the registered type.
Topic* topic =
participant->create_topic("topic_name", custom_type_support.get_type_name(), TOPIC_QOS_DEFAULT);
if (nullptr == topic)
{
// Error
return;
}
// Create an alias for the same data type using a different name.
custom_type_support.register_type(participant, "data_type_name");
// We can now use the aliased name to If no name is given, it uses the name returned by the type itself
Topic* another_topic =
participant->create_topic("other_topic_name", "data_type_name", TOPIC_QOS_DEFAULT);
if (nullptr == another_topic)
{
// Error
return;
}
1.4.5.4 动态DataType
可以根据DDS接口的OMG可扩展和动态主题类型动态定义数据类型。
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{
// Error
return;
}
// Load the XML file with the type description
eprosima::fastrtps::xmlparser::XMLProfileManager::loadXMLFile("example_type.xml");
// Retrieve the an instance of the desired type and register it
eprosima::fastrtps::types::DynamicType_ptr dyn_type =
eprosima::fastrtps::xmlparser::XMLProfileManager::getDynamicTypeByName("DynamicType")->build();
TypeSupport dyn_type_support(new eprosima::fastrtps::types::DynamicPubSubType(dyn_type));
dyn_type_support.register_type(participant, nullptr);
// Create a Topic with the registered type.
Topic* topic =
participant->create_topic("topic_name", dyn_type_support.get_type_name(), TOPIC_QOS_DEFAULT);
if (nullptr == topic)
{
// Error
return;
}
1.4.5.5 创建topic
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{
// Error
return;
}
// Create a Topic with default TopicQos and no Listener
// The symbol TOPIC_QOS_DEFAULT is used to denote the default QoS.
Topic* topic_with_default_qos =
participant->create_topic("TopicName", "DataTypeName", TOPIC_QOS_DEFAULT);
if (nullptr == topic_with_default_qos)
{
// Error
return;
}
// A custom TopicQos can be provided to the creation method
TopicQos custom_qos;
// Modify QoS attributes
// (...)
Topic* topic_with_custom_qos =
participant->create_topic("TopicName", "DataTypeName", custom_qos);
if (nullptr == topic_with_custom_qos)
{
// Error
return;
}
// Create a Topic with default QoS and a custom Listener.
// CustomTopicListener inherits from TopicListener.
// The symbol TOPIC_QOS_DEFAULT is used to denote the default QoS.
CustomTopicListener custom_listener;
Topic* topic_with_default_qos_and_custom_listener =
participant->create_topic("TopicName", "DataTypeName", TOPIC_QOS_DEFAULT, &custom_listener);
if (nullptr == topic_with_default_qos_and_custom_listener)
{
// Error
return;
}
通过xml方式创建
// First load the XML with the profiles
DomainParticipantFactory::get_instance()->load_XML_profiles_file("profiles.xml");
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{
// Error
return;
}
// Create a Topic using a profile and no Listener
Topic* topic_with_profile =
participant->create_topic_with_profile("TopicName", "DataTypeName", "topic_profile");
if (nullptr == topic_with_profile)
{
// Error
return;
}
// Create a Topic using a profile and a custom Listener.
// CustomTopicListener inherits from TopicListener.
CustomTopicListener custom_listener;
Topic* topic_with_profile_and_custom_listener =
participant->create_topic_with_profile("TopicName", "DataTypeName", "topic_profile", &custom_listener);
if (nullptr == topic_with_profile_and_custom_listener)
{
// Error
return;
}
1.4.5.6 过滤Topic 的数据
创建ContentFilteredTopic
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{
// Error
return;
}
// Create the Topic.
/* IDL
*
* struct HelloWorld
* {
* long index;
* string message;
* }
*
*/
Topic* topic =
participant->create_topic("HelloWorldTopic", "HelloWorld", TOPIC_QOS_DEFAULT);
if (nullptr == topic)
{
// Error
return;
}
// Create a ContentFilteredTopic using an expression with no parameters
std::string expression = "message like 'Hello*'";
std::vector<std::string> parameters;
ContentFilteredTopic* filter_topic =
participant->create_contentfilteredtopic("HelloWorldFilteredTopic1", topic, expression, parameters);
if (nullptr == filter_topic)
{
// Error
return;
}
// Create a ContentFilteredTopic using an expression with parameters
expression = "message like %0 or index > %1";
parameters.push_back("'*world*'");
parameters.push_back("20");
ContentFilteredTopic* filter_topic_with_parameters =
participant->create_contentfilteredtopic("HelloWorldFilteredTopic2", topic, expression, parameters);
if (nullptr == filter_topic_with_parameters)
{
// Error
return;
}
// The ContentFilteredTopic instances can then be used to create DataReader objects.
Subscriber* subscriber =
participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT);
if (nullptr == subscriber)
{
// Error
return;
}
DataReader* reader_on_filter = subscriber->create_datareader(filter_topic, DATAREADER_QOS_DEFAULT);
if (nullptr == reader_on_filter)
{
// Error
return;
}
DataReader* reader_on_filter_with_parameters =
subscriber->create_datareader(filter_topic_with_parameters, DATAREADER_QOS_DEFAULT);
if (nullptr == reader_on_filter_with_parameters)
{
// Error
return;
}
更新过滤条件和表达式
// This lambda prints all the information of a ContentFilteredTopic
auto print_filter_info = [](
const ContentFilteredTopic* filter_topic)
{
std::cout << "ContentFilteredTopic info for '" << filter_topic->get_name() << "':" << std::endl;
std::cout << " - Related Topic: " << filter_topic->get_related_topic()->get_name() << std::endl;
std::cout << " - Expression: " << filter_topic->get_filter_expression() << std::endl;
std::cout << " - Parameters:" << std::endl;
std::vector<std::string> parameters;
filter_topic->get_expression_parameters(parameters);
size_t i = 0;
for (const std::string& parameter : parameters)
{
std::cout << " " << i++ << ": " << parameter << std::endl;
}
};
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{
// Error
return;
}
// Create a Topic
/* IDL
*
* struct HelloWorld
* {
* long index;
* string message;
* }
*
*/
Topic* topic =
participant->create_topic("HelloWorldTopic", "HelloWorldTopic", TOPIC_QOS_DEFAULT);
if (nullptr == topic)
{
// Error
return;
}
// Create a ContentFilteredTopic
ContentFilteredTopic* filter_topic =
participant->create_contentfilteredtopic("HelloWorldFilteredTopic", topic, "index > 10", {});
if (nullptr == filter_topic)
{
// Error
return;
}
// Print the information
print_filter_info(filter_topic);
// Use the ContentFilteredTopic on DataReader objects.
// (...)
// Update the expression
if (ReturnCode_t::RETCODE_OK !=
filter_topic->set_filter_expression("message like %0 or index > %1", {"'Hello*'", "15"}))
{
// Error
return;
}
// Print the updated information
print_filter_info(filter_topic);
// Update the parameters
if (ReturnCode_t::RETCODE_OK !=
filter_topic->set_expression_parameters({"'*world*'", "222"}))
{
// Error
return;
}
// Print the updated information
print_filter_info(filter_topic);
1.4.5.7 过滤应该安装到writer端还是reader端?
两边都可以。
DataWriter可以在发现过程中从DataReader中获得过滤器的表达式。
也可以在DataWriter中以增加CPU的使用率为代价的方式节省网络带宽。
写入端过滤的条件:
当满足以下所有条件时,DataWriter将在DataReader中执行过滤器评估;过滤将由DataReader执行。
DataWriter具有无限的活力
与DataReader的通信既不是进程内的,也不是数据共享的。
DataReader没有使用多播。
DataWriter过滤的datareader数量不超过reader_filters_allocation上设置的最大值。
DataWriterQos上有一个资源限制策略。控制写入端资源的分配行为。最大值为0时将禁用写入端的过滤,最大值为32时写入端将执行最大的过滤数。
1.4.5.8 发现竞态条件
在更新过滤器表达式和参数的应用程序中,DataWriter会应用旧版本的过滤器,直到通过发现接收到更新的信息。这就意味着在DataReader更新过滤器很短的时间内,但在DataReader接收到更新的信息之前所做的发布可能不会被发送到DataReader,即使更新的过滤器会告知相反的结果。接收到更新之后的信息之后,才使用新的。
如果不想接受这个过程,就将reader_filters_allocation设置为0,禁用写侧端的过滤。
1.4.6 源码生成工具FastDDS-GEN
简化了将数据类型的IDL规范转换为函数实现的过程。该工具自动生成使用IDL定义的数据类型的源码。
输出文件:
假设IDL文件的名称为“MyType"
MyType.cxx/.h:类型定义。
MyTypePubSubTpe.cxx/.h:数据类型的序列化和反序列化源代码。它还定义了MyTypePubSubType类的getKey()成员函数,以防主题实现键(参见带键的数据类型)。