def ik_solve(limb, pos, orient):
ns = "ExternalTools/" + limb + "/PositionKinematicsNode/IKService"
iksvc = rospy.ServiceProxy(ns, SolvePositionIK)
ikreq = SolvePositionIKRequest()
hdr = Header(stamp=rospy.Time.now(), frame_id='base')
poses = {
str(limb): PoseStamped(header=hdr, pose=Pose(position=pos, orientation=orient))}
ikreq.pose_stamp.append(poses[limb])
try:
rospy.wait_for_service(ns, 5.0)
resp = iksvc(ikreq)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
return 1
if resp.isValid[0]:
limb_joints = dict(zip(resp.joints[0].name, resp.joints[0].position))
return limb_joints
else:
return Errors.RaiseNotReachable(pos)
python类Header()的实例源码
def __init__(self):
#init code
rospy.init_node("robotGame")
self.currentDist = 1
self.previousDist = 1
self.reached = False
self.tf = TransformListener()
self.left_joint_names = ['yumi_joint_1_l', 'yumi_joint_2_l', 'yumi_joint_3_l', 'yumi_joint_4_l', 'yumi_joint_5_l', 'yumi_joint_6_l', 'yumi_joint_7_l', 'gripper_l_joint']
self.right_joint_names = ['yumi_joint_1_r', 'yumi_joint_2_r', 'yumi_joint_3_r', 'yumi_joint_4_r', 'yumi_joint_5_r', 'yumi_joint_6_r', 'yumi_joint_7_r', 'gripper_r_joint']
self.left_positions = [-2.01081820427463881, 1.4283937236421274, -1.3593418228836045, -0.19315625641494183, 1.7016501799872579, 0.6573540231496411, 3.404315594906305, 0.0]
self.right_positions = [0.01081820427463881, 2.4283937236421274, 0.3593418228836045, -0.19315625641494183, 1.7016501799872579, 0.6573540231496411, 3.404315594906305, 1.8145107750466565]
self.rjv = []
self.ljv = []
self.pub = rospy.Publisher('my_joint_states', JointState, queue_size=1)
self.js = JointState()
self.js.header = Header()
self.js.name = self.left_joint_names + self.right_joint_names
self.js.velocity = []
self.js.effort = []
self.sub = rospy.Subscriber('/joint_states', JointState, self.jsCB)
self.destPos = np.random.uniform(0,0.25, size =(3))
self.reset()
def publish_interframe_motion(self, last_features, new_features, status, err):
self.good_old = last_features[(status == 1) & (err < 12.0)]
self.good_new = new_features[(status == 1) & (err < 12.0)]
# TODO: clean up these features before publishing
self.pub_keypoint_motion.publish(
header=Header(
stamp=rospy.Time.now(), # TODO: use camera image time
frame_id='tango_camera_2d'
),
prev_x=self.good_old[:, 0],
prev_y=self.good_old[:, 1],
cur_x=self.good_new[:, 0],
cur_y=self.good_new[:, 1]
)
def publish_interframe_motion(self, last_features, new_features, status, err):
self.good_old = last_features[(status == 1) & (err < 12.0)]
self.good_new = new_features[(status == 1) & (err < 12.0)]
# TODO: clean up these features before publishing
self.pub_keypoint_motion.publish(
header=Header(
stamp=rospy.Time.now(), # TODO: use camera image time
frame_id='tango_camera_2d'
),
prev_x=self.good_old[:, 0],
prev_y=self.good_old[:, 1],
cur_x=self.good_new[:, 0],
cur_y=self.good_new[:, 1]
)
def __get_point_msg(cls, data, frame):
"""
Deserializes a location to a message of type PointStamped.
Args:
data: A dictionary containing the location.
frame: Frame for the point.
Returns:
A message of type PointStamped with the location.
"""
header = Header()
header.stamp = rospy.get_rostime()
header.frame_id = frame
msg = GeoPointStamped()
msg.header = header
msg.position.latitude = data["latitude"]
msg.position.longitude = data["longitude"]
return msg
def compute_fk_client(self, group, joint_values, links):
rospy.wait_for_service('compute_fk')
try:
compute_fk = rospy.ServiceProxy('compute_fk', GetPositionFK)
header = Header()
header.stamp = rospy.Time.now()
header.frame_id = group.get_pose_reference_frame()
rs = RobotState()
rs.joint_state.header = header
rs.joint_state.name = group.get_active_joints()
rs.joint_state.position = joint_values
res = compute_fk(header, links, rs)
return res.pose_stamped
except rospy.ServiceException, e:
print "Service call failed: %s" % e
def process_radar_tracks(msg):
assert msg._md5sum == '6a2de2f790cb8bb0e149d45d297462f8'
tracks = RadarObservation.from_msg(msg)
num_tracks = len(tracks)
acc=[]
cloud = np.zeros([num_tracks, 3], dtype=np.float32)
for i, track in enumerate(tracks):
cloud[i] = [track.x, track.y, track.z] - np.array(RADAR_TO_LIDAR)
if np.abs(track.y) < 2:
#acc.append(track.x)
#acc.append(track.power)
print track.vx*3.7, track.vy*3.7
#print x, y, z
#print acc #msg.header.stamp
header = Header()
header.stamp = msg.header.stamp
header.frame_id = 'velodyne'
cloud_msg = pc2.create_cloud_xyz32(header, cloud)
cloud_msg.width = 1
cloud_msg.height = num_tracks
rospy.Publisher('radar_points', PointCloud2, queue_size=1).publish(cloud_msg)
def publish():
header = Header()
header.stamp = rospy.Time.now()
header.frame_id = 'map'
for role, robot_id in WorldModel.assignments.items():
if robot_id is not None:
command = WorldModel.commands[role]
if command.velocity_control_enable:
msg = TwistStamped()
msg.header = header
msg.twist = command.target_velocity
pubs_velocity[robot_id].publish(msg)
else:
send_pose = WorldModel.tf_listener.transformPose("map",command.target_pose)
send_pose.header.stamp = rospy.Time.now()
pubs_position[robot_id].publish(send_pose)
pubs_kick_velocity[robot_id].publish(Float32(command.kick_power))
status = AIStatus()
# TODO(Asit) use navigation_enable instead avoidBall.
status.avoidBall = command.navigation_enable
status.do_chip = command.chip_enable
status.dribble_power = command.dribble_power
pubs_ai_status[robot_id].publish(status)
command.reset_adjustments()
def test_import_absolute_msg(self):
print_importers()
# Verify that files exists and are importable
import std_msgs.msg as std_msgs
self.assert_std_message_classes(std_msgs.Bool, std_msgs.Header)
def test_import_class_from_absolute_msg(self):
"""Verify that"""
print_importers()
# Verify that files exists and are importable
from std_msgs.msg import Bool, Header
self.assert_std_message_classes(Bool, Header)
def test_double_import_uses_cache(self):
print_importers()
import std_msgs.msg as std_msgs
self.assert_std_message_classes(std_msgs.Bool, std_msgs.Header)
import std_msgs.msg as std_msgs2
self.assertTrue(std_msgs == std_msgs2)
def test_import_absolute_msg(self):
print_importers()
# Verify that files exists and are importable
import std_msgs.msg as std_msgs
print_importers_of(std_msgs)
self.assert_std_message_classes(std_msgs.Bool, std_msgs.Header)
def test_import_class_from_absolute_msg(self):
"""Verify that"""
print_importers()
# Verify that files exists and are importable
from std_msgs.msg import Bool, Header
self.assert_std_message_classes(Bool, Header)
baxter_continuous_scan.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def _vector_to(self, vector, to='base'):
h = Header()
h.stamp = rospy.Time(0)
h.frame_id = '{}_gripper'.format(self.limb_name)
v = Vector3(*vector)
v_base = self.tl.transformVector3(to,
Vector3Stamped(h, v)).vector
v_cartesian = [v_base.x, v_base.y, v_base.z, 0, 0, 0]
return v_cartesian
baxter_continuous_scan.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def pick(self, pose, direction=(0, 0, 1), distance=0.1):
pregrasp_pose = self.translate(pose, direction, distance)
while True:
try:
#stamped_pose = PoseStamped( Header(0, rospy.Time(0), n.robot.base), pose)
self.move_ik(pregrasp_pose)
break
except AttributeError:
print("can't find valid pose for gripper cause I'm dumb")
return False
rospy.sleep(0.5)
# We want to block end effector opening so that the next
# movement happens with the gripper fully opened.
#self.gripper.open()
while True:
#rospy.loginfo("Going down to pick (at {})".format(self.tip.max()))
if(not self.sensors_updated()):
return
if self.tip.max() > 2000:
break
else:
scaled_direction = (di / 100 for di in direction)
#print("Scaled direction: ", scaled_direction)
v_cartesian = self._vector_to(scaled_direction)
v_cartesian[2] = -.01
#print("cartesian: ", v_cartesian)
v_joint = self.compute_joint_velocities(v_cartesian)
#print("joint : ", v_joint)
self.limb.set_joint_velocities(v_joint)
rospy.loginfo('Went down!')
stacking_blocks.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def _vector_to(self, vector, to='base'):
h = Header()
h.stamp = rospy.Time(0)
h.frame_id = '{}_gripper'.format(self.limb_name)
v = Vector3(*vector)
v_base = self.tl.transformVector3(to,
Vector3Stamped(h, v)).vector
v_cartesian = [v_base.x, v_base.y, v_base.z, 0, 0, 0]
return v_cartesian
baxter_cube_calibration.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def _vector_to(self, vector, to='base'):
h = Header()
h.stamp = rospy.Time(0)
h.frame_id = '{}_gripper'.format(self.limb_name)
v = Vector3(*vector)
v_base = self.tl.transformVector3(to,
Vector3Stamped(h, v)).vector
v_cartesian = [v_base.x, v_base.y, v_base.z, 0, 0, 0]
return v_cartesian
safety_stop.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def _transform_vector(self, v, mode='speed', to='base'):
h = Header()
h.stamp = rospy.Time(0)
h.frame_id = '{}_gripper'.format(self.bx.limb_name)
v = Vector3(*v)
v_base = self.tl.transformVector3(to,
Vector3Stamped(h, v)).vector
if mode == 'speed':
v_cartesian = np.array([0, 0, 0, v_base.x, v_base.y, v_base.z])
elif mode == 'direction':
v_cartesian = np.array([v_base.x, v_base.y, v_base.z, 0, 0, 0])
return v_cartesian
baxter_scan_object.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def _vector_to(self, vector, to='base'):
h = Header()
h.stamp = rospy.Time(0)
h.frame_id = '{}_gripper'.format(self.limb_name)
v = Vector3(*vector)
v_base = self.tl.transformVector3(to,
Vector3Stamped(h, v)).vector
v_cartesian = [v_base.x, v_base.y, v_base.z, 0, 0, 0]
return v_cartesian
def move_calib_position(self):
# move arm to the calibration position
self.calib_pose = PoseStamped(
Header(0, rospy.Time(0), self.robot.base),
Pose(Point(0.338520675898,-0.175860479474,0.0356775075197),
Quaternion(-0.0183493755758,0.708424150944, 0.704712092876, 0.0343413949013)))
self.robot.move_ik(self.calib_pose)
def move_ik(self, stamped_pose):
"""Take a PoseStamped and move the arm there.
"""
action_address = ('/' + self.robot_type + '_driver/pose_action/tool_pose')
self.client = actionlib.SimpleActionClient(
action_address,
kinova_msgs.msg.ArmPoseAction)
if not isinstance(stamped_pose, PoseStamped):
raise TypeError("No duck typing here? :(")
pose = stamped_pose.pose
position, orientation = pose.position, pose.orientation
# Send a cartesian goal to the action server. Adapted from kinova_demo
rospy.loginfo("Waiting for SimpleAction server")
self.client.wait_for_server()
goal = kinova_msgs.msg.ArmPoseGoal()
goal.pose.header = Header(frame_id=(self.robot_type + '_link_base'))
goal.pose.pose.position = position
goal.pose.pose.orientation = orientation
rospy.loginfo("Sending goal")
print goal
self.client.send_goal(goal)
# rospy.loginfo("Waiting for up to {} s for result".format(t))
if self.client.wait_for_result(rospy.Duration(100)):
rospy.loginfo("Action succeeded")
print self.client.get_result()
return self.client.get_result()
else:
self.client.cancel_all_goals()
rospy.loginfo(' the cartesian action timed-out')
return None
def make_header(frame_id, stamp=None):
''' Creates a Header object for stamped ROS objects '''
if stamp == None:
stamp = rospy.Time.now()
header = Header()
header.stamp = stamp
header.frame_id = frame_id
return header
def __get_flyzone(cls, data, frame):
"""
Deserializes flight boundary data into a FlyZoneArray message.
Args:
data: List of dictionaries.
frame: Frame id for the boundaries.
Returns:
A FlyzoneArray message type which contains an array of FlyZone
messages, which contains a polygon for the boundary, a max
altitude and a min altitude.
"""
# Generate header for all zones.
header = Header()
header.stamp = rospy.get_rostime()
header.frame_id = frame
flyzones = FlyZoneArray()
for zone in data:
flyzone = FlyZone()
flyzone.zone.header = header
flyzone.max_alt = feet_to_meters(zone["altitude_msl_max"])
flyzone.min_alt = feet_to_meters(zone["altitude_msl_min"])
# Change boundary points to ros message of type polygon.
for waypoint in zone["boundary_pts"]:
point = GeoPoint()
point.latitude = waypoint["latitude"]
point.longitude = waypoint["longitude"]
flyzone.zone.polygon.points.append(point)
flyzones.flyzones.append(flyzone)
return flyzones
def __get_waypoints(cls, data, frame):
"""
Deserializes a list of waypoints into a marker message.
Args:
data: List of dictionaries corresponding to waypoints.
frame: Frame of the markers.
Returns:
A marker message of type Points, with a list of points in order
corresponding to each waypoint.
"""
header = Header()
header.stamp = rospy.get_rostime()
header.frame_id = frame
waypoints = WayPoints()
waypoints.header = header
# Ensure there is no rotation by setting w to 1.
for point in data:
waypoint = GeoPoint()
altitude = feet_to_meters(point["altitude_msl"])
waypoint.latitude = point["latitude"]
waypoint.longitude = point["longitude"]
waypoint.altitude = altitude
waypoints.waypoints.append(waypoint)
return waypoints
def __get_search_grid(cls, data, frame):
"""
Deserializes a the search grid into a polygon message.
Args:
data: List of dictionaries corresponding to the search grid points.
frame: Frame for the polygon.
Returns:
Message of type PolygonStamped with the bounds of the search grid.
"""
header = Header()
header.stamp = rospy.get_rostime()
header.frame_id = frame
search_grid = GeoPolygonStamped()
search_grid.header = header
for point in data:
boundary_pnt = GeoPoint()
altitude = feet_to_meters(point["altitude_msl"])
boundary_pnt.latitude = point["latitude"]
boundary_pnt.longitude = point["longitude"]
boundary_pnt.altitude = altitude
search_grid.polygon.points.append(boundary_pnt)
return search_grid
def baxter_ik_move(self, rpy_pose):
# quaternion_pose = conversions.list_to_pose_stamped(rpy_pose, "base")
quaternion_pose = convert_pose_to_msg(rpy_pose, "base")
node = "ExternalTools/" + self.limb + "/PositionKinematicsNode/IKService"
ik_service = rospy.ServiceProxy(node, SolvePositionIK)
ik_request = SolvePositionIKRequest()
hdr = Header(stamp=rospy.Time.now(), frame_id="base")
ik_request.pose_stamp.append(quaternion_pose)
try:
rospy.wait_for_service(node, 5.0)
ik_response = ik_service(ik_request)
except (rospy.ServiceException, rospy.ROSException), error_message:
rospy.logerr("Service request failed: %r" % (error_message,))
print "ERROR - baxter_ik_move - Failed to append pose"
return 1
if (ik_response.isValid[0]):
# convert response to joint position control dictionary
limb_joints = dict(zip(ik_response.joints[0].name, ik_response.joints[0].position))
# move limb
self.limb_interface.move_to_joint_positions(limb_joints)
else:
print "ERROR - baxter_ik_move - No valid joint configuration found"
return 2
self.getPose() #Store current pose into self.pose
print "Move Executed"
return -1
#Because we can't get full moveit_commander on the raspberry pi due to memory limitations, rewritten implementation of the required conversion function
def forward_kinematic(self, joint_state, base='base', links=None):
def compute_fk_client():
try:
header = Header()
header.stamp = rospy.Time.now()
header.frame_id = self.prefix + '/base'
rs = RobotState()
rs.joint_state = joint_state
res = self.compute_fk(header, links, rs)
return res.pose_stamped
except rospy.ServiceException, e:
print "Service call failed: %s" % e
# in case of troubles return 0 pose stamped
return []
if links is None:
links = self.end_effectors['whole_body']
if type(links) is not list:
if links == "all":
links = self.get_link_names('whole_body')
else:
links = [links]
# check that the base is in links
if base != 'base' and base not in links:
links.append(base)
pose_stamped_list = compute_fk_client()
if not pose_stamped_list:
return {}
# transform it in a dict of poses
pose_dict = {}
if base != 'base':
tr_base = transformations.pose_to_list(pose_stamped_list[links.index(base)].pose)
inv_base = transformations.inverse_transform(tr_base)
for i in range(len(links)):
if links[i] != base:
tr = transformations.pose_to_list(pose_stamped_list[i].pose)
pose_dict[links[i]] = transformations.multiply_transform(inv_base, tr)
else:
for i in range(len(links)):
pose_dict[links[i]] = transformations.pose_to_list(pose_stamped_list[i].pose)
return pose_dict
def get_ik_joints(target_x, target_y, initial_z, target_z, target_theta, n_steps):
ns = "ExternalTools/left/PositionKinematicsNode/IKService"
iksvc = rospy.ServiceProxy(ns, SolvePositionIK)
ikreq = SolvePositionIKRequest(seed_mode=SolvePositionIKRequest.SEED_CURRENT)
hdr = Header(stamp=rospy.Time.now(), frame_id='base')
qx = np.sin(target_theta * 0.5)
qy = np.cos(target_theta * 0.5)
for z in np.arange(initial_z, target_z, (target_z-initial_z)/n_steps):
pose = PoseStamped(
header=hdr,
pose=Pose(
position=Point( x=target_x, y=target_y, z=z, ),
orientation=Quaternion( x=qx, y=qy, z=0., w=0., ),
),
)
ikreq.pose_stamp.append(pose)
try:
rospy.wait_for_service(ns, 5.0)
resp = iksvc(ikreq)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
return 1
return [j for v, j in zip(resp.isValid, resp.joints) if v]
def make_header(frame_id, stamp=None):
if stamp == None:
stamp = rospy.Time.now()
header = Header()
header.stamp = stamp
header.frame_id = frame_id
return header
def make_header(frame_id, stamp=None):
if stamp == None:
stamp = rospy.Time.now()
header = Header()
header.stamp = stamp
header.frame_id = frame_id
return header