def rosmsg(self):
""":obj:`sensor_msgs.CamerInfo` : Returns ROS CamerInfo msg
"""
msg_header = Header()
msg_header.frame_id = self._frame
msg_roi = RegionOfInterest()
msg_roi.x_offset = 0
msg_roi.y_offset = 0
msg_roi.height = 0
msg_roi.width = 0
msg_roi.do_rectify = 0
msg = CameraInfo()
msg.header = msg_header
msg.height = self._height
msg.width = self._width
msg.distortion_model = 'plumb_bob'
msg.D = [0.0, 0.0, 0.0, 0.0, 0.0]
msg.K = [self._fx, 0.0, self._cx, 0.0, self._fy, self._cy, 0.0, 0.0, 1.0]
msg.R = [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0]
msg.P = [self._fx, 0.0, self._cx, 0.0, 0.0, self._fx, self._cy, 0.0, 0.0, 0.0, 1.0, 0.0]
msg.binning_x = 0
msg.binning_y = 0
msg.roi = msg_roi
return msg
python类Header()的实例源码
def handle_radar_msg(msg, dont_fuse):
assert msg._md5sum == '6a2de2f790cb8bb0e149d45d297462f8'
publish_rviz_topics = True
with g_fusion_lock:
# do we have any estimation?
if g_fusion.last_state_mean is not None:
pose = g_fusion.lidar_observation_function(g_fusion.last_state_mean)
observations = RadarObservation.from_msg(msg, RADAR_TO_LIDAR, 0.9115)
# find nearest observation to current object position estimation
distance_threshold = 4.4
nearest = None
nearest_dist = 1e9
for o in observations:
dist = [o.x - pose[0], o.y - pose[1], o.z - pose[2]]
dist = np.sqrt(np.array(dist).dot(dist))
if dist < nearest_dist and dist < distance_threshold:
nearest_dist = dist
nearest = o
if nearest is not None:
# FUSION
if not dont_fuse:
g_fusion.filter(nearest)
if publish_rviz_topics:
header = Header()
header.frame_id = '/velodyne'
header.stamp = rospy.Time.now()
point = np.array([[nearest.x, nearest.y, nearest.z]], dtype=np.float32)
rospy.Publisher(name='obj_radar_nearest',
data_class=PointCloud2,
queue_size=1).publish(pc2.create_cloud_xyz32(header, point))
#pose = g_fusion.lidar_observation_function(g_fusion.last_state_mean)
#br = ros_tf.TransformBroadcaster()
#br.sendTransform(tuple(pose), (0,0,0,1), rospy.Time.now(), 'car_fuse_centroid', 'velodyne')
# if last_known_box_size is not None:
# # bounding box
# marker = Marker()
# marker.header.frame_id = "car_fuse_centroid"
# marker.header.stamp = rospy.Time.now()
# marker.type = Marker.CUBE
# marker.action = Marker.ADD
# marker.scale.x = last_known_box_size[2]
# marker.scale.y = last_known_box_size[1]
# marker.scale.z = last_known_box_size[0]
# marker.color = ColorRGBA(r=1., g=1., b=0., a=0.5)
# marker.lifetime = rospy.Duration()
# pub = rospy.Publisher("car_fuse_bbox", Marker, queue_size=10)
# pub.publish(marker)
def main_fcn():
pub = rospy.Publisher('joint_states',JointState,queue_size = 10)
pub2 = rospy.Publisher('read_angles',Int32,queue_size = 10)
pub3 = rospy.Publisher('uarm_status',String,queue_size = 100)
rospy.init_node('display',anonymous = True)
rate = rospy.Rate(20)
joint_state_send = JointState()
joint_state_send.header = Header()
joint_state_send.name = ['base_to_base_rot','base_rot_to_link_1','link_1_to_link_2' ,'link_2_to_link_3','link_3_to_link_end']
joint_state_send.name = joint_state_send.name+ ['link_1_to_add_1','link_2_to_add_4','link_3_to_add_2','base_rot_to_add_3','link_2_to_add_5']
angle = {}
trigger = 1
while not rospy.is_shutdown():
joint_state_send.header.stamp = rospy.Time.now()
pub2.publish(1)
if trigger == 1:
pub3.publish("detach")
time.sleep(1)
trigger = 0
angle['s1'] = rospy.get_param('servo_1')*math.pi/180
angle['s2'] = rospy.get_param('servo_2')*math.pi/180
angle['s3'] = rospy.get_param('servo_3')*math.pi/180
angle['s4'] = rospy.get_param('servo_4')*math.pi/180
joint_state_send.position = [angle['s1'],angle['s2'],-angle['s2']-angle['s3'],angle['s3'],angle['s4']]
joint_state_send.position = joint_state_send.position + [-angle['s2']-angle['s3'],angle['s3'],PI/2-angle['s3']]
joint_state_send.position = joint_state_send.position + [angle['s2']-PI,angle['s2']+angle['s3']-PI/2]
joint_state_send.velocity = [0]
joint_state_send.effort = []
pub.publish(joint_state_send)
rate.sleep()
baxter_continuous_scan.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def move_cup(self):
self.gripper.close()
rotate = long(random.randrange(20, 180))*(3.14156)/(180)
print("rotate: " + str(rotate))
t = self.tl.getLatestCommonTime("/root", "/right_gripper")
cur_position, quaternion = self.tl.lookupTransform("/root",
"/right_gripper",
t)
quaternion = [1,0,0,0]
requrd_rot = (0,0,rotate) # in radians
requrd_trans = (0,0,0)
rotated_pose = self.getOffsetPoses(cur_position, quaternion, requrd_rot, requrd_trans)
trans_5= tuple(rotated_pose[:3])
quat_5= tuple(rotated_pose[3:])
br = tf.TransformBroadcaster()
br.sendTransform(trans_5,
quat_5,
rospy.Time.now(),
"pick_pose",
"/root")
pick_pose = PoseStamped(Header(0, rospy.Time(0), n.robot.base),
Pose(Point(*trans_5),
Quaternion(*quat_5)))
self.move_ik(pick_pose)
self.gripper.open()
t = self.tl.getLatestCommonTime("/root", "/right_gripper")
cur_position, quaternion = self.tl.lookupTransform("/root",
"/right_gripper",
t)
cur_position = list(cur_position)
cur_position[2] = cur_position[2] + 0.08
pose = Pose(Point(*cur_position),
Quaternion(*quaternion))
while True:
try:
stamped_pose = PoseStamped( Header(0, rospy.Time(0), n.robot.base), pose)
self.move_ik(stamped_pose)
break
except AttributeError:
print("can't find valid pose for gripper cause I'm dumb")
return False
rospy.sleep(2)
home = PoseStamped(
Header(0, rospy.Time(0), n.robot.base),
Pose(Point(0.60558, -0.24686, 0.093535),
Quaternion(0.99897, -0.034828, 0.027217, 0.010557)))
self.move_ik(home)
rospy.sleep(2)
safety_stop.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def control_from_sensor_values(self):
log_values = np.log(self.bx.inside)
# Match which side is which. Ideally, if the sign of the diff
# matches whether the gripper needs to move towards the
# positive or negative part of the y axis in left_gripper.
# That is, we need left side - right side (using left/right
# like in l_gripper_{l, r}_finger_tip tfs)
# We want to take log(values) for a better behaved controller
inside_diff = log_values[7:] - log_values[:7]
scalar_diff = sum(inside_diff) / len(inside_diff)
# Take negative for one of the sides, so that angles should
# match for a parallel object in the gripper
l_angle, _ = np.polyfit(np.arange(7), log_values[:7], 1)
r_angle, _ = np.polyfit(np.arange(7), -log_values[7:], 1)
rospy.loginfo('Angle computed from l: {}'.format(np.rad2deg(l_angle)))
rospy.loginfo('Angle computed from r: {}'.format(np.rad2deg(r_angle)))
avg_angle = np.arctan((l_angle + r_angle) / 2.0)
# Let's get a frame by the middle of the end effector
# p = Point(0, 0, -0.05)
p = (0, 0, -0.05)
# Of course, tf uses (w, x, y, z), Quaternion uses x, y, z,
# w. However, tf.TransformBroadcaster().sendTransform uses the
# tf order.
# q = Quaternion(q[1], q[2], q[3], q[0])
q = tf.transformations.quaternion_about_axis(avg_angle, (1, 0, 0))
# We had a lot of trouble sending a transform (p, q) from
# left_gripper, and then asking for a point in the last frame
# in the tf coordinate. Timing issues that I couldn't
# solve. Instead, do it manually here:
m1 = tf.transformations.quaternion_matrix(q)
m1[:3, 3] = p
p = (0, scalar_diff / 100, 0.05)
m2 = np.eye(4)
m2[:3, 3] = p
m = m2.dot(m1)
# Extract pose now
p = Point(*m[:3, 3])
q = Quaternion(*tf.transformations.quaternion_from_matrix(m))
time = rospy.Time(0)
h = Header()
h.frame_id = 'left_gripper'
h.stamp = time
pose = Pose(p, q)
new_endpose = self.tl.transformPose('base', PoseStamped(h, pose))
self.bx.move_ik(new_endpose)
def __init__(self, robot, *robotargs):
super(PickAndPlaceNode, self).__init__('pp_node')
rospy.loginfo("PickAndPlaceNode")
_post_perceive_trans = defaultdict(lambda: self._pick)
_post_perceive_trans.update({'c': self._calibrate})
_preplace = defaultdict(lambda: self._preplace)
self.transition_table = {
# If calibration has already happened, allow skipping it
'initial': {'c': self._calibrate, 'q': self._perceive,
's': self._preplace},
'calibrate': {'q': self._perceive, 'c': self._calibrate},
'perceive': {'p': self._post_perceive, 'q': self._perceive, 's': self._stop_perceive, 'c': self._calibrate},
'post_perceive': _post_perceive_trans,
'postpick': {'1': self._level, '2': self._level, '9': self._level},
'level': _preplace,
'preplace': {'s': self._place},
'place': {'q': self._perceive, 'c': self._calibrate}
}
rospy.loginfo("PickAndPlaceNode1")
if callable(robot):
self.robot = robot(*robotargs)
else:
self.robot = robot
self.robot.level = 1
rospy.loginfo("PickAndPlaceNode2")
# Hardcoded place for now
self.place_pose = PoseStamped(
Header(0, rospy.Time(0), self.robot.base),
Pose(Point(0.526025806, 0.4780144, -0.161326153),
Quaternion(1, 0, 0, 0)))
self.tl = tf.TransformListener()
self.num_objects = 0
# Would this work too? Both tf and tf2 have (c) 2008...
# self.tf2 = tf2_ros.TransformListener()
self.n_objects_sub = rospy.Subscriber("/num_objects", Int64,
self.update_num_objects,
queue_size=1)
self.perception_pub = rospy.Publisher("/perception/enabled",
Bool,
queue_size=1)
self.alignment_pub = rospy.Publisher("/alignment/doit",
Bool,
queue_size=1)
self.br = tf.TransformBroadcaster()
rospy.loginfo("PickAndPlaceNode3")
self.int_marker_server = InteractiveMarkerServer('int_markers')
# Dict to map imarker names and their updated poses
self.int_markers = {}
# Ideally this call would be in a Factory/Metaclass/Parent
self.show_options()
self.perceive = False
# self.robot.home()
# self.move_calib_position()
def proc_imu(quat1, acc, gyro):
# New info:
# https://github.com/thalmiclabs/myo-bluetooth/blob/master/myohw.h#L292-L295
# Scale values for unpacking IMU data
# define MYOHW_ORIENTATION_SCALE 16384.0f
# See myohw_imu_data_t::orientation
# define MYOHW_ACCELEROMETER_SCALE 2048.0f
# See myohw_imu_data_t::accelerometer
# define MYOHW_GYROSCOPE_SCALE 16.0f
# See myohw_imu_data_t::gyroscope
if not any(quat1):
# If it's all 0's means we got no data, don't do anything
return
h = Header()
h.stamp = rospy.Time.now()
# frame_id is node name without /
h.frame_id = rospy.get_name()[1:]
# We currently don't know the covariance of the sensors with each other
cov = [0, 0, 0, 0, 0, 0, 0, 0, 0]
quat = Quaternion(quat1[0] / 16384.0,
quat1[1] / 16384.0,
quat1[2] / 16384.0,
quat1[3] / 16384.0)
# Normalize the quaternion and accelerometer values
quatNorm = sqrt(quat.x * quat.x + quat.y *
quat.y + quat.z * quat.z + quat.w * quat.w)
normQuat = Quaternion(quat.x / quatNorm,
quat.y / quatNorm,
quat.z / quatNorm,
quat.w / quatNorm)
normAcc = Vector3(acc[0] / 2048.0,
acc[1] / 2048.0,
acc[2] / 2048.0)
normGyro = Vector3(gyro[0] / 16.0, gyro[1] / 16.0, gyro[2] / 16.0)
imu = Imu(h, normQuat, cov, normGyro, cov, normAcc, cov)
imuPub.publish(imu)
roll, pitch, yaw = euler_from_quaternion([normQuat.x,
normQuat.y,
normQuat.z,
normQuat.w])
oriPub.publish(Vector3(roll, pitch, yaw))
oriDegPub.publish(Vector3(degrees(roll), degrees(pitch), degrees(yaw)))
posePub.publish( PoseStamped(h,Pose(Point(0.0,0.0,0.0),normQuat)) )
# Package the arm and x-axis direction into an Arm message
def from_dict(cls, data, frame, lifetime):
"""Deserializes obstacle data into two MarkerArrays.
Args:
data: A dictionary.
frame: Frame ID of every Marker.
lifetime: Lifetime of every Marker in seconds.
Returns:
Tuple of two visualization_msgs/MarkerArray, MarkerArray) tuple.
The first is of moving obstacles, and the latter is of stationary
obstacles.
"""
# Generate base header.
header = Header()
header.stamp = rospy.get_rostime()
header.frame_id = frame
# Parse moving obstacles, and populate markers with spheres.
moving_obstacles = GeoSphereArrayStamped()
moving_obstacles.header = header
if "moving_obstacles" in data:
for obj in data["moving_obstacles"]:
# Moving obstacles are spheres.
obstacle = GeoSphere()
# Set scale as radius.
obstacle.radius = feet_to_meters(obj["sphere_radius"])
obstacle.center.latitude = obj["latitude"]
obstacle.center.longitude = obj["longitude"]
obstacle.center.altitude = feet_to_meters(obj["altitude_msl"])
moving_obstacles.spheres.append(obstacle)
# Parse stationary obstacles, and populate markers with cylinders.
stationary_obstacles = GeoCylinderArrayStamped()
stationary_obstacles.header = header
if "stationary_obstacles" in data:
for obj in data["stationary_obstacles"]:
# Stationary obstacles are cylinders.
obstacle = GeoCylinder()
# Set scale to define size.
obstacle.radius = feet_to_meters(obj["cylinder_radius"])
obstacle.height = feet_to_meters(obj["cylinder_height"])
obstacle.center.latitude = obj["latitude"]
obstacle.center.longitude = obj["longitude"]
stationary_obstacles.cylinders.append(obstacle)
return (moving_obstacles, stationary_obstacles)
def get_ik_joints_linear(initial_x, initial_y, initial_z, initial_w2,
target_x, target_y, target_z, target_w2, n_steps):
ns = "ExternalTools/left/PositionKinematicsNode/IKService"
iksvc = rospy.ServiceProxy(ns, SolvePositionIK)
ikreq = SolvePositionIKRequest(seed_mode=SolvePositionIKRequest.SEED_CURRENT)
hdr = Header(stamp=rospy.Time.now(), frame_id='base')
# current_pose = arm.endpoint_pose()
x0 = initial_x
y0 = initial_y
z0 = initial_z
# linear interpolate between current pose and target pose
for i in xrange(n_steps):
t = (i + 1) * 1. / n_steps
x = (1. - t) * x0 + t * target_x
y = (1. - t) * y0 + t * target_y
z = (1. - t) * z0 + t * target_z
pose = PoseStamped(
header=hdr,
pose=Pose(
position=Point( x=x, y=y, z=z, ),
# endeffector pointing down
orientation=Quaternion( x=1., y=0., z=0., w=0., ),
),
)
ikreq.pose_stamp.append(pose)
try:
rospy.wait_for_service(ns, 5.0)
resp = iksvc(ikreq)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
return []
js = []
# control w2 separately from other joints
for i, (v, j) in enumerate(zip(resp.isValid, resp.joints)):
t = (i + 1) * 1. / n_steps
if v:
w2 = (1. - t) * initial_w2 + t * target_w2
j.position = j.position[:-1] + (w2,)
js.append(j)
return js