1. Creating an empty scene
launch the simulator
通过如下代码将command-line options传入。
import argparse
from isaaclab.app import AppLauncher
# create argparser
parser = argparse.ArgumentParser(description="Tutorial on creating an empty stage.")
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
设置simulation context
from isaaclab.sim import SimulationCfg, SimulationContext
# Initialize the simulation context
sim_cfg = SimulationCfg(dt=0.01)
sim = SimulationContext(sim_cfg)
# Set main camera
sim.set_camera_view([2.5, 2.5, 2.5], [0.0, 0.0, 0.0])
运行simulation
# Play the simulator
sim.reset()
# Now we are ready!
print("[INFO]: Setup complete...")
# Simulate physics
while simulation_app.is_running():
# perform step
sim.step()
退出simulation
# close sim app
simulation_app.close()
2. Spawning prims into the scene
USD的概念:
- Prims: USD scene的基础模块,也可以视为scene graph的nodes
- Attribute:prim的属性
- Relationships:prims之间的关系,也可以视为nodes之间的pointers
每个USD stage就是以上一系列的组合,可以视为一个scene中,所有prim的容器。
spawn prim通常先设置config,后spawn或使用config的func。
# Create a configuration class instance
cfg = MyPrimCfg()
prim_path = "/path/to/prim"
# Spawn the prim into the scene using the corresponding spawner function
spawn_my_prim(prim_path, cfg, translation=[0, 0, 0], orientation=[1, 0, 0, 0], scale=[1, 1, 1])
# OR
# Use the spawner function directly from the configuration class
cfg.func(prim_path, cfg, translation=[0, 0, 0], orientation=[1, 0, 0, 0], scale=[1, 1, 1])
生成地面
# Ground-plane
cfg_ground = sim_utils.GroundPlaneCfg()
cfg_ground.func("/World/defaultGroundPlane", cfg_ground)
生成lights
# spawn distant light
cfg_light_distant = sim_utils.DistantLightCfg(
intensity=3000.0,
color=(0.75, 0.75, 0.75),
)
cfg_light_distant.func("/World/lightDistant", cfg_light_distant, translation=(1, 0, 10))
生成prim shapes
用Xform将其他prim归为一组。
# create a new xform prim for all objects to be spawned under
prim_utils.create_prim("/World/Objects", "Xform")
生成视觉shapes
# spawn a red cone
cfg_cone = sim_utils.ConeCfg(
radius=0.15,
height=0.5,
visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(1.0, 0.0, 0.0)),
)
cfg_cone.func("/World/Objects/Cone1", cfg_cone, translation=(-1.0, 1.0, 1.0))
cfg_cone.func("/World/Objects/Cone2", cfg_cone, translation=(-1.0, -1.0, 1.0))
生成rigid body shapes
# spawn a green cone with colliders and rigid body
cfg_cone_rigid = sim_utils.ConeCfg(
radius=0.15,
height=0.5,
rigid_props=sim_utils.RigidBodyPropertiesCfg(),
mass_props=sim_utils.MassPropertiesCfg(mass=1.0),
collision_props=sim_utils.CollisionPropertiesCfg(),
visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(0.0, 1.0, 0.0)),
)
cfg_cone_rigid.func(
"/World/Objects/ConeRigid", cfg_cone_rigid, translation=(-0.2, 0.0, 2.0), orientation=(0.5, 0.0, 0.5, 0.0)
)
从文件生成,实际是生成了一个指向该USD的pointer,称为reference,所有对其更改只在stage中生效,不改变文件。
# spawn a usd file of a table into the scene
cfg = sim_utils.UsdFileCfg(usd_path=f"{ISAAC_NUCLEUS_DIR}/Props/Mounts/SeattleLabTable/table_instanceable.usd")
cfg.func("/World/Objects/Table", cfg, translation=(0.0, 0.0, 1.05))
3. Deep-dive into AppLauncher
可以在AppLauncher中设置SimulationApp中已有的参数--height和--width,也可以额外加入--size。前者直接生效,后者通过args_cli.size调用。
import argparse
from isaaclab.app import AppLauncher
# create argparser
parser = argparse.ArgumentParser(description="Tutorial on running IsaacSim via the AppLauncher.")
parser.add_argument("--size", type=float, default=1.0, help="Side-length of cuboid")
# SimulationApp arguments https://docs.omniverse.nvidia.com/py/isaacsim/source/isaacsim.simulation_app/docs/index.html?highlight=simulationapp#isaacsim.simulation_app.SimulationApp
parser.add_argument(
"--width", type=int, default=1280, help="Width of the viewport and generated images. Defaults to 1280"
)
parser.add_argument(
"--height", type=int, default=720, help="Height of the viewport and generated images. Defaults to 720"
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
4. Adding a New Robot to Isaac Lab
定义一个ArticulationCfg,指定spawnCfg和actuators,前者生成Articulation,后者指定哪些部分由agent控制。如下代码展示一个简单的和一个复杂的设置。
import isaaclab.sim as sim_utils
from isaaclab.actuators import ImplicitActuatorCfg
from isaaclab.assets import AssetBaseCfg
from isaaclab.assets.articulation import ArticulationCfg
from isaaclab.scene import InteractiveScene, InteractiveSceneCfg
from isaaclab.utils.assets import ISAAC_NUCLEUS_DIR
JETBOT_CONFIG = ArticulationCfg(
spawn=sim_utils.UsdFileCfg(usd_path=f"{ISAAC_NUCLEUS_DIR}/Robots/NVIDIA/Jetbot/jetbot.usd"),
actuators={"wheel_acts": ImplicitActuatorCfg(joint_names_expr=[".*"], damping=None, stiffness=None)},
)
DOFBOT_CONFIG = ArticulationCfg(
spawn=sim_utils.UsdFileCfg(
usd_path=f"{ISAAC_NUCLEUS_DIR}/Robots/Yahboom/Dofbot/dofbot.usd",
rigid_props=sim_utils.RigidBodyPropertiesCfg(
disable_gravity=False,
max_depenetration_velocity=5.0,
),
articulation_props=sim_utils.ArticulationRootPropertiesCfg(
enabled_self_collisions=True, solver_position_iteration_count=8, solver_velocity_iteration_count=0
),
),
init_state=ArticulationCfg.InitialStateCfg(
joint_pos={
"joint1": 0.0,
"joint2": 0.0,
"joint3": 0.0,
"joint4": 0.0,
},
pos=(0.25, -0.25, 0.0),
),
actuators={
"front_joints": ImplicitActuatorCfg(
joint_names_expr=["joint[1-2]"],
effort_limit_sim=100.0,
velocity_limit_sim=100.0,
stiffness=10000.0,
damping=100.0,
),
"joint3_act": ImplicitActuatorCfg(
joint_names_expr=["joint3"],
effort_limit_sim=100.0,
velocity_limit_sim=100.0,
stiffness=10000.0,
damping=100.0,
),
"joint4_act": ImplicitActuatorCfg(
joint_names_expr=["joint4"],
effort_limit_sim=100.0,
velocity_limit_sim=100.0,
stiffness=10000.0,
damping=100.0,
),
},
)
之后即可通过定义一个InteractivateSceneCfg来与机器人互动。
class NewRobotsSceneCfg(InteractiveSceneCfg):
"""Designs the scene."""
# Ground-plane
ground = AssetBaseCfg(prim_path="/World/defaultGroundPlane", spawn=sim_utils.GroundPlaneCfg())
# lights
dome_light = AssetBaseCfg(
prim_path="/World/Light", spawn=sim_utils.DomeLightCfg(intensity=3000.0, color=(0.75, 0.75, 0.75))
)
# robot
Jetbot = JETBOT_CONFIG.replace(prim_path="{ENV_REGEX_NS}/Jetbot")
Dofbot = DOFBOT_CONFIG.replace(prim_path="{ENV_REGEX_NS}/Dofbot")
最后运行仿真:
def run_simulator(sim: sim_utils.SimulationContext, scene: InteractiveScene):
sim_dt = sim.get_physics_dt()
sim_time = 0.0
count = 0
while simulation_app.is_running():
# reset
if count % 500 == 0:
# reset counters
count = 0
# reset the scene entities to their initial positions offset by the environment origins
root_jetbot_state = scene["Jetbot"].data.default_root_state.clone()
root_jetbot_state[:, :3] += scene.env_origins
root_dofbot_state = scene["Dofbot"].data.default_root_state.clone()
root_dofbot_state[:, :3] += scene.env_origins
# copy the default root state to the sim for the jetbot's orientation and velocity
scene["Jetbot"].write_root_pose_to_sim(root_jetbot_state[:, :7])
scene["Jetbot"].write_root_velocity_to_sim(root_jetbot_state[:, 7:])
scene["Dofbot"].write_root_pose_to_sim(root_dofbot_state[:, :7])
scene["Dofbot"].write_root_velocity_to_sim(root_dofbot_state[:, 7:])
# copy the default joint states to the sim
joint_pos, joint_vel = (
scene["Jetbot"].data.default_joint_pos.clone(),
scene["Jetbot"].data.default_joint_vel.clone(),
)
scene["Jetbot"].write_joint_state_to_sim(joint_pos, joint_vel)
joint_pos, joint_vel = (
scene["Dofbot"].data.default_joint_pos.clone(),
scene["Dofbot"].data.default_joint_vel.clone(),
)
scene["Dofbot"].write_joint_state_to_sim(joint_pos, joint_vel)
# clear internal buffers
scene.reset()
print("[INFO]: Resetting Jetbot and Dofbot state...")
# drive around
if count % 100 < 75:
# Drive straight by setting equal wheel velocities
action = torch.Tensor([[10.0, 10.0]])
else:
# Turn by applying different velocities
action = torch.Tensor([[5.0, -5.0]])
scene["Jetbot"].set_joint_velocity_target(action)
# wave
wave_action = scene["Dofbot"].data.default_joint_pos
wave_action[:, 0:4] = 0.25 * np.sin(2 * np.pi * 0.5 * sim_time)
scene["Dofbot"].set_joint_position_target(wave_action)
scene.write_data_to_sim()
sim.step()
sim_time += sim_dt
count += 1
scene.update(sim_dt)
5. Interacting with a rigid object
配置simulator主要分为两大步:设计scene和运行simulation。运行simulation时建议不要加入包含物理法则的prim。
Design the scene
下面代码设置了若干spawn的origin,通过prim_path中使用regex来在每个origin生成object。
# Create separate groups called "Origin1", "Origin2", "Origin3"
# Each group will have a robot in it
origins = [[0.25, 0.25, 0.0], [-0.25, 0.25, 0.0], [0.25, -0.25, 0.0], [-0.25, -0.25, 0.0]]
for i, origin in enumerate(origins):
prim_utils.create_prim(f"/World/Origin{i}", "Xform", translation=origin)
# Rigid Object
cone_cfg = RigidObjectCfg(
prim_path="/World/Origin.*/Cone",
spawn=sim_utils.ConeCfg(
radius=0.1,
height=0.2,
rigid_props=sim_utils.RigidBodyPropertiesCfg(),
mass_props=sim_utils.MassPropertiesCfg(mass=1.0),
collision_props=sim_utils.CollisionPropertiesCfg(),
visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(0.0, 1.0, 0.0), metallic=0.2),
),
init_state=RigidObjectCfg.InitialStateCfg(),
)
cone_object = RigidObject(cfg=cone_cfg)
最后通过返回entity使得其能在simulation中互动。
# return the scene information
scene_entities = {"cone": cone_object}
return scene_entities, origins
Running the simulation loop
需要先设定rigid object的init pose, velocity。这些不能在Xform中设定,而要在simulation world frame中设定,因为物理引擎只能接受这个。
通过assets.RigidObject.data.default_root_state获得默认的root state,这个可以通过assets.RigidObjectCfg.init_state来设置。代码如下。
# reset root state
root_state = cone_object.data.default_root_state.clone()
# sample a random position on a cylinder around the origins
root_state[:, :3] += origins
root_state[:, :3] += math_utils.sample_cylinder(
radius=0.1, h_range=(0.25, 0.5), size=cone_object.num_instances, device=cone_object.device
)
# write root state to simulation
cone_object.write_root_pose_to_sim(root_state[:, :7])
cone_object.write_root_velocity_to_sim(root_state[:, 7:])
# reset buffers
cone_object.reset()
Stepping the simulation
在step之前,使用assets.RigidObject.write_data_to_sim(),会将外力等其他数据写入simulation buffer,在本次案例中并不存在所以并无必要。
# apply sim data
cone_object.write_data_to_sim()
step完了之后,更新rigid object的internal buffer来反映其在assets.RigidObject.data属性中的新状态。
# update buffers
cone_object.update(sim_dt)
6. Interacting with an articulation
Resetting the simulation
类似于rigid body,Articulation也有root state,代表的是在articulation tree中的root body的state。在此之上,还有各个joint的state。Reset其pose和velocity的代码如下。
# reset the scene entities
# root state
# we offset the root state by the origin since the states are written in simulation world frame
# if this is not done, then the robots will be spawned at the (0, 0, 0) of the simulation world
root_state = robot.data.default_root_state.clone()
root_state[:, :3] += origins
robot.write_root_pose_to_sim(root_state[:, :7])
robot.write_root_velocity_to_sim(root_state[:, 7:])
# set joint positions with some noise
joint_pos, joint_vel = robot.data.default_joint_pos.clone(), robot.data.default_joint_vel.clone()
joint_pos += torch.rand_like(joint_pos) * 0.1
robot.write_joint_state_to_sim(joint_pos, joint_vel)
# clear internal buffers
robot.reset()
Stepping the simulation
主要包含两步:
- Setting the joint target: 包括joint position, velocity, effort targets
- Writing the data to the simulation: 处理所有的actuation conversions并写入转换后的值至PhysX buffer。
在本案例中,通过joint effort控制articulation,为此需要设置刚性和阻尼为0,已经预设置了。
在每一步中我们使用随机joint efforts并写入,代码如下。
# Apply random action
# -- generate random joint efforts
efforts = torch.randn_like(robot.data.joint_pos) * 5.0
# -- apply action to the robot
robot.set_joint_effort_target(efforts)
# -- write data to sim
robot.write_data_to_sim()
Updating the state
每个articulation class包含一个assets.ArticulationData,它保存了各个state,要更新它用到如下更新代码。
# Update buffers
robot.update(sim_dt)
7. Interacting with a deformable object
在PhysX中使用FEM来仿真soft bodies,它包含一个simulation mesh(用以模拟变形)和一个collision mesh(用以检测碰撞)。
Designing the scene
与rigid body部分相似。
# Create separate groups called "Origin1", "Origin2", "Origin3"
# Each group will have a robot in it
origins = [[0.25, 0.25, 0.0], [-0.25, 0.25, 0.0], [0.25, -0.25, 0.0], [-0.25, -0.25, 0.0]]
for i, origin in enumerate(origins):
prim_utils.create_prim(f"/World/Origin{i}", "Xform", translation=origin)
# Deformable Object
cfg = DeformableObjectCfg(
prim_path="/World/Origin.*/Cube",
spawn=sim_utils.MeshCuboidCfg(
size=(0.2, 0.2, 0.2),
deformable_props=sim_utils.DeformableBodyPropertiesCfg(rest_offset=0.0, contact_offset=0.001),
visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(0.5, 0.1, 0.0)),
physics_material=sim_utils.DeformableBodyMaterialCfg(poissons_ratio=0.4, youngs_modulus=1e5),
),
init_state=DeformableObjectCfg.InitialStateCfg(pos=(0.0, 0.0, 1.0)),
debug_vis=True,
)
cube_object = DeformableObject(cfg=cfg)
Running the simulation loop
类似于rigid body部分,reset the simulation at regular intervals,apply kinematic commands to the deformable body, step the simulation, and update the deformable object's internal buffers.
Resetting the simulation state
不同于rigid body和articulation, deformable有不同的state representation,即nodal positions和velocities,在simulation world frame中定义,存储在assets.DeformableObject.data。
# reset the nodal state of the object
nodal_state = cube_object.data.default_nodal_state_w.clone()
# apply random pose to the object
pos_w = torch.rand(cube_object.num_instances, 3, device=sim.device) * 0.1 + origins
quat_w = math_utils.random_orientation(cube_object.num_instances, device=sim.device)
nodal_state[..., :3] = cube_object.transform_nodal_pos(nodal_state[..., :3], pos_w, quat_w)
然后reset。
# write nodal state to simulation
cube_object.write_nodal_state_to_sim(nodal_state)
# Write the nodal state to the kinematic target and free all vertices
nodal_kinematic_target[..., :3] = nodal_state[..., :3]
nodal_kinematic_target[..., 3] = 1.0
cube_object.write_nodal_kinematic_target_to_sim(nodal_kinematic_target)
# reset buffers
cube_object.reset()
Stepping the simulation
Deformable bodies支持用户控制部分mesh nodes,同时其他nodes通过FEM solver仿真。
在教程中,我们用kinematic commands控制两个cube。
# update the kinematic target for cubes at index 0 and 3
# we slightly move the cube in the z-direction by picking the vertex at index 0
nodal_kinematic_target[[0, 3], 0, 2] += 0.001
# set vertex at index 0 to be kinematically constrained
# 0: constrained, 1: free
nodal_kinematic_target[[0, 3], 0, 3] = 0.0
# write kinematic target to simulation
cube_object.write_nodal_kinematic_target_to_sim(nodal_kinematic_target)
8. Interacting with a surface gripper
导入并设置。
# Create separate groups called "Origin1", "Origin2"
# Each group will have a robot in it
origins = [[2.75, 0.0, 0.0], [-2.75, 0.0, 0.0]]
# Origin 1
prim_utils.create_prim("/World/Origin1", "Xform", translation=origins[0])
# Origin 2
prim_utils.create_prim("/World/Origin2", "Xform", translation=origins[1])
# Articulation: First we define the robot config
pick_and_place_robot_cfg = PICK_AND_PLACE_CFG.copy()
pick_and_place_robot_cfg.prim_path = "/World/Origin.*/Robot"
pick_and_place_robot = Articulation(cfg=pick_and_place_robot_cfg)
# Surface Gripper: Next we define the surface gripper config
surface_gripper_cfg = SurfaceGripperCfg()
# We need to tell the View which prim to use for the surface gripper
surface_gripper_cfg.prim_expr = "/World/Origin.*/Robot/picker_head/SurfaceGripper"
# We can then set different parameters for the surface gripper, note that if these parameters are not set,
# the View will try to read them from the prim.
surface_gripper_cfg.max_grip_distance = 0.1 # [m] (Maximum distance at which the gripper can grasp an object)
surface_gripper_cfg.shear_force_limit = 500.0 # [N] (Force limit in the direction perpendicular direction)
surface_gripper_cfg.coaxial_force_limit = 500.0 # [N] (Force limit in the direction of the gripper's axis)
surface_gripper_cfg.retry_interval = 0.1 # seconds (Time the gripper will stay in a grasping state)
# We can now spawn the surface gripper
surface_gripper = SurfaceGripper(cfg=surface_gripper_cfg)
stepping the simulation
# Sample a random command between -1 and 1.
gripper_commands = torch.rand(surface_gripper.num_instances) * 2.0 - 1.0
# The gripper behavior is as follows:
# -1 < command < -0.3 --> Gripper is Opening
# -0.3 < command < 0.3 --> Gripper is Idle
# 0.3 < command < 1 --> Gripper is Closing
print(f"[INFO]: Gripper commands: {gripper_commands}")
mapped_commands = [
"Opening" if command < -0.3 else "Closing" if command > 0.3 else "Idle" for command in gripper_commands
]
print(f"[INFO]: Mapped commands: {mapped_commands}")
# Set the gripper command
surface_gripper.set_grippers_command(gripper_commands)
# Write data to sim
surface_gripper.write_data_to_sim()
9. Using the interactive Scene
scene.InteractiveScene提供了一个便捷的接口,用以生成和管理prims,或者说,它是一个scene entities的collection。本案例借用第4个Interacting with an articulation使用interactive scene来管理cartpole。
继承InteractiveSceneCfg以设置scene。
@configclass
class CartpoleSceneCfg(InteractiveSceneCfg):
"""Configuration for a cart-pole scene."""
# ground plane
ground = AssetBaseCfg(prim_path="/World/defaultGroundPlane", spawn=sim_utils.GroundPlaneCfg())
# lights
dome_light = AssetBaseCfg(
prim_path="/World/Light", spawn=sim_utils.DomeLightCfg(intensity=3000.0, color=(0.75, 0.75, 0.75))
)
# articulation
cartpole: ArticulationCfg = CARTPOLE_CFG.replace(prim_path="{ENV_REGEX_NS}/Robot")
Scene instantiation
# Design scene
scene_cfg = CartpoleSceneCfg(num_envs=args_cli.num_envs, env_spacing=2.0)
scene = InteractiveScene(scene_cfg)
Accessing scene elements
# Extract scene entities
# note: we only do this here for readability.
robot = scene["cartpole"]
10. Creating a Manager-Based Base Environment
envs.ManagerBasedEnv主要有下组成:
-
scene.InteractiveScene- The scene that is used for the simulation. -
managers.ActionManager- The manager that handles actions. -
managers.ObservationManager- The manager that handles observations. -
managers.EventManager- The manager that schedules operations (such as domain randomization) at specified simulation events. 例如启动时、reset时、周期interval时。
Designing the scene
主要包含scene settings 和3个basic settings:observations, actions, events。其中action和event主要通过isaaclab.envs.mdp模块导入,actions通过managers.ActionManager控制,例如action设置控制cart的力如下:
@configclass
class ActionsCfg:
"""Action specifications for the environment."""
joint_efforts = mdp.JointEffortActionCfg(asset_name="robot", joint_names=["slider_to_cart"], scale=5.0)
Observation通过managers.ObservationManager计算。通常实现会定义一个“policy”的group,是一个managers.ObservationGroupCfg class,它一方面收集不同的observation,一方面定义一些通用的属性,例如enabling noise corruption or concatenating the observations into a single tensor.
@configclass
class ObservationsCfg:
"""Observation specifications for the environment."""
@configclass
class PolicyCfg(ObsGroup):
"""Observations for policy group."""
# observation terms (order preserved)
joint_pos_rel = ObsTerm(func=mdp.joint_pos_rel)
joint_vel_rel = ObsTerm(func=mdp.joint_vel_rel)
def __post_init__(self) -> None:
self.enable_corruption = False
self.concatenate_terms = True
# observation groups
policy: PolicyCfg = PolicyCfg()
events主要由managers.EventManager管理,主要包括reset或randomize scene,randomize物理属性或视觉属性。对于三种常见设置,isaaclab提供了使用模式:
- "startup" - Event that takes place only once at environment startup.
- "reset" - Event that occurs on environment termination and reset.
- "interval" - Event that are executed at a given interval, i.e., periodically after a certain number of steps.
@configclass
class EventCfg:
"""Configuration for events."""
# on startup 改变质量计算成本较高,所以仅开始一次
add_pole_mass = EventTerm(
func=mdp.randomize_rigid_body_mass,
mode="startup",
params={
"asset_cfg": SceneEntityCfg("robot", body_names=["pole"]),
"mass_distribution_params": (0.1, 0.5),
"operation": "add",
},
)
# on reset 改变cart和pole的joint state
reset_cart_position = EventTerm(
func=mdp.reset_joints_by_offset,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"]),
"position_range": (-1.0, 1.0),
"velocity_range": (-0.1, 0.1),
},
)
reset_pole_position = EventTerm(
func=mdp.reset_joints_by_offset,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"]),
"position_range": (-0.125 * math.pi, 0.125 * math.pi),
"velocity_range": (-0.01 * math.pi, 0.01 * math.pi),
},
)
通过envs.ManagerBasedEnvCfg class
来管理所有设置,其中post_init中可以设置simulation的参数。
@configclass
class CartpoleEnvCfg(ManagerBasedEnvCfg):
"""Configuration for the cartpole environment."""
# Scene settings
scene = CartpoleSceneCfg(num_envs=1024, env_spacing=2.5)
# Basic settings
observations = ObservationsCfg()
actions = ActionsCfg()
events = EventCfg()
def __post_init__(self):
"""Post initialization."""
# viewer settings
self.viewer.eye = [4.5, 0.0, 6.0]
self.viewer.lookat = [0.0, 0.0, 2.0]
# step settings
self.decimation = 4 # env step every 4 sim steps: 200Hz / 4 = 50Hz
# simulation settings
self.sim.dt = 0.005 # sim step every 5ms: 200Hz
运行simulation,可以通过envs.ManagerBasedEnv.reset()和envs.ManagerBasedEnv.step(),但 envs.ManagerBasedEnv没有termination notion,所以在本案例中自定义设置每隔300个step就reset。
def main():
"""Main function."""
# parse the arguments
env_cfg = CartpoleEnvCfg()
env_cfg.scene.num_envs = args_cli.num_envs
env_cfg.sim.device = args_cli.device
# setup base environment
env = ManagerBasedEnv(cfg=env_cfg)
# simulate physics
count = 0
while simulation_app.is_running():
with torch.inference_mode():
# reset
if count % 300 == 0:
count = 0
env.reset()
print("-" * 80)
print("[INFO]: Resetting environment...")
# sample random actions
joint_efforts = torch.randn_like(env.action_manager.action)
# step the environment
obs, _ = env.step(joint_efforts)
# print current orientation of pole
print("[Env 0]: Pole joint: ", obs["policy"][0][1].item())
# update counter
count += 1
# close the environment
env.close()
11. Creating a Manager-Based RL Environment
主要的RL实现都可以通过 envs.mdp 调用,对于预置的特定任务可以通过类似于isaaclab_tasks.manager_based.classic.cartpole.mdp。
Defining rewards
managers.RewardManager主要用来计算奖励项。 managers.RewardTermCfg用于指定计算reward的函数,包括其权重。
@configclass
class RewardsCfg:
"""Reward terms for the MDP."""
# (1) Constant running reward
alive = RewTerm(func=mdp.is_alive, weight=1.0)
# (2) Failure penalty
terminating = RewTerm(func=mdp.is_terminated, weight=-2.0)
# (3) Primary task: keep pole upright
pole_pos = RewTerm(
func=mdp.joint_pos_target_l2,
weight=-1.0,
params={"asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"]), "target": 0.0},
)
# (4) Shaping tasks: lower cart velocity
cart_vel = RewTerm(
func=mdp.joint_vel_l1,
weight=-0.01,
params={"asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"])},
)
# (5) Shaping tasks: lower pole angular velocity
pole_vel = RewTerm(
func=mdp.joint_vel_l1,
weight=-0.005,
params={"asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"])},
)
Defining termination criteria
managers.TerminationsCfg用以设置episode的终止条件,在本案例中包括两个:max_episode_length以及Cart out of bounds 。
flagmanagers.TerminationsCfg.time_out可以知道是否因为time_out而退出。
@configclass
class TerminationsCfg:
"""Termination terms for the MDP."""
# (1) Time out
time_out = DoneTerm(func=mdp.time_out, time_out=True)
# (2) Cart out of bounds
cart_out_of_bounds = DoneTerm(
func=mdp.joint_pos_out_of_manual_limit,
params={"asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"]), "bounds": (-3.0, 3.0)},
)
Defining commands
通过 managers.CommandManager可以管理command,它在每一个step可以resample或update commands。commands也可以作为observation提供给agent。本案例中没用到。
Defining curriculum
一般训练过程是从一个简单任务开始逐渐变难,managers.CurriculumManager可用于定义一个相应的课程。
以下是一个机器狗速度控制的例子:
@configclass
class CurriculumCfg:
"""Curriculum terms for the MDP."""
terrain_levels = CurrTerm(func=mdp.terrain_levels_vel)
command_levels = CurrTerm(
func=mdp.command_levels_vel,
params={
"reward_term_name": "track_lin_vel_xy_exp",
"range_multiplier": (0.1, 1.0),
},
)
整合
ManagerBasedRLEnvCfg 类似于ManagerBasedEnvCfg,但是多了reward和termination等设置。
@configclass
class CartpoleEnvCfg(ManagerBasedRLEnvCfg):
"""Configuration for the cartpole environment."""
# Scene settings
scene: CartpoleSceneCfg = CartpoleSceneCfg(num_envs=4096, env_spacing=4.0, clone_in_fabric=True)
# Basic settings
observations: ObservationsCfg = ObservationsCfg()
actions: ActionsCfg = ActionsCfg()
events: EventCfg = EventCfg()
# MDP settings
rewards: RewardsCfg = RewardsCfg()
terminations: TerminationsCfg = TerminationsCfg()
# Post initialization
def __post_init__(self) -> None:
"""Post initialization."""
# general settings
self.decimation = 2
self.episode_length_s = 5
# viewer settings
self.viewer.eye = (8.0, 0.0, 5.0)
# simulation settings
self.sim.dt = 1 / 120
self.sim.render_interval = self.decimation
运行simulation
同样的类似于上一个任务,只是step函数返回额外的信息比如reward和termination status,episode length等。
12. Creating a Direct Workflow RL Environment
direct即不用manager,因此可以实现更多控制,例如pytorch jit。
env cfg由于不使用action和observation的manager,直接定义其数量。
@configclass
class CartpoleEnvCfg(DirectRLEnvCfg):
...
action_space = 1
observation_space = 4
state_space = 0
也可以定义任务属性,例如奖励权重、reset条件等。
@configclass
class CartpoleEnvCfg(DirectRLEnvCfg):
...
# reset
max_cart_pos = 3.0
initial_pole_angle_range = [-0.25, 0.25]
# reward scales
rew_scale_alive = 1.0
rew_scale_terminated = -2.0
rew_scale_pole_pos = -1.0
rew_scale_cart_vel = -0.01
rew_scale_pole_vel = -0.005
定义env,其中也可以定义一些所有其他类函数(如apply actions, computing reset, rewards, observations)可以获取的参数。
class CartpoleEnv(DirectRLEnv):
cfg: CartpoleEnvCfg
def __init__(self, cfg: CartpoleEnvCfg, render_mode: str | None = None, **kwargs):
super().__init__(cfg, render_mode, **kwargs)
Scene creation
direct方式在scene creation中可以非常灵活,包括adding actors into the stage, cloning the environments, filtering collisions between the environments, adding the actors into the scene, and adding any additional props to the scene, such as ground plane and lights。
def _setup_scene(self):
self.cartpole = Articulation(self.cfg.robot_cfg)
# add ground plane
spawn_ground_plane(prim_path="/World/ground", cfg=GroundPlaneCfg())
# clone and replicate
self.scene.clone_environments(copy_from_source=False)
# we need to explicitly filter collisions for CPU simulation
if self.device == "cpu":
self.scene.filter_collisions(global_prim_paths=[])
# add articulation to scene
self.scene.articulations["cartpole"] = self.cartpole
# add lights
light_cfg = sim_utils.DomeLightCfg(intensity=2000.0, color=(0.75, 0.75, 0.75))
light_cfg.func("/World/Light", light_cfg)
Defining rewards
_get_rewards(self) API定义奖励函数,返回reward buffer。
def _get_rewards(self) -> torch.Tensor:
total_reward = compute_rewards(
self.cfg.rew_scale_alive,
self.cfg.rew_scale_terminated,
self.cfg.rew_scale_pole_pos,
self.cfg.rew_scale_cart_vel,
self.cfg.rew_scale_pole_vel,
self.joint_pos[:, self._pole_dof_idx[0]],
self.joint_vel[:, self._pole_dof_idx[0]],
self.joint_pos[:, self._cart_dof_idx[0]],
self.joint_vel[:, self._cart_dof_idx[0]],
self.reset_terminated,
)
return total_reward
@torch.jit.script
def compute_rewards(
rew_scale_alive: float,
rew_scale_terminated: float,
rew_scale_pole_pos: float,
rew_scale_cart_vel: float,
rew_scale_pole_vel: float,
pole_pos: torch.Tensor,
pole_vel: torch.Tensor,
cart_pos: torch.Tensor,
cart_vel: torch.Tensor,
reset_terminated: torch.Tensor,
):
rew_alive = rew_scale_alive * (1.0 - reset_terminated.float())
rew_termination = rew_scale_terminated * reset_terminated.float()
rew_pole_pos = rew_scale_pole_pos * torch.sum(torch.square(pole_pos), dim=-1)
rew_cart_vel = rew_scale_cart_vel * torch.sum(torch.abs(cart_vel), dim=-1)
rew_pole_vel = rew_scale_pole_vel * torch.sum(torch.abs(pole_vel), dim=-1)
total_reward = rew_alive + rew_termination + rew_pole_pos + rew_cart_vel + rew_pole_vel
return total_reward
Defining Observations
_get_observations(self)计算observation buffer,返回一个dict,key是policy,value是observation buffer。对于asymmetric policies,dict还会包含critic: states buffer。
Computing Dones and Performing Resets
_get_dones(self)用以决定哪些env需要被reset或者到达了episode length limit。
def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
self.joint_pos = self.cartpole.data.joint_pos
self.joint_vel = self.cartpole.data.joint_vel
time_out = self.episode_length_buf >= self.max_episode_length - 1
out_of_bounds = torch.any(torch.abs(self.joint_pos[:, self._cart_dof_idx]) > self.cfg.max_cart_pos, dim=1)
out_of_bounds = out_of_bounds | torch.any(torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1)
return out_of_bounds, time_out
_reset_idx(self, env_ids)会reset指定的env,指定初始化状态,直接写入simulation。
def _reset_idx(self, env_ids: Sequence[int] | None):
if env_ids is None:
env_ids = self.cartpole._ALL_INDICES
super()._reset_idx(env_ids)
joint_pos = self.cartpole.data.default_joint_pos[env_ids]
joint_pos[:, self._pole_dof_idx] += sample_uniform(
self.cfg.initial_pole_angle_range[0] * math.pi,
self.cfg.initial_pole_angle_range[1] * math.pi,
joint_pos[:, self._pole_dof_idx].shape,
joint_pos.device,
)
joint_vel = self.cartpole.data.default_joint_vel[env_ids]
default_root_state = self.cartpole.data.default_root_state[env_ids]
default_root_state[:, :3] += self.scene.env_origins[env_ids]
self.joint_pos[env_ids] = joint_pos
self.joint_vel[env_ids] = joint_vel
self.cartpole.write_root_pose_to_sim(default_root_state[:, :7], env_ids)
self.cartpole.write_root_velocity_to_sim(default_root_state[:, 7:], env_ids)
self.cartpole.write_joint_state_to_sim(joint_pos, joint_vel, None, env_ids)
Applying Actions
_pre_physics_step(self, actions)输入policy的actions,用以处理policy的actions buffer,为env缓存数据在一个class 变量中。
def _pre_physics_step(self, actions: torch.Tensor) -> None:
self.actions = self.action_scale * actions.clone()
_apply_action(self) API会在每decimation次数个step中执行一次,在physics step之前执行。
def _apply_action(self) -> None:
self.cartpole.set_joint_effort_target(self.actions, joint_ids=self._cart_dof_idx)
Domain Randomization
@configclass
class EventCfg:
robot_physics_material = EventTerm(
func=mdp.randomize_rigid_body_material,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", body_names=".*"),
"static_friction_range": (0.7, 1.3),
"dynamic_friction_range": (1.0, 1.0),
"restitution_range": (1.0, 1.0),
"num_buckets": 250,
},
)
robot_joint_stiffness_and_damping = EventTerm(
func=mdp.randomize_actuator_gains,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", joint_names=".*"),
"stiffness_distribution_params": (0.75, 1.5),
"damping_distribution_params": (0.3, 3.0),
"operation": "scale",
"distribution": "log_uniform",
},
)
reset_gravity = EventTerm(
func=mdp.randomize_physics_scene_gravity,
mode="interval",
is_global_time=True,
interval_range_s=(36.0, 36.0), # time_s = num_steps * (decimation * dt)
params={
"gravity_distribution_params": ([0.0, 0.0, 0.0], [0.0, 0.0, 0.4]),
"operation": "add",
"distribution": "gaussian",
},
)
func指定函数,可以去events模块里面找,params指定func的参数,asset_cfg中指定了被随机化的actors and bodies/joints。
最后可以加入base task config。
@configclass
class MyTaskConfig:
events: EventCfg = EventCfg()
Action and Observation Noise
通过task config 中action_noise_model和observation_noise_model变量来添加。
@configclass
class MyTaskConfig:
# at every time-step add gaussian noise + bias. The bias is a gaussian sampled at reset
action_noise_model: NoiseModelWithAdditiveBiasCfg = NoiseModelWithAdditiveBiasCfg(
noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.05, operation="add"),
bias_noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.015, operation="abs"),
)
# at every time-step add gaussian noise + bias. The bias is a gaussian sampled at reset
observation_noise_model: NoiseModelWithAdditiveBiasCfg = NoiseModelWithAdditiveBiasCfg(
noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.002, operation="add"),
bias_noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.0001, operation="abs"),
)
NoiseModelWithAdditiveBiasCfg可用来采样相关和无关的噪音。
如果仅需要每一个step的noise,使用如下代码。
@configclass
class MyTaskConfig:
action_noise_model: GaussianNoiseCfg = GaussianNoiseCfg(mean=0.0, std=0.05, operation="add")
direct的方式需要对每个task设定,而gymnasium.make()可以解决这个问题。
13. Registering an Environment
当环境很多时,可以用gymnasium.register()来register environments with the gymnasium registry,这样我们可以用gymnasium.make()来创建环境。
Manager-Based Environments
import gymnasium as gym
from . import agents
##
# Register Gym environments.
##
gym.register(
id="Isaac-Cartpole-v0",
entry_point="isaaclab.envs:ManagerBasedRLEnv",
disable_env_checker=True,
kwargs={
"env_cfg_entry_point": f"{__name__}.cartpole_env_cfg:CartpoleEnvCfg",
"rl_games_cfg_entry_point": f"{agents.__name__}:rl_games_ppo_cfg.yaml",
"rsl_rl_cfg_entry_point": f"{agents.__name__}.rsl_rl_ppo_cfg:CartpolePPORunnerCfg",
"rsl_rl_with_symmetry_cfg_entry_point": f"{agents.__name__}.rsl_rl_ppo_cfg:CartpolePPORunnerWithSymmetryCfg",
"skrl_cfg_entry_point": f"{agents.__name__}:skrl_ppo_cfg.yaml",
"sb3_cfg_entry_point": f"{agents.__name__}:sb3_ppo_cfg.yaml",
},
)
gym.register(
id="Isaac-Cartpole-RGB-v0",
entry_point="isaaclab.envs:ManagerBasedRLEnv",
disable_env_checker=True,
kwargs={
"env_cfg_entry_point": f"{__name__}.cartpole_camera_env_cfg:CartpoleRGBCameraEnvCfg",
"rl_games_cfg_entry_point": f"{agents.__name__}:rl_games_camera_ppo_cfg.yaml",
},
)
gym.register(
id="Isaac-Cartpole-Depth-v0",
entry_point="isaaclab.envs:ManagerBasedRLEnv",
disable_env_checker=True,
kwargs={
"env_cfg_entry_point": f"{__name__}.cartpole_camera_env_cfg:CartpoleDepthCameraEnvCfg",
"rl_games_cfg_entry_point": f"{agents.__name__}:rl_games_camera_ppo_cfg.yaml",
},
)
gym.register(
id="Isaac-Cartpole-RGB-ResNet18-v0",
entry_point="isaaclab.envs:ManagerBasedRLEnv",
disable_env_checker=True,
kwargs={
"env_cfg_entry_point": f"{__name__}.cartpole_camera_env_cfg:CartpoleResNet18CameraEnvCfg",
"rl_games_cfg_entry_point": f"{agents.__name__}:rl_games_feature_ppo_cfg.yaml",
},
)
gym.register(
id="Isaac-Cartpole-RGB-TheiaTiny-v0",
entry_point="isaaclab.envs:ManagerBasedRLEnv",
disable_env_checker=True,
kwargs={
"env_cfg_entry_point": f"{__name__}.cartpole_camera_env_cfg:CartpoleTheiaTinyCameraEnvCfg",
"rl_games_cfg_entry_point": f"{agents.__name__}:rl_games_feature_ppo_cfg.yaml",
},
)
id是环境名,一般以Isaac开头,然后是任务、机器人的名字,最后是v<N>代表不同环境版本。entry_point的形式是<module>:<class>,用以导入env的class以创建实例。env_cfg_entry_point用以指定环境的配置,默认通过isaaclab_tasks.utils.parse_env_cfg()来导入,然后传给gymnasium.make()来创建环境实例,配置可以是YAML或python的configuration class。
Direct Environments
entry_point不是ManagerBasedRLEnv,而是env的实现类。
import gymnasium as gym
from . import agents
##
# Register Gym environments.
##
gym.register(
id="Isaac-Cartpole-Direct-v0",
entry_point=f"{__name__}.cartpole_env:CartpoleEnv",
disable_env_checker=True,
kwargs={
"env_cfg_entry_point": f"{__name__}.cartpole_env:CartpoleEnvCfg",
"rl_games_cfg_entry_point": f"{agents.__name__}:rl_games_ppo_cfg.yaml",
"rsl_rl_cfg_entry_point": f"{agents.__name__}.rsl_rl_ppo_cfg:CartpolePPORunnerCfg",
"skrl_cfg_entry_point": f"{agents.__name__}:skrl_ppo_cfg.yaml",
"sb3_cfg_entry_point": f"{agents.__name__}:sb3_ppo_cfg.yaml",
},
)
gym.register(
Creating the environment
import isaaclab_tasks # noqa: F401
from isaaclab_tasks.utils import parse_env_cfg
# PLACEHOLDER: Extension template (do not remove this comment)
def main():
"""Random actions agent with Isaac Lab environment."""
# create environment configuration
env_cfg = parse_env_cfg(
args_cli.task, device=args_cli.device, num_envs=args_cli.num_envs, use_fabric=not args_cli.disable_fabric
)
# create environment
env = gym.make(args_cli.task, cfg=env_cfg)
在commandline中指定task name 和num_envs
./isaaclab.sh -p scripts/environments/random_agent.py --task Isaac-Cartpole-v0 --num_envs 32
random_agent.py全部代码:
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Script to an environment with random action agent."""
"""Launch Isaac Sim Simulator first."""
import argparse
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Random agent for Isaac Lab environments.")
parser.add_argument(
"--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations."
)
parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate.")
parser.add_argument("--task", type=str, default=None, help="Name of the task.")
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
import gymnasium as gym
import torch
import isaaclab_tasks # noqa: F401
from isaaclab_tasks.utils import parse_env_cfg
# PLACEHOLDER: Extension template (do not remove this comment)
def main():
"""Random actions agent with Isaac Lab environment."""
# create environment configuration
env_cfg = parse_env_cfg(
args_cli.task, device=args_cli.device, num_envs=args_cli.num_envs, use_fabric=not args_cli.disable_fabric
)
# create environment
env = gym.make(args_cli.task, cfg=env_cfg)
# print info (this is vectorized environment)
print(f"[INFO]: Gym observation space: {env.observation_space}")
print(f"[INFO]: Gym action space: {env.action_space}")
# reset environment
env.reset()
# simulate environment
while simulation_app.is_running():
# run everything in inference mode
with torch.inference_mode():
# sample actions from -1 to 1
actions = 2 * torch.rand(env.action_space.shape, device=env.unwrapped.device) - 1
# apply actions
env.step(actions)
# close the simulator
env.close()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()
14. Training with an RL Agent
之前的教程包括定义一个RL env后register到gym的registry,使用一个随机agent互动。现在要训练一个anget来解决task。
envs.ManagerBasedRLEnv的输入和输出不是numpy arrays,而是torch tensors。此外,不同的RL libraries对env的接口也不一样。因此envs.ManagerBasedRLEnv不会基于任何特定的learning library,而是通过 isaaclab_rl模块中的wrapper来转换env到所需的接口。
代码:
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Script to train RL agent with Stable Baselines3."""
"""Launch Isaac Sim Simulator first."""
import argparse
import contextlib
import signal
import sys
from pathlib import Path
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Train an RL agent with Stable-Baselines3.")
parser.add_argument("--video", action="store_true", default=False, help="Record videos during training.")
parser.add_argument("--video_length", type=int, default=200, help="Length of the recorded video (in steps).")
parser.add_argument("--video_interval", type=int, default=2000, help="Interval between video recordings (in steps).")
parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate.")
parser.add_argument("--task", type=str, default=None, help="Name of the task.")
parser.add_argument(
"--agent", type=str, default="sb3_cfg_entry_point", help="Name of the RL agent configuration entry point."
)
parser.add_argument("--seed", type=int, default=None, help="Seed used for the environment")
parser.add_argument("--log_interval", type=int, default=100_000, help="Log data every n timesteps.")
parser.add_argument("--checkpoint", type=str, default=None, help="Continue the training from checkpoint.")
parser.add_argument("--max_iterations", type=int, default=None, help="RL Policy training iterations.")
parser.add_argument("--export_io_descriptors", action="store_true", default=False, help="Export IO descriptors.")
parser.add_argument(
"--keep_all_info",
action="store_true",
default=False,
help="Use a slower SB3 wrapper but keep all the extra training info.",
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli, hydra_args = parser.parse_known_args()
# always enable cameras to record video
if args_cli.video:
args_cli.enable_cameras = True
# clear out sys.argv for Hydra
sys.argv = [sys.argv[0]] + hydra_args
# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
def cleanup_pbar(*args):
"""
A small helper to stop training and
cleanup progress bar properly on ctrl+c
"""
import gc
tqdm_objects = [obj for obj in gc.get_objects() if "tqdm" in type(obj).__name__]
for tqdm_object in tqdm_objects:
if "tqdm_rich" in type(tqdm_object).__name__:
tqdm_object.close()
raise KeyboardInterrupt
# disable KeyboardInterrupt override
signal.signal(signal.SIGINT, cleanup_pbar)
"""Rest everything follows."""
import gymnasium as gym
import numpy as np
import os
import random
from datetime import datetime
import omni
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import CheckpointCallback, LogEveryNTimesteps
from stable_baselines3.common.vec_env import VecNormalize
from isaaclab.envs import (
DirectMARLEnv,
DirectMARLEnvCfg,
DirectRLEnvCfg,
ManagerBasedRLEnvCfg,
multi_agent_to_single_agent,
)
from isaaclab.utils.dict import print_dict
from isaaclab.utils.io import dump_pickle, dump_yaml
from isaaclab_rl.sb3 import Sb3VecEnvWrapper, process_sb3_cfg
import isaaclab_tasks # noqa: F401
from isaaclab_tasks.utils.hydra import hydra_task_config
# PLACEHOLDER: Extension template (do not remove this comment)
@hydra_task_config(args_cli.task, args_cli.agent)
def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agent_cfg: dict):
"""Train with stable-baselines agent."""
# randomly sample a seed if seed = -1
if args_cli.seed == -1:
args_cli.seed = random.randint(0, 10000)
# override configurations with non-hydra CLI arguments
env_cfg.scene.num_envs = args_cli.num_envs if args_cli.num_envs is not None else env_cfg.scene.num_envs
agent_cfg["seed"] = args_cli.seed if args_cli.seed is not None else agent_cfg["seed"]
# max iterations for training
if args_cli.max_iterations is not None:
agent_cfg["n_timesteps"] = args_cli.max_iterations * agent_cfg["n_steps"] * env_cfg.scene.num_envs
# set the environment seed
# note: certain randomizations occur in the environment initialization so we set the seed here
env_cfg.seed = agent_cfg["seed"]
env_cfg.sim.device = args_cli.device if args_cli.device is not None else env_cfg.sim.device
# directory for logging into
run_info = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_root_path = os.path.abspath(os.path.join("logs", "sb3", args_cli.task))
print(f"[INFO] Logging experiment in directory: {log_root_path}")
# The Ray Tune workflow extracts experiment name using the logging line below, hence, do not change it (see PR #2346, comment-2819298849)
print(f"Exact experiment name requested from command line: {run_info}")
log_dir = os.path.join(log_root_path, run_info)
# dump the configuration into log-directory
dump_yaml(os.path.join(log_dir, "params", "env.yaml"), env_cfg)
dump_yaml(os.path.join(log_dir, "params", "agent.yaml"), agent_cfg)
dump_pickle(os.path.join(log_dir, "params", "env.pkl"), env_cfg)
dump_pickle(os.path.join(log_dir, "params", "agent.pkl"), agent_cfg)
# save command used to run the script
command = " ".join(sys.orig_argv)
(Path(log_dir) / "command.txt").write_text(command)
# post-process agent configuration
agent_cfg = process_sb3_cfg(agent_cfg, env_cfg.scene.num_envs)
# read configurations about the agent-training
policy_arch = agent_cfg.pop("policy")
n_timesteps = agent_cfg.pop("n_timesteps")
# set the IO descriptors output directory if requested
if isinstance(env_cfg, ManagerBasedRLEnvCfg):
env_cfg.export_io_descriptors = args_cli.export_io_descriptors
env_cfg.io_descriptors_output_dir = log_dir
else:
omni.log.warn(
"IO descriptors are only supported for manager based RL environments. No IO descriptors will be exported."
)
# create isaac environment
env = gym.make(args_cli.task, cfg=env_cfg, render_mode="rgb_array" if args_cli.video else None)
# convert to single-agent instance if required by the RL algorithm
if isinstance(env.unwrapped, DirectMARLEnv):
env = multi_agent_to_single_agent(env)
# wrap for video recording
if args_cli.video:
video_kwargs = {
"video_folder": os.path.join(log_dir, "videos", "train"),
"step_trigger": lambda step: step % args_cli.video_interval == 0,
"video_length": args_cli.video_length,
"disable_logger": True,
}
print("[INFO] Recording videos during training.")
print_dict(video_kwargs, nesting=4)
env = gym.wrappers.RecordVideo(env, **video_kwargs)
# wrap around environment for stable baselines
env = Sb3VecEnvWrapper(env, fast_variant=not args_cli.keep_all_info)
norm_keys = {"normalize_input", "normalize_value", "clip_obs"}
norm_args = {}
for key in norm_keys:
if key in agent_cfg:
norm_args[key] = agent_cfg.pop(key)
if norm_args and norm_args.get("normalize_input"):
print(f"Normalizing input, {norm_args=}")
env = VecNormalize(
env,
training=True,
norm_obs=norm_args["normalize_input"],
norm_reward=norm_args.get("normalize_value", False),
clip_obs=norm_args.get("clip_obs", 100.0),
gamma=agent_cfg["gamma"],
clip_reward=np.inf,
)
# create agent from stable baselines
agent = PPO(policy_arch, env, verbose=1, tensorboard_log=log_dir, **agent_cfg)
if args_cli.checkpoint is not None:
agent = agent.load(args_cli.checkpoint, env, print_system_info=True)
# callbacks for agent
checkpoint_callback = CheckpointCallback(save_freq=1000, save_path=log_dir, name_prefix="model", verbose=2)
callbacks = [checkpoint_callback, LogEveryNTimesteps(n_steps=args_cli.log_interval)]
# train the agent
with contextlib.suppress(KeyboardInterrupt):
agent.learn(
total_timesteps=n_timesteps,
callback=callbacks,
progress_bar=True,
log_interval=None,
)
# save the final model
agent.save(os.path.join(log_dir, "model"))
print("Saving to:")
print(os.path.join(log_dir, "model.zip"))
if isinstance(env, VecNormalize):
print("Saving normalization")
env.save(os.path.join(log_dir, "model_vecnormalize.pkl"))
# close the simulator
env.close()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()
本次案例以sb3为例,主要包含了3个wrappers:
-
gymnasium.wrappers.RecordVideo: 记录video至指定的路径。 -
wrappers.sb3.Sb3VecEnvWrapper:将env装换为sb3兼容的env。 -
stable_baselines3.common.vec_env.VecNormalize: 对观察和奖励归一化
以上wrapper依次wrap前一个wrapper(env = wrapper(env, *args, **kwargs))来使用。直到最后一个env用来训练agent。
训练
--headless不render实时画面,因此效率高。此时要看视频可以加上--video,视频将保存在logs/sb3/Isaac-Cartpole-v0/<run-dir>/videos/train。
不用headless时可以使用不同的render mode和训练互动,在界面右下角可以调。
训练时可以实时查看日志。新开一个窗口运行以下命令:
# execute from the root directory of the repository
./isaaclab.sh -p -m tensorboard.main --logdir logs/sb3/Isaac-Cartpole-v0
运行agent
训练完成后可以运行agent
# execute from the root directory of the repository
./isaaclab.sh -p scripts/reinforcement_learning/sb3/play.py --task Isaac-Cartpole-v0 --num_envs 32 --use_last_checkpoint
最后一个checkpoint自动加载目录logs/sb3/Isaac-Cartpole-v0中的最后一个ckpt,也可以通过--checkpoint指定路径。
15. 配置一个RL Agent
本案例主要配置训练过程中的不同RL库和不同的训练算法。
gym.register(
id="Isaac-Cartpole-v0",
entry_point="isaaclab.envs:ManagerBasedRLEnv",
disable_env_checker=True,
kwargs={
"env_cfg_entry_point": f"{__name__}.cartpole_env_cfg:CartpoleEnvCfg",
"rl_games_cfg_entry_point": f"{agents.__name__}:rl_games_ppo_cfg.yaml",
"rsl_rl_cfg_entry_point": f"{agents.__name__}.rsl_rl_ppo_cfg:CartpolePPORunnerCfg",
"rsl_rl_with_symmetry_cfg_entry_point": f"{agents.__name__}.rsl_rl_ppo_cfg:CartpolePPORunnerWithSymmetryCfg",
"skrl_cfg_entry_point": f"{agents.__name__}:skrl_ppo_cfg.yaml",
"sb3_cfg_entry_point": f"{agents.__name__}:sb3_ppo_cfg.yaml",
},
其中kwargs的键值对,其值可以是yaml文件或者config class,建议使用string来指定class。
在commandline中指定不同的--agent可以使用不同的配置。
# PPO training with symmetry augmentation
./isaaclab.sh -p scripts/reinforcement_learning/rsl_rl/train.py --task Isaac-Cartpole-v0 --headless \
--agent rsl_rl_with_symmetry_cfg_entry_point \
--run_name ppo_with_symmetry_data_augmentation
# you can use hydra to disable symmetry augmentation but enable mirror loss computation
./isaaclab.sh -p scripts/reinforcement_learning/rsl_rl/train.py --task Isaac-Cartpole-v0 --headless \
--agent rsl_rl_with_symmetry_cfg_entry_point \
--run_name ppo_without_symmetry_data_augmentation \
agent.algorithm.symmetry_cfg.use_data_augmentation=false
16. Modifying an existing Direct RL Environment
修改已有代码前先备份。本次使用的代码如下。
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
from isaaclab_assets import HUMANOID_CFG
import isaaclab.sim as sim_utils
from isaaclab.assets import ArticulationCfg
from isaaclab.envs import DirectRLEnvCfg
from isaaclab.scene import InteractiveSceneCfg
from isaaclab.sim import SimulationCfg
from isaaclab.terrains import TerrainImporterCfg
from isaaclab.utils import configclass
from isaaclab_tasks.direct.locomotion.locomotion_env import LocomotionEnv
@configclass
class HumanoidEnvCfg(DirectRLEnvCfg):
# env
episode_length_s = 15.0
decimation = 2
action_scale = 1.0
action_space = 21
observation_space = 75
state_space = 0
# simulation
sim: SimulationCfg = SimulationCfg(dt=1 / 120, render_interval=decimation)
terrain = TerrainImporterCfg(
prim_path="/World/ground",
terrain_type="plane",
collision_group=-1,
physics_material=sim_utils.RigidBodyMaterialCfg(
friction_combine_mode="average",
restitution_combine_mode="average",
static_friction=1.0,
dynamic_friction=1.0,
restitution=0.0,
),
debug_vis=False,
)
# scene
scene: InteractiveSceneCfg = InteractiveSceneCfg(
num_envs=4096, env_spacing=4.0, replicate_physics=True, clone_in_fabric=True
)
# robot
robot: ArticulationCfg = HUMANOID_CFG.replace(prim_path="/World/envs/env_.*/Robot")
joint_gears: list = [
67.5000, # lower_waist
67.5000, # lower_waist
67.5000, # right_upper_arm
67.5000, # right_upper_arm
67.5000, # left_upper_arm
67.5000, # left_upper_arm
67.5000, # pelvis
45.0000, # right_lower_arm
45.0000, # left_lower_arm
45.0000, # right_thigh: x
135.0000, # right_thigh: y
45.0000, # right_thigh: z
45.0000, # left_thigh: x
135.0000, # left_thigh: y
45.0000, # left_thigh: z
90.0000, # right_knee
90.0000, # left_knee
22.5, # right_foot
22.5, # right_foot
22.5, # left_foot
22.5, # left_foot
]
heading_weight: float = 0.5
up_weight: float = 0.1
energy_cost_scale: float = 0.05
actions_cost_scale: float = 0.01
alive_reward_scale: float = 2.0
dof_vel_scale: float = 0.1
death_cost: float = -1.0
termination_height: float = 0.8
angular_velocity_scale: float = 0.25
contact_force_scale: float = 0.01
class HumanoidEnv(LocomotionEnv):
cfg: HumanoidEnvCfg
def __init__(self, cfg: HumanoidEnvCfg, render_mode: str | None = None, **kwargs):
super().__init__(cfg, render_mode, **kwargs)
对其task name HumanoidEnv和配置实例HumanoidEnvCfg都重命名,避免registering env时出现冲突。之后加入一个new entry来register task,并修改init.py。之后还可以修改robot的参数。具体操作见网页。
17. Policy Inference in USD Environment
代码文件为policy_inference_in_usd.py,注意其中env_cfg.scene.terrain部分使用了外部usd。
18. Adding sensors on a robot
略
19. Using a task-space controller
可能主要用于机械臂,略。