Skip to content

6.1.14 激光雷达数据订阅

该示例中用到了echo_lidar_data ,通过订阅/aima/hal/sensor/lidar_chest_front/话题来接收机器人的激光雷达数据,支持点云数据和IMU数据两种数据类型。

功能特点:

  • 支持激光雷达点云数据订阅

  • 支持激光雷达IMU数据订阅

  • 实时FPS统计和数据显示

  • 可配置的topic类型选择

  • 详细的数据字段信息输出

支持的数据类型:

  • PointCloud2: 激光雷达点云数据 (sensor_msgs/PointCloud2)

  • Imu: 激光雷达IMU数据 (sensor_msgs/Imu)

技术实现:

  • 使用SensorDataQoS配置(BEST_EFFORT + VOLATILE

  • 支持点云字段信息解析和显示

  • 支持IMU四元数、角速度和线性加速度数据

  • 提供详细的调试日志输出

应用场景:

  • 激光雷达数据采集和分析

  • 点云数据处理和可视化

  • 机器人导航和定位

  • SLAM算法开发

  • 环境感知和建图

python
#!/usr/bin/env python3
"""
Chest LiDAR data subscription example

Supports subscribing to the following topics:
  1. /aima/hal/sensor/lidar_chest_front/lidar_pointcloud
     - Data type: sensor_msgs/PointCloud2
     - frame_id: lidar_chest_front
     - child_frame_id: /
     - Content: LiDAR point cloud data
  2. /aima/hal/sensor/lidar_chest_front/imu
     - Data type: sensor_msgs/Imu
     - frame_id: lidar_imu_chest_front
     - Content: LiDAR IMU data

You can select the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
  - pointcloud: subscribe to LiDAR point cloud
  - imu: subscribe to LiDAR IMU
Default topic_type is pointcloud

Examples:
  ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=pointcloud
  ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=imu
"""

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
from sensor_msgs.msg import PointCloud2, Imu
from collections import deque
import time

class LidarChestEcho(Node):
    def __init__(self):
        super().__init__('lidar_chest_echo')

        # Select the topic type to subscribe
        self.declare_parameter('topic_type', 'pointcloud')
        self.topic_type = self.get_parameter('topic_type').value

        # SensorDataQoS: BEST_EFFORT + VOLATILE
        qos = QoSProfile(
            reliability=QoSReliabilityPolicy.BEST_EFFORT,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=5
        )

        # Create different subscribers based on topic_type
        if self.topic_type == "pointcloud":
            self.topic_name = "/aima/hal/sensor/lidar_chest_front/lidar_pointcloud"
            self.sub_pointcloud = self.create_subscription(
                PointCloud2, self.topic_name, self.cb_pointcloud, qos)
            self.get_logger().info(
                f"✅ Subscribing LIDAR PointCloud2: {self.topic_name}")

        elif self.topic_type == "imu":
            self.topic_name = "/aima/hal/sensor/lidar_chest_front/imu"
            self.sub_imu = self.create_subscription(
                Imu, self.topic_name, self.cb_imu, qos)
            self.get_logger().info(
                f"✅ Subscribing LIDAR IMU: {self.topic_name}")

        else:
            self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
            raise ValueError("Unknown topic_type")

        # Internal state
        self.last_print = self.get_clock().now()
        self.arrivals = deque()

    def update_arrivals(self):
        """Calculate received FPS"""
        now = self.get_clock().now()
        self.arrivals.append(now)
        while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
            self.arrivals.popleft()

    def get_fps(self):
        """Get FPS"""
        return len(self.arrivals)

    def should_print(self):
        """Control print frequency"""
        now = self.get_clock().now()
        if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
            self.last_print = now
            return True
        return False

    def cb_pointcloud(self, msg: PointCloud2):
        """PointCloud2 callback (LiDAR point cloud)"""
        self.update_arrivals()

        if self.should_print():
            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9

            # Format fields info
            fields_str = " ".join(
                [f"{f.name}({f.datatype})" for f in msg.fields])

            self.get_logger().info(
                f"🟢 LIDAR PointCloud2 received\n"
                f"  • frame_id:        {msg.header.frame_id}\n"
                f"  • stamp (sec):     {stamp_sec:.6f}\n"
                f"  • width x height:  {msg.width} x {msg.height}\n"
                f"  • point_step:      {msg.point_step}\n"
                f"  • row_step:        {msg.row_step}\n"
                f"  • fields:          {fields_str}\n"
                f"  • is_bigendian:    {msg.is_bigendian}\n"
                f"  • is_dense:        {msg.is_dense}\n"
                f"  • data size:       {len(msg.data)}\n"
                f"  • recv FPS (1s):   {self.get_fps():1.1f}"
            )

    def cb_imu(self, msg: Imu):
        """IMU callback (LiDAR IMU)"""
        self.update_arrivals()

        if self.should_print():
            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9

            self.get_logger().info(
                f"🟢 LIDAR IMU received\n"
                f"  • frame_id:        {msg.header.frame_id}\n"
                f"  • stamp (sec):     {stamp_sec:.6f}\n"
                f"  • orientation:     [{msg.orientation.x:.6f}, {msg.orientation.y:.6f}, {msg.orientation.z:.6f}, {msg.orientation.w:.6f}]\n"
                f"  • angular_velocity:[{msg.angular_velocity.x:.6f}, {msg.angular_velocity.y:.6f}, {msg.angular_velocity.z:.6f}]\n"
                f"  • linear_accel:    [{msg.linear_acceleration.x:.6f}, {msg.linear_acceleration.y:.6f}, {msg.linear_acceleration.z:.6f}]\n"
                f"  • recv FPS (1s):   {self.get_fps():.1f}"
            )

def main(args=None):
    rclpy.init(args=args)
    try:
        node = LidarChestEcho()
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    except Exception as e:
        print(f"Error: {e}")
    finally:
        if 'node' in locals():
            node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

使用说明:

bash
# 订阅激光雷达点云数据
ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=pointcloud

# 订阅激光雷达IMU数据
ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=imu

输出示例:

python
[INFO] [lidar_chest_echo]: ✅ Subscribing LIDAR PointCloud2: /aima/hal/sensor/lidar_chest_front/lidar_pointcloud
[INFO] [lidar_chest_echo]: 🟢 LIDAR PointCloud2 received
  • frame_id:        lidar_chest_front
  • stamp (sec):     1234567890.123456
  • width x height:  1 x 36000
  • point_step:      16
  • row_step:        16
  • fields:          x(7) y(7) z(7) intensity(7)
  • is_bigendian:    False
  • is_dense:        True
  • data size:       576000
  • recv FPS (1s):   10.0

📡 Gitee 私有仓库 | 📖 Cloudflare Pages | 基于 AGIBOT X2 平台构建