Skip to content

6.1.8 控制机器人走跑

该示例中用到了mc_locomotion_velocity,以下示例通过发布 /aima/mc/locomotion/velocity话题控制机器人的行走,对于v0.7之后的版本,实现速度控制前需要注册输入源(该示例已实现注册输入源),具体注册步骤可参考代码。

进入稳定站立模式后执行本程序

python
#!/usr/bin/env python3

import rclpy
from rclpy.node import Node
import time
import signal
import sys

from aimdk_msgs.msg import McLocomotionVelocity, MessageHeader
from aimdk_msgs.srv import SetMcInputSource

class DirectVelocityControl(Node):
    def __init__(self):
        super().__init__('direct_velocity_control')

        self.publisher = self.create_publisher(
            McLocomotionVelocity, '/aima/mc/locomotion/velocity', 10)
        self.client = self.create_client(
            SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource')

        self.forward_velocity = 0.0
        self.lateral_velocity = 0.0
        self.angular_velocity = 0.0

        self.max_forward_speed = 1.0
        self.min_forward_speed = 0.2

        self.max_lateral_speed = 1.0
        self.min_lateral_speed = 0.2

        self.max_angular_speed = 1.0
        self.min_angular_speed = 0.1

        self.timer = None

        self.get_logger().info("Direct velocity control node started!")

    def start_publish(self):
        if not self.timer:
            self.timer = self.create_timer(0.02, self.publish_velocity)

    def register_input_source(self):
        self.get_logger().info("Registering input source...")

        timeout_sec = 8.0
        start = self.get_clock().now().nanoseconds / 1e9

        while not self.client.wait_for_service(timeout_sec=2.0):
            now = self.get_clock().now().nanoseconds / 1e9
            if now - start > timeout_sec:
                self.get_logger().error("Waiting for service timed out")
                return False
            self.get_logger().info("Waiting for input source service...")

        req = SetMcInputSource.Request()
        req.action.value = 1001
        req.input_source.name = "node"
        req.input_source.priority = 40
        req.input_source.timeout = 1000

        for i in range(8):
            req.request.header.stamp = self.get_clock().now().to_msg()
            future = self.client.call_async(req)
            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)

            if future.done():
                break

            # retry as remote peer is NOT handled well by ROS
            self.get_logger().info(f"trying to register input source... [{i}]")

        if future.done():
            try:
                response = future.result()
                state = response.response.state.value
                self.get_logger().info(
                    f"Input source set successfully: state={state}, task_id={response.response.task_id}")
                return True
            except Exception as e:
                self.get_logger().error(f"Service call exception: {str(e)}")
                return False
        else:
            self.get_logger().error("Service call failed or timed out")
            return False

    def publish_velocity(self):
        msg = McLocomotionVelocity()
        msg.header = MessageHeader()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.source = "node"
        msg.forward_velocity = self.forward_velocity
        msg.lateral_velocity = self.lateral_velocity
        msg.angular_velocity = self.angular_velocity

        self.publisher.publish(msg)

        self.get_logger().info(
            f"Publishing velocity: forward {self.forward_velocity:.2f} m/s, "
            f"lateral {self.lateral_velocity:.2f} m/s, "
            f"angular {self.angular_velocity:.2f} rad/s"
        )

    def set_forward(self, forward):
        # check value range, mc has thresholds to start movement
        if abs(forward) < 0.005:
            self.forward_velocity = 0.0
            return True
        elif abs(forward) > self.max_forward_speed or abs(forward) < self.min_forward_speed:
            raise ValueError("out of range")
        else:
            self.forward_velocity = forward
            return True

    def set_lateral(self, lateral):
        # check value range, mc has thresholds to start movement
        if abs(lateral) < 0.005:
            self.lateral_velocity = 0.0
            return True
        elif abs(lateral) > self.max_lateral_speed or abs(lateral) < self.min_lateral_speed:
            raise ValueError("out of range")
        else:
            self.lateral_velocity = lateral
            return True

    def set_angular(self, angular):
        # check value range, mc has thresholds to start movement
        if abs(angular) < 0.005:
            self.angular_velocity = 0.0
            return True
        elif abs(angular) > self.max_angular_speed or abs(angular) < self.min_angular_speed:
            raise ValueError("out of range")
        else:
            self.angular_velocity = angular
            return True

    def clear_velocity(self):
        self.forward_velocity = 0.0
        self.lateral_velocity = 0.0
        self.angular_velocity = 0.0

# Global node instance for signal handling
global_node = None

def signal_handler(sig, frame):
    global global_node
    if global_node is not None:
        global_node.clear_velocity()
        global_node.get_logger().info(
            f"Received signal {sig}, clearing velocity and shutting down")
    rclpy.shutdown()
    sys.exit(0)

def main():
    global global_node
    rclpy.init()

    node = DirectVelocityControl()
    global_node = node

    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    if not node.register_input_source():
        node.get_logger().error("Input source registration failed, exiting")
        rclpy.shutdown()
        return

    # get and check control values
    # notice that mc has thresholds to start movement
    try:
        # get input forward
        forward = float(
            input("Please enter forward velocity 0 or ±(0.2 ~ 1.0) m/s: "))
        node.set_forward(forward)
        # get input lateral
        lateral = float(
            input("Please enter lateral velocity 0 or ±(0.2 ~ 1.0) m/s: "))
        node.set_lateral(lateral)
        # get input angular
        angular = float(
            input("Please enter angular velocity 0 or ±(0.1 ~ 1.0) rad/s: "))
        node.set_angular(angular)
    except Exception as e:
        node.get_logger().error(f"Invalid input: {e}")
        rclpy.shutdown()
        return

    node.get_logger().info("Setting velocity, moving for 5 seconds")
    node.start_publish()

    start = node.get_clock().now()
    while (node.get_clock().now() - start).nanoseconds / 1e9 < 5.0:
        rclpy.spin_once(node, timeout_sec=0.1)
        time.sleep(0.001)

    node.clear_velocity()
    node.get_logger().info("5-second motion finished, robot stopped")

    rclpy.spin(node)
    rclpy.shutdown()

if __name__ == '__main__':
    main()

📡 Gitee 私有仓库 | 📖 Cloudflare Pages | 基于 AGIBOT X2 平台构建