def __init__(self):
self.initial_poses = {}
self.planning_groups_tips = {}
self.tf_listener = tf.TransformListener()
self.marker_lock = threading.Lock()
self.prev_time = rospy.Time.now()
self.counter = 0
self.history = StatusHistory(max_length=10)
self.pre_pose = PoseStamped()
self.pre_pose.pose.orientation.w = 1
self.current_planning_group_index = 0
self.current_eef_index = 0
self.initialize_poses = False
self.initialized = False
self.parseSRDF()
self.plan_group_pub = rospy.Publisher('/rviz/moveit/select_planning_group', String, queue_size=5)
self.updatePlanningGroup(0)
self.updatePoseTopic(0, False)
self.joy_pose_pub = rospy.Publisher("/joy_pose", PoseStamped, queue_size=1)
self.plan_pub = rospy.Publisher("/rviz/moveit/plan", Empty, queue_size=5)
self.execute_pub = rospy.Publisher("/rviz/moveit/execute", Empty, queue_size=5)
self.update_start_state_pub = rospy.Publisher("/rviz/moveit/update_start_state", Empty, queue_size=5)
self.update_goal_state_pub = rospy.Publisher("/rviz/moveit/update_goal_state", Empty, queue_size=5)
self.interactive_marker_sub = rospy.Subscriber("/rviz_moveit_motion_planning_display/robot_interaction_interactive_marker_topic/update_full",
InteractiveMarkerInit,
self.markerCB, queue_size=1)
self.sub = rospy.Subscriber("/joy", Joy, self.joyCB, queue_size=1)
python类PoseStamped()的实例源码
def __init__(self):
self.bridge = CvBridge()
self.x = 0.0
self.y = 0.0
self.ang = 0.0
self.pos_sub = rospy.Subscriber("/slam_out_pose", PoseStamped, self.pos_callback)
self.image_sub = rospy.Subscriber("/map_image/full",Image,self.img_callback)
baxter_continuous_scan.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def pick(self, pose, direction=(0, 0, 1), distance=0.1):
pregrasp_pose = self.translate(pose, direction, distance)
while True:
try:
#stamped_pose = PoseStamped( Header(0, rospy.Time(0), n.robot.base), pose)
self.move_ik(pregrasp_pose)
break
except AttributeError:
print("can't find valid pose for gripper cause I'm dumb")
return False
rospy.sleep(0.5)
# We want to block end effector opening so that the next
# movement happens with the gripper fully opened.
#self.gripper.open()
while True:
#rospy.loginfo("Going down to pick (at {})".format(self.tip.max()))
if(not self.sensors_updated()):
return
if self.tip.max() > 2000:
break
else:
scaled_direction = (di / 100 for di in direction)
#print("Scaled direction: ", scaled_direction)
v_cartesian = self._vector_to(scaled_direction)
v_cartesian[2] = -.01
#print("cartesian: ", v_cartesian)
v_joint = self.compute_joint_velocities(v_cartesian)
#print("joint : ", v_joint)
self.limb.set_joint_velocities(v_joint)
rospy.loginfo('Went down!')
def move_calib_position(self):
# move arm to the calibration position
self.calib_pose = PoseStamped(
Header(0, rospy.Time(0), self.robot.base),
Pose(Point(0.338520675898,-0.175860479474,0.0356775075197),
Quaternion(-0.0183493755758,0.708424150944, 0.704712092876, 0.0343413949013)))
self.robot.move_ik(self.calib_pose)
def imarker_callback(self, msg):
# http://docs.ros.org/jade/api/visualization_msgs/html/msg/InteractiveMarkerFeedback.html # noqa
rospy.loginfo('imarker_callback called')
name = msg.marker_name
new_pose = msg.pose
self.int_markers[name] = PoseStamped(msg.header, new_pose)
def translate(self, pose, direction, distance):
"""Get an PoseStamped msg and apply a translation direction * distance
"""
# Let's get the pose, move it to the tf frame, and then
# translate it
base_pose = self.tl.transformPose(self.base, pose)
base_pose.pose.position = Point(*np.array(direction) * distance +
Point2list(base_pose.pose.position))
return base_pose
def move_ik(self, stamped_pose):
"""Take a PoseStamped and move the arm there.
"""
action_address = ('/' + self.robot_type + '_driver/pose_action/tool_pose')
self.client = actionlib.SimpleActionClient(
action_address,
kinova_msgs.msg.ArmPoseAction)
if not isinstance(stamped_pose, PoseStamped):
raise TypeError("No duck typing here? :(")
pose = stamped_pose.pose
position, orientation = pose.position, pose.orientation
# Send a cartesian goal to the action server. Adapted from kinova_demo
rospy.loginfo("Waiting for SimpleAction server")
self.client.wait_for_server()
goal = kinova_msgs.msg.ArmPoseGoal()
goal.pose.header = Header(frame_id=(self.robot_type + '_link_base'))
goal.pose.pose.position = position
goal.pose.pose.orientation = orientation
rospy.loginfo("Sending goal")
print goal
self.client.send_goal(goal)
# rospy.loginfo("Waiting for up to {} s for result".format(t))
if self.client.wait_for_result(rospy.Duration(100)):
rospy.loginfo("Action succeeded")
print self.client.get_result()
return self.client.get_result()
else:
self.client.cancel_all_goals()
rospy.loginfo(' the cartesian action timed-out')
return None
action_database.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def make_pose_stamped_msg(self,translation,orientation,frame='/j2n6a300_link_base'):
msg = PoseStamped()
msg.header.frame_id = frame
msg.header.stamp = rospy.Time.now()
msg.pose.position.x = translation[0]
msg.pose.position.y = translation[1]
msg.pose.position.z = translation[2]
msg.pose.orientation.x = orientation[0]
msg.pose.orientation.y = orientation[1]
msg.pose.orientation.z = orientation[2]
return msg
def make_pose_msg(position, orientation, timestamp):
pose_msg = PoseStamped()
pose_msg.header.stamp = timestamp
pose_msg.header.frame_id = '/dvs_simulator'
pose_msg.pose.position.x = position[0]
pose_msg.pose.position.y = position[1]
pose_msg.pose.position.z = position[2]
pose_msg.pose.orientation.x = orientation[0]
pose_msg.pose.orientation.y = orientation[1]
pose_msg.pose.orientation.z = orientation[2]
pose_msg.pose.orientation.w = orientation[3]
return pose_msg
def visualize(self):
'''
Publish various visualization messages.
'''
if not self.DO_VIZ:
return
if self.pose_pub.get_num_connections() > 0 and isinstance(self.inferred_pose, np.ndarray):
# Publish the inferred pose for visualization
ps = PoseStamped()
ps.header = Utils.make_header("map")
ps.pose.position.x = self.inferred_pose[0]
ps.pose.position.y = self.inferred_pose[1]
ps.pose.orientation = Utils.angle_to_quaternion(self.inferred_pose[2])
self.pose_pub.publish(ps)
if self.particle_pub.get_num_connections() > 0:
# publish a downsampled version of the particle distribution to avoid a lot of latency
if self.MAX_PARTICLES > self.MAX_VIZ_PARTICLES:
# randomly downsample particles
proposal_indices = np.random.choice(self.particle_indices, self.MAX_VIZ_PARTICLES, p=self.weights)
# proposal_indices = np.random.choice(self.particle_indices, self.MAX_VIZ_PARTICLES)
self.publish_particles(self.particles[proposal_indices,:])
else:
self.publish_particles(self.particles)
if self.pub_fake_scan.get_num_connections() > 0 and isinstance(self.ranges, np.ndarray):
# generate the scan from the point of view of the inferred position for visualization
self.viz_queries[:,0] = self.inferred_pose[0]
self.viz_queries[:,1] = self.inferred_pose[1]
self.viz_queries[:,2] = self.downsampled_angles + self.inferred_pose[2]
self.range_method.calc_range_many(self.viz_queries, self.viz_ranges)
self.publish_scan(self.downsampled_angles, self.viz_ranges)
def __init__(self):
"""Set up class variables, initialize broadcaster and start subscribers."""
# Create broadcasters
self.br_head = tf.TransformBroadcaster()
self.br_arm = tf.TransformBroadcaster()
# Create subscribers
rospy.Subscriber("bax_head/pose", PoseStamped, self.head_callback, queue_size=1)
rospy.Subscriber("bax_arm/pose", PoseStamped, self.arm_callback, queue_size=1)
# Main while loop.
rate = rospy.Rate(50)
while not rospy.is_shutdown():
rate.sleep()
def __init__(self):
"""Set up class variables, initialize broadcaster and start subscribers."""
# Create broadcasters
self.br_head = tf.TransformBroadcaster()
self.br_rods = tf.TransformBroadcaster()
# Create subscribers
rospy.Subscriber("bax_head/pose", PoseStamped, self.head_callback, queue_size=1)
rospy.Subscriber("rods/pose", PoseStamped, self.rod_callback, queue_size=1)
# Main while loop.
rate = rospy.Rate(50)
while not rospy.is_shutdown():
rate.sleep()
def getAnglesFromPose(self,pose):
if type(pose)==Pose:
goal=PoseStamped()
goal.header.frame_id="/base"
goal.pose=pose
else:
goal=pose
if not self.ik:
rospy.logerror("ERROR: Inverse Kinematics service was not loaded")
return None
goalstr="%f,%f,%f"%(goal.pose.position.x,goal.pose.position.y,goal.pose.position.z)
ikreq = SolvePositionIKRequest()
ikreq.pose_stamp.append(goal)
try:
resp = self.iksvc(ikreq)
if (resp.isValid[0]):
return resp.joints[0]
else:
rospy.logerr("FAILURE - No Valid Joint Solution Found for %s"%goalstr)
return None
except rospy.ServiceException,e :
rospy.loginfo("Service call failed: %s" % (e,))
return None
def __init__(self):
self.save_path = os.path.join(rospy.get_param("~save_path"), time.strftime("%Y-%m-%d-%H-%M-%S") + ".traj")
self.trajectory = LineTrajectory("/built_trajectory")
# receive either goal points or clicked points
self.publish_point_sub = rospy.Subscriber("/clicked_point", PointStamped, self.clicked_point_callback, queue_size=1)
self.nav_goal_sub = rospy.Subscriber("/move_base_simple/goal", PoseStamped, self.clicked_point_callback, queue_size=1)
# save the built trajectory on shutdown
rospy.on_shutdown(self.saveTrajectory)
def clicked_point_callback(self, msg):
if isinstance(msg, PointStamped):
self.trajectory.addPoint(msg.point)
elif isinstance(msg, PoseStamped):
self.trajectory.addPoint(msg.pose.position)
# publish visualization of the path being built
self.trajectory.publish_viz()
def visualize(self):
'''
Publish various visualization messages.
'''
if not self.DO_VIZ:
return
if self.pose_pub.get_num_connections() > 0 and isinstance(self.inferred_pose, np.ndarray):
ps = PoseStamped()
ps.header = Utils.make_header("map")
ps.pose.position.x = self.inferred_pose[0]
ps.pose.position.y = self.inferred_pose[1]
ps.pose.orientation = Utils.angle_to_quaternion(self.inferred_pose[2])
self.pose_pub.publish(ps)
if self.particle_pub.get_num_connections() > 0:
if self.MAX_PARTICLES > self.MAX_VIZ_PARTICLES:
# randomly downsample particles
proposal_indices = np.random.choice(self.particle_indices, self.MAX_VIZ_PARTICLES, p=self.weights)
# proposal_indices = np.random.choice(self.particle_indices, self.MAX_VIZ_PARTICLES)
self.publish_particles(self.particles[proposal_indices,:])
else:
self.publish_particles(self.particles)
if self.pub_fake_scan.get_num_connections() > 0 and isinstance(self.ranges, np.ndarray):
# generate the scan from the point of view of the inferred position for visualization
self.viz_queries[:,0] = self.inferred_pose[0]
self.viz_queries[:,1] = self.inferred_pose[1]
self.viz_queries[:,2] = self.downsampled_angles + self.inferred_pose[2]
self.range_method.calc_range_many(self.viz_queries, self.viz_ranges)
self.publish_scan(self.downsampled_angles, self.viz_ranges)
def SendGoal(GoalPublisher, goal, time_stamp):
# goal: [x, y, yaw]
GoalMsg = PoseStamped()
#GoalMsg.header.seq = 0
GoalMsg.header.stamp = time_stamp
GoalMsg.header.frame_id = 'map'
GoalMsg.pose.position.x = goal[0]
GoalMsg.pose.position.y = goal[1]
#GoalMsg.pose.position.z = 0.0
quaternion = quaternion_from_euler(0, 0, goal[2])
GoalMsg.pose.orientation.x = quaternion[0]
GoalMsg.pose.orientation.y = quaternion[1]
GoalMsg.pose.orientation.z = quaternion[2]
GoalMsg.pose.orientation.w = quaternion[3]
GoalPublisher.publish(GoalMsg)
control_crazyflie.py 文件源码
项目:ROS-Robotics-By-Example
作者: PacktPublishing
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def _Land(self, req):
rospy.loginfo("Landing requested!")
self._cf_state = 'land'
return ()
# This callback function puts the target PoseStamped message into a local variable.
def __init__(self):
# initialize ROS node
rospy.init_node('target_detector', anonymous=True)
# initialize publisher for target pose, PoseStamped message, and set initial sequence number
self.pub = rospy.Publisher('target_pose', PoseStamped, queue_size=1)
self.pub_pose = PoseStamped()
self.pub_pose.header.seq = 0
self.rate = rospy.Rate(1.0) # publish message at 1 Hz
# initialize values for locating target on Kinect v2 image
self.target_u = 0 # u is pixels left(0) to right(+)
self.target_v = 0 # v is pixels top(0) to bottom(+)
self.target_d = 0 # d is distance camera(0) to target(+) from depth image
self.target_found = False # flag initialized to False
self.last_d = 0 # last non-zero depth measurement
# target orientation to Kinect v2 image (Euler)
self.r = 0
self.p = 0
self.y = 0
# Convert image from a ROS image message to a CV image
self.bridge = CvBridge()
# Wait for the camera_info topic to become available
rospy.wait_for_message('/kinect2/qhd/image_color_rect', Image)
# Subscribe to registered color and depth images
rospy.Subscriber('/kinect2/qhd/image_color_rect', Image, self.image_callback, queue_size=1)
rospy.Subscriber('/kinect2/qhd/image_depth_rect', Image, self.depth_callback, queue_size=1)
self.rate.sleep() # suspend until next cycle
# This callback function handles processing Kinect color image, looking for the pink target.
def depth_callback(self, msg):
# process only if target is found
if self.target_found == True:
# create OpenCV depth image using default passthrough encoding
try:
depth_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='passthrough')
except CvBridgeError as e:
print(e)
# using target (v, u) location, find depth value of point and divide by 1000
# to change millimeters into meters (for Kinect sensors only)
self.target_d = depth_image[self.target_v, self.target_u] / 1000.0
# if depth value is zero, use the last non-zero depth value
if self.target_d == 0:
self.target_d = self.last_d
else:
self.last_d = self.target_d
# record target location and publish target pose message
rospy.loginfo("Target depth: x at %d y at %d z at %f", int(self.target_u),
int(self.target_v), self.target_d)
self.update_target_pose (self.target_u, self.target_v, self.target_d)
# This function builds the target PoseStamped message and publishes it.