具身智能从DRL-robot-navigation学起
  【无限机器人】   2025年12月18日   39   0

文章来源:https://www.guyuehome.com/detail?id=1929555287440687105

1. 简介

在这个数字化和智能化日益加速的时代,机器人技术正在逐渐改变我们的生活方式。 DRL-robot-navigation是一个非常不错的入门开源项目,它利用深度强化学习(Deep Reinforcement Learning, DRL)让机器人实现自主导航,通过模拟环境训练机器人,使其能够学习如何在复杂环境中有效地移动,避开障碍物并达到目标位置。其核心技术是基于TensorFlow实现的深度Q网络(Deep Q-Network, DQN),这是一种被广泛应用于强化学习的神经网络模型。

2. 环境安装

2.1 ROS安装

ROS安装参考鱼香ROS的一步安装教程,按照步骤安装Ros1-noetic即可:

wget http://fishros.com/install -O fishros && . fishros

2.2 miniconda安装

应当先安装Ros,后安装miniconda,防止出现兼容问题。

wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
chmod +x Miniconda3-latest-Linux-x86_64.sh
./Miniconda3-latest-Linux-x86_64.sh
echo 'export PATH=$HOME/miniconda3/bin:$PATH' >> ~/.bashrc

虚拟环境中有base环境和DRL环境,很烦的一点是每次打开终端都会默认进入base环境中,输入指令即可默认进入正常终端

conda config --set auto_activate_base false

2.3 DRL-robot-navigation源码配置

由于有路径问题,建议将其安装在 /home 目录下

cd ~
git clone https://github.com/reiniscimurs/DRL-robot-navigation -b Noetic
cd ~/DRL-robot-navigation/catkin_ws
catkin_make_isolated

image.png
其中 **catkin_ws ** 为ROS工作空间,包含了gazebo仿真环境、p3dx小车模型、velody激光雷达模型等。但其主要任务就是打开一个仿真环境
TD3 中包含了文章中介绍的算法,这份程序是一个普通的 Python 文件(train_velodyne_td3.py)在最外层,里面会去调用 catkin_ws 里的 ROS 资源(velodyne_env.py),包括开启 Gazebo、RViz 和 ROS 控制程序。其内部结构如下:!
image.png
velodyne_env 是发送订阅数据、处理数据、获取状态、计算奖励等功能的代码,相当于gym中的环境部分
train_velodyne_td3 是训练代码,其中包括了网络结构以及训练部分。

2.4 环境搭建配置

这里已经建立好 Anaconda 环境,下面就是创建conda环境python 3.8,并且构建 Pytorch-GPU

conda create -n DRL python=3.8
$ conda activate DRL
$ export ROS_HOSTNAME=localhost
$ export ROS_MASTER_URI=http://localhost:11311
$ export ROS_PORT_SIM=11311
$ export GAZEBO_RESOURCE_PATH=/usr/share/gazebo-11/:~/DRL-robot-navigation/catkin_ws/src/multi_robot_scenario/launch
$ cd ~/DRL-robot-navigation/catkin_ws
$ source devel_isolated/setup.bash
$ source /usr/share/gazebo/setup.bash

此外,还需要 Gazebo 模型的数据库:

git clone https://github.com/osrf/gazebo_models.git

接着将 gazebo_model 中所有的文件夹与文件复制到 ~/.gazebo/models 目录中,切记是“里面”的所有文件夹,而不是 gazebo_model 文件夹本身。如果想要显示 Gazebo 界面,可以修改以下文件:

~/DRL-robot-navigation/catkin_ws/src/multi_robot_scenario/launch/empty_world.launch

将 arg 的 gui 改为 true。

<launch> 
    <include file="$(find gazebo_ros)/launch/empty_world.launch">
        <arg name="world_name" value="$(find multi_robot_scenario)/launch/TD3.world"/>
        <arg name="paused" value="false"/>
        <arg name="use_sim_time" value="true"/>
        <arg name="gui" value="true"/>
        <arg name="headless" value="false"/>
        <arg name="debug" value="false"/>
      </include>
</launch>

在编译过程中,可能会出现各种各样的库缺失,具体缺失的内容可以读取报错信息,然后安装

# No module named 'yaml'
conda install -c conda-forge pyyaml
# No module named 'rospkg'
pip install rospkg
# No module named 'squaternion'
pip install squaternion
# No module named 'attr'
pip install attrs
# No module named 'defusedxml'
pip install defusedxml
# No module named 'netifaces'
pip install netifaces

最后,进入到TD3文件夹下,打开终端,cd到路径,python运行。这样就可以进行训练了

cd ~/DRL-robot-navigation/TD3
python3 velodyne_td3.py

在后续的训练过程中,遇到过一个动态链接问题为: ImportError: /lib/x86_64-linux-gnu/libstdc++.so.6: version ‘GLIBCXX_3.4.29’ not found 可以参考动态链接问题这个文章,但是最好不要这么干,因为/lib/x86_64-linux-gnu/是最关键的系统库链接,稍有不慎会有问题。同时需要保持整个对libstdc++.so.6操作都在su下

sudo ln -f -s /usr/local/anaconda3/envs/habitat/lib/libstdc++.so.6 /usr/lib/x86_64-linux-gnu/libstdc++.so.6

2.5 打开仿真环境(在velodyne_env.py函数中调用了multi_robot_scenario.launch)

打开一个新的终端

cd DRL-robot-navigation-main/catkin_ws/
catkin_make
source devel/setup.sh
roslaunch multi_robot_scenario TD3_world.launch 

发现这里并没有我们需要的机器人模型,所以我们还需要打开launch文件夹下的pioneer3dx.gazebo.launch文件,则新建一个终端

cd DRL-robot-navigation-main/catkin_ws/
source devel/setup.sh
roslaunch multi_robot_scenario pioneer3dx.gazebo.launch 

