def build_laser_scan(self, ranges):
result = LaserScan()
result.header.stamp = rospy.Time.now()
result.angle_min = -almath.PI
result.angle_max = almath.PI
if len(ranges[1]) > 0:
result.angle_increment = (result.angle_max - result.angle_min) / len(ranges[1])
result.range_min = 0.0
result.range_max = ranges[0]
for range_it in ranges[1]:
result.ranges.append(range_it[1])
return result
# do it!
python类LaserScan()的实例源码
def __init__(self):
"""Constructor for the class
initialize topic subscription and
instance variables
"""
self.r = rospy.Rate(5)
self.publisher = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
rospy.Subscriber('/scan', LaserScan, self.process_scan)
# user chooses which parking mode to be performed
self.is_parallel = rospy.get_param('~parallel', False)
# Instance Variables
self.timestamp1 = None
self.timestamp2 = None
self.dist2Neato = None
self.dist2Wall = None
self.widthOfSpot = None
self.twist = None
self.radius = None
# Adjusment to be made before moving along the arc
self.adjustment = 0
self.isAligned = False
def __init__(self):
#constants for racecar speed and angle calculations
self.pSpeed = 0.3 #tweak
self.pAngle = 1
#positive charge behind racecar to give it a "kick" (forward vector)
self.propelling_charge = 4.5
#more constants
self.charge = 0.01
self.safety_threshold = 0.3
self.speeds = [1] #Creates a list of speeds
self.stuck_time = 0
self.stuck = False
self.stuck_threshold = 2
rospy.init_node("explorer")
self.sub = rospy.Subscriber('scan', LaserScan, self.laser_callback)
self.pub = rospy.Publisher('/vesc/ackermann_cmd_mux/input/navigation', AckermannDriveStamped)
rospy.on_shutdown(self.shutdown)
def run(self):
""" Run our code """
rospy.loginfo("Start laser test code")
#self.blank_image()
#for x in range(0, 99):
# self.add_point(x * 1.0, x * 1.0, 10)
#cv2.imshow("Hackme", self.img)
#cv2.waitKey(0)
self.joint_subscriber = rospy.Subscriber("/multisense/joint_states", JointState, self.recv_joint_state)
self.scan_subscriber = rospy.Subscriber("/multisense/lidar_scan", LaserScan, self.recv_scan)
self.spindle = rospy.Publisher("/multisense/set_spindle_speed", Float64, queue_size=10)
self.set_x_range(-1.0, 1.0)
self.set_y_range(-1.0, 1.0)
self.set_z_range(0.0, 2.0)
rospy.sleep(0.1)
rospy.loginfo("Setting spindle going")
self.spindle.publish(Float64(1.0))
#self.zarj.zps.look_down()
rospy.loginfo("Spin forever, hopefully receiving data...")
while True:
rospy.sleep(1.0)
#print "Got cloud {}x{}".format(resp.cloud.height, resp.cloud.width)
#img = self.create_image_from_cloud(resp.cloud.points)
#cv2.destroyAllWindows()
#print "New image"
#cv2.imshow("My image", img)
#cv2.waitKey(1)
#print resp
#cv_image = self.bridge.imgmsg_to_cv2(resp.cloud, "bgr8")
#cv2.imshow("Point cloud", cv_image)
def laser_listener():
rospy.init_node('laser_listener', anonymous=True)
rospy.Subscriber("/scan", LaserScan,callback)
rospy.spin()
def publish_scan(self, angles, ranges):
# publish the given angels and ranges as a laser scan message
ls = LaserScan()
ls.header = Utils.make_header("laser", stamp=self.last_stamp)
ls.angle_min = np.min(angles)
ls.angle_max = np.max(angles)
ls.angle_increment = np.abs(angles[0] - angles[1])
ls.range_min = 0
ls.range_max = np.max(ranges)
ls.ranges = ranges
self.pub_fake_scan.publish(ls)
def setup():
""" Setup ROS node. Create listener and talker threads """
global pub_cmd_vel
global srv_reset_positions
print("\n Initiating ROS Node")
rospy.init_node(NODE_NAME, anonymous=True)
# ToDo: Change to 'odom' in ROS launch for Giraff:
rospy.Subscriber('odom_giraff', Odometry, callback_odom)
rospy.Subscriber('laser_scan', LaserScan, callback_base_scan)
pub_cmd_vel = rospy.Publisher(
'cmd_vel', Twist, queue_size=1, tcp_nodelay=True)
srv_reset_positions = rospy.ServiceProxy('reset_positions', Empty)
thread_listener = Thread(target=listener, args=[])
thread_talker = Thread(target=talker, args=[])
thread_listener.start()
thread_talker.start()
rospy.loginfo("Node Initiated: %s", NODE_NAME)
return
def __init__(self):
self.received_data = None
self.parsed_data = None
self.angles = None
self.bins = None
self.averages = None
self.sub = rospy.Subscriber("/scan", LaserScan, self.lidarCB, queue_size=1)
self.pub = rospy.Publisher("/vesc/low_level/ackermann_cmd_mux/input/safety",\
AckermannDriveStamped, queue_size =1 )
self.thread = Thread(target=self.drive)
self.thread.start()
rospy.loginfo("safety node initialized")
def publish_scan(self, angles, ranges):
ls = LaserScan()
ls.header = Utils.make_header("laser", stamp=self.last_stamp)
ls.angle_min = np.min(angles)
ls.angle_max = np.max(angles)
ls.angle_increment = np.abs(angles[0] - angles[1])
ls.range_min = 0
ls.range_max = np.max(ranges)
ls.ranges = ranges
self.pub_fake_scan.publish(ls)
def __init__(self):
#constants for racecar speed and angle calculations
self.pSpeed = 2 #tweak
self.pAngle = .7
#positive charge behind racecar to give it a "kick" (forward vector)
self.propelling_charge = 5
#more constants
self.charge = 0.01
self.safety_threshold = 0.3
self.speeds = [1] #Creates a list of speeds
self.stuck_time = 0
self.stuck = False
self.stuck_threshold = 2
self.e1=0
rospy.init_node("explorer")
self.sub = rospy.Subscriber('scan', LaserScan, self.laser_callback)
self.pub = rospy.Publisher('/vesc/ackermann_cmd_mux/input/navigation', AckermannDriveStamped)
rospy.on_shutdown(self.shutdown)
def __init__(self):
super(Robot, self).__init__()
rospy.init_node('betelgeuse-ai')
self.speed = 0.0
self.turn = 0.0
self.drift = 0.0
self.drive_pub = rospy.Publisher("/ai/ai/drive_command", KamaroDriveCommand, queue_size=1)
self.lidar_front_sub = rospy.Subscriber("/lidar_filtered/lidar_front_scan_filtered", LaserScan, self._on_lidar_front)
self.lidar_back_sub = rospy.Subscriber("/lidar_filtered/lidar_back_scan_filtered", LaserScan, self._on_lidar_back)
self.odom_sub = rospy.Subscriber("/odom/fused", Odometry, self._on_odometry)
def __init__(self, max_random_start,
observation_dims, data_format, display, use_cumulated_reward):
self.max_random_start = max_random_start
self.action_size = 28
self.use_cumulated_reward = use_cumulated_reward
self.display = display
self.data_format = data_format
self.observation_dims = observation_dims
self.screen = np.zeros((self.observation_dims[0],self.observation_dims[1]), np.uint8)
#self.dirToTarget = 0
self.robotPose = Pose()
self.goalPose = Pose()
self.robotRot = 0.0
self.prevDist = 0.0
self.boom = False
self.numWins = 0
self.ep_reward = 0.0
self.terminal = 0
self.sendTerminal = 0
self.readyForNewData = True
rospy.wait_for_service('reset_positions')
self.resetStage = rospy.ServiceProxy('reset_positions', EmptySrv)
# Subscribers:
# rospy.Subscriber('base_scan', LaserScan, self.scanCB,queue_size=1)
# rospy.Subscriber('base_pose_ground_truth', Odometry, self.poseUpdateCB, queue_size=1)
# trying time sync:
sub_scan_ = message_filters.Subscriber('base_scan', LaserScan)
sub_pose_ = message_filters.Subscriber('base_pose_ground_truth', Odometry)
ts = message_filters.TimeSynchronizer( [sub_scan_, sub_pose_], 1)
ts.registerCallback( self.syncedCB)
# publishers:
self.pub_vel_ = rospy.Publisher('cmd_vel', Twist, queue_size = 1)
self.pub_rew_ = rospy.Publisher("lastreward",Float64,queue_size=10)
self.pub_goal_ = rospy.Publisher("goal", Pose, queue_size = 15)
def __init__(self, direction):
if direction not in [RIGHT, LEFT]:
rospy.loginfo("incorect %s wall selected. choose left or right")
rospy.signal_shutdown()
self.direction = direction
if SHOW_VIS:
self.viz = DynamicPlot()
self.viz.initialize()
self.pub = rospy.Publisher("/vesc/high_level/ackermann_cmd_mux/input/nav_0",\
AckermannDriveStamped, queue_size =1 )
self.sub = rospy.Subscriber("/scan", LaserScan, self.lidarCB, queue_size=1)
if PUBLISH_LINE:
self.line_pub = rospy.Publisher("/viz/line_fit", PolygonStamped, queue_size =1 )
# computed control instructions
self.control = None
self.steering_hist = CircularArray(HISTORY_SIZE)
# containers for laser scanner related data
self.data = None
self.xs = None
self.ys = None
self.m = 0
self.c = 0
# flag to indicate the first laser scan has been received
self.received_data = False
# cached constants
self.min_angle = None
self.max_angle = None
self.direction_muliplier = 0
self.laser_angles = None
self.drive_thread = Thread(target=self.apply_control)
self.drive_thread.start()
if SHOW_VIS:
while not rospy.is_shutdown():
self.viz_loop()
rospy.sleep(0.1)