def marker_from_circle(circle, index=0, linewidth=0.1, color=ColorRGBA(1, 0, 0, 1), z=0., lifetime=10.0):
marker = Marker()
marker.header = make_header("/map")
marker.ns = "Markers_NS"
marker.id = index
marker.type = Marker.CYLINDER
marker.action = 0 # action=0 add/modify object
# marker.color.r = 1.0
# marker.color.g = 0.0
# marker.color.b = 0.0
# marker.color.a = 0.4
marker.color = color
marker.lifetime = rospy.Duration.from_sec(lifetime)
marker.pose = Pose()
marker.pose.position.z = z
marker.pose.position.x = circle.center[0]
marker.pose.position.y = circle.center[1]
marker.scale = Vector3(circle.radius*2.0, circle.radius*2.0, 0)
return marker
python类ColorRGBA()的实例源码
def marker_from_point_radius(point, radius, index=0, linewidth=0.1, color=ColorRGBA(1, 0, 0, 1), z=0., lifetime=10.0):
marker = Marker()
marker.header = make_header("/map")
marker.ns = "Speed_NS"
marker.id = index
marker.type = Marker.CYLINDER
marker.action = 0 # action=0 add/modify object
# marker.color.r = 1.0
# marker.color.g = 0.0
# marker.color.b = 0.0
# marker.color.a = 0.4
marker.color = color
marker.lifetime = rospy.Duration.from_sec(lifetime)
marker.pose = Pose()
marker.pose.position.z = z
marker.pose.position.x = point[0]
marker.pose.position.y = point[1]
marker.scale = Vector3(radius*2.0, radius*2.0, 0.001)
return marker
def publish_speeds(self, duration=0.0, scale=0.7):
should_publish = len(self.speed_profile) > 1
if self.visualize and self.speed_pub.get_num_connections() > 0:
if self.dirty():
self.make_np_array()
markers = [marker_clear_all("/map")]
normed_speeds = np.array(self.speed_profile) / np.max(self.speed_profile)
last_speed = 0.0
for i, speed in enumerate(normed_speeds):
if speed >= last_speed * 0.99:
color = ColorRGBA(0, 1, 0, 0.8)
else:
color = ColorRGBA(1, 0, 0, 0.8)
last_speed = speed
markers.append(marker_from_point_radius(self.np_points[i,:], np.power(speed, 0.8) * scale,
index=i, linewidth=0.05, color=color, lifetime=duration))
marker_array = MarkerArray(markers=markers)
self.speed_pub.publish(marker_array)
def publish_cloud_markers(pub_ns, _arr, c='b', stamp=None, flip_rb=False, frame_id='map'):
"""
Publish point cloud on:
pub_ns: Namespace on which the cloud will be published
arr: numpy array (N x 3) for point cloud data
c: Option 1: 'c', 'b', 'r' etc colors accepted by matplotlibs color
Option 2: float ranging from 0 to 1 via matplotlib's jet colormap
Option 3: numpy array (N x 3) with r,g,b vals ranging from 0 to 1
s: supported only by matplotlib plotting
alpha: supported only by matplotlib plotting
"""
arr, carr = copy_pointcloud_data(_arr, c, flip_rb=flip_rb)
marker = vis_msg.Marker(type=vis_msg.Marker.SPHERE_LIST, ns=pub_ns, action=vis_msg.Marker.ADD)
marker.header.frame_id = frame_id
marker.header.stamp = stamp if stamp is not None else rospy.Time.now()
marker.scale.x = 0.01
marker.scale.y = 0.01
# XYZ
inds, = np.where(~np.isnan(arr).any(axis=1))
marker.points = [geom_msg.Point(arr[j,0], arr[j,1], arr[j,2]) for j in inds]
# RGB (optionally alpha)
N, D = arr.shape[:2]
if D == 3:
marker.colors = [std_msg.ColorRGBA(carr[j,0], carr[j,1], carr[j,2], 1.0)
for j in inds]
elif D == 4:
marker.colors = [std_msg.ColorRGBA(carr[j,0], carr[j,1], carr[j,2], carr[j,3])
for j in inds]
marker.lifetime = rospy.Duration()
_publish_marker(marker)
print 'Publishing marker', N
def publish_line_segments(pub_ns, _arr1, _arr2, c='b', stamp=None, frame_id='camera', flip_rb=False, size=0.002):
"""
Publish point cloud on:
pub_ns: Namespace on which the cloud will be published
arr: numpy array (N x 3) for point cloud data
c: Option 1: 'c', 'b', 'r' etc colors accepted by matplotlibs color
Option 2: float ranging from 0 to 1 via matplotlib's jet colormap
Option 3: numpy array (N x 3) with r,g,b vals ranging from 0 to 1
s: supported only by matplotlib plotting
alpha: supported only by matplotlib plotting
"""
arr1, carr = copy_pointcloud_data(_arr1, c, flip_rb=flip_rb)
arr2 = (deepcopy(_arr2)).reshape(-1,3)
marker = vis_msg.Marker(type=vis_msg.Marker.LINE_LIST, ns=pub_ns, action=vis_msg.Marker.ADD)
if not arr1.shape == arr2.shape: raise AssertionError
marker.header.frame_id = frame_id
marker.header.stamp = stamp if stamp is not None else rospy.Time.now()
marker.scale.x = size
marker.scale.y = size
marker.color.b = 1.0
marker.color.a = 1.0
marker.pose.position = geom_msg.Point(0,0,0)
marker.pose.orientation = geom_msg.Quaternion(0,0,0,1)
# Handle 3D data: [ndarray or list of ndarrays]
arr12 = np.hstack([arr1, arr2])
inds, = np.where(~np.isnan(arr12).any(axis=1))
marker.points = []
for j in inds:
marker.points.extend([geom_msg.Point(arr1[j,0], arr1[j,1], arr1[j,2]),
geom_msg.Point(arr2[j,0], arr2[j,1], arr2[j,2])])
# RGB (optionally alpha)
marker.colors = [std_msg.ColorRGBA(carr[j,0], carr[j,1], carr[j,2], 1.0)
for j in inds]
marker.lifetime = rospy.Duration()
_publish_marker(marker)
def publish_octomap(pub_ns, arr, carr, size=0.1, stamp=None, frame_id='map', flip_rb=False):
"""
Publish cubes list:
"""
marker = vis_msg.Marker(type=vis_msg.Marker.CUBE_LIST, ns=pub_ns, action=vis_msg.Marker.ADD)
marker.header.frame_id = frame_id
marker.header.stamp = stamp if stamp is not None else rospy.Time.now()
# Point width, and height
marker.scale.x = size
marker.scale.y = size
marker.scale.z = size
N, D = arr.shape
# XYZ
inds, = np.where(~np.isnan(arr).any(axis=1))
marker.points = [geom_msg.Point(arr[j,0], arr[j,1], arr[j,2]) for j in inds]
# RGB (optionally alpha)
rax, bax = 0, 2
# carr = carr.astype(np.float32) * 1.0 / 255
if flip_rb: rax, bax = 2, 0
if D == 3:
marker.colors = [std_msg.ColorRGBA(carr[j,rax], carr[j,1], carr[j,bax], 1.0)
for j in inds]
elif D == 4:
marker.colors = [std_msg.ColorRGBA(carr[j,rax], carr[j,1], carr[j,bax], carr[j,3])
for j in inds]
marker.lifetime = rospy.Duration()
_publish_marker(marker)
def handle_image_msg(msg):
assert msg._type == 'sensor_msgs/Image'
with g_fusion_lock:
g_fusion.filter(EmptyObservation(msg.header.stamp.to_sec()))
if g_fusion.last_state_mean is not None:
pose = g_fusion.lidar_observation_function(g_fusion.last_state_mean)
yaw_q = ros_tf.transformations.quaternion_from_euler(0, 0, pose[3])
br = ros_tf.TransformBroadcaster()
br.sendTransform(tuple(pose[:3]), tuple(yaw_q), rospy.Time.now(), 'obj_fuse_centroid', 'velodyne')
if last_known_box_size is not None:
# bounding box
marker = Marker()
marker.header.frame_id = "obj_fuse_centroid"
marker.header.stamp = rospy.Time.now()
marker.type = Marker.CUBE
marker.action = Marker.ADD
marker.scale.x = last_known_box_size[2]
marker.scale.y = last_known_box_size[1]
marker.scale.z = last_known_box_size[0]
marker.color = ColorRGBA(r=0., g=1., b=0., a=0.5)
marker.lifetime = rospy.Duration()
pub = rospy.Publisher("obj_fuse_bbox", Marker, queue_size=10)
pub.publish(marker)
def set_color(self, rgba, link="link", ambient_coeff=0.7, specular_coeff=0.7):
rgba = list(rgba)
if len(rgba) is 3: rgba +=[self.color_range]
r,g,b,a = tuple(rgba)
a = self.color_range if a is None else a
message = MaterialColor()
message.color_type = [COLOR_TYPE['diffuse'], COLOR_TYPE['ambient'], COLOR_TYPE['specular']]
message.color.append(ColorRGBA(r/self.color_range, g/self.color_range, b/self.color_range, a/self.color_range))
message.color.append(ColorRGBA(0.7*r/self.color_range, 0.7*g/self.color_range, 0.7*b/self.color_range, a/self.color_range))
message.color.append(ColorRGBA(1-specular_coeff*(1-r/self.color_range), 1-specular_coeff*(1-g/self.color_range), 1-specular_coeff*(1-b/self.color_range), a/self.color_range))
self.color_publishers[link].publish(message)
def set_color(self, color):
if len(color) is 3: color += [self.color_range]
previous_color = self.color
self.color = ColorRGBA(color[0]/self.color_range, color[1]/self.color_range, color[2]/self.color_range, color[3]/self.color_range)
if self.color != previous_color and self._on : self._send_color_cmd(self.color)
def publish_pose_list(pub_ns, _poses, texts=[], stamp=None, size=0.05, frame_id='camera', seq=1):
"""
Publish Pose List on:
pub_channel: Channel on which the cloud will be published
"""
poses = deepcopy(_poses)
if not len(poses): return
arrs = np.vstack([pose.matrix[:3,:3].T.reshape((1,9)) for pose in poses]) * size
arrX = np.vstack([pose.tvec.reshape((1,3)) for pose in poses])
arrx, arry, arrz = arrs[:,0:3], arrs[:,3:6], arrs[:,6:9]
# Point width, and height
N = len(poses)
markers = vis_msg.Marker(type=vis_msg.Marker.LINE_LIST, ns=pub_ns, action=vis_msg.Marker.ADD)
markers.header.frame_id = frame_id
markers.header.stamp = stamp if stamp is not None else rospy.Time.now()
markers.header.seq = seq
markers.scale.x = size/20 # 0.01
markers.scale.y = size/20 # 0.01
markers.color.a = 1.0
markers.pose.position = geom_msg.Point(0,0,0)
markers.pose.orientation = geom_msg.Quaternion(0,0,0,1)
markers.points = []
markers.lifetime = rospy.Duration()
for j in range(N):
markers.points.extend([geom_msg.Point(arrX[j,0], arrX[j,1], arrX[j,2]),
geom_msg.Point(arrX[j,0] + arrx[j,0],
arrX[j,1] + arrx[j,1],
arrX[j,2] + arrx[j,2])])
markers.colors.extend([std_msg.ColorRGBA(1.0, 0.0, 0.0, 1.0), std_msg.ColorRGBA(1.0, 0.0, 0.0, 1.0)])
markers.points.extend([geom_msg.Point(arrX[j,0], arrX[j,1], arrX[j,2]),
geom_msg.Point(arrX[j,0] + arry[j,0],
arrX[j,1] + arry[j,1],
arrX[j,2] + arry[j,2])])
markers.colors.extend([std_msg.ColorRGBA(0.0, 1.0, 0.0, 1.0), std_msg.ColorRGBA(0.0, 1.0, 0.0, 1.0)])
markers.points.extend([geom_msg.Point(arrX[j,0], arrX[j,1], arrX[j,2]),
geom_msg.Point(arrX[j,0] + arrz[j,0],
arrX[j,1] + arrz[j,1],
arrX[j,2] + arrz[j,2])])
markers.colors.extend([std_msg.ColorRGBA(0.0, 0.0, 1.0, 1.0), std_msg.ColorRGBA(0.0, 0.0, 1.0, 1.0)])
_publish_marker(markers)
def handle_radar_msg(msg, dont_fuse):
assert msg._md5sum == '6a2de2f790cb8bb0e149d45d297462f8'
publish_rviz_topics = True
with g_fusion_lock:
# do we have any estimation?
if g_fusion.last_state_mean is not None:
pose = g_fusion.lidar_observation_function(g_fusion.last_state_mean)
observations = RadarObservation.from_msg(msg, RADAR_TO_LIDAR, 0.9115)
# find nearest observation to current object position estimation
distance_threshold = 4.4
nearest = None
nearest_dist = 1e9
for o in observations:
dist = [o.x - pose[0], o.y - pose[1], o.z - pose[2]]
dist = np.sqrt(np.array(dist).dot(dist))
if dist < nearest_dist and dist < distance_threshold:
nearest_dist = dist
nearest = o
if nearest is not None:
# FUSION
if not dont_fuse:
g_fusion.filter(nearest)
if publish_rviz_topics:
header = Header()
header.frame_id = '/velodyne'
header.stamp = rospy.Time.now()
point = np.array([[nearest.x, nearest.y, nearest.z]], dtype=np.float32)
rospy.Publisher(name='obj_radar_nearest',
data_class=PointCloud2,
queue_size=1).publish(pc2.create_cloud_xyz32(header, point))
#pose = g_fusion.lidar_observation_function(g_fusion.last_state_mean)
#br = ros_tf.TransformBroadcaster()
#br.sendTransform(tuple(pose), (0,0,0,1), rospy.Time.now(), 'car_fuse_centroid', 'velodyne')
# if last_known_box_size is not None:
# # bounding box
# marker = Marker()
# marker.header.frame_id = "car_fuse_centroid"
# marker.header.stamp = rospy.Time.now()
# marker.type = Marker.CUBE
# marker.action = Marker.ADD
# marker.scale.x = last_known_box_size[2]
# marker.scale.y = last_known_box_size[1]
# marker.scale.z = last_known_box_size[0]
# marker.color = ColorRGBA(r=1., g=1., b=0., a=0.5)
# marker.lifetime = rospy.Duration()
# pub = rospy.Publisher("car_fuse_bbox", Marker, queue_size=10)
# pub.publish(marker)
def run(self):
while self.is_looping():
navigation_tf_msg = TFMessage()
try:
motion_to_robot = almath.Pose2D(self.motion.getRobotPosition(True))
localization = self.navigation.getRobotPositionInMap()
exploration_path = self.navigation.getExplorationPath()
except Exception as e:
navigation_tf_msg.transforms.append(self.get_navigation_tf(almath.Pose2D()))
self.publishers["map_tf"].publish(navigation_tf_msg)
self.rate.sleep()
continue
if len(localization) > 0 and len(localization[0]) == 3:
navigation_to_robot = almath.Pose2D(localization[0][0], localization[0][1], localization[0][2])
navigation_to_motion = navigation_to_robot * almath.pinv(motion_to_robot)
navigation_tf_msg.transforms.append(self.get_navigation_tf(navigation_to_motion))
self.publishers["map_tf"].publish(navigation_tf_msg)
if len(localization) > 0:
if self.publishers["uncertainty"].get_num_connections() > 0:
uncertainty = Marker()
uncertainty.header.frame_id = "/base_footprint"
uncertainty.header.stamp = rospy.Time.now()
uncertainty.type = Marker.CYLINDER
uncertainty.scale = Vector3(localization[1][0], localization[1][1], 0.2)
uncertainty.pose.position = Point(0, 0, 0)
uncertainty.pose.orientation = self.get_ros_quaternion(
almath.Quaternion_fromAngleAndAxisRotation(0, 0, 0, 1))
uncertainty.color = ColorRGBA(1, 0.5, 0.5, 0.5)
self.publishers["uncertainty"].publish(uncertainty)
if self.publishers["map"].get_num_connections() > 0:
aggregated_map = None
try:
aggregated_map = self.navigation.getMetricalMap()
except Exception as e:
print("error " + str(e))
if aggregated_map is not None:
map_marker = OccupancyGrid()
map_marker.header.stamp = rospy.Time.now()
map_marker.header.frame_id = "/map"
map_marker.info.resolution = aggregated_map[0]
map_marker.info.width = aggregated_map[1]
map_marker.info.height = aggregated_map[2]
map_marker.info.origin.orientation = self.get_ros_quaternion(
almath.Quaternion_fromAngleAndAxisRotation(-1.57, 0, 0, 1))
map_marker.info.origin.position.x = aggregated_map[3][0]
map_marker.info.origin.position.y = aggregated_map[3][1]
map_marker.data = aggregated_map[4]
self.publishers["map"].publish(map_marker)
if self.publishers["exploration_path"].get_num_connections() > 0:
path = Path()
path.header.stamp = rospy.Time.now()
path.header.frame_id = "/map"
for node in exploration_path:
current_node = PoseStamped()
current_node.pose.position.x = node[0]
current_node.pose.position.y = node[1]
path.poses.append(current_node)
self.publishers["exploration_path"].publish(path)
self.rate.sleep()
def visualize(self):
if self.circle_pub.get_num_connections() > 0 and self.circle_path:
print "CIRCLE PUB"
# root = self.space_explorer.tree_root
# markers = [utils.marker_clear_all("/map")]
# explored = self.non_overlapping_paths(root)
# if type(explored[0]) == list:
# explored = reduce(lambda x,y: x+y, explored)
# print "explored: ", len(explored)
# markers += [utils.marker_from_circle(circle, index=i, linewidth=0.01, color=ColorRGBA(0, 1, 0, 0.1), lifetime=1.0) \
# for i, circle in enumerate(explored)]
# marker_array = MarkerArray(markers=markers)
# self.circle_pub.publish(marker_array)
# print len(self.circle_path.states)
# marker = utils.marker_from_circles(self.circle_path.states, 1, "Markers_NS2", "/map", 3.0, [1.,0.,0.])
# self.circle_pub.publish(marker)
markers = [utils.marker_clear_all("/map")]
markers += [utils.marker_from_circle(circle, index=i, linewidth=0.05, color=ColorRGBA(1, 0, 0, 0.4), \
lifetime=4.0)
for i, circle in enumerate(self.circle_path.states)]
# print len(markers)
marker_array = MarkerArray(markers=markers)
self.circle_pub.publish(marker_array)
if self.should_refine_trajectory and self.fast_circle_pub.get_num_connections() > 0 and self.fast_path:
print "FAST CIRCLE PUB"
# root = self.path_planner.tree_root
# markers = [utils.marker_clear_all("/map")]
# explored = self.non_overlapping_paths(root)
# if type(explored[0]) == list:
# explored = reduce(lambda x,y: x+y, explored)
# print "explored: ", len(explored)
# markers += [utils.marker_from_circle(circle, index=i, linewidth=0.01, color=ColorRGBA(0, 0, 1, 0.1), lifetime=1.0) \
# for i, circle in enumerate(explored)]
markers = [utils.marker_clear_all("/map")]
markers += [utils.marker_from_circle(circle, index=i, linewidth=0.05, color=ColorRGBA(1, 0, 0, 0.4), \
lifetime=4.0)
for i, circle in enumerate(self.fast_path.states)]
marker_array = MarkerArray(markers=markers)
self.fast_circle_pub.publish(marker_array)
if self.show_exploration_buffer and self.exploration_pub.get_num_connections() > 0:
print "Publishing exploration grid"
self.exploration_pub.publish(self.map.get_exploration_occupancy_grid())
# plt.imshow(self.map.exploration_buffer)
# plt.imshow(self.map.permissible_region)
# plt.show()
print "visualize"