def to_pose_stamped(self):
pose = PoseStamped()
pose.header.stamp = rospy.Time.now()
pose.header.frame_id = "map"
pose.pose.position.x = self.x
pose.pose.position.y = self.y
pose.pose.position.z = 0.25
quaternion = tf.transformations.quaternion_from_euler(0, 0, self.theta)
pose.pose.orientation.x = quaternion[0]
pose.pose.orientation.y = quaternion[1]
pose.pose.orientation.z = quaternion[2]
pose.pose.orientation.w = quaternion[3]
return pose
python类PoseStamped()的实例源码
def ik_solve(limb, pos, orient):
ns = "ExternalTools/" + limb + "/PositionKinematicsNode/IKService"
iksvc = rospy.ServiceProxy(ns, SolvePositionIK)
ikreq = SolvePositionIKRequest()
hdr = Header(stamp=rospy.Time.now(), frame_id='base')
poses = {
str(limb): PoseStamped(header=hdr, pose=Pose(position=pos, orientation=orient))}
ikreq.pose_stamp.append(poses[limb])
try:
rospy.wait_for_service(ns, 5.0)
resp = iksvc(ikreq)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
return 1
if resp.isValid[0]:
limb_joints = dict(zip(resp.joints[0].name, resp.joints[0].position))
return limb_joints
else:
return Errors.RaiseNotReachable(pos)
def subscribePose():
rospy.Subscriber('/initialpose', PoseWithCovarianceStamped, getPoseData)
# rospy.Subscriber('/move_base_simple/goal', PoseStamped, getPoseData)
global background
def __init__(self):
self.map = None
self.start = None
self.goal = None
self.moves = [Move(0.1, 0), # forward
Move(-0.1, 0), # back
Move(0, 1.5708), # turn left 90
Move(0, -1.5708)] # turn right 90
self.robot = Robot(0.5, 0.5)
self.is_working = False # Replace with mutex after all
self.map_subscriber = rospy.Subscriber("map", OccupancyGrid, self.new_map_callback)
self.start_subscriber = rospy.Subscriber("initialpose", PoseWithCovarianceStamped, self.new_start_callback)
self.goal_subscriber = rospy.Subscriber("goal", PoseStamped, self.new_goal_callback)
self.path_publisher = rospy.Publisher("trajectory", MarkerArray, queue_size=1)
self.pose_publisher = rospy.Publisher("debug_pose", PoseStamped, queue_size=1)
# what will be there. A module goes into variable. Isn't it too much memory consumption. Maybe I should assign function replan() to this variable?
self.planner = planners.astar.replan
def updatePoseTopic(self, next_index, wait=True):
planning_group = self.planning_groups_keys[self.current_planning_group_index]
topics = self.planning_groups[planning_group]
if next_index >= len(topics):
self.current_eef_index = 0
elif next_index < 0:
self.current_eef_index = len(topics) - 1
else:
self.current_eef_index = next_index
next_topic = topics[self.current_eef_index]
rospy.loginfo("Changed controlled end effector to " + self.planning_groups_tips[planning_group][self.current_eef_index])
self.pose_pub = rospy.Publisher(next_topic, PoseStamped, queue_size=5)
if wait:
self.waitForInitialPose(next_topic)
self.current_pose_topic = next_topic
def markerCB(self, msg):
try:
self.marker_lock.acquire()
if not self.initialize_poses:
return
self.initial_poses = {}
for marker in msg.markers:
if marker.name.startswith("EE:goal_"):
# resolve tf
if marker.header.frame_id != self.frame_id:
ps = PoseStamped(header=marker.header, pose=marker.pose)
try:
transformed_pose = self.tf_listener.transformPose(self.frame_id, ps)
self.initial_poses[marker.name[3:]] = transformed_pose.pose
except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException, e):
rospy.logerr("tf error when resolving tf: %s" % e)
else:
self.initial_poses[marker.name[3:]] = marker.pose #tf should be resolved
finally:
self.marker_lock.release()
def waitForInitialPose(self, next_topic, timeout=None):
counter = 0
while not rospy.is_shutdown():
counter = counter + 1
if timeout and counter >= timeout:
return False
try:
self.marker_lock.acquire()
self.initialize_poses = True
topic_suffix = next_topic.split("/")[-1]
if self.initial_poses.has_key(topic_suffix):
self.pre_pose = PoseStamped(pose=self.initial_poses[topic_suffix])
self.initialize_poses = False
return True
else:
rospy.logdebug(self.initial_poses.keys())
rospy.loginfo("Waiting for pose topic of '%s' to be initialized",
topic_suffix)
rospy.sleep(1)
finally:
self.marker_lock.release()
def __init__(self):
rospy.Subscriber('/mavros/local_position/pose', PoseStamped, self.pose_callback, queue_size=1)
rospy.Subscriber('/marker_data', MarkerArray, self.marker_callback, queue_size=1)
self.vision_position_pub = rospy.Publisher('/mavros/vision_pose/pose', PoseStamped, queue_size=1)
self.offset_pub = rospy.Publisher('~offset', PointStamped, queue_size=1)
self.position_fcu_pub = rospy.Publisher('~position/fcu', PoseStamped, queue_size=1)
self.pose_pub = rospy.Publisher('~position/marker_map', PoseStamped, queue_size=1)
self.vision_position_message = PoseStamped()
self.position_fcu_message = PoseStamped()
self.position_fcu_message.header.frame_id = 'vision_fcu'
self.pose_message = PoseStamped()
self.pose_message.header.frame_id = 'marker_map'
self.marker_position_offset = PointStamped()
def __init__(self):
self.topics = {
"ball": {"topic": "environment/ball", "sub": None, "data": CircularState(), "type": CircularState},
"light": {"topic": "environment/light", "sub": None, "data": UInt8(), "type": UInt8},
"sound": {"topic": "environment/sound", "sub": None, "data": Float32(), "type": Float32},
"ergo_state": {"topic": "ergo/state", "sub": None, "data": CircularState(), "type": CircularState},
"joy1": {"topic": "sensors/joystick/1", "sub": None, "data": Joy(), "type": Joy},
"joy2": {"topic": "sensors/joystick/2", "sub": None, "data": Joy(), "type": Joy},
"torso_l_j": {"topic": "{}/joint_state".format("poppy_torso"), "sub": None, "data": JointState(), "type": JointState},
"torso_l_eef": {"topic": "{}/left_arm/end_effector_pose".format("poppy_torso"), "sub": None, "data": PoseStamped(), "type": PoseStamped},
"torso_r_eef": {"topic": "{}/right_arm/end_effector_pose".format("poppy_torso"), "sub": None, "data": PoseStamped(), "type": PoseStamped}
}
self.topics["ball"]["sub"] = rospy.Subscriber(self.topics["ball"]["topic"], self.topics["ball"]["type"], self.cb_ball)
self.topics["light"]["sub"] = rospy.Subscriber(self.topics["light"]["topic"], self.topics["light"]["type"], self.cb_light)
self.topics["sound"]["sub"] = rospy.Subscriber(self.topics["sound"]["topic"], self.topics["sound"]["type"], self.cb_sound)
self.topics["ergo_state"]["sub"] = rospy.Subscriber(self.topics["ergo_state"]["topic"], self.topics["ergo_state"]["type"], self.cb_ergo)
self.topics["joy1"]["sub"] = rospy.Subscriber(self.topics["joy1"]["topic"], self.topics["joy1"]["type"], self.cb_joy1)
self.topics["joy2"]["sub"] = rospy.Subscriber(self.topics["joy2"]["topic"], self.topics["joy2"]["type"], self.cb_joy2)
self.topics["torso_l_j"]["sub"] = rospy.Subscriber(self.topics["torso_l_j"]["topic"], self.topics["torso_l_j"]["type"], self.cb_torso_l_j)
self.topics["torso_l_eef"]["sub"] = rospy.Subscriber(self.topics["torso_l_eef"]["topic"], self.topics["torso_l_eef"]["type"], self.cb_torso_l_eef)
self.topics["torso_r_eef"]["sub"] = rospy.Subscriber(self.topics["torso_r_eef"]["topic"], self.topics["torso_r_eef"]["type"], self.cb_torso_r_eef)
def __init__(self):
self.frame_id = rospy.get_param("~frame_id", "map")
cov_x = rospy.get_param("~cov_x", 0.6)
cov_y = rospy.get_param("~cov_y", 0.6)
cov_z = rospy.get_param("~cov_z", 0.6)
self.cov = self.cov_matrix(cov_x, cov_y, cov_z)
self.ps_pub = rospy.Publisher(
POSE_TOPIC, PoseStamped, queue_size=1)
self.ps_cov_pub = rospy.Publisher(
POSE_COV_TOPIC, PoseWithCovarianceStamped, queue_size=1)
self.ps_pub_3d = rospy.Publisher(
POSE_TOPIC_3D, PoseStamped, queue_size=1)
self.ps_cov_pub_3d = rospy.Publisher(
POSE_COV_TOPIC_3D, PoseWithCovarianceStamped, queue_size=1)
self.last = None
self.listener = tf.TransformListener()
self.tag_range_topics = rospy.get_param("~tag_range_topics")
self.subs = list()
self.ranges = dict()
self.tag_pos = dict()
self.altitude = 0.0
self.last_3d = None
for topic in self.tag_range_topics:
self.subs.append(rospy.Subscriber(topic, Range, self.range_cb))
def publish_position(self, pos, ps_pub, ps_cov_pub, cov):
x, y = pos[0], pos[1]
if len(pos) > 2:
z = pos[2]
else:
z = 0
ps = PoseStamped()
ps_cov = PoseWithCovarianceStamped()
ps.pose.position.x = x
ps.pose.position.y = y
ps.pose.position.z = z
ps.header.frame_id = self.frame_id
ps.header.stamp = rospy.get_rostime()
ps_cov.header = ps.header
ps_cov.pose.pose = ps.pose
ps_cov.pose.covariance = cov
ps_pub.publish(ps)
ps_cov_pub.publish(ps_cov)
def callback_state(self, data):
for idx, cube in enumerate(data.name):
self.cubes_state.setdefault(cube, [0] * 3)
pose = self.cubes_state[cube]
cube_init = self.CubeMap[cube]["init"]
pose[0] = data.pose[idx].position.x + cube_init[0]
pose[1] = data.pose[idx].position.y + cube_init[1]
pose[2] = data.pose[idx].position.z + cube_init[2]
# def add_cube(self, name):
# p = PoseStamped()
# p.header.frame_id = ros_robot.get_planning_frame()
# p.header.stamp = rospy.Time.now()
#
# # p.pose = self._arm.get_random_pose().pose
# p.pose.position.x = -0.18
# p.pose.position.y = 0
# p.pose.position.z = 0.046
#
# q = quaternion_from_euler(0.0, 0.0, 0.0)
# p.pose.orientation = Quaternion(*q)
# ros_scene.add_box(name, p, (0.02, 0.02, 0.02))
#
# def remove_cube(self, name):
# ros_scene.remove_world_object(name)
def _add_table(self, name):
p = PoseStamped()
p.header.frame_id = self._robot.get_planning_frame()
p.header.stamp = rospy.Time.now()
p.pose.position.x = 0.2
p.pose.position.y = 0.0
p.pose.position.z = 0.1
q = quaternion_from_euler(0.0, 0.0, numpy.deg2rad(90.0))
p.pose.orientation = Quaternion(*q)
# Table size from ~/.gazebo/models/table/model.sdf, using the values
# for the surface link.
self._scene.add_box(name, p, (0.005, 0.005, 0.005))
return p.pose
def _add_grasp_block_(self, name):
p = PoseStamped()
p.header.frame_id = self._robot.get_planning_frame()
p.header.stamp = rospy.Time.now()
# p.pose = self._arm.get_random_pose().pose
p.pose.position.x = 0.021
p.pose.position.y = 0.008
p.pose.position.z = 0.2885
q = quaternion_from_euler(0.0, 0.0, 0.0)
p.pose.orientation = Quaternion(*q)
# Coke can size from ~/.gazebo/models/coke_can/meshes/coke_can.dae,
# using the measure tape tool from meshlab.
# The box is the bounding box of the coke cylinder.
# The values are taken from the cylinder base diameter and height.
self._scene.add_box(name, p, (0.01, 0.01, 0.01))
return p.pose
def set_pose_target(self, pose, end_effector_link = ""):
""" Set the pose of the end-effector, if one is available. The expected input is a Pose message, a PoseStamped message or a list of 6 floats:"""
""" [x, y, z, rot_x, rot_y, rot_z] or a list of 7 floats [x, y, z, qx, qy, qz, qw] """
if len(end_effector_link) > 0 or self.has_end_effector_link():
ok = False
if type(pose) is PoseStamped:
old = self.get_pose_reference_frame()
self.set_pose_reference_frame(pose.header.frame_id)
ok = self._g.set_pose_target(conversions.pose_to_list(pose.pose), end_effector_link)
self.set_pose_reference_frame(old)
elif type(pose) is Pose:
ok = self._g.set_pose_target(conversions.pose_to_list(pose), end_effector_link)
else:
ok = self._g.set_pose_target(pose, end_effector_link)
if not ok:
raise MoveItCommanderException("Unable to set target pose")
else:
raise MoveItCommanderException("There is no end effector to set the pose for")
def place(self, object_name, location=None):
"""Place the named object at a particular location in the environment or somewhere safe in the world if location is not provided"""
result = False
if location is None:
result = self._g.place(object_name)
elif type(location) is PoseStamped:
old = self.get_pose_reference_frame()
self.set_pose_reference_frame(location.header.frame_id)
result = self._g.place(object_name, conversions.pose_to_list(location.pose))
self.set_pose_reference_frame(old)
elif type(location) is Pose:
result = self._g.place(object_name, conversions.pose_to_list(location))
elif type(location) is PlaceLocation:
result = self._g.place(object_name, conversions.msg_to_string(location))
else:
raise MoveItCommanderException("Parameter location must be a Pose, PoseStamped or PlaceLocation object")
return result
def _add_table(self, name):
p = PoseStamped()
p.header.frame_id = self._robot.get_planning_frame()
p.header.stamp = rospy.Time.now()
p.pose.position.x = 0.45
p.pose.position.y = 0.0
p.pose.position.z = 0.22
q = quaternion_from_euler(0.0, 0.0, numpy.deg2rad(90.0))
p.pose.orientation = Quaternion(*q)
# Table size from ~/.gazebo/models/table/model.sdf, using the values
# for the surface link.
self._scene.add_box(name, p, (0.5, 0.4, 0.02))
return p.pose
def _add_grasp_block_(self, name):
p = PoseStamped()
p.header.frame_id = self._robot.get_planning_frame()
p.header.stamp = rospy.Time.now()
p.pose.position.x = 0.25
p.pose.position.y = 0.05
p.pose.position.z = 0.32
q = quaternion_from_euler(0.0, 0.0, 0.0)
p.pose.orientation = Quaternion(*q)
# Coke can size from ~/.gazebo/models/coke_can/meshes/coke_can.dae,
# using the measure tape tool from meshlab.
# The box is the bounding box of the coke cylinder.
# The values are taken from the cylinder base diameter and height.
self._scene.add_box(name, p, (0.03, 0.03, 0.09))
return p.pose
def publish_pose(pose, stamp=None, frame_id='camera'):
msg = geom_msg.PoseStamped();
msg.header.frame_id = frame_id
msg.header.stamp = stamp if stamp is not None else rospy.Time.now()
tvec = pose.tvec
x,y,z,w = pose.quat.to_xyzw()
msg.pose.position = geom_msg.Point(tvec[0],tvec[1],tvec[2])
msg.pose.orientation = geom_msg.Quaternion(x,y,z,w)
_publish_pose(msg)
def rtk_position_to_numpy(msg):
if isinstance(msg, Odometry):
p = msg.pose.pose.position
return np.array([p.x, p.y, p.z])
elif isinstance(msg, PoseStamped) or isinstance(msg, NavSatFix):
p = msg.pose.position
return np.array([p.x, p.y, p.z])
else:
raise ValueError
def rtk_orientation_to_numpy(msg):
if isinstance(msg, Odometry):
p = msg.pose.pose.orientation
return np.array([p.x, p.y, p.z, p.w])
elif isinstance(msg, PoseStamped) or isinstance(msg, NavSatFix):
p = msg.pose.orientation
return np.array([p.x, p.y, p.z, p.w])
else:
raise ValueError
def pose_to_list(pose):
if isinstance(pose, PoseStamped):
plist = [[pose.pose.position.x,
pose.pose.position.y,
pose.pose.position.z],
[pose.pose.orientation.x,
pose.pose.orientation.y,
pose.pose.orientation.z,
pose.pose.orientation.w]]
return plist
raise TypeError("ROSBridge.pose_to_list only accepts Path, got {}".format(type(pose)))
def path_last_point_to_numpy(path):
if isinstance(path, Path):
path = path.poses[-1]
if isinstance(path, PoseStamped):
return ROSBridge.pose_to_list(path)
raise TypeError("ROSBridge.path_last_point_to_numpy only accepts Path or PoseStamped, got {}".format(type(path)))
def subscribePose():
rospy.Subscriber('/initialpose', PoseWithCovarianceStamped, getPoseData)
# rospy.Subscriber('/move_base_simple/goal', PoseStamped, getPoseData)
global background
def __init__(self):
self.target_pose = PoseStamped()
self.target_velocity = Twist()
self.aim = Pose()
self.kick_power = 0.0
self.dribble_power = 0.0
self.velocity_control_enable = False
self.chip_enable = False
self.navigation_enable = True
self._MAX_KICK_POWER = 8.0
self._MAX_DRIBBLE_POWER = 8.0
def moveGripperTo(self, position, rollpitchyaw=[-np.pi, 0.0, 0.0], timeout=1, useInitGuess=False, wait=False, rightArm=True, ret=False):
# Move using IK and joint trajectory controller
# Attach new pose to a frame
poseData = list(position) + list(rollpitchyaw)
frameData = PyKDL.Frame()
poseFrame = dh.array2KDLframe(poseData)
poseFrame = dh.frameConversion(poseFrame, frameData)
pose = dh.KDLframe2Pose(poseFrame)
# Create a PoseStamped message and perform transformation to given frame
ps = PoseStamped()
ps.header.frame_id = self.frame
ps.pose.position = pose.position
ps.pose.orientation = pose.orientation
ps = self.tf.transformPose(self.frame, ps)
# Perform IK
if rightArm:
ikGoal = self.rightArmKdl.inverse(ps.pose, q_guess=self.initRightJointGuess, min_joints=self.rightJointLimitsMin, max_joints=self.rightJointLimitsMax)
# ikGoal = self.rightArmKdl.inverse(ps.pose, q_guess=(self.initRightJointGuess if useInitGuess or self.currentRightJointPositions is None else self.currentRightJointPositions), min_joints=self.rightJointLimitsMin, max_joints=self.rightJointLimitsMax)
else:
ikGoal = self.leftArmKdl.inverse(ps.pose, q_guess=self.initLeftJointGuess, min_joints=self.leftJointLimitsMin, max_joints=self.leftJointLimitsMax)
# ikGoal = self.leftArmKdl.inverse(ps.pose, q_guess=(self.initLeftJointGuess if useInitGuess or self.currentLeftJointPositions is None else self.currentLeftJointPositions), min_joints=self.leftJointLimitsMin, max_joints=self.leftJointLimitsMax)
if ikGoal is not None:
if not ret:
self.moveToJointAngles(ikGoal, timeout=timeout, wait=wait, rightArm=rightArm)
else:
return ikGoal
else:
print 'IK failed'
def ref_cb(msg):
global odom
vehicle_pub.publish(PoseStamped(pose=msg.pose.pose, header=msg.header))
odom = msg
def __init__(self):
NaoqiNode.__init__(self, 'naoqi_moveto_listener')
self.connectNaoQi()
self.listener = tf.TransformListener()
self.goal_sub = rospy.Subscriber('move_base_simple/goal', PoseStamped, self.goalCB)
self.cvel_sub = rospy.Subscriber('cmd_vel', Twist, self.cvelCB)
# (re-) connect to NaoQI:
def get_position(self):
""" Where are we... """
world = self.zarj.transform.lookup_transform('world',
self.zarj.walk.lfname,
rospy.Time())
chest_position = self.zarj.transform.lookup_transform(
'pelvis', 'torso', rospy.Time()).transform
pose = PoseStamped()
pose.header.frame_id = 'pelvis'
pose.header.stamp = rospy.Time()
pose.pose.position = world.transform.translation
pose.pose.orientation = chest_position.rotation
return pose
def turn_body_to(self, angle, wait=True):
"""
Turn the torso to a given angle as related to the pelvis
Angles smaller than 110 or larger than 250 are unstable
"""
log("Turn body to {} degrees".format(angle))
chest_position = self.zarj.transform.lookup_transform(
'pelvis', 'torso', rospy.Time()).transform
msg = ChestTrajectoryRosMessage()
msg.unique_id = self.zarj.uid
msg.execution_mode = msg.OVERRIDE
euler = euler_from_quaternion((chest_position.rotation.w,
chest_position.rotation.x,
chest_position.rotation.y,
chest_position.rotation.z))
qua = quaternion_from_euler(radians(-angle-180), euler[1], euler[2])
pose = PoseStamped()
pose.pose.orientation = Quaternion(qua[1], qua[2], qua[3], qua[0])
pose.header.frame_id = 'pelvis'
pose.header.stamp = rospy.Time()
result = self.zarj.transform.tf_buffer.transform(pose, 'world')
point = SO3TrajectoryPointRosMessage()
point.time = 1.0
point.orientation = result.pose.orientation
point.angular_velocity = Vector3(0.0, 0.0, 0.0)
msg.taskspace_trajectory_points = [point]
self.chest_publisher.publish(msg)
if wait:
rospy.sleep(point.time + 0.1)