Skip to content

6.1.15 播放视频

该示例中用到了play_video ,在运行节点程序前,需要先将视频上传到机器人的**交互计算单元(PC3)**上(用户可在其上创建一个用来存储视频的目录,如/var/tmp/videos/),然后将节点程序中的video_path改为需要播放视频的路径。

注意

⚠️ 请注意!交互计算单元(PC3)独立于二开程序所在的开发计算单元(PC2), 音视频文件务必存入交互计算单元(IP: 10.0.1.42)。
⚠️ 音视频文件夹及该文件夹所有父目录应当为所有用户可访问读取(建议在/var/tmp/下创建子目录存放)

功能说明
通过调用PlayVideo服务,可以让机器人在屏幕上播放指定路径的视频文件。请确保视频文件已上传到交互计算单元,否则播放会失败。

python
#!/usr/bin/env python3

import rclpy
import rclpy.logging
from rclpy.node import Node

from aimdk_msgs.srv import PlayVideo

class PlayVideoClient(Node):
    def __init__(self):
        super().__init__('play_video_client')
        self.client = self.create_client(
            PlayVideo, '/face_ui_proxy/play_video')
        self.get_logger().info('✅ PlayVideo client node created.')

        # Wait for the service to become available
        while not self.client.wait_for_service(timeout_sec=2.0):
            self.get_logger().info('⏳ Service unavailable, waiting...')

        self.get_logger().info('🟢 Service available, ready to send request.')

    def send_request(self, video_path, mode, priority):
        req = PlayVideo.Request()

        req.video_path = video_path
        req.mode = mode
        req.priority = priority

        # async call
        self.get_logger().info(
            f'📨 Sending request to play video: mode={mode} video={video_path}')
        for i in range(8):
            req.header.header.stamp = self.get_clock().now().to_msg()
            future = self.client.call_async(req)
            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)

            if future.done():
                break

            # retry as remote peer is NOT handled well by ROS
            self.get_logger().info(f'trying ... [{i}]')

        resp = future.result()
        if resp is None:
            self.get_logger().error('❌ Service call not completed or timed out.')
            return False

        if resp.success:
            self.get_logger().info(
                f'✅ Request to play video recorded successfully: {resp.message}')
            return True
        else:
            self.get_logger().error(
                f'❌ Failed to record play-video request: {resp.message}')
            return False

def main(args=None):
    rclpy.init(args=args)
    node = None

    try:
        # video path and priority can be customized
        video_path = "/agibot/data/home/agi/zhiyuan.mp4"
        priority = 5
        # input play mode
        mode = int(input("Enter video play mode (1: play once, 2: loop): "))
        if mode not in (1, 2):
            raise ValueError(f'invalid mode {mode}')

        node = PlayVideoClient()
        node.send_request(video_path, mode, priority)
    except KeyboardInterrupt:
        pass
    except Exception as e:
        rclpy.logging.get_logger('main').error(
            f'Program exited with exception: {e}')

    if node:
        node.destroy_node()
    if rclpy.ok():
        rclpy.shutdown()

if __name__ == '__main__':
    main()

📡 Gitee 私有仓库 | 📖 Cloudflare Pages | 基于 AGIBOT X2 平台构建