def __init__(self):
rospy.init_node('gaze', anonymous=False)
self.lock = threading.RLock()
with self.lock:
self.current_state = GazeState.IDLE
self.last_state = self.current_state
# Initialize Variables
self.glance_timeout = 0
self.glance_timecount = 0
self.glance_played = False
self.idle_timeout = 0
self.idle_timecount = 0
self.idle_played = False
rospy.loginfo('\033[92m[%s]\033[0m waiting for bringup social_mind...'%rospy.get_name())
rospy.wait_for_service('environmental_memory/read_data')
rospy.wait_for_service('social_events_memory/read_data')
self.rd_memory = {}
self.rd_memory['environmental_memory'] = rospy.ServiceProxy('environmental_memory/read_data', ReadData)
self.rd_memory['social_events_memory'] = rospy.ServiceProxy('social_events_memory/read_data', ReadData)
rospy.Subscriber('raising_events', RaisingEvents, self.handle_raising_events)
rospy.Subscriber('gaze_focusing', String, self.handle_gaze_focusing)
self.pub_gaze_cmd = rospy.Publisher('gaze_command', GazeCommand, queue_size=10)
self.pub_viz_gaze_cmd = rospy.Publisher('visualization_gaze_cmd', PointStamped, queue_size=10)
rospy.Timer(rospy.Duration(GAZE_CONTROLLER_PERIOD), self.handle_gaze_controller)
rospy.loginfo('\033[92m[%s]\033[0m initialized...'%rospy.get_name())
rospy.spin()
python类PointStamped()的实例源码
def lookAt(self, pos, sim=False):
goal = PointHeadGoal()
point = PointStamped()
point.header.frame_id = self.frame
point.point.x = pos[0]
point.point.y = pos[1]
point.point.z = pos[2]
goal.target = point
# Point using kinect frame
goal.pointing_frame = 'head_mount_kinect_rgb_link'
if sim:
goal.pointing_frame = 'high_def_frame'
goal.pointing_axis.x = 1
goal.pointing_axis.y = 0
goal.pointing_axis.z = 0
goal.min_duration = rospy.Duration(1.0)
goal.max_velocity = 1.0
self.pointHeadClient.send_goal_and_wait(goal)
def find_button(macro):
macro.fc.send_stereo_camera()
image, details = macro.fc.zarj.eyes.get_cloud_image_with_details(macro.fc.cloud)
things = Things(image, details, 2)
if things.array_button is not None:
point = PointStamped()
point.header = macro.fc.cloud.header
det = things.array_button.computed_center
point.point.x = det[0]
point.point.y = det[1]
point.point.z = det[2]
button = macro.fc.zarj.transform.tf_buffer.transform(point, macro.fc.zarj.walk.lfname)
log("button found at: {}/{}/{}".format(button.point.x, button.point.y, button.point.z))
macro.fc.points[0] = [ button.point.x, button.point.y, button.point.z ]
macro.fc.points[1] = None
return True
return False
def find_plug(macro):
macro.fc.send_stereo_camera()
image, details = macro.fc.zarj.eyes.get_cloud_image_with_details(macro.fc.cloud)
things = Things(image, details, 2)
if things.power_plug:
point = PointStamped()
point.header = macro.fc.cloud.header
det = details[things.power_plug.position[1], things.power_plug.position[0]]
point.point.x = det[0]
point.point.y = det[1]
point.point.z = det[2]
plug = macro.fc.zarj.transform.tf_buffer.transform(point, macro.fc.zarj.walk.lfname)
log("plug at: {}/{}/{}".format(plug.point.x, plug.point.y, plug.point.z))
macro.fc.points[0] = [plug.point.x, plug.point.y, plug.point.z]
macro.fc.points[1] = None
return True
return False
def find_repair_button(macro):
image, details = macro.fc.zarj.eyes.get_cloud_image_with_details(macro.fc.cloud)
things = Things(image, details, 3)
if things.repair_button is not None and things.repair_button.computed_center is not None:
point = PointStamped()
point.header = macro.fc.cloud.header
det = things.repair_button.computed_center
point.point.x = det[0]
point.point.y = det[1]
point.point.z = det[2]
button = macro.fc.zarj.transform.tf_buffer.transform(point, macro.fc.zarj.walk.lfname)
log("button found at: {}/{}/{}".format(button.point.x, button.point.y, button.point.z))
return button.point
return None
def _ensure_leak(self):
""" If we do not have a leak detected assume our hand is in the
right place"""
global LAST_LEAK, LEAK_SIDE
if LAST_LEAK is None:
palm = self.fc.zarj.hands.get_current_hand_center_transform(
'left', 'world')
LAST_LEAK = PointStamped()
LAST_LEAK.header.frame_id = 'world'
LAST_LEAK.point = palm.translation
log("No leak previously detected.. Assuming left hand is correct")
log('Leak at {}'.format(LAST_LEAK))
current = self.fc.zarj.hands.get_joint_states('left')
LEAK_SIDE = current[0]['leftShoulderPitch'][0] > 0
log('Leak is behind us: '.format(LEAK_SIDE))
def __init__(self):
rospy.Subscriber('/mavros/local_position/pose', PoseStamped, self.pose_callback, queue_size=1)
rospy.Subscriber('/marker_data', MarkerArray, self.marker_callback, queue_size=1)
self.vision_position_pub = rospy.Publisher('/mavros/vision_pose/pose', PoseStamped, queue_size=1)
self.offset_pub = rospy.Publisher('~offset', PointStamped, queue_size=1)
self.position_fcu_pub = rospy.Publisher('~position/fcu', PoseStamped, queue_size=1)
self.pose_pub = rospy.Publisher('~position/marker_map', PoseStamped, queue_size=1)
self.vision_position_message = PoseStamped()
self.position_fcu_message = PoseStamped()
self.position_fcu_message.header.frame_id = 'vision_fcu'
self.pose_message = PoseStamped()
self.pose_message.header.frame_id = 'marker_map'
self.marker_position_offset = PointStamped()
def subscribePoint():
rospy.Subscriber('/clicked_point', PointStamped, printclickpoint)
def subscribePoint():
rospy.Subscriber('/clicked_point', PointStamped, printclickpoint)
def build_point(self, x, y, z, header):
""" Take an X,Y,Z and convert it into a head point """
point = PointStamped()
point.header = header
point.point.x = x
point.point.y = y
point.point.z = z
head_pnt = self.tf.tf_buffer.transform(point, self.frame_id)
head_pnt.header.seq = header.seq
return head_pnt
def _find_stair_base(self, cloud, distance, center, width):
""" Try to find the base of the stairs really exactly """
_, details = self.zarj.eyes.get_cloud_image_with_details(cloud)
occ = 0
pnt = None
while pnt is None:
if not np.isnan(details[distance][center-occ][0]):
pnt = details[distance][center-occ]
break
if not np.isnan(details[distance][center+occ][0]):
pnt = details[distance][center+occ]
break
occ += 1
if occ >= width/6:
occ = 0
distance -= 1
if pnt is not None:
point = PointStamped()
point.header = cloud.header
point.point.x = pnt[0]
point.point.y = pnt[1]
point.point.z = pnt[2]
pnt = self.zarj.transform.tf_buffer.transform(point,
self.zarj.walk.lfname)
return pnt.point.x
log("ERROR: Could not find stair base")
return None
def find_button(self):
cloud = self.zarj.eyes.get_stereo_cloud()
image, details = self.zarj.eyes.get_cloud_image_with_details(cloud)
things = Things(image, details, 2, True)
if things.array_button is not None and things.array_button.computed_center is not None:
button = PointStamped()
button.header = cloud.header
button.point.x = things.array_button.computed_center[0]
button.point.y = things.array_button.computed_center[1]
button.point.z = things.array_button.computed_center[2]
button_in_foot = self.zarj.transform.tf_buffer.transform(button, self.zarj.walk.lfname)
p = button_in_foot.point
print "JPW sez button is {}/{}/{}".format(p.x, p.y, p.z)
def handle_mouse(self, event, x, y, flags, param):
if event == cv2.EVENT_LBUTTONUP:
point = PointStamped()
point.header = self.cloud.header
point.point.x = self.details[y, x][0]
point.point.y = self.details[y, x][1]
point.point.z = self.details[y, x][2]
print x, y, self.details[y,x]
result = self.zarj.transform.tf_buffer.transform(point, 'rightIndexFingerPitch1Link')
print result
def find_choke(macro):
macro.fc.send_stereo_camera()
image, details = macro.fc.zarj.eyes.get_cloud_image_with_details(macro.fc.cloud)
things = Things(image, details, 2)
if things.choke_inner:
point = PointStamped()
point.header = macro.fc.cloud.header
det = details[things.choke_inner.position[1], things.choke_inner.position[0]]
point.point.x = det[0]
point.point.y = det[1]
point.point.z = det[2]
inner = macro.fc.zarj.transform.tf_buffer.transform(point, macro.fc.zarj.walk.lfname)
log("inner choke found at: {}/{}/{}".format(inner.point.x, inner.point.y, inner.point.z))
if things.choke_outer:
point = PointStamped()
point.header = macro.fc.cloud.header
det = details[things.choke_outer.position[1], things.choke_outer.position[0]]
point.point.x = det[0]
point.point.y = det[1]
point.point.z = det[2]
outer = macro.fc.zarj.transform.tf_buffer.transform(point, macro.fc.zarj.walk.lfname)
log("outer choke found at: {}/{}/{}".format(outer.point.x, outer.point.y, outer.point.z))
macro.fc.points[0] = [outer.point.x, outer.point.y, outer.point.z]
macro.fc.points[1] = [inner.point.x, inner.point.y, inner.point.z]
return True
return False
def start(self, _=None):
""" Start the macro """
global LAST_LEAK
self.leak = None
self.best_leak = 0.01
sub = rospy.Subscriber("/task3/checkpoint5/leak", Leak,
self._leak_update)
self.fc.zarj.neck.neck_control([0.35, 1.0, 0], True)
self._check_distance()
found, stage = self._full_cycle(0)
if self.stop:
return
while found is not None and not found:
log('Leak not found at : {}'.format(stage))
found, stage = self._full_cycle(stage)
if self.stop:
break
if found:
self.fc.process_get_palm_msg(ZarjGetPalmCommand('left'))
joint_values = self.fc.zarj.hands.get_joint_values('left')
ack = ZarjGetArmJointsResponse('left', joint_values)
self.fc.zarj_comm.push_message(ack)
log('leak found! stage {}'.format(stage))
palm_transform = self.fc.zarj.hands.get_current_hand_center_transform(
'left', 'world')
LAST_LEAK = PointStamped()
LAST_LEAK.header.frame_id = 'world'
LAST_LEAK.point = palm_transform.translation
log('Leak at {}'.format(LAST_LEAK))
log('Arm and palm positions updated')
sub.unregister()
def clicked_pose(self, msg):
'''
Receive pose messages from RViz and initialize the particle distribution in response.
'''
if isinstance(msg, PointStamped):
self.initialize_global()
elif isinstance(msg, PoseWithCovarianceStamped):
self.initialize_particles_pose(msg.pose.pose)
def __init__(self):
rospy.init_node('tag_position_publisher', anonymous=True)
self.rate = rospy.Rate(LocalizationNode.EVERY_SECOND)
self.basestations = []
self.extractParams()
self.frame = '/target_' + str(self.tag)
self.ekf = ROSEKF(self.tag, self.basestations, selectBestPositions, Poly3(self.measurement_model_coeffs),
self.var_z, Poly5(self.measurement_weight_model_coeffs), self.max_selection, self.debug)
self.publisher = rospy.Publisher(self.frame, PointStamped, queue_size=10)
def createCommunicators(self):
self.measurement_requester = rospy.Publisher('measurements_request', String, queue_size=10)
self.measurement_reciver = rospy.Subscriber('measurements', MeasurementList, self.receiveMeasurements)
self.position_publisher = rospy.Publisher('/target_' + str(self.tag), PointStamped, queue_size=10)
def publishPosition(self):
msg = PointStamped()
msg.header.stamp = rospy.Time.now()
msg.header.frame_id = '/target_' + str(self.tag)
msg.point = Point(self.estimated_position[0], self.estimated_position[1], 0)
self.position_publisher.publish(msg)
self.publishCovarianceMatrix()
def __init__(self, address, x, y):
self.address = address
self.position = np.array([x, y])
self.frame = "basestation"
self.station_id = address.split('.', 4)[-1]
self.custom_frame = self.frame + '_' + self.station_id
self.publisher = rospy.Publisher(self.frame, PointStamped, queue_size=10)
def publishPosition(self):
msg = PointStamped()
msg.header.stamp = rospy.Time.now()
msg.header.frame_id = self.frame
msg.point = Point(self.position[0], self.position[1], 0)
self.publisher.publish(msg)
def publishPosition(self):
msg = PointStamped()
msg.header.stamp = rospy.Time.now()
msg.header.frame_id = self.frame
msg.point = Point(self.position[0], self.position[1], 0)
self.publisher.publish(msg)
def __init__(self, address, x, y):
self.address = address
self.position = np.array([x, y])
self.frame = "basestation"
self.station_id = address.split('.', 4)[-1]
self.namespace = self.frame + '_' + self.station_id
self.publisher = rospy.Publisher(self.frame, PointStamped, queue_size=10)
def publishPosition(self):
msg = PointStamped()
msg.header.stamp = rospy.Time.now()
msg.header.frame_id = self.frame
msg.point = Point(self.position[0], self.position[1], 0)
self.publisher.publish(msg)
def __init__(self):
self.save_path = os.path.join(rospy.get_param("~save_path"), time.strftime("%Y-%m-%d-%H-%M-%S") + ".traj")
self.trajectory = LineTrajectory("/built_trajectory")
# receive either goal points or clicked points
self.publish_point_sub = rospy.Subscriber("/clicked_point", PointStamped, self.clicked_point_callback, queue_size=1)
self.nav_goal_sub = rospy.Subscriber("/move_base_simple/goal", PoseStamped, self.clicked_point_callback, queue_size=1)
# save the built trajectory on shutdown
rospy.on_shutdown(self.saveTrajectory)
def clicked_point_callback(self, msg):
if isinstance(msg, PointStamped):
self.trajectory.addPoint(msg.point)
elif isinstance(msg, PoseStamped):
self.trajectory.addPoint(msg.pose.position)
# publish visualization of the path being built
self.trajectory.publish_viz()
def clicked_pose(self, msg):
'''
Receive pose messages from RViz and initialize the particle distribution in response.
'''
if isinstance(msg, PointStamped):
self.initialize_global()
elif isinstance(msg, PoseWithCovarianceStamped):
self.initialize_particles_pose(msg.pose.pose)
def __init__(self):
self.rate = rospy.get_param("~rate", 20.0)
self.period = 1.0 / self.rate
# angular mode maps angular z directly to steering angle
# (adjusted appropriately)
# non-angular mode is somewhat suspect, but it turns
# a linear y into a command to turn just so that the
# achieved linear x and y match the desired, though
# the vehicle has to turn to do so.
# Non-angular mode is not yet supported.
self.angular_mode = rospy.get_param("~angular_mode", True)
# Not used yet
# self.tf_buffer = tf2_ros.Buffer()
# self.tf = tf2_ros.TransformListener(self.tf_buffer)
# broadcast odometry
self.br = tf2_ros.TransformBroadcaster()
self.ts = TransformStamped()
self.ts.header.frame_id = "map"
self.ts.child_frame_id = "base_link"
self.ts.transform.rotation.w = 1.0
self.angle = 0
# The cmd_vel is assumed to be in the base_link frame centered
# on the middle of the two driven wheels
# This is half the distance between the two drive wheels
self.base_radius = rospy.get_param("~base_radius", 0.06)
self.wheel_radius = rospy.get_param("~wheel_radius", 0.04)
self.left_wheel_joint = rospy.get_param("~left_wheel_joint", "wheel_front_left_joint")
self.right_wheel_joint = rospy.get_param("~right_wheel_joint", "wheel_front_right_joint")
self.point_pub = rospy.Publisher("cmd_vel_spin_center", PointStamped, queue_size=1)
self.joint_pub = {}
self.joint_pub['left'] = rospy.Publisher("front_left/joint_state",
JointState, queue_size=1)
self.joint_pub['right'] = rospy.Publisher("front_right/joint_state",
JointState, queue_size=1)
# TODO(lucasw) is there a way to get TwistStamped out of standard
# move_base publishers?
# TODO(lucasw) make this more generic, read in a list of any number of wheels
# the requirement is that that all have to be aligned, and also need
# a set spin center.
self.ind = {}
self.joint_state = {}
self.joint_state['left'] = JointState()
self.joint_state['left'].name.append(self.left_wheel_joint)
self.joint_state['left'].position.append(0.0)
self.joint_state['left'].velocity.append(0.0)
self.joint_state['right'] = JointState()
self.joint_state['right'].name.append(self.right_wheel_joint)
self.joint_state['right'].position.append(0.0)
self.joint_state['right'].velocity.append(0.0)
self.cmd_vel = Twist()
rospy.Subscriber("cmd_vel", Twist, self.cmd_vel_callback, queue_size=2)
self.timer = rospy.Timer(rospy.Duration(self.period), self.update)
def detection(self, hsv_image):
"""Check for detection in the image """
mask = cv2.inRange(hsv_image, self.color_low, self.color_high)
if self.baseline_cnt > 0:
nmask = cv2.bitwise_not(mask)
if self.baseline is None:
rospy.loginfo("getting baseline for {}".format(self.name))
self.baseline = nmask
else:
self.baseline = cv2.bitwise_or(nmask, self.baseline)
mask = cv2.bitwise_and(mask, self.baseline)
count = cv2.countNonZero(mask) + self.baseline_fuzzy
self.low_count = max(self.low_count, count)
self.high_count = self.low_count + self.baseline_fuzzy
self.baseline_cnt -= 1
return
elif self.baseline is not None:
mask = cv2.bitwise_and(mask, self.baseline)
count = cv2.countNonZero(mask)
if count > self.low_count and self.active is None:
self.active = rospy.get_rostime()
rospy.loginfo("{} ACTIVE ({})".format(self.name, count))
self.cloud.reset()
if self.callbacks[0] is not None:
self.callbacks[0](self.name)
elif self.active is not None and count < self.high_count:
rospy.loginfo("{} GONE ({})".format(self.name, count))
self.cloud.reset()
self.active = None
if self.callbacks[2] is not None:
self.published = False
self.report_count = 0
if self.callbacks[1] is not None:
self.callbacks[1](self.name)
# DEBUGGING to see what the masked image for the request is
if self.debug:
cv2.namedWindow(self.name, cv2.WINDOW_NORMAL)
if self.baseline is not None:
cv2.namedWindow(self.name+'_baseline', cv2.WINDOW_NORMAL)
cv2.imshow(self.name+'_baseline', self.baseline)
cv2.imshow(self.name, mask)
cv2.waitKey(1)
# to to see if we notify the location callback
if self.is_active() and self.report_count > self.min_reports:
now = rospy.get_rostime()
if (self.active + self.stablize_time) < now:
self.published = True
point = PointStamped()
center = self.cloud.find_center()
point.header.seq = 1
point.header.stamp = now
point.header.frame_id = self.frame_id
point.point.x = center[0]
point.point.y = center[1]
point.point.z = center[2]
if self.callbacks[2] is not None:
self.callbacks[2](self.name, point)
def process_palm_msg(self, msg):
if math.isnan(msg.x) or math.isnan(msg.y) or math.isnan(msg.z):
self.oclog("Warning - NAN position found. skipping");
return
point_vector = Vector3(msg.x, msg.y, msg.z)
if not msg.relative:
point = PointStamped()
point.point = point_vector
if self.cloud is None:
point.header.seq = 1
point.header.stamp = rospy.get_rostime() - rospy.Duration(0.1)
else:
point.header = deepcopy(self.cloud.header)
if msg.use_left_foot:
point.header.frame_id = self.zarj.walk.lfname
desired_frame = self.zarj.transform.lookup_transform('world', point.header.frame_id, point.header.stamp)
result = self.zarj.transform.tf_buffer.transform(point, 'world')
if math.isnan(msg.yaw) or math.isnan(msg.pitch) or math.isnan(msg.roll):
current = self.zarj.hands.get_current_hand_center_transform(msg.sidename)
rotation = current.rotation
else:
desired_rotation = quaternion_multiply(
msg_q_to_q(desired_frame.transform.rotation),
yaw_pitch_roll_to_q([msg.yaw, msg.pitch, msg.roll]) )
rotation = q_to_msg_q(desired_rotation)
self.zarj.hands.move_hand_center_abs(msg.sidename, result.point, rotation)
self.oclog("Palm move to {}, {}, {}; yaw {}, pitch {}, roll {} commanded".format(
fmt(msg.x), fmt(msg.y), fmt(msg.z), fmt(msg.yaw), fmt(msg.pitch), fmt(msg.roll)))
else:
if math.isnan(msg.yaw):
msg.yaw = 0.0
if math.isnan(msg.pitch):
msg.pitch = 0.0
if math.isnan(msg.roll):
msg.roll = 0.0
self.zarj.hands.move_with_yaw_pitch_roll(msg.sidename, point_vector,
[ msg.yaw, msg.pitch, msg.roll ])
self.oclog("Relative palm move to {}, {}, {} commanded; yaw {}, pitch {}, roll {}".format(
fmt(msg.x), fmt(msg.y), fmt(msg.z), fmt(msg.yaw), fmt(msg.pitch), fmt(msg.roll)))