Skip to content

6.1.6 注册二开输入源

对于v0.7之后的版本,在实现对MC的控制前,需要先注册输入源。该示例中通过 /aimdk_5Fmsgs/srv/SetMcInputSource服务来注册二开的输入源,让MC感知到,只有注册过输入源后才能实现机器人的速度控制

python
#!/usr/bin/env python3

import rclpy
import rclpy.logging
from rclpy.node import Node

from aimdk_msgs.srv import SetMcInputSource
from aimdk_msgs.msg import RequestHeader, McInputAction

class McInputClient(Node):
    def __init__(self):
        super().__init__('set_mc_input_source_client')
        self.client = self.create_client(
            SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource'
        )

        self.get_logger().info('✅ SetMcInputSource client node created.')

        # Wait for the service to become available
        while not self.client.wait_for_service(timeout_sec=2.0):
            self.get_logger().info('⏳ Service unavailable, waiting...')

        self.get_logger().info('🟢 Service available, ready to send request.')

    def send_request(self):
        req = SetMcInputSource.Request()

        # header
        req.request.header = RequestHeader()

        # action (e.g. 1001 = register)
        req.action = McInputAction()
        req.action.value = 1001

        # input source info
        req.input_source.name = 'node'
        req.input_source.priority = 40
        req.input_source.timeout = 1000  # ms

        # Send request and wait for response
        self.get_logger().info(
            f'📨 Sending input source request: action_id={req.action.value}, '
            f'name={req.input_source.name}, priority={req.input_source.priority}'
        )
        for i in range(8):
            req.request.header.stamp = self.get_clock().now().to_msg()
            future = self.client.call_async(req)
            rclpy.spin_until_future_complete(
                self, future, timeout_sec=0.25)

            if future.done():
                break

            # retry as remote peer is NOT handled well by ROS
            self.get_logger().info(f'trying ... [{i}]')

        if not future.done():
            self.get_logger().error('❌ Service call failed or timed out.')
            return False

        response = future.result()
        ret_code = response.response.header.code
        task_id = response.response.task_id

        if ret_code == 0:
            self.get_logger().info(
                f'✅ Input source set successfully. task_id={task_id}'
            )
            return True
        else:
            self.get_logger().error(
                f'❌ Input source set failed. ret_code={ret_code}, task_id={task_id} (duplicated ADD? or MODIFY/ENABLE/DISABLE for unknown source?)'
            )
            return False

def main(args=None):
    rclpy.init(args=args)

    node = None
    try:
        node = McInputClient()
        ok = node.send_request()
        if not ok:
            node.get_logger().error('Input source request failed.')
    except KeyboardInterrupt:
        pass
    except Exception as e:
        rclpy.logging.get_logger('main').error(
            f'Program exited with exception: {e}')

    if node:
        node.destroy_node()
    if rclpy.ok():
        rclpy.shutdown()

if __name__ == '__main__':
    main()

📡 Gitee 私有仓库 | 📖 Cloudflare Pages | 基于 AGIBOT X2 平台构建