image.png

3. 代码解析

3.1 velody_env.py文件

这一部分就是重置gazebo仿真器,然后按照TB3信息来规划,并给出reward值。有一个最重要的部分是,我们需要在每一次执行action的时候,让这个action运行那么0.1或者0.05秒,这样我们的小车才能够正常的运行一段距离。如果不设置这个运行时间的话,小车的action就会以最快的速度发布,那么根本就没有办法正常运行,step也会瞬间爆炸。

import math
import os
import random
import subprocess
import time
from os import path

import numpy as np
import rospy
import sensor_msgs.point_cloud2 as pc2
from gazebo_msgs.msg import ModelState
from geometry_msgs.msg import Twist
from nav_msgs.msg import Odometry
from sensor_msgs.msg import PointCloud2
from squaternion import Quaternion
from std_srvs.srv import Empty
from visualization_msgs.msg import Marker
from visualization_msgs.msg import MarkerArray

GOAL_REACHED_DIST = 0.3 # 机器人达到目标的距离阈值
COLLISION_DIST = 0.35 # 机器人与障碍物发生碰撞的距离阈值
TIME_DELTA = 0.1 # 机器人在每次动作后等待的时间# 检查给定的 (x, y) 坐标是否位于障碍物上def check_pos(x, y):
    # 是否在多个预设的障碍物区域内。如果在这些区域内,返回 False
    goal_ok = Trueif -3.8 > x > -6.2 and 6.2 > y > 3.8:
        goal_ok = Falseif -1.3 > x > -2.7 and 4.7 > y > -0.2:
        goal_ok = Falseif -0.3 > x > -4.2 and 2.7 > y > 1.3:
        goal_ok = Falseif -0.8 > x > -4.2 and -2.3 > y > -4.2:
        goal_ok = Falseif -1.3 > x > -3.7 and -0.8 > y > -2.7:
        goal_ok = Falseif 4.2 > x > 0.8 and -1.8 > y > -3.2:
        goal_ok = Falseif 4 > x > 2.5 and 0.7 > y > -3.2:
        goal_ok = Falseif 6.2 > x > 3.8 and -3.3 > y > -4.2:
        goal_ok = Falseif 4.2 > x > 1.3 and 3.7 > y > 1.5:
        goal_ok = Falseif -3.0 > x > -7.2 and 0.5 > y > -1.5:
        goal_ok = Falseif x > 4.5 or x < -4.5 or y > 4.5 or y < -4.5:
        goal_ok = Falsereturn goal_ok


