Skip to content

Writing a SMACH

This Tutorial will walk you through writing a simple SMACH. SMACH stands for State Machine and is an important concept in the ROS Navigation ecosystem. You can check out the official ROS SMACH Tutorials, as well.

The following SMACH tutorial will perform the circular path you already know from previous tutorials by using path planning with the /mbf_msgs/GetPath.action and execution from within a simple SMACH with /mbf_msgs/ExePath.action.

Code

import rospy
import smach
import smach_ros
import threading

from geometry_msgs.msg import PoseStamped
from nav_msgs.msg import Path

from mbf_msgs.msg import ExePathAction
from mbf_msgs.msg import GetPathAction
from mbf_msgs.msg import RecoveryAction


def create_pose(x, y, z, xx, yy, zz, ww):
    pose = PoseStamped()
    pose.header.frame_id = "map"
    pose.header.stamp = rospy.Time.now()
    pose.pose.position.x = x
    pose.pose.position.y = y
    pose.pose.position.z = z
    pose.pose.orientation.x = xx
    pose.pose.orientation.y = yy
    pose.pose.orientation.z = zz
    pose.pose.orientation.w = ww
    return pose


def iterate_target_poses():
    target_poses = [   
        create_pose(-1.75, 0.74, 0, 0, 0, 0.539, 0.843),
        create_pose(-0.36, 1.92, 0, 0, 0, -0.020, 0.999),
        create_pose(0.957, 1.60, 0, 0, 0, -0.163, 0.987),
        create_pose(1.8741, 0.3830, 0, 0, 0, -0.70, 0.711),
        create_pose(1.752, -0.928, 0, 0, 0, -0.856, 0.517),
        create_pose(0.418, -2.116, 0, 0, 0, 0.998, 0.0619),
        create_pose(-0.775, -1.80, 0, 0, 0, 0.954, 0.300),
        create_pose(-1.990, -0.508, 0, 0, 0, -0.112, 0.999)
    ]

    for target_pose in target_poses:
        yield target_pose

def create_path_goal(path, tolerance_from_action, dist_tolerance, angle_tolerance):
    goal = mbf_msgs.ExePathGoal()
    goal.path = path
    goal.tolerance_from_action = tolerance_from_action
    goal.dist_tolerance = dist_tolerance
    goal.angle_tolerance = angle_tolerance
    return goal

def main():
    rospy.init_node('mbf_state_machine')

    target_poses = iterate_target_poses()

    # Create SMACH state machine
    sm = smach.StateMachine(outcomes=['succeeded', 'aborted', 'preempted'])

    # Define userdata
    sm.userdata.target_pose = None
    sm.userdata.path = None
    sm.userdata.error = None
    sm.userdata.clear_costmap_flag = False
    sm.userdata.error_status = None

    with sm:
        # path callback
        def get_path_callback(userdata, goal):
            try:
                goal.target_pose = next(target_poses)
            except StopIteration:
                rospy.logwarn("Reached last target pose")
                rospy.signal_shutdown("Last goal reached. Shutting down")

        # Get path
        smach.StateMachine.add(
            'GET_PATH',
            smach_ros.SimpleActionState(
                '/move_base_flex/get_path',
                GetPathAction,
                goal_cb=get_path_callback,
                goal_slots=['target_pose'],
                result_slots=['path']
            ),
            transitions={
                'succeeded': 'EXE_PATH',
                'aborted': 'aborted',
                'preempted': 'preempted'
            }
        )

        def path_callback(userdata, goal):
            target_pose = goal.path.poses[-1].pose
            rospy.loginfo("Attempting to reach (%1.3f, %1.3f)", target_pose.position.x, target_pose.position.y)

        # Execute path
        smach.StateMachine.add(
            'EXE_PATH',
            smach_ros.SimpleActionState(
                '/move_base_flex/exe_path',
                ExePathAction,
                goal_cb=path_callback,
                goal_slots=['path']
            ),
            transitions={
                'succeeded': 'GET_PATH',
                'aborted': 'aborted',
                'preempted': 'preempted'
            }
        )

    # Execute SMACH plan
    # Create a thread to execute the smach container
    smach_thread = threading.Thread(target=sm.execute)
    smach_thread.start()

    # Wait for ctrl-c
    rospy.spin()

    # Request the container to preempt
    sm.request_preempt()

    # Block until everything is preempted 
    smach_thread.join()

