def subscribePose():
rospy.Subscriber('/initialpose', PoseWithCovarianceStamped, getPoseData)
# rospy.Subscriber('/move_base_simple/goal', PoseStamped, getPoseData)
global background
python类PoseWithCovarianceStamped()的实例源码
def __init__(self):
self.map = None
self.start = None
self.goal = None
self.moves = [Move(0.1, 0), # forward
Move(-0.1, 0), # back
Move(0, 1.5708), # turn left 90
Move(0, -1.5708)] # turn right 90
self.robot = Robot(0.5, 0.5)
self.is_working = False # Replace with mutex after all
self.map_subscriber = rospy.Subscriber("map", OccupancyGrid, self.new_map_callback)
self.start_subscriber = rospy.Subscriber("initialpose", PoseWithCovarianceStamped, self.new_start_callback)
self.goal_subscriber = rospy.Subscriber("goal", PoseStamped, self.new_goal_callback)
self.path_publisher = rospy.Publisher("trajectory", MarkerArray, queue_size=1)
self.pose_publisher = rospy.Publisher("debug_pose", PoseStamped, queue_size=1)
# what will be there. A module goes into variable. Isn't it too much memory consumption. Maybe I should assign function replan() to this variable?
self.planner = planners.astar.replan
def create_pose(pose_obj):
pose_stamped = PoseWithCovarianceStamped()
pose_stamped.header.frame_id = 'map'
pose_stamped.header.stamp = rospy.Time.now()
pose = pose_obj.get("pose")
position = pose.get("position")
orientation = pose.get("orientation")
covariance = pose_obj.get("covariance")
pose_stamped.pose.pose.position.x = position.get("x")
pose_stamped.pose.pose.position.y = position.get("y")
pose_stamped.pose.pose.position.z = position.get("z")
pose_stamped.pose.pose.orientation.y = orientation.get("x")
pose_stamped.pose.pose.orientation.x = orientation.get("y")
pose_stamped.pose.pose.orientation.z = orientation.get("z")
pose_stamped.pose.pose.orientation.w = orientation.get("w")
pose_stamped.pose.covariance = covariance
return pose_stamped
def __init__(self):
self.frame_id = rospy.get_param("~frame_id", "map")
cov_x = rospy.get_param("~cov_x", 0.6)
cov_y = rospy.get_param("~cov_y", 0.6)
cov_z = rospy.get_param("~cov_z", 0.6)
self.cov = self.cov_matrix(cov_x, cov_y, cov_z)
self.ps_pub = rospy.Publisher(
POSE_TOPIC, PoseStamped, queue_size=1)
self.ps_cov_pub = rospy.Publisher(
POSE_COV_TOPIC, PoseWithCovarianceStamped, queue_size=1)
self.ps_pub_3d = rospy.Publisher(
POSE_TOPIC_3D, PoseStamped, queue_size=1)
self.ps_cov_pub_3d = rospy.Publisher(
POSE_COV_TOPIC_3D, PoseWithCovarianceStamped, queue_size=1)
self.last = None
self.listener = tf.TransformListener()
self.tag_range_topics = rospy.get_param("~tag_range_topics")
self.subs = list()
self.ranges = dict()
self.tag_pos = dict()
self.altitude = 0.0
self.last_3d = None
for topic in self.tag_range_topics:
self.subs.append(rospy.Subscriber(topic, Range, self.range_cb))
def publish_position(self, pos, ps_pub, ps_cov_pub, cov):
x, y = pos[0], pos[1]
if len(pos) > 2:
z = pos[2]
else:
z = 0
ps = PoseStamped()
ps_cov = PoseWithCovarianceStamped()
ps.pose.position.x = x
ps.pose.position.y = y
ps.pose.position.z = z
ps.header.frame_id = self.frame_id
ps.header.stamp = rospy.get_rostime()
ps_cov.header = ps.header
ps_cov.pose.pose = ps.pose
ps_cov.pose.covariance = cov
ps_pub.publish(ps)
ps_cov_pub.publish(ps_cov)
def PoseCallback(posedata):
# PoseWithCovarianceStamped data from amcl_pose
global robot_pose # [time, [x,y,yaw]]
header = posedata.header
pose = posedata.pose
if (not robot_pose[0]) or (header.stamp > robot_pose[0]):
# more recent pose data received
robot_pose[0] = header.stamp
# TODO: maybe add covariance check here?
print('robot position update!')
euler = euler_from_quaternion([pose.pose.orientation.x, pose.pose.orientation.y, pose.pose.orientation.z, pose.pose.orientation.w]) #roll, pitch, yaw
robot_pose[1] = [pose.pose.position.x, pose.pose.position.y, euler[2]] # in radians
return robot_pose
#========================================
def SendInitialPose(InitialPosePublisher, initial_pose, time_stamp):
# goal: [x, y, yaw]
InitialPoseMsg = PoseWithCovarianceStamped()
#InitialPoseMsg.header.seq = 0
InitialPoseMsg.header.stamp = time_stamp
InitialPoseMsg.header.frame_id = 'map'
InitialPoseMsg.pose.pose.position.x = initial_pose[0]
InitialPoseMsg.pose.pose.position.y = initial_pose[1]
#InitialPoseMsg.pose.position.z = 0.0
quaternion = quaternion_from_euler(0, 0, initial_pose[2])
InitialPoseMsg.pose.pose.orientation.x = quaternion[0]
InitialPoseMsg.pose.pose.orientation.y = quaternion[1]
InitialPoseMsg.pose.pose.orientation.z = quaternion[2]
InitialPoseMsg.pose.pose.orientation.w = quaternion[3]
InitialPosePublisher.publish(InitialPoseMsg)
#========================================
def PoseCallback(posedata):
# PoseWithCovarianceStamped data from amcl_pose
global robot_pose # [time, [x,y,yaw]]
header = posedata.header
pose = posedata.pose
if (not robot_pose[0]) or (header.stamp > robot_pose[0]):
# more recent pose data received
robot_pose[0] = header.stamp
# TODO: maybe add covariance check here?
print('robot position update!')
euler = euler_from_quaternion([pose.pose.orientation.x, pose.pose.orientation.y, pose.pose.orientation.z, pose.pose.orientation.w]) #roll, pitch, yaw
robot_pose[1] = [pose.pose.position.x, pose.pose.position.y, euler[2]] # in radians
return robot_pose
#========================================
def main():
trans= 0
rot = 0
rospy.init_node('odom_sync')
listener = tf.TransformListener()
pubfiltered = rospy.Publisher('set_pose',PoseWithCovarianceStamped,queue_size = 10)
pubodom = rospy.Publisher('mobile_base/commands/reset_odometry', Empty, queue_size = 10)
serv = rospy.ServiceProxy("set_offset",SetOdomOffset)
rospy.Subscriber('/command_input',UInt8,callback,callback_args=(pubodom,pubfiltered,listener,trans,rot,serv))
rospy.sleep(1)
print "done sleeping"
while not rospy.is_shutdown():
try:
(trans,rot) = listener.lookupTransform('/base_meas', '/odom_meas',rospy.Time(0))
except: continue
rospy.spin()
def subscribePose():
rospy.Subscriber('/initialpose', PoseWithCovarianceStamped, getPoseData)
# rospy.Subscriber('/move_base_simple/goal', PoseStamped, getPoseData)
global background
def __init__(self):
self.lock = threading.Lock()
self.sub_imu = rospy.Subscriber('robot_pose_ekf/odom_combined', PoseWithCovarianceStamped, self.imu_cb)
self.last_imu_angle = 0
self.imu_angle = 0
while not rospy.is_shutdown():
rospy.sleep(0.1)
rospy.loginfo("imu_angle:"+str((self.imu_angle)*180/3.1415926))
def __init__(self):
# Give the node a name
rospy.init_node('odom_ekf', anonymous=False)
# Publisher of type nav_msgs/Odometry
self.ekf_pub = rospy.Publisher('output', Odometry, queue_size=5)
# Wait for the /odom_combined topic to become available
rospy.wait_for_message('input', PoseWithCovarianceStamped)
# Subscribe to the /odom_combined topic
rospy.Subscriber('input', PoseWithCovarianceStamped, self.pub_ekf_odom)
rospy.loginfo("Publishing combined odometry on /odom_ekf")
def execute(self, userdata):
global waypoints
self.initialize_path_queue()
self.path_ready = False
# Start thread to listen for when the path is ready (this function will end then)
def wait_for_path_ready():
"""thread worker function"""
data = rospy.wait_for_message('/path_ready', Empty)
rospy.loginfo('Recieved path READY message')
self.path_ready = True
ready_thread = threading.Thread(target=wait_for_path_ready)
ready_thread.start()
topic = "/initialpose"
rospy.loginfo("Waiting to recieve waypoints via Pose msg on topic %s" % topic)
rospy.loginfo("To start following waypoints: 'rostopic pub /path_ready std_msgs/Empty -1'")
# Wait for published waypoints
while not self.path_ready:
try:
pose = rospy.wait_for_message(topic, PoseWithCovarianceStamped, timeout=1)
except rospy.ROSException as e:
if 'timeout exceeded' in e.message:
continue # no new waypoint within timeout, looping...
else:
raise e
rospy.loginfo("Recieved new waypoint")
waypoints.append(pose)
# publish waypoint queue as pose array so that you can see them in rviz, etc.
self.poseArray_publisher.publish(convert_PoseWithCovArray_to_PoseArray(waypoints))
# Path is ready! return success and move on to the next state (FOLLOW_PATH)
return 'success'
def clicked_pose(self, msg):
'''
Receive pose messages from RViz and initialize the particle distribution in response.
'''
if isinstance(msg, PointStamped):
self.initialize_global()
elif isinstance(msg, PoseWithCovarianceStamped):
self.initialize_particles_pose(msg.pose.pose)
def __init__(self):
# Give the node a name
rospy.init_node('odom_ekf', anonymous=False)
# Publisher of type nav_msgs/Odometry
self.ekf_pub = rospy.Publisher('output', Odometry, queue_size=5)
# Wait for the /odom_combined topic to become available
rospy.wait_for_message('input', PoseWithCovarianceStamped)
# Subscribe to the /odom_combined topic
rospy.Subscriber('input', PoseWithCovarianceStamped, self.pub_ekf_odom)
rospy.loginfo("Publishing combined odometry on /odom_ekf")
def ekf_pub(self, ranges, vel_data, yaw, alt):
z = np.array([])
new_pose = Odometry()
ps_cov = PoseWithCovarianceStamped()
for tag_name in self.tag_order:
measurement = ranges[tag_name]
z = np.append(z, measurement)
if self.last_time is None:
self.last_time = rospy.Time.now().to_sec()
else:
dt = rospy.Time.now().to_sec() - self.last_time
self.predict(dt)
self.update(z, vel_data, yaw, alt)
self.last_time = rospy.Time.now().to_sec()
new_pose.header.stamp = rospy.get_rostime()
new_pose.header.frame_id = self.frame_id
new_pose.pose.pose.position.x = self.x[0]
new_pose.pose.pose.position.y = self.x[1]
new_pose.pose.pose.position.z = self.x[2]
cov = self.P.flatten().tolist()
new_pose.pose.covariance = cov
new_pose.twist.twist.linear.x = self.x[3]
new_pose.twist.twist.linear.y = self.x[4]
new_pose.twist.twist.linear.z = self.x[5]
return new_pose
# @n.publisher(EKF_COV_TOPIC, PoseWithCovarianceStamped)
# def cov_pub(self, ps_cov):
# return ps_cov
def clicked_pose(self, msg):
'''
Receive pose messages from RViz and initialize the particle distribution in response.
'''
if isinstance(msg, PointStamped):
self.initialize_global()
elif isinstance(msg, PoseWithCovarianceStamped):
self.initialize_particles_pose(msg.pose.pose)
def __init__(self):
rospy.Subscriber('/amcl_pose', PoseWithCovarianceStamped, self.monitor_pose)
def PoseCallback(posedata):
# PoseWithCovarianceStamped data from amcl_pose
global robot_pose # [time, [x,y,yaw]]
header = posedata.header
pose = posedata.pose
if (not robot_pose[0]) or (header.stamp > robot_pose[0]):
# more recent pose data received
robot_pose[0] = header.stamp
# TODO: maybe add covariance check here?
print('robot position update!')
euler = euler_from_quaternion([pose.pose.orientation.x, pose.pose.orientation.y, pose.pose.orientation.z, pose.pose.orientation.w]) #roll, pitch, yaw
robot_pose[1] = [pose.pose.position.x, pose.pose.position.y, euler[2]] # in radians
return robot_pose