class GazeboEnv:
    """用于与 Gazebo 环境交互的类,包含机器人控制、状态管理和环境重置等功能
    launchfile:Gazebo 的启动文件路径
    environment_dim:环境的维度,用于表示机器人周围的障碍物
    """def __init__(self, launchfile, environment_dim):
        self.environment_dim = environment_dim
        self.odom_x = 0self.odom_y = 0self.goal_x = 1self.goal_y = 0.0self.upper = 5.0self.lower = -5.0self.velodyne_data = np.ones(self.environment_dim) * 10# 初始化激光雷达数据self.last_odom = Noneself.set_self_state = ModelState()# 初始化机器人的状态self.set_self_state.model_name = "r1"# 机器人的名称self.set_self_state.pose.position.x = 0.0self.set_self_state.pose.position.y = 0.0self.set_self_state.pose.position.z = 0.0self.set_self_state.pose.orientation.x = 0.0self.set_self_state.pose.orientation.y = 0.0self.set_self_state.pose.orientation.z = 0.0self.set_self_state.pose.orientation.w = 1.0self.gaps = [[-np.pi / 2 - 0.03, -np.pi / 2 + np.pi / self.environment_dim]]# 初始化激光雷达的扫描角度for m in range(self.environment_dim - 1):
            self.gaps.append(
                [self.gaps[m][1], self.gaps[m][1] + np.pi / self.environment_dim]
            )# 每个激光雷达的扫描角度self.gaps[-1][-1] += 0.03

        port = "11311"
        subprocess.Popen(["roscore", "-p", port])# 启动 ROScoreprint("Roscore launched!")

        # Launch the simulation with the given launchfile name
        rospy.init_node("gym", anonymous=True)
        if launchfile.startswith("/"):# 如果是绝对路径
            fullpath = launchfile
        else:
            fullpath = os.path.join(os.path.dirname(__file__), "assets", launchfile)
        if not path.exists(fullpath):# 如果文件不存在raise IOError("File " + fullpath + " does not exist")

        subprocess.Popen(["roslaunch", "-p", port, fullpath])# 启动 Gazeboprint("Gazebo launched!")

        # 初始化 Gazebo 环境,设置初始状态和 ROS 相关的发布者和订阅者self.vel_pub = rospy.Publisher("/r1/cmd_vel", Twist, queue_size=1)
        self.set_state = rospy.Publisher(
            "gazebo/set_model_state", ModelState, queue_size=10
        )
        self.unpause = rospy.ServiceProxy("/gazebo/unpause_physics", Empty)# 恢复 Gazebo 的物理仿真self.pause = rospy.ServiceProxy("/gazebo/pause_physics", Empty)# 暂停 Gazebo 的物理仿真self.reset_proxy = rospy.ServiceProxy("/gazebo/reset_world", Empty)# 重置 Gazebo 的仿真世界self.publisher = rospy.Publisher("goal_point", MarkerArray, queue_size=3)# 发布目标点的位置self.publisher2 = rospy.Publisher("linear_velocity", MarkerArray, queue_size=1)# 发布线速度self.publisher3 = rospy.Publisher("angular_velocity", MarkerArray, queue_size=1)# 发布角速度self.velodyne = rospy.Subscriber(
            "/velodyne_points", PointCloud2, self.velodyne_callback, queue_size=1
        )# 订阅 Velodyne 激光雷达的数据self.odom = rospy.Subscriber(
            "/r1/odom", Odometry, self.odom_callback, queue_size=1
        )# 订阅机器人的里程计数据# 处理来自 Velodyne 激光雷达的数据,更新距离信息。def velodyne_callback(self, v):
        data = list(pc2.read_points(v, skip_nans=False, field_names=("x", "y", "z")))# 读取激光雷达的数据self.velodyne_data = np.ones(self.environment_dim) * 10# 初始化激光雷达数据for i in range(len(data)):
            if data[i][2] > -0.2:# 如果激光雷达的高度大于 -0.2
                dot = data[i][0] * 1 + data[i][1] * 0# 计算激光雷达的方向
                mag1 = math.sqrt(math.pow(data[i][0], 2) + math.pow(data[i][1], 2))# 计算激光雷达的距离
                mag2 = math.sqrt(math.pow(1, 2) + math.pow(0, 2))# 计算激光雷达的距离
                beta = math.acos(dot / (mag1 * mag2)) * np.sign(data[i][1])# 计算激光雷达的角度
                dist = math.sqrt(data[i][0] ** 2 + data[i][1] ** 2 + data[i][2] ** 2)# 计算激光雷达的距离for j in range(len(self.gaps)):
                    if self.gaps[j][0] <= beta < self.gaps[j][1]:# 如果激光雷达的角度在指定范围内self.velodyne_data[j] = min(self.velodyne_data[j], dist)# 更新激光雷达的距离break# 处理来自里程计的数据,更新机器人的位置def odom_callback(self, od_data):
        self.last_odom = od_data

    # 执行给定的动作并返回新的状态# action:包含线性和角速度的动作数组def step(self, action):
        target = False# 发布速度命令,等待一段时间,然后暂停物理仿真
        vel_cmd = Twist()
        vel_cmd.linear.x = action[0]
        vel_cmd.angular.z = action[1]
        self.vel_pub.publish(vel_cmd)
        self.publish_markers(action)

        rospy.wait_for_service("/gazebo/unpause_physics")# 等待 Gazebo 恢复物理仿真try:
            self.unpause()# 恢复物理仿真except (rospy.ServiceException) as e:
            print("/gazebo/unpause_physics service call failed")

        # 等待一段时间,然后暂停物理仿真
        time.sleep(TIME_DELTA)

        rospy.wait_for_service("/gazebo/pause_physics")
        try:
            passself.pause()
        except (rospy.ServiceException) as e:
            print("/gazebo/pause_physics service call failed")

        # 读取激光雷达数据,判断是否发生碰撞
        done, collision, min_laser = self.observe_collision(self.velodyne_data)
        v_state = []
        v_state[:] = self.velodyne_data[:]# 读取激光雷达数据
        laser_state = [v_state]# 激光雷达数据的状态# 读取机器人的位置和方向self.odom_x = self.last_odom.pose.pose.position.x
        self.odom_y = self.last_odom.pose.pose.position.y
        quaternion = Quaternion(
            self.last_odom.pose.pose.orientation.w,
            self.last_odom.pose.pose.orientation.x,
            self.last_odom.pose.pose.orientation.y,
            self.last_odom.pose.pose.orientation.z,
        )
        euler = quaternion.to_euler(degrees=False)# 欧拉角
        angle = round(euler[2], 4)# 机器人的角度# 计算机器人到目标的距离
        distance = np.linalg.norm(
            [self.odom_x - self.goal_x, self.odom_y - self.goal_y]
        )

        # 计算机器人的角度和朝向目标的角度之间的夹角
        skew_x = self.goal_x - self.odom_x
        skew_y = self.goal_y - self.odom_y
        dot = skew_x * 1 + skew_y * 0
        mag1 = math.sqrt(math.pow(skew_x, 2) + math.pow(skew_y, 2))
        mag2 = math.sqrt(math.pow(1, 2) + math.pow(0, 2))
        beta = math.acos(dot / (mag1 * mag2))
        if skew_y < 0:# 如果目标点在机器人的左侧if skew_x < 0:# 如果目标点在机器人的后方
                beta = -beta
            else:
                beta = 0 - beta
        theta = beta - angle
        if theta > np.pi:
            theta = np.pi - theta
            theta = -np.pi - theta
        if theta < -np.pi:
            theta = -np.pi - theta
            theta = np.pi - theta

        # 检测目标是否目标已经到达并且给一个大的正奖励if distance < GOAL_REACHED_DIST:
            target = True
            done = True

        robot_state = [distance, theta, action[0], action[1]]# 机器人的状态
        state = np.append(laser_state, robot_state)# 机器人的状态
        reward = self.get_reward(target, collision, action, min_laser)# 计算奖励return state, reward, done, target
    # 重置环境def reset(self):
        # 重置环境的状态并返回初始观察
        rospy.wait_for_service("/gazebo/reset_world")
        try:
            self.reset_proxy()

        except rospy.ServiceException as e:
            print("/gazebo/reset_simulation service call failed")

        angle = np.random.uniform(-np.pi, np.pi)# 随机生成机器人的角度
        quaternion = Quaternion.from_euler(0.0, 0.0, angle)# 生成四元数
        object_state = self.set_self_state

        x = 0
        y = 0
        position_ok = Falsewhile not position_ok:
            x = np.random.uniform(-4.5, 4.5)# 随机生成机器人的位置
            y = np.random.uniform(-4.5, 4.5)
            position_ok = check_pos(x, y)
        object_state.pose.position.x = x
        object_state.pose.position.y = y
        # object_state.pose.position.z = 0.
        object_state.pose.orientation.x = quaternion.x
        object_state.pose.orientation.y = quaternion.y
        object_state.pose.orientation.z = quaternion.z
        object_state.pose.orientation.w = quaternion.w
        self.set_state.publish(object_state)#发布机器人的状态self.odom_x = object_state.pose.position.x
        self.odom_y = object_state.pose.position.y

        # 随机生成目标点的位置self.change_goal()
        # 随机生成障碍物的位置self.random_box()
        self.publish_markers([0.0, 0.0])

        rospy.wait_for_service("/gazebo/unpause_physics")
        try:
            self.unpause()
        except (rospy.ServiceException) as e:
            print("/gazebo/unpause_physics service call failed")

        time.sleep(TIME_DELTA)

        rospy.wait_for_service("/gazebo/pause_physics")
        try:
            self.pause()
        except (rospy.ServiceException) as e:
            print("/gazebo/pause_physics service call failed")
        v_state = []
        v_state[:] = self.velodyne_data[:]
        laser_state = [v_state]

        distance = np.linalg.norm(
            [self.odom_x - self.goal_x, self.odom_y - self.goal_y]
        )#给定机器人到目标的距离

        skew_x = self.goal_x - self.odom_x
        skew_y = self.goal_y - self.odom_y

        dot = skew_x * 1 + skew_y * 0
        mag1 = math.sqrt(math.pow(skew_x, 2) + math.pow(skew_y, 2))
        mag2 = math.sqrt(math.pow(1, 2) + math.pow(0, 2))
        beta = math.acos(dot / (mag1 * mag2))

        if skew_y < 0:
            if skew_x < 0:
                beta = -beta
            else:
                beta = 0 - beta
        theta = beta - angle

        if theta > np.pi:
            theta = np.pi - theta
            theta = -np.pi - theta
        if theta < -np.pi:
            theta = -np.pi - theta
            theta = np.pi - theta

        robot_state = [distance, theta, 0.0, 0.0]
        state = np.append(laser_state, robot_state)
        return state

    # 改变目标点的位置def change_goal(self):
        # 放置一个新的目标点,并检查其位置是否不在障碍物上if self.upper < 10:
            self.upper += 0.004if self.lower > -10:
            self.lower -= 0.004

        goal_ok = Falsewhile not goal_ok:
            self.goal_x = self.odom_x + random.uniform(self.upper, self.lower)
            self.goal_y = self.odom_y + random.uniform(self.upper, self.lower)
            goal_ok = check_pos(self.goal_x, self.goal_y)

    # 随机在环境中放置障碍物(盒子)def random_box(self):
        # Randomly change the location of the boxes in the environment on each reset to randomize the training# environmentfor i in range(4):
            name = "cardboard_box_" + str(i)

            x = 0
            y = 0
            box_ok = False# 在随机位置放置盒子,确保它们不与机器人或目标重叠while not box_ok:
                x = np.random.uniform(-6, 6)
                y = np.random.uniform(-6, 6)
                box_ok = check_pos(x, y)
                distance_to_robot = np.linalg.norm([x - self.odom_x, y - self.odom_y])
                distance_to_goal = np.linalg.norm([x - self.goal_x, y - self.goal_y])
                if distance_to_robot < 1.5 or distance_to_goal < 1.5:
                    box_ok = False
            box_state = ModelState()
            box_state.model_name = name
            box_state.pose.position.x = x
            box_state.pose.position.y = y
            box_state.pose.position.z = 0.0
            box_state.pose.orientation.x = 0.0
            box_state.pose.orientation.y = 0.0
            box_state.pose.orientation.z = 0.0
            box_state.pose.orientation.w = 1.0self.set_state.publish(box_state)

    # 在 Rviz 中发布可视化数据def publish_markers(self, action):
        # 根据机器人的目标位置和动作参数发布标记
        markerArray = MarkerArray()
        marker = Marker()
        marker.header.frame_id = "odom"
        marker.type = marker.CYLINDER
        marker.action = marker.ADD
        marker.scale.x = 0.1
        marker.scale.y = 0.1
        marker.scale.z = 0.01
        marker.color.a = 1.0
        marker.color.r = 0.0
        marker.color.g = 1.0
        marker.color.b = 0.0
        marker.pose.orientation.w = 1.0
        marker.pose.position.x = self.goal_x
        marker.pose.position.y = self.goal_y
        marker.pose.position.z = 0

        markerArray.markers.append(marker)

        self.publisher.publish(markerArray)

        markerArray2 = MarkerArray()
        marker2 = Marker()
        marker2.header.frame_id = "odom"
        marker2.type = marker.CUBE
        marker2.action = marker.ADD
        marker2.scale.x = abs(action[0])
        marker2.scale.y = 0.1
        marker2.scale.z = 0.01
        marker2.color.a = 1.0
        marker2.color.r = 1.0
        marker2.color.g = 0.0
        marker2.color.b = 0.0
        marker2.pose.orientation.w = 1.0
        marker2.pose.position.x = 5
        marker2.pose.position.y = 0
        marker2.pose.position.z = 0

        markerArray2.markers.append(marker2)
        self.publisher2.publish(markerArray2)

        markerArray3 = MarkerArray()
        marker3 = Marker()
        marker3.header.frame_id = "odom"
        marker3.type = marker.CUBE
        marker3.action = marker.ADD
        marker3.scale.x = abs(action[1])
        marker3.scale.y = 0.1
        marker3.scale.z = 0.01
        marker3.color.a = 1.0
        marker3.color.r = 1.0
        marker3.color.g = 0.0
        marker3.color.b = 0.0
        marker3.pose.orientation.w = 1.0
        marker3.pose.position.x = 5
        marker3.pose.position.y = 0.2
        marker3.pose.position.z = 0

        markerArray3.markers.append(marker3)
        self.publisher3.publish(markerArray3)

    # 检测是否发生碰撞    @staticmethoddef observe_collision(laser_data):
        # Detect a collision from laser data
        min_laser = min(laser_data)
        if min_laser < COLLISION_DIST:
            return True, True, min_laser
        return False, False, min_laser

    # 检测是否发生碰撞    @staticmethoddef get_reward(target, collision, action, min_laser):
        if target:
            return 100.0elif collision:
            return -100.0else:
            r3 = lambda x: 1 - x if x < 1 else 0.0return action[0] / 2 - abs(action[1]) / 2 - r3(min_laser) / 2