if __name__=="__main__":
    main()

The Code Explained

Let's define the State Machine with three outcomes: succeeded, aborted and preempted:

# Create SMACH state machine
sm = smach.StateMachine(outcomes=['succeeded', 'aborted', 'preempted'])

Next, we need to add states to the SMACH. In this simple case, all we need is a state to handle path planning, GET_PATH and another to handle the path execution, EXE_PATH.

The states use smach_ros

The smach_ros package contains extensions for the SMACH library to integrate it tightly with ROS. For example, SMACH-ROS can call ROS services, listen to ROS topics, and integrate with actionlib both as a client, and a provider of action servers. SMACH is a new library that takes advantage of very old concepts in order to quickly create robust robot behavior with maintainable and modular code.

GET_PATH State

smach.StateMachine.add(
    'GET_PATH',
    smach_ros.SimpleActionState(
        '/move_base_flex/get_path',
        GetPathAction,
        goal_cb=get_path_callback,
        goal_slots=['target_pose'],
        result_slots=['path']
    ),
    transitions={
        'succeeded': 'EXE_PATH',
        'aborted': 'aborted',
        'preempted': 'preempted'
    },
    remapping={
        'target_pose': 'goal'
    }
)

In our case, we need access to

  • the goal, to tell out robot about the next waypoint: goal_slots=['target_pose']
  • the path, to tell the next state which path to execute: result_slots=['path']

While the goal_slots define the input values, result_slots indicate the output values.

The seconds parameter tells the SMACH which action server to use to receive a path: /move_base_flex/get_path with the mbf_msgs/GetPathAction we already know from the beginner tutorials.

The transitions parameters configures which states are visited next, depending on the result of the state.

Note: If goals/results and userdata are named differently, you can change them with the remapping argument (not necessary in our case). e.g.:

remapping={
    'target_pose': 'goal'
}

Next use this callback to set the goal. After we set the goal, the state calls the /move_base_flex/get_path action server, and relays the result to the next state (Shown shortly).

def get_path_callback(userdata, goal):
    try:
        goal.target_pose = next(target_poses)
    except StopIteration:
        rospy.logwarn("Reached last target pose")
        rospy.signal_shutdown("Last goal reached. Shutting down")

EXE_PATH State

smach.StateMachine.add(
    'EXE_PATH',
    smach_ros.SimpleActionState(
        '/move_base_flex/exe_path',
        ExePathAction,
        goal_cb=path_callback,
        goal_slots=['path']
    ),
    transitions={
        'succeeded': 'GET_PATH',
        'aborted': 'aborted',
        'preempted': 'preempted'
    }
)

the EXE_PATH state receives a path via userdata an is declared with goal_slots=['path']. On success, the path will transition to the GET_PATH state and will try to plan a new path from there.

The exe_path_callback is used for verbosity only

def exe_path_callback(userdata, goal):
        target_pose = goal.path.poses[-1].pose
        rospy.loginfo("Attempting to reach (%1.3f, %1.3f)", target_pose.position.x, target_pose.position.y)

Userdata

Finally, the result and goal slots we used to determine how the data will be passed between states, need to be set in the userdata:

# Define userdata
sm.userdata.goal = None
sm.userdata.path = None

Run the SMACH

Launch gazebo

export TURTLEBOT3_MODEL=burger
roslaunch turtlebot3_gazebo turtlebot3_world.launch

as well as Move Base Flex

export TURTLEBOT3_MODEL=burger
roslaunch mbf_advanced amcl_demo_mbf.launch

Finally, start the SMACH

rosrun mbf_advanced circle_smach.py