一、SMACH容器
1.1 状态机容器
1.1.1 创建状态机容器
首先引入状态机容器
from smach import StateMachine
由于SMACH状态机还提供状态接口,因此必须在构造时指定其结果和用户数据交互。
sm = StateMachine(outcomes = ['outcome1', 'outcome2'],
input_keys = ['input1', 'input2'],
output_keys = ['output1' , 'output2'])
与SMACH状态接口类似,输入键和输出键是可选的。构造函数签名是:
__init__(self, outcomes, input_keys=[], output_keys = [])
1.1.2 添加状态
向状态机添加状态时,首先指定要向其添加状态的状态机。这可以通过使用Python的“with”语句来实现。你可以把它想象成“打开”容器进行施工。它创建了一个上下文,在该上下文中,所有后续的add*调用都将应用于打开的容器。
例如,要向名为“sm”的状态机添加两个状态,可以编写以下代码:
with sm:
StateMachine.add('FOO',
FooState(),
{'outcome2':'FOO',
'outcome3':'BAR'})
StateMachine.add('BAR',
BarState(),
{'outcome3':'FOO',
'outcome4':'outcome4'})
上面的示例添加了两个标记为“FOO”和“BAR”的状态,分别为“FooState”和“BarState”类型。静态add方法有可选参数。add方法的签名是:
add(label, state, transitions=None, remapping=None)
1.2 并发容器
from smach import Concurrence
1.2.1 指定并发结果
1)并发结果图
SMACH并发的结果映射指定了基于其子级的结果确定并发结果的策略。具体来说,map是一个字典,其中键是并发的潜在结果,值是将子标签映射到子结果的字典。一旦并发中的所有状态都已终止,如果满足其中一个子结果映射,则并发将返回其关联的结果。如果没有满足任何映射,并发将返回其默认结果。
cc = Concurrence(outcomes = ['outcome1', 'outcome2'],
default_outcome = 'outcome1',
input_keys = ['sm_input'],
output_keys = ['sm_output'],
outcome_map = {'succeeded':{'FOO':'succeeded',
'BAR':'outcome2'},
'outcome3':{'FOO':'outcome2'}})
with cc:
Concurrence.add('FOO', Foo())
Concurrence.add('BAR', Bar())
上面的示例指定了以下策略:
- 当'FOO'的结果为'succeed'且'BAR'的结果为'outcome2'时,状态机将以结果为'succeed'退出。
- 当'FOO'有结果“outcome2”时,状态机将以结果“outcome3”退出,与状态栏的结果无关。
1.2.2 回调
如果希望完全控制并发状态机,可以使用它提供的回调、子级终止和结果:
# gets called when ANY child state terminates
def child_term_cb(outcome_map):
# terminate all running states if FOO finished with outcome 'outcome3'
if outcome_map['FOO'] == 'outcome3':
return True
# terminate all running states if BAR finished
if outcome_map['BAR']:
return True
# in all other case, just keep running, don't terminate anything
return False
# gets called when ALL child states are terminated
def out_cb(outcome_map):
if outcome_map['FOO'] == 'succeeded':
return 'outcome1'
else:
return 'outcome2'
# creating the concurrence state machine
sm = Concurrence(outcomes=['outcome1', 'outcome2'],
default_outcome='outcome1',
input_keys=['sm_input'],
output_keys=['sm_output'],
child_termination_cb = child_term_cb,
outcome_cb = out_cb)
with sm:
Concurrence.add('FOO', Foo(),
remapping={'foo_in':'input'})
Concurrence.add('BAR', Bar(),
remapping={'bar_out':'bar_out'})
每当一个子状态终止时,就会调用子状态终止。在回调函数中,您可以决定状态机是应该继续运行(返回False),还是应该抢占所有剩余的运行状态(返回True)。
当最后一个子状态终止时,将调用一次结果_cb。此回调返回并发状态机的结果。
1.3 序列容器
1.3.1 创建序列容器
from smach import Sequence
容器序列有其在构造时指定的结果,以及用于自动转换的“连接器_结果”。构造函数签名是:
__init__(self, outcomes, connector_outcome):
1.3.2 添加状态
向序列添加状态与向容器添加状态相同。
但是,每个添加的状态都会收到一个从它到后面添加的状态的额外转换。过渡将遵循该容器建造时规定的结果。
如果dictionary mapping参数中给出的某个转换为'Sequence.add()'遵循构造函数中指定的连接器结果,提供的转换将覆盖自动生成的连接器转换。
1.3.3 举例
例如,当创建一系列动作状态时,不需要转换映射。所有行动状态都有通常的三重结果,序列也需要有三重结果:
sq = Sequence(
outcomes = ['succeeded','aborted','preempted'],
connector_outcome = 'succeeded')
with sq:
Sequence.add('MOVE_ARM_GRAB_PRE', MoveVerticalGripperPoseActionState())
Sequence.add('MOVE_GRIPPER_OPEN', MoveGripperState(GRIPPER_MAX_WIDTH))
Sequence.add('MOVE_ARM_GRAB', MoveVerticalGripperPoseActionState())
Sequence.add('MOVE_GRIPPER_CLOSE', MoveGripperState(grab_width))
Sequence.add('MOVE_ARM_GRAB_POST', MoveVerticalGripperPoseActionState())
1.4迭代容器
1.4.1 概述
迭代器允许您循环一个或多个状态,直到满足成功条件。本教程演示如何使用迭代器将数字列表排序为偶数和赔率。
import roslib; roslib.load_manifest('smach')
roslib.load_manifest('smach_ros')
import rospy
import smach
from smach import Iterator, StateMachine, CBState
from smach_ros import ConditionState, IntrospectionServer
def construct_sm():
sm = StateMachine(outcomes = ['succeeded','aborted','preempted'])
sm.userdata.nums = range(25, 88, 3)
sm.userdata.even_nums = []
sm.userdata.odd_nums = []
with sm:
tutorial_it = Iterator(outcomes = ['succeeded','preempted','aborted'],
input_keys = ['nums', 'even_nums', 'odd_nums'],
it = lambda: range(0, len(sm.userdata.nums)),
output_keys = ['even_nums', 'odd_nums'],
it_label = 'index',
exhausted_outcome = 'succeeded')
with tutorial_it:
container_sm = StateMachine(outcomes = ['succeeded','preempted','aborted','continue'],
input_keys = ['nums', 'index', 'even_nums', 'odd_nums'],
output_keys = ['even_nums', 'odd_nums'])
with container_sm:
#test wether even or odd
StateMachine.add('EVEN_OR_ODD',
ConditionState(cond_cb = lambda ud:ud.nums[ud.index]%2,
input_keys=['nums', 'index']),
{'true':'ODD',
'false':'EVEN' })
#add even state
@smach.cb_interface(input_keys=['nums', 'index', 'even_nums'],
output_keys=['odd_nums'],
outcomes=['succeeded'])
def even_cb(ud):
ud.even_nums.append(ud.nums[ud.index])
return 'succeeded'
StateMachine.add('EVEN', CBState(even_cb),
{'succeeded':'continue'})
#add odd state
@smach.cb_interface(input_keys=['nums', 'index', 'odd_nums'],
output_keys=['odd_nums'],
outcomes=['succeeded'])
def odd_cb(ud):
ud.odd_nums.append(ud.nums[ud.index])
return 'succeeded'
StateMachine.add('ODD', CBState(odd_cb),
{'succeeded':'continue'})
#close container_sm
Iterator.set_contained_state('CONTAINER_STATE',
container_sm,
loop_outcomes=['continue'])
#close the tutorial_it
StateMachine.add('TUTORIAL_IT',tutorial_it,
{'succeeded':'succeeded',
'aborted':'aborted'})
return sm
def main():
rospy.init_node("iterator_tutorial")
sm_iterator = construct_sm()
# Run state machine introspection server for smach viewer
intro_server = IntrospectionServer('iterator_tutorial',sm_iterator,'/ITERATOR_TUTORIAL')
intro_server.start()
outcome = sm_iterator.execute()
rospy.spin()
intro_server.stop()
if __name__ == "__main__":
main()
程序解释
从构造迭代器开始,构造函数采用以下参数:
__init__(self, outcomes, input_keys, output_keys, it=[], it_label='it_data', exhausted_outcome='exhausted')
在本例中,结果现在包括preempted,这是迭代器的默认结果。it参数是要迭代的对象列表,it_标签是保存it列表中项目的当前值的键。耗尽的参数应该设置为首选状态机结果,在本例中,当迭代器在it列表中循环完成时,迭代器结果成功。
tutorial_it = Iterator(outcomes = ['succeeded','preempted','aborted'],
input_keys = ['nums', 'even_nums', 'odd_nums'],
it = lambda: range(0, len(sm.userdata.nums)),
output_keys = ['even_nums', 'odd_nums'],
it_label = 'index',
exhausted_outcome = 'succeeded')
现在添加一个容器,并创建用于将列表排序为偶数和奇数的状态。
with tutorial_it:
container_sm = StateMachine(outcomes = ['succeeded','preempted','aborted','continue'],
input_keys = ['nums', 'index', 'even_nums', 'odd_nums'],
output_keys = ['even_nums', 'odd_nums'])
with container_sm:
#test wether even or odd
StateMachine.add('EVEN_OR_ODD',
ConditionState(cond_cb = lambda ud:ud.nums[ud.index]%2,
input_keys=['nums', 'index']),
{'true':'ODD',
'false':'EVEN' })
#add even state
@smach.cb_interface(input_keys=['nums', 'index', 'even_nums'],
output_keys=['odd_nums'],
outcomes=['succeeded'])
def even_cb(ud):
ud.even_nums.append(ud.nums[ud.index])
return 'succeeded'
StateMachine.add('EVEN', CBState(even_cb),
{'succeeded':'continue'})
#add odd state
@smach.cb_interface(input_keys=['nums', 'index', 'odd_nums'],
output_keys=['odd_nums'],
outcomes=['succeeded'])
def odd_cb(ud):
ud.odd_nums.append(ud.nums[ud.index])
return 'succeeded'
StateMachine.add('ODD', CBState(odd_cb),
{'succeeded':'continue'})
将容器添加到迭代器:
#close container_sm
Iterator.set_contained_state('CONTAINER_STATE',
container_sm,
loop_outcomes=['continue'])
通过将迭代器添加到顶级状态机来完成:
#close the tutorial_it
StateMachine.add('TUTORIAL_IT',tutorial_it,
{'succeeded':'succeeded',
'aborted':'aborted'})
1.5 使用actionlib包装容器
1.5.1 包装 SMACH 状态
SMACH提供了名为ActionServerWrapper的顶级容器。这一个类播发actionlib操作服务器。当操作服务器收到目标时,其包含的状态将变为活动状态,而不是由父级执行。因此,此容器不会从smach继承。状态基类,不能放在另一个容器中。
action server包装器可以将action server接收到的目标消息注入到包含状态,并在终止时从该状态提取结果消息。在构造操作服务器包装器时,用户指定哪些状态机结果对应于成功、中止或抢占的结果。
考虑这个例子,它将一个SMACH状态机封装为一个动作:
import rospy
from smach import StateMachine
from smach_ros import ActionServerWrapper
# Construct state machine
sm = StateMachine(outcomes=['did_something',
'did_something_else',
'aborted',
'preempted'])
with sm:
### Add states in here...
# Construct action server wrapper
asw = ActionServerWrapper(
'my_action_server_name', MyAction,
wrapped_container = sm,
succeeded_outcomes = ['did_something','did_something_else'],
aborted_outcomes = ['aborted'],
preempted_outcomes = ['preempted'] )
# Run the server in a background thread
asw.run_server()
# Wait for control-c
rospy.spin()
1.5.2 让目标/结果消息进入/退出包含状态
上面的代码将调用 sm.execute(),但它不会将目标加载到包含的状态机中,也不会提取结果。为了做这些事情,您需要告诉动作服务器包装器在 SMACH 的上下文中它应该调用什么目标和结果消息。您可以将操作服务器包装器构造调用替换为以下内容:
# Construct action server wrapper
asw = ActionServerWrapper(
'my_action_server_name', MyAction, sm,
['did_something','did_something_else'], ['aborted'], ['preempted'],
goal_key = 'my_awesome_goal',
result_key = 'egad_its_a_result' )
关键字参数goal_key和result_key是ActionServerWrapper上下文中的 SMACH 用户数据键。与任何其他容器一样,这意味着包装器的包含状态(在本例中为状态机sm)将在调用其 execute() 方法时接收对此用户数据结构的引用。与嵌套状态机中的范围之间传递用户数据的方式类似,在这种情况下,您也需要在状态机sm中设置这些键标识符。
为了从父级复制密钥,您可以将状态机sm的构造调用替换为:
# Construct state machine
sm = StateMachine(
outcomes=['did_something','did_something_else','aborted','preempted'],
input_keys = ['my_awesome_goal'],
output_keys = ['egad_its_a_result'])
完成此操作后,您可以从添加到sm的任何状态访问这些密钥。有关更多目标/结果策略,请参阅ActionServerWrapper API文档。
二.SMACH 状态
2.1 通用状态
from smach import State
创建状态的最通用(但通常效率最低)的方法是从“状态”基类派生。你几乎可以在状态的执行方法中做任何你想做的事情。知道 SMACH 带有一个有用的状态库,您可以使用它,而无需编写整个自定义状态。
class Foo(smach.State):
def __init__(self, outcomes=['outcome1', 'outcome2']):
# Your state initialization goes here
def execute(self, userdata):
# Your state execution goes here
if xxxx:
return 'outcome1'
else:
return 'outcome2'
2.2 CBState
from smach import CBState
2.2.1 描述
此状态在执行状态时仅执行单个回调。这对于在状态中执行任意代码很有用,而无需声明新的状态类。这个类支持使用smach.cb_interface装饰器,它的 API 可以在 这里找到。
CBState 使用至少一个参数调用回调:容器的用户数据。可以在构造时将其他参数和关键字参数提供给 CBState。当执行 CBState 时,这些参数将被传递到回调中。
以下示例将 CBState 添加到 SMACH StateMachine中,该状态机从 userdata 中获取一些参数和变量,并将它们的总和存储在 userdata 键“xyz”中:
@smach.cb_interface(input_keys=['q'],
output_keys=['xyz'],
outcomes=['foo'])
def my_cb(ud, x, y, z):
ud.xyz = ud.q + x + y + z
return 'foo'
...
with sm:
...
StateMachine.add('MY_CB', CBState(my_cb,
cb_args=[10],
cb_kwargs={'z':2,'y':3}),
{'foo':'OTHER_STATE'})
2.3 简单动作状态ROS
见前面
2.4 服务状态(ROS)
from smach_ros import ServiceState
您可以简单地从通用状态调用任何服务,但 SMACH 对调用服务有特定的支持,从而为您节省大量代码!SMACH 提供了一个状态类,作为ROS 服务的代理。状态的实例化采用服务名称、服务类型和一些用于生成服务请求的策略。服务状态的可能结果是“成功”、“抢占”和“中止”。
/!\服务状态几乎与简单动作状态相同。只需将“目标”替换为“请求”,将“结果”替换为“响应”即可。
2.4.1 要求
1)空请求消息
sm = StateMachine(['succeeded','aborted','preempted'])
with sm:
smach.StateMachine.add('TRIGGER_GRIPPER',
ServiceState('service_name',
GripperSrv),
transitions={'succeeded':'APPROACH_PLUG'})
2)固定请求消息
sm = StateMachine(['succeeded','aborted','preempted'])
with sm:
smach.StateMachine.add('TRIGGER_GRIPPER',
ServiceState('service_name',
GripperSrv,
request = GripperSrv(9.0)),
transitions={'succeeded':'APPROACH_PLUG'})
3)来自用书数据的请求
sm = StateMachine(['succeeded','aborted','preempted'])
with sm:
smach.StateMachine.add('TRIGGER_GRIPPER',
ServiceState('service_name',
GripperSrv,
request_slots = ['max_effort',
'position']),
transitions={'succeeded':'APPROACH_PLUG'})
4)请求回调
sm = StateMachine(['succeeded','aborted','preempted'])
with sm:
@smach.cb_interface(input_keys=['gripper_input'])
def gripper_request_cb(userdata, request):
gripper_request = GripperSrv().Request
gripper_request.position.x = 2.0
gripper_request.max_effort = userdata.gripper_input
return gripper_request
smach.StateMachine.add('TRIGGER_GRIPPER',
ServiceState('service_name',
GripperSrv,
request_cb = gripper_request_cb,
input_keys = ['gripper_input']),
transitions={'succeeded':'APPROACH_PLUG'})
2.4.2 回复
1)对用户数据的响应
2)响应回调
2.5 监控状态(ROS)
这是一个使用MonitorState的简单状态机示例。为了从状态 Foo 转换到状态 Bar,向 /sm_reset 主题发送消息:
rostopic pub -1 /sm_reset std_msgs/Empty
1) 代码
#!/usr/bin/env python
import roslib; roslib.load_manifest('smach_MonitorState_example')
import rospy
import smach
import smach_ros
from std_msgs.msg import Empty
class bar(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['bar_succeeded'])
def execute(self, userdata):
rospy.sleep(3.0)
return 'bar_succeeded'
def monitor_cb(ud, msg):
return False
def main():
rospy.init_node("monitor_example")
sm = smach.StateMachine(outcomes=['DONE'])
with sm:
smach.StateMachine.add('FOO', smach_ros.MonitorState("/sm_reset", Empty, monitor_cb), transitions={'invalid':'BAR', 'valid':'FOO', 'preempted':'FOO'})
smach.StateMachine.add('BAR',bar(), transitions={'bar_succeeded':'FOO'})
sis = smach_ros.IntrospectionServer('smach_server', sm, '/SM_ROOT')
sis.start()
sm.execute()
rospy.spin()
sis.stop()
if __name__=="__main__":
main()
2) 带注释的代码
def monitor_cb(ud, msg):
return False
参数是用户数据和消息。当我们希望监视器状态终止时,这需要返回 False。在这种情况下,监视器状态将返回“无效”
sm = smach.StateMachine(outcomes=['DONE'])
with sm:
smach.StateMachine.add('FOO',
smach_ros.MonitorState("/sm_reset",
Empty,
monitor_cb),
transitions={'invalid':'BAR',
'valid':'FOO',
'preempted':'FOO'})
请注意,您的MonitorState需要为所有三种可能的结果(“有效”、“无效”、“抢占”)定义转换。
从MonitorState的源代码:
def __init__(self, topic, msg_type, cond_cb, max_checks=-1)
max_checks 参数是在MonitorState将返回“有效”之前可以调用多少次 monitor_cb 的限制。