3.2 train_velodyne_td3.py文件

这一部分就是训练网络参数的python文件了。该代码实现了一个基于TD3(Twin Delayed Deep Deterministic Policy Gradient)算法的深度强化学习训练框架,旨在训练一个机器人在环境中进行自主导航。核心操作包括:定义演员(Actor)和评论家(Critic)网络结构,使用重放缓冲区存储经验,进行网络训练以优化行动策略,评估策略性能并记录奖励和碰撞率,最后保存和加载模型。训练过程中,网络根据当前状态选择动作,并通过添加探索噪声来增强探索能力,同时在接近障碍物时采取随机动作以提高安全性和有效性。其中的1-287行是TD3网络结构以及各种参数设定,从288行的while是训练的主要部分。这里我们给出我们的注释。

import os
import time

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from numpy import inf
from torch.utils.tensorboard import SummaryWriter

from replay_buffer import ReplayBuffer
from velodyne_env import GazeboEnv

'''
该函数通过在环境中运行给定的神经网络(策略)来评估其性能,计算平均奖励和碰撞率
network: 用于决策的演员-评论家网络。
epoch: 当前的训练轮数,用于日志记录。
eval_episodes: 评估时运行的回合数(默认为10)
'''def evaluate(network, epoch, eval_episodes=10):
    # 初始化 avg_reward 和 col 以跟踪总奖励和碰撞次数
    avg_reward = 0.0
    col = 0for _ in range(eval_episodes):
        count = 0
        state = env.reset()# 对于每个评估回合,重置环境并迭代选择动作,使用网络进行决策
        done = Falsewhile not done and count < 501:
            action = network.get_action(np.array(state))# 将状态传递给网络以获取动作,然后将动作进行缩放并在环境中执行
            a_in = [(action[0] + 1) / 2, action[1]]
            state, reward, done, _ = env.step(a_in)
            avg_reward += reward
            count += 1if reward < -90:# 累计奖励,如果奖励低于 -90,则统计为碰撞
                col += 1# 最后,打印平均奖励和碰撞率,并返回平均奖励
    avg_reward /= eval_episodes
    avg_col = col / eval_episodes
    print("..............................................")
    print(
        "Average Reward over %i Evaluation Episodes, Epoch %i: %f, %f"
        % (eval_episodes, epoch, avg_reward, avg_col)
    )
    print("..............................................")
    return avg_reward

'''
该类定义了演员网络,负责根据当前状态选择动作
state_dim: 输入状态的维度。
action_dim: 输出动作的维度。
'''class Actor(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Actor, self).__init__()
        # 演员网络由三层线性层组成,前两层使用 ReLU 激活,最后一层使用 Tanh 激活。self.layer_1 = nn.Linear(state_dim, 800)# 24 -> 800self.layer_2 = nn.Linear(800, 600)
        self.layer_3 = nn.Linear(600, action_dim)# 600 -> 2self.tanh = nn.Tanh()

    def forward(self, s):
        s = F.relu(self.layer_1(s))
        s = F.relu(self.layer_2(s))
        a = self.tanh(self.layer_3(s))
        return a

'''
该类定义了评论家网络,负责评估给定状态和动作对的价值
state_dim: 输入状态的维度。
action_dim: 输入动作的维度。
'''class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()

        self.layer_1 = nn.Linear(state_dim, 800) # 24 -> 800self.layer_2_s = nn.Linear(800, 600)
        self.layer_2_a = nn.Linear(action_dim, 600) # 2 -> 600self.layer_3 = nn.Linear(600, 1)

        self.layer_4 = nn.Linear(state_dim, 800)
        self.layer_5_s = nn.Linear(800, 600)
        self.layer_5_a = nn.Linear(action_dim, 600)
        self.layer_6 = nn.Linear(600, 1)

    def forward(self, s, a):
        s1 = F.relu(self.layer_1(s))#由两组线性层组成,分别处理状态和动作,每个网络都有两个隐藏层self.layer_2_s(s1)
        self.layer_2_a(a)
        s11 = torch.mm(s1, self.layer_2_s.weight.data.t())
        s12 = torch.mm(a, self.layer_2_a.weight.data.t())
        s1 = F.relu(s11 + s12 + self.layer_2_a.bias.data)
        q1 = self.layer_3(s1)# 计算两个 Q 值,分别为 q1 和 q2,表示对输入状态和动作对的价值

        s2 = F.relu(self.layer_4(s))
        self.layer_5_s(s2)
        self.layer_5_a(a)
        s21 = torch.mm(s2, self.layer_5_s.weight.data.t())
        s22 = torch.mm(a, self.layer_5_a.weight.data.t())
        s2 = F.relu(s21 + s22 + self.layer_5_a.bias.data)
        q2 = self.layer_6(s2)
        return q1, q2


'''
该类实现了 TD3(Twin Delayed Deep Deterministic Policy Gradient)算法,包含演员和评论家网络的训练过程
state_dim: 输入状态的维度。
action_dim: 输出动作的维度。
max_action: 动作的最大值。
'''class TD3(object):
    def __init__(self, state_dim, action_dim, max_action):
        # 初始化演员和评论家网络及其目标网络,并设置优化器# Initialize the Actor networkself.actor = Actor(state_dim, action_dim).to(device)
        self.actor_target = Actor(state_dim, action_dim).to(device)
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters())

        # Initialize the Critic networksself.critic = Critic(state_dim, action_dim).to(device)
        self.critic_target = Critic(state_dim, action_dim).to(device)
        self.critic_target.load_state_dict(self.critic.state_dict())
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters())

        self.max_action = max_action
        self.writer = SummaryWriter()
        self.iter_count = 0# 根据当前状态返回网络输出的动作def get_action(self, state):
        # Function to get the action from the actor
        state = torch.Tensor(state.reshape(1, -1)).to(device)
        return self.actor(state).cpu().data.numpy().flatten()

    # 执行训练循环,包括从重放缓冲区中抽样、计算目标 Q 值、更新评论家网络和演员网络的参数,以及执行软更新以更新目标网络的参数def train(
        self,
        replay_buffer,
        iterations,
        batch_size=100,
        discount=1,
        tau=0.005,
        policy_noise=0.2,  # discount=0.99
        noise_clip=0.5,
        policy_freq=2,
    ):
        av_Q = 0
        max_Q = -inf
        av_loss = 0for it in range(iterations):#迭代次数# sample a batch from the replay buffer
            (
                batch_states,
                batch_actions,
                batch_rewards,
                batch_dones,
                batch_next_states,
            ) = replay_buffer.sample_batch(batch_size)#从重放缓冲区中抽样
            state = torch.Tensor(batch_states).to(device)#将抽样的数据转换为张量
            next_state = torch.Tensor(batch_next_states).to(device)#将抽样的数据转换为张量
            action = torch.Tensor(batch_actions).to(device)#将抽样的数据转换为张量
            reward = torch.Tensor(batch_rewards).to(device)#将抽样的数据转换为张量
            done = torch.Tensor(batch_dones).to(device)#将抽样的数据转换为张量# 识别下一个状态的估计动作,使用演员-目标
            next_action = self.actor_target(next_state)

            # Add noise to the action# 为了增加探索性,对动作添加噪声
            noise = torch.Tensor(batch_actions).data.normal_(0, policy_noise).to(device)
            noise = noise.clamp(-noise_clip, noise_clip)
            next_action = (next_action + noise).clamp(-self.max_action, self.max_action)

            # 计算下一个状态-动作对的评论家目标网络的Q值
            target_Q1, target_Q2 = self.critic_target(next_state, next_action)

            # 选择两个计算值中的最小Q值
            target_Q = torch.min(target_Q1, target_Q2)
            av_Q += torch.mean(target_Q)
            max_Q = max(max_Q, torch.max(target_Q))

            # 计算最终的Q值,使用贝尔曼方程
            target_Q = reward + ((1 - done) * discount * target_Q).detach()

            # 使用当前参数获取基础网络的Q值
            current_Q1, current_Q2 = self.critic(state, action)

            # 计算当前Q值和目标Q值之间的损失
            loss = F.mse_loss(current_Q1, target_Q) + F.mse_loss(current_Q2, target_Q)

            # 执行梯度下降self.critic_optimizer.zero_grad()
            loss.backward()
            self.critic_optimizer.step()

            #如果迭代次数是policy_freq的倍数,则更新演员网络if it % policy_freq == 0:
                # 通过在负Q值上执行梯度下降(本质上执行梯度上升)来最大化演员输出值
                actor_grad, _ = self.critic(state, self.actor(state))
                actor_grad = -actor_grad.mean()
                self.actor_optimizer.zero_grad()
                actor_grad.backward()
                self.actor_optimizer.step()

                # 使用软更新通过注入少量当前参数来更新演员目标网络的参数for param, target_param in zip(
                    self.actor.parameters(), self.actor_target.parameters()
                ):
                    target_param.data.copy_(
                        tau * param.data + (1 - tau) * target_param.data
                    )
                # 使用软更新通过注入少量当前参数来更新评论家目标网络的参数for param, target_param in zip(
                    self.critic.parameters(), self.critic_target.parameters()
                ):
                    target_param.data.copy_(
                        tau * param.data + (1 - tau) * target_param.data
                    )

            av_loss += loss
        self.iter_count += 1# 将新值写入tensorboardself.writer.add_scalar("loss", av_loss / iterations, self.iter_count)
        self.writer.add_scalar("Av. Q", av_Q / iterations, self.iter_count)
        self.writer.add_scalar("Max. Q", max_Q, self.iter_count)

    # 保存和加载模型def save(self, filename, directory):
        torch.save(self.actor.state_dict(), "%s/%s_actor.pth" % (directory, filename))
        torch.save(self.critic.state_dict(), "%s/%s_critic.pth" % (directory, filename))

    # 加载模型def load(self, filename, directory):
        self.actor.load_state_dict(
            torch.load("%s/%s_actor.pth" % (directory, filename))
        )
        self.critic.load_state_dict(
            torch.load("%s/%s_critic.pth" % (directory, filename))
        )

# 设置实现的参数
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # 使用 CUDA 或 CPU
seed = 0  # 随机种子
eval_freq = 5e3  # 每多少步进行一次评估
max_ep = 500  # 每个回合的最大步骤数
eval_ep = 10  # 评估时的回合数
max_timesteps = 5e6  # 最大执行步骤数
expl_noise = 1  # 初始探索噪声值,在 [expl_min ... 1] 范围内
expl_decay_steps = 500000  # 探索噪声衰减的步骤数
expl_min = 0.1  # 衰减后的探索噪声值,在 [0...expl_noise] 范围内
batch_size = 40  # 小批量的大小
discount = 0.99999  # 折扣因子,用于计算未来奖励的折扣(应该接近 1)
tau = 0.005  # 软目标更新变量(应该接近 0)
policy_noise = 0.2  # 为探索添加的噪声
noise_clip = 0.5  # 噪声的最大限制值
policy_freq = 2  # 演员网络更新的频率
buffer_size = 1e6  # 缓冲区的最大大小
file_name = "TD3_velodyne"  # 存储策略的文件名
save_model = True  # 是否保存模型
load_model = False  # 是否加载已存储的模型
random_near_obstacle = True  # 是否在障碍物附近执行随机动作# 创建网络存储文件夹if not os.path.exists("./results"):
    os.makedirs("./results")  # 如果不存在,则创建结果文件夹if save_model and not os.path.exists("./pytorch_models"):
    os.makedirs("./pytorch_models")  # 如果需要保存模型且不存在,则创建模型文件夹# 创建训练环境
environment_dim = 20  # 环境的维度
robot_dim = 4  # 机器人状态的维度
env = GazeboEnv("multi_robot_scenario.launch", environment_dim)  # 创建 Gazebo 环境
time.sleep(5)  # 等待环境加载
torch.manual_seed(seed)  # 设置 PyTorch 随机种子
np.random.seed(seed)  # 设置 NumPy 随机种子
state_dim = environment_dim + robot_dim  # 状态的维度 (20 + 4 = 24)
action_dim = 2  # 行动的维度
max_action = 1  # 最大动作值# 创建网络
network = TD3(state_dim, action_dim, max_action)  # 初始化 TD3 网络# 创建重放缓冲区
replay_buffer = ReplayBuffer(buffer_size, seed)  # 初始化重放缓冲区if load_model:
    try:
        network.load(file_name, "./pytorch_models")  # 尝试加载已保存的模型except:
        print(
            "Could not load the stored model parameters, initializing training with random parameters"
        )  # 如果加载失败,则用随机参数初始化训练# 创建评估数据存储
evaluations = []  # 用于存储评估结果

timestep = 0  # 当前时间步
timesteps_since_eval = 0  # 自上次评估以来的时间步数
episode_num = 0  # 当前回合数
done = True  # 回合结束标志
epoch = 1  # 当前训练轮数

count_rand_actions = 0  # 随机动作计数
random_action = []  # 存储随机动作# 开始训练循环while timestep < max_timesteps:

    # 回合结束时if done:
        if timestep != 0:
            network.train(  # 训练网络
                replay_buffer,
                episode_timesteps,
                batch_size,
                discount,
                tau,
                policy_noise,
                noise_clip,
                policy_freq,
            )

        if timesteps_since_eval >= eval_freq:  # 如果达到评估频率print("Validating")
            timesteps_since_eval %= eval_freq  # 重置评估计数
            evaluations.append(
                evaluate(network=network, epoch=epoch, eval_episodes=eval_ep)  # 进行评估
            )
            network.save(file_name, directory="./pytorch_models")  # 保存模型
            np.save("./results/%s" % (file_name), evaluations)  # 保存评估结果
            epoch += 1  # 增加轮数

        state = env.reset()  # 重置环境
        done = False  # 标记回合未结束

        episode_reward = 0  # 当前回合的总奖励
        episode_timesteps = 0  # 当前回合的时间步数
        episode_num += 1  # 增加回合数# 添加一些探索噪声if expl_noise > expl_min:
        expl_noise = expl_noise - ((1 - expl_min) / expl_decay_steps)  # 衰减探索噪声

    action = network.get_action(np.array(state))  # 获取当前状态的动作
    action = (action + np.random.normal(0, expl_noise, size=action_dim)).clip(
        -max_action, max_action
    )  # 添加噪声并限制动作范围# 如果机器人面临障碍物,随机强制其采取一致的随机动作。# 这样可以在障碍物附近增加探索。if random_near_obstacle:
        if (
            np.random.uniform(0, 1) > 0.85  # 随机选择and min(state[4:-8]) < 0.6  # 检查状态是否接近障碍物and count_rand_actions < 1
        ):
            count_rand_actions = np.random.randint(8, 15)  # 设置随机动作的持续时间
            random_action = np.random.uniform(-1, 1, 2)  # 随机生成动作if count_rand_actions > 0:
            count_rand_actions -= 1  # 减少随机动作计数
            action = random_action  # 使用随机动作
            action[0] = -1  # 强制将线速度设为负值# 更新动作以落在 [0,1] 范围内用于线速度,[-1,1] 范围内用于角速度
    a_in = [(action[0] + 1) / 2, action[1]]  # 将线速度归一化到 [0,1] 范围
    next_state, reward, done, target = env.step(a_in)  # 执行动作并获取下一个状态和奖励
    done_bool = 0 if episode_timesteps + 1 == max_ep else int(done)  # 判断回合是否结束
    done = 1 if episode_timesteps + 1 == max_ep else int(done)
    episode_reward += reward  # 累加当前回合的奖励# 将状态、动作、奖励等元组保存到重放缓冲区
    replay_buffer.add(state, action, reward, done_bool, next_state)

    # 更新计数器
    state = next_state  # 更新状态
    episode_timesteps += 1  # 增加当前回合的时间步数
    timestep += 1  # 增加总时间步数
    timesteps_since_eval += 1  # 增加自上次评估以来的时间步数# 训练结束后,评估网络并保存
evaluations.append(evaluate(network=network, epoch=epoch, eval_episodes=eval_ep))  # 最终评估if save_model:
    network.save("%s" % file_name, directory="./models")  # 保存模型
np.save("./results/%s" % file_name, evaluations)  # 保存评估结果

4. 实际机器运行代码

import os
import time
import numpy as np
import torch
import rospy
from std_msgs.msg import Float32
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
from nav_msgs.msg import Odometry
import math
from quaternion import Quaternion
from sensor_msgs import point_cloud2 as pc2  # 添加导入# 定义常量
GOAL_REACHED_DIST = 0.1  # 到达目标的距离阈值
COLLISION_DIST = 0.5  # 碰撞的距离阈值class Actor(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Actor, self).__init__()

        self.layer_1 = nn.Linear(state_dim, 800)
        self.layer_2 = nn.Linear(800, 600)
        self.layer_3 = nn.Linear(600, action_dim)
        self.tanh = nn.Tanh()

    def forward(self, s):
        s = F.relu(self.layer_1(s))
        s = F.relu(self.layer_2(s))
        a = self.tanh(self.layer_3(s))
        return a
