Skip to content

6.1.10 键盘控制机器人

本示例实现了通过PC的键盘输入控制机器人的前进后退转弯的功能。

通过 W``A``S``D控制机器人行走方向,增加/减少速度(±0.2 m/s),Q/E增加/减少角速度(±0.1 rad/s),ESC退出程序并释放终端资源,Space立即将速度归零,执行急停。

小心

注意:运行本示例前需要先使用手柄,将机器人切入稳定站立模式。(位控站立/走跑模式时按 R2+X, 其他模式详见模式流转图进行操作),然后在机器人的终端界面使用:aima em stop-app rc关闭遥控器,防止通道占用。

实现键盘控制前需要注册输入源(该示例已实现注册输入源)

python
#!/usr/bin/env python3

import rclpy
import rclpy.logging
from rclpy.node import Node
from aimdk_msgs.msg import McLocomotionVelocity, MessageHeader
from aimdk_msgs.srv import SetMcInputSource
import curses
import time
from functools import partial

class KeyboardVelocityController(Node):
    def __init__(self, stdscr):
        super().__init__('keyboard_velocity_controller')
        self.stdscr = stdscr
        self.forward_velocity = 0.0
        self.lateral_velocity = 0.0
        self.angular_velocity = 0.0
        self.step = 0.2
        self.angular_step = 0.1

        self.publisher = self.create_publisher(
            McLocomotionVelocity, '/aima/mc/locomotion/velocity', 10)
        self.client = self.create_client(
            SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource')

        if not self.register_input_source():
            self.get_logger().error("Input source registration failed, exiting")
            raise RuntimeError("Failed to register input source")

        # Configure curses
        curses.cbreak()
        curses.noecho()
        self.stdscr.keypad(True)
        self.stdscr.nodelay(True)

        self.get_logger().info(
            "Control started: W/S forward/backward, A/D strafe, Q/E turn, Space stop, Esc exit")

        # Timer: check keyboard every 50 ms
        self.timer = self.create_timer(0.05, self.check_key_and_publish)

    def register_input_source(self):
        self.get_logger().info("Registering input source...")

        timeout_sec = 8.0
        start = self.get_clock().now().nanoseconds / 1e9

        while not self.client.wait_for_service(timeout_sec=2.0):
            now = self.get_clock().now().nanoseconds / 1e9
            if now - start > timeout_sec:
                self.get_logger().error("Waiting for service timed out")
                return False
            self.get_logger().info("Waiting for input source service...")

        req = SetMcInputSource.Request()
        req.action.value = 1001
        req.input_source.name = "node"
        req.input_source.priority = 40
        req.input_source.timeout = 1000

        for i in range(8):
            req.request.header.stamp = self.get_clock().now().to_msg()
            future = self.client.call_async(req)
            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)

            if future.done():
                break

            # retry as remote peer is NOT handled well by ROS
            self.get_logger().info(f"trying to register input source... [{i}]")

        if future.done():
            try:
                resp = future.result()
                state = resp.response.state.value
                self.get_logger().info(
                    f"Input source set successfully: state={state}, task_id={resp.response.task_id}")
                return True
            except Exception as e:
                self.get_logger().error(f"Service exception: {str(e)}")
                return False
        else:
            self.get_logger().error("Service call failed or timed out")
            return False

    def check_key_and_publish(self):
        try:
            ch = self.stdscr.getch()
        except Exception:
            ch = -1

        if ch != -1:
            if ch == ord(' '):
                self.forward_velocity = 0.0
                self.lateral_velocity = 0.0
                self.angular_velocity = 0.0
            elif ch == ord('w'):
                self.forward_velocity = min(
                    self.forward_velocity + self.step, 1.0)
            elif ch == ord('s'):
                self.forward_velocity = max(
                    self.forward_velocity - self.step, -1.0)
            elif ch == ord('a'):
                self.lateral_velocity = min(
                    self.lateral_velocity + self.step, 1.0)
            elif ch == ord('d'):
                self.lateral_velocity = max(
                    self.lateral_velocity - self.step, -1.0)
            elif ch == ord('q'):
                self.angular_velocity = min(
                    self.angular_velocity + self.angular_step, 1.0)
            elif ch == ord('e'):
                self.angular_velocity = max(
                    self.angular_velocity - self.angular_step, -1.0)
            elif ch == 27:  # ESC
                self.get_logger().info("Exiting control")
                rclpy.shutdown()
                return

        msg = McLocomotionVelocity()
        msg.header = MessageHeader()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.source = "node"
        msg.forward_velocity = self.forward_velocity
        msg.lateral_velocity = self.lateral_velocity
        msg.angular_velocity = self.angular_velocity

        self.publisher.publish(msg)

        # Update UI
        self.stdscr.clear()
        self.stdscr.addstr(
            0, 0, "W/S: Forward/Backward | A/D: Strafe | Q/E: Turn | Space: Stop | ESC: Exit")
        self.stdscr.addstr(2, 0,
                           f"Speed Status: Forward: {self.forward_velocity:.2f} m/s | "
                           f"Lateral: {self.lateral_velocity:.2f} m/s | "
                           f"Angular: {self.angular_velocity:.2f} rad/s")
        self.stdscr.refresh()

def curses_main(stdscr):
    rclpy.init()
    try:
        node = KeyboardVelocityController(stdscr)
        rclpy.spin(node)
    except Exception as e:
        rclpy.logging.get_logger("main").fatal(
            f"Program exited with exception: {e}")
    finally:
        curses.endwin()
        rclpy.shutdown()

def main():
    curses.wrapper(curses_main)

if __name__ == '__main__':
    main()

📡 Gitee 私有仓库 | 📖 Cloudflare Pages | 基于 AGIBOT X2 平台构建