def robot_listener(self):
'''
rospy.wait_for_service('spawn')
spawner = rospy.ServiceProxy('spawn', turtlesim.srv.Spawn)
spawner(4, 2, 0, 'turtle2')
'''
robot_vel_pub = rospy.Publisher('robot_0/cmd_vel', geometry_msgs.msg.Twist, queue_size=1)
rate = rospy.Rate(10.0)
while not rospy.is_shutdown():
try:
(trans,rot) = self.listener.lookupTransform('/robot_3', '/robot_0', rospy.Time(0))
except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException):
continue
angular = 4 * math.atan2(trans[1], trans[0])
linear = 0.5 * math.sqrt(trans[0] ** 2 + trans[1] ** 2)
cmd = geometry_msgs.msg.Twist()
cmd.linear.x = linear
cmd.angular.z = angular
robot_vel_pub.publish(cmd)
rate.sleep()
python类ExtrapolationException()的实例源码
def printJointStates(self):
try:
# now = rospy.Time.now()
# self.tf.waitForTransform(self.frame, 'r_gripper_tool_frame', now, rospy.Duration(10.0))
self.tf.waitForTransform(self.frame, 'r_gripper_tool_frame', rospy.Time(), rospy.Duration(10.0))
currentRightPos, currentRightOrient = self.tf.lookupTransform(self.frame, 'r_gripper_tool_frame', rospy.Time(0))
msg = rospy.wait_for_message('/r_arm_controller/state', JointTrajectoryControllerState)
currentRightJointPositions = msg.actual.positions
print 'Right positions:', currentRightPos, currentRightOrient
print 'Right joint positions:', currentRightJointPositions
# now = rospy.Time.now()
# self.tf.waitForTransform(self.frame, 'l_gripper_tool_frame', now, rospy.Duration(10.0))
currentLeftPos, currentLeftOrient = self.tf.lookupTransform(self.frame, 'l_gripper_tool_frame', rospy.Time(0))
msg = rospy.wait_for_message('/l_arm_controller/state', JointTrajectoryControllerState)
currentLeftJointPositions = msg.actual.positions
print 'Left positions:', currentLeftPos, currentLeftOrient
print 'Left joint positions:', currentLeftJointPositions
# print 'Right gripper state:', rospy.wait_for_message('/r_gripper_controller/state', JointControllerState)
except tf.ExtrapolationException:
print 'No transformation available! Failing to record this time step.'
def markerCB(self, msg):
try:
self.marker_lock.acquire()
if not self.initialize_poses:
return
self.initial_poses = {}
for marker in msg.markers:
if marker.name.startswith("EE:goal_"):
# resolve tf
if marker.header.frame_id != self.frame_id:
ps = PoseStamped(header=marker.header, pose=marker.pose)
try:
transformed_pose = self.tf_listener.transformPose(self.frame_id, ps)
self.initial_poses[marker.name[3:]] = transformed_pose.pose
except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException, e):
rospy.logerr("tf error when resolving tf: %s" % e)
else:
self.initial_poses[marker.name[3:]] = marker.pose #tf should be resolved
finally:
self.marker_lock.release()
def goalCB(self, poseStamped):
# reset timestamp because of bug: https://github.com/ros/geometry/issues/82
poseStamped.header.stamp = rospy.Time(0)
try:
robotToTarget1 = self.listener.transformPose("/base_footprint", poseStamped)
except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException) as e:
rospy.logerr("Error while transforming pose: %s", str(e))
return
quat = robotToTarget1.pose.orientation
(roll,pitch,yaw) = euler_from_quaternion((quat.x, quat.y, quat.z, quat.w))
self.motionProxy.moveTo(robotToTarget1.pose.position.x, robotToTarget1.pose.position.y, yaw)
def transform_pose(self, pose):
try:
new_pose = self.listener.transformPose(self.frame_id, pose)
# now new_pose should be the pose of the tag in the odom_meas frame.
# Let's ignore the height, and rotations not about the z-axis
new_pose.pose.position.z = 0
except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException):
return None
return new_pose
def go_to_pose(self, name, T, timeout):
msg = convert_to_trans_message(T)
self.update_marker(T)
start_time = time.time()
done = False
while not done and not rospy.is_shutdown():
self.pub_cmd.publish(msg)
try:
(trans,rot) = self.listener.lookupTransform('/world_link','lwr_arm_7_link',
rospy.Time(0))
except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException):
print "TF Exception!"
continue
TR = numpy.dot(tf.transformations.translation_matrix(trans),
tf.transformations.quaternion_matrix(rot))
if (is_same(T, TR)):
print name + ": Reached desired pose"
done = True
if (time.time() - start_time > timeout) :
print name + ": Robot took too long to reach desired pose"
done = True
else:
rospy.sleep(0.1)
def goto_pose(self, name, T, timeout):
self.server.setPose("move_arm_marker", convert_to_message(T))
self.server.applyChanges()
self.pub_command.publish(convert_to_trans_message(T))
print 'Goal published'
start_time = time.time()
done = False
while not done and not rospy.is_shutdown():
self.mutex.acquire()
last_joint_state = deepcopy(self.joint_state)
self.mutex.release()
if not self.check_validity(last_joint_state):
print 'COLLISION!'
try:
(trans,rot) = self.listener.lookupTransform('/world_link','/lwr_arm_7_link',
rospy.Time(0))
except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException):
print "TF Exception!"
continue
TR = numpy.dot(tf.transformations.translation_matrix(trans),
tf.transformations.quaternion_matrix(rot))
if (is_same(T, TR)):
print name + ": Reached goal"
done = True
if (time.time() - start_time > timeout) :
done = True
print name + ": Robot took too long to reach goal. Grader timed out"
else:
rospy.sleep(0.05)
def establish_tfs(self, relations):
"""
Perform a one-time look up of all the requested
*static* relations between frames (available via /tf)
"""
# Init node and tf listener
rospy.init_node(self.__class__.__name__, disable_signals=True)
tf_listener = tf.TransformListener()
# Create tf decoder
tf_dec = TfDecoderAndPublisher(channel='/tf')
# Establish tf relations
print('{} :: Establishing tfs from ROSBag'.format(self.__class__.__name__))
for self.idx, (channel, msg, t) in enumerate(self.log.read_messages(topics='/tf')):
tf_dec.decode(msg)
for (from_tf, to_tf) in relations:
# If the relations have been added already, skip
if (from_tf, to_tf) in self.relations_map_:
continue
# Check if the frames exist yet
if not tf_listener.frameExists(from_tf) or not tf_listener.frameExists(to_tf):
continue
# Retrieve the transform with common time
try:
tcommon = tf_listener.getLatestCommonTime(from_tf, to_tf)
(trans,rot) = tf_listener.lookupTransform(from_tf, to_tf, tcommon)
self.relations_map_[(from_tf,to_tf)] = RigidTransform(tvec=trans, xyzw=rot)
# print('\tSuccessfully received transform: {:} => {:} {:}'
# .format(from_tf, to_tf, self.relations_map_[(from_tf,to_tf)]))
except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException) as e:
# print e
pass
# Finish up once we've established all the requested tfs
if len(self.relations_map_) == len(relations):
break
try:
tfs = [self.relations_map_[(from_tf,to_tf)] for (from_tf, to_tf) in relations]
for (from_tf, to_tf) in relations:
print('\tSuccessfully received transform:\n\t\t {:} => {:} {:}'
.format(from_tf, to_tf, self.relations_map_[(from_tf,to_tf)]))
except:
raise RuntimeError('Error concerning tf lookup')
print('{} :: Established {:} relations\n'.format(self.__class__.__name__, len(tfs)))
return tfs