# TD3 networkclass TD3(object):
    def __init__(self, state_dim, action_dim):
        # Initialize the Actor networkself.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # cuda or cpuself.actor = Actor(state_dim, action_dim)
        self.actor.load_state_dict(torch.load("/home/polya/DRL-robot-navigation-main/pytorch_models180/TD3_final_actor.pth"))
        self.actor = self.actor.to(self.device)
        self.max_action = 1def get_action(self, state):
        # Function to get the action from the actor
        state = torch.Tensor(state.reshape(1, -1)).to(self.device)
        action = self.actor(state).cpu().data.numpy().flatten()
        action = action.clip(-self.max_action,self.max_action)
        linear_v = (action[0]+1)/2
        angle_v = action[1]
        return linear_v,angle_v
    def load(self, filename, directory):
        self.actor.load_state_dict(
            torch.load("%s/%s_actor.pth" % (directory, filename))
        )

class MyCar:
    def __init__(self):
        self.environment_dim = 20  # 设置激光雷达的维度self.velodyne_data = np.ones(self.environment_dim) * 10  # 初始化激光雷达数据self.last_odom = Odometry()  # 机器人位姿self.goal_x = 2.5  # 目标点 x 坐标self.goal_y = 0.5  # 目标点 y 坐标# 订阅激光数据和里程计数据
        rospy.Subscriber("/scan", LaserScan, self.LaserScanCallback)
        rospy.Subscriber("/odom", Odometry, self.OdomCallback)

    def LaserScanCallback(self, v):
        # 处理激光扫描数据
        data = list(pc2.read_points(v, field_names=("x", "y", "z"), skip_nans=True))
        self.velodyne_data = np.ones(self.environment_dim) * 10  # 初始化激光雷达数据for i in range(len(data)):
            if data[i][2] > -0.2:  # 只考虑有效的激光数据
                angle = math.atan2(data[i][1], data[i][0])
                dist = math.sqrt(data[i][0]**2 + data[i][1]**2)
                index = int((angle + math.pi / 2) / (math.pi / self.environment_dim)) % self.environment_dim
                self.velodyne_data[index] = min(self.velodyne_data[index], dist)

    def OdomCallback(self, data):
        # 处理里程计数据self.last_odom = data  # 更新里程计信息def GetStates(self):
        target = False
        v_state = []
        v_state[:] = self.velodyne_data[:]  # 读取激光雷达数据
        laser_state = [v_state]  # 激光雷达数据的状态# 获取机器人的位姿self.odom_x = self.last_odom.pose.pose.position.x
        self.odom_y = self.last_odom.pose.pose.position.y
        quaternion = Quaternion(
            self.last_odom.pose.pose.orientation.w,
            self.last_odom.pose.pose.orientation.x,
            self.last_odom.pose.pose.orientation.y,
            self.last_odom.pose.pose.orientation.z,
        )
        euler = quaternion.to_euler(degrees=False)
        angle = round(euler[2], 4)

        # 计算与目标点的距离和角度
        distance = np.linalg.norm([self.odom_x - self.goal_x, self.odom_y - self.goal_y])
        skew_x = self.goal_x - self.odom_x
        skew_y = self.goal_y - self.odom_y
        dot = skew_x * 1 + skew_y * 0
        mag1 = math.sqrt(skew_x**2 + skew_y**2)
        mag2 = 1  # [1, 0] 的模
        beta = math.acos(dot / (mag1 * mag2)) if mag1 != 0 else 0if skew_y < 0:
            beta = -beta if skew_x >= 0 else 0 - beta
        theta = beta - angle
        theta = ((theta + np.pi) % (2 * np.pi)) - np.pi  # 将 theta 限制在 [-pi, pi]self.dis = distance
        if distance < GOAL_REACHED_DIST:
            target = True# 机器人状态输入
        robot_state = [distance, theta, self.last_odom.twist.twist.linear.x, self.last_odom.twist.twist.angular.z]
        state = np.append(laser_state, robot_state)

        return state, target

# ROS节点初始化
rospy.init_node("my_car")
car = MyCar()  # 实例化获取状态的类# 发布目标点
xpub = rospy.Publisher("/goal_x", Float32, queue_size=10)
ypub = rospy.Publisher("/goal_y", Float32, queue_size=10)
xpub.publish(car.goal_x)
ypub.publish(car.goal_y)

# 初始化TD3网络
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
network = TD3(24, 2, 1).to(device)  # 实例化TD3网络# 发布速度控制命令
pub = rospy.Publisher("/cmd_vel", Twist, queue_size=10)
rate = rospy.Rate(10)  # 控制频率# 主循环while not rospy.is_shutdown():
    state, target = car.GetStates()  # 获取状态信息if not target:
        # 获取模型输出的动作
        linear_x, angular_z = network.get_action(np.array(state))  
        cmd_vel = Twist()  # 创建Twist消息
        cmd_vel.linear.x = linear_x / 5  # 限制线速度
        cmd_vel.angular.z = angular_z / 5
        pub.publish(cmd_vel)  # 发布命令# 碰撞和目标到达检测if min(car.velodyne_data) < COLLISION_DIST:  # 简易碰撞检测
        pub.publish(Twist())  # 停止机器人breakif target:  # 简易目标到达检测
        pub.publish(Twist())  # 停止机器人break

    rate.sleep()  # 控制循环频率# 结束后清理
pub.publish(Twist())  # 确保机器人停止

5. 参考链接

https://blog.csdn.net/weixin_55800047/article/details/137513393
https://hackmd.io/@yiyunGE/rJUs8E4W9?type=view
https://blog.csdn.net/PolypolyA/article/details/128198552

最后一次编辑于 2025年12月29日 0 0

暂无评论