def execute(self, motion, duration):
"""
Blocking function executing a motion
:param motion: [{"motor_name": value_in_degrees}, {}...]
:param duration: duration in secs
"""
request = ExecuteTrajectoryRequest()
request.trajectory.joint_names = motion.keys
request.trajectory.time = rospy.Time.now()
motion_length = len(duration)
for point_id, point in enumerate(motion.values()):
point = JointTrajectoryPoint(positions=point,
time_from_start=rospy.Duration(float(point_id*duration)/motion_length))
request.points.append(point)
return self.execute_proxy(request)
python类Duration()的实例源码
def move(self, goal):
# Send the goal pose to the MoveBaseAction server
self.move_base.send_goal(goal)
# Allow 1 minute to get there
finished_within_time = self.move_base.wait_for_result(rospy.Duration(60))
# If we don't get there in time, abort the goal
if not finished_within_time:
self.move_base.cancel_goal()
rospy.loginfo("Timed out achieving goal")
else:
# We made it!
state = self.move_base.get_state()
if state == GoalStatus.SUCCEEDED:
rospy.loginfo("Goal succeeded!")
def _init_params(self):
self.port = rospy.get_param('~port', self.default_port)
self.robot_type = rospy.get_param('~robot_type', 'create')
#self.baudrate = rospy.get_param('~baudrate', self.default_baudrate)
self.update_rate = rospy.get_param('~update_rate', self.default_update_rate)
self.drive_mode = rospy.get_param('~drive_mode', 'twist')
self.has_gyro = rospy.get_param('~has_gyro', True)
self.odom_angular_scale_correction = rospy.get_param('~odom_angular_scale_correction', 1.0)
self.odom_linear_scale_correction = rospy.get_param('~odom_linear_scale_correction', 1.0)
self.cmd_vel_timeout = rospy.Duration(rospy.get_param('~cmd_vel_timeout', 0.6))
self.stop_motors_on_bump = rospy.get_param('~stop_motors_on_bump', True)
self.min_abs_yaw_vel = rospy.get_param('~min_abs_yaw_vel', None)
self.max_abs_yaw_vel = rospy.get_param('~max_abs_yaw_vel', None)
self.publish_tf = rospy.get_param('~publish_tf', False)
self.odom_frame = rospy.get_param('~odom_frame', 'odom')
self.base_frame = rospy.get_param('~base_frame', 'base_footprint')
self.operate_mode = rospy.get_param('~operation_mode', 3)
rospy.loginfo("serial port: %s"%(self.port))
rospy.loginfo("update_rate: %s"%(self.update_rate))
rospy.loginfo("drive mode: %s"%(self.drive_mode))
rospy.loginfo("has gyro: %s"%(self.has_gyro))
def publishShape(self, position, rx, ry, tag, namespace):
marker = Marker()
marker.header.frame_id = '/map'
marker.header.stamp = rospy.Time.now()
marker.ns = namespace
marker.id = int(tag)
marker.action = Marker.ADD
marker.type = self.shape
self.setPosition(marker, position)
self.setRadius(marker, rx, ry)
self.setColor(marker, tag)
marker.lifetime = rospy.Duration(5)
self.publisher.publish(marker)
circumference_publisher.py 文件源码
项目:indoor-localization
作者: luca-morreale
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def publishShape(self, position, measurement, tag, namespace):
marker = Marker()
marker.header.frame_id = '/map'
marker.header.stamp = rospy.Time.now()
marker.ns = namespace
marker.id = int(tag)
marker.action = Marker.ADD
marker.type = self.shape
self.setPosition(marker, position)
self.setRadius(marker, measurement)
self.setColor(marker, tag)
marker.lifetime = rospy.Duration(5)
self.publisher.publish(marker)
def __init__(self,dis=0.0):
State.__init__(self, outcomes=['succeeded','preempted','aborted'],input_keys=['target'])
rospy.on_shutdown(self.shutdown)
self.test_distance = target.y-dis
self.rate = 200
self.r = rospy.Rate(self.rate)
self.speed = rospy.get_param('~speed',0.08)
self.tolerance = rospy.get_param('~tolerance', 0.01)
self.cmd_vel = rospy.Publisher('/mobile_base_controller/smooth_cmd_vel',Twist,queue_size=5)
self.base_frame = rospy.get_param('~base_frame', '/base_link')
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
self.tf_listener = tf.TransformListener()
rospy.sleep(1)
self.tf_listener.waitForTransform(self.odom_frame,self.base_frame,rospy.Time(),rospy.Duration(60.0))
#define a bianliang
self.flag = rospy.get_param('~flag', True)
rospy.loginfo(self.test_distance)
#tf get position
#publish cmd_vel
def __init__(self,angle=0):
State.__init__(self, outcomes=['succeeded','aborted','preempted'],input_keys=["angle_in"])
self.speed = rospy.get_param('~speed', 0.10)
self.secretindigal = 1.0
self.tolerance = rospy.get_param('tolerance', math.radians(5))
self.start = True
self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
self.base_frame = rospy.get_param('~base_frame', '/base_link')
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
rospy.on_shutdown(self.shutdown)
self.rate = 30
self.start_test = True
self.r = rospy.Rate(self.rate)
self.tf_listener = tf.TransformListener()
rospy.sleep(0.5)
self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
def __init__(self, angle=0):
State.__init__(self, outcomes=['succeeded', 'aborted', 'preempted'])
self.speed = rospy.get_param('~speed', 0.03)
self.secretindigal = 1.0
self.tolerance = rospy.get_param('tolerance', math.radians(5))
self.start = True
self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
self.base_frame = rospy.get_param('~base_frame', '/base_link')
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
rospy.on_shutdown(self.shutdown)
self.rate = 30
self.start_test = True
self.r = rospy.Rate(self.rate)
self.angle=angle
self.tf_listener = tf.TransformListener()
rospy.sleep(0.5)
self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
def __init__(self,dis=0):
State.__init__(self, outcomes=['succeeded','preempted','aborted'])
rospy.on_shutdown(self.shutdown)
self.test_distance = dis
self.rate = 100
self.r = rospy.Rate(self.rate)
self.speed = rospy.get_param('~speed',0.08)
self.tolerance = rospy.get_param('~tolerance', 0.01)
self.cmd_vel = rospy.Publisher('/mobile_base_controller/smooth_cmd_vel',Twist,queue_size=5)
self.base_frame = rospy.get_param('~base_frame', '/base_link')
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
self.tf_listener = tf.TransformListener()
rospy.sleep(1)
self.tf_listener.waitForTransform(self.odom_frame,self.base_frame,rospy.Time(),rospy.Duration(60.0))
self.flag = rospy.get_param('~flag', True)
#tf get position
self.position = Point()
self.position = self.get_position()
self.y_start = self.position.y
self.x_start = self.position.x
#publish cmd_vel
def __init__(self,angle=0):
State.__init__(self, outcomes=['succeeded','aborted','preempted'])
self.speed = rospy.get_param('~speed', 0.20)
self.secretindigal = 1.0
self.tolerance = rospy.get_param('tolerance', math.radians(5))
self.start = True
self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
self.base_frame = rospy.get_param('~base_frame', '/base_link')
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
rospy.on_shutdown(self.shutdown)
self.rate = 30
self.start_test = True
self.r = rospy.Rate(self.rate)
self.angle = angle
self.tf_listener = tf.TransformListener()
rospy.sleep(0.5)
self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
def __init__(self,angle=0):
State.__init__(self, outcomes=['succeeded','aborted','preempted'])
self.speed = rospy.get_param('~speed', 0.20)
self.secretindigal = 1.0
self.tolerance = rospy.get_param('tolerance', math.radians(5))
self.start = True
self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
self.base_frame = rospy.get_param('~base_frame', '/base_link')
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
rospy.on_shutdown(self.shutdown)
self.rate = 30
self.start_test = True
self.r = rospy.Rate(self.rate)
self.angle = angle
self.tf_listener = tf.TransformListener()
rospy.sleep(0.5)
self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
def __init__(self, limb):
ns = 'robot/limb/' + limb + '/'
self._client = actionlib.SimpleActionClient(
ns + "follow_joint_trajectory",
FollowJointTrajectoryAction,
)
self._goal = FollowJointTrajectoryGoal()
self._goal_time_tolerance = rospy.Time(0.1)
self._goal.goal_time_tolerance = self._goal_time_tolerance
server_up = self._client.wait_for_server(timeout=rospy.Duration(10.0))
if not server_up:
rospy.logerr("Timed out waiting for Joint Trajectory"
" Action Server to connect. Start the action server"
" before running example.")
rospy.signal_shutdown("Timed out waiting for Action Server")
sys.exit(1)
self.clear(limb)
def __init__(self, limb):
ns = 'robot/limb/' + limb + '/'
self._client = actionlib.SimpleActionClient(
ns + "follow_joint_trajectory",
FollowJointTrajectoryAction,
)
self._goal = FollowJointTrajectoryGoal()
self._goal_time_tolerance = rospy.Time(0.1)
self._goal.goal_time_tolerance = self._goal_time_tolerance
server_up = self._client.wait_for_server(timeout=rospy.Duration(10.0))
if not server_up:
rospy.logerr("Timed out waiting for Joint Trajectory"
" Action Server to connect. Start the action server"
" before running example.")
rospy.signal_shutdown("Timed out waiting for Action Server")
sys.exit(1)
self.clear(limb)
def blink(self,color, timeout=0.0, frequency=2.0, blocking=True): #timeout <= 0 blinks endlessly
"""
Blinks a led for a specific time and frequency (blocking)
:param color: Color of the led (red, green, orange)
:type color: str
:param timeout: Duration to let the led blinking. A value <= 0.0 means infinite time
:type timeout: float
:param frequency: Rate, how often a led blinks in one second
:type frequency: float
"""
if blocking is False:
self.post.blink(color,timeout,frequency,True)
else :
end = rospy.Time.now()+ rospy.Duration(timeout)
self.led_state[name] = 1
while not rospy.is_shutdown():
start = rospy.Time.now()
if (start > end and timeout > 0) or self.led_state[name] == 0:
self.setLed(color, 0)
break
self.setLed(color, 100)
rospy.Rate(frequency*2).sleep()
self.setLed(color, 0)
rospy.Rate(frequency*2).sleep()
def blink(self,name, timeout=0.0, frequency=2.0): #timeout <= 0 blinks endlessly
"""
Blinks a led for a specific time and frequency (blocking)
:param name: Name of the led
:type name: str
:param timeout: Duration to let the led blinking. A value <= 0.0 means infinite time
:type timeout: float
:param frequency: Rate, how often a led blinks in one second
:type frequency: float
"""
end = rospy.Time.now()+ rospy.Duration(timeout)
self.led_state[name] = 1
try:
while not rospy.is_shutdown():
start = rospy.Time.now()
if (start > end and timeout > 0) or self.led_state[name] == 0:
self.led_handle[name].set_output(False)
break
self.led_handle[name].set_output(True)
rospy.Rate(frequency*2).sleep()
self.led_handle[name].set_output(False)
rospy.Rate(frequency*2).sleep()
except:
pass
def execute(self, userdata):
self.goal.target_pose.pose = userdata.waypoint_in
# Send the goal pose to the MoveBaseAction server
self.move_base.send_goal(self.goal)
if self.preempt_requested():
self.move_base.cancel_goal()
self.service_preempt()
return 'preempted'
# Allow 1 minute to get there
finished_within_time = self.move_base.wait_for_result(rospy.Duration(60))
# If we don't get there in time, abort the goal
if not finished_within_time:
self.move_base.cancel_goal()
rospy.loginfo("Timed out achieving goal")
return 'aborted'
else:
# We made it!
state = self.move_base.get_state()
if state == GoalStatus.SUCCEEDED:
rospy.loginfo("Goal succeeded!")
return 'succeeded'
def IK(self, T_goal):
req = moveit_msgs.srv.GetPositionIKRequest()
req.ik_request.group_name = self.group_name
req.ik_request.robot_state = moveit_msgs.msg.RobotState()
req.ik_request.robot_state.joint_state = self.joint_state
req.ik_request.avoid_collisions = True
req.ik_request.pose_stamped = geometry_msgs.msg.PoseStamped()
req.ik_request.pose_stamped.header.frame_id = "world_link"
req.ik_request.pose_stamped.header.stamp = rospy.get_rostime()
req.ik_request.pose_stamped.pose = convert_to_message(T_goal)
req.ik_request.timeout = rospy.Duration(3.0)
res = self.ik_service(req)
q = []
if res.error_code.val == res.error_code.SUCCESS:
q = self.q_from_joint_state(res.solution.joint_state)
return q
def __init__(self):
maki_body = actionlib.SimpleActionClient("/CoRDial/Behavior", BehaviorAction) # the action server of maki_cordial_server
rospy.sleep(2.0)
global loop_count
global loop_time
global flag
self.new_behavior_time = rospy.Time.now()
self.eye_gaze_time = rospy.Time.now()
self.blink_time = rospy.Time.now() + rospy.Duration(10)
while not loop_count == 0 or flag == 1: # while number of times the actions are repeated
print loop_count
loop_count -= 1
if rospy.Time.now() >= self.new_behavior_time:
behav = maki_body_act
self.new_behavior_time = rospy.Time.now() + rospy.Duration(loop_time)
maki_body.send_goal(BehaviorGoal(behavior=behav)) # sending the behavior to maki, name changed to behavior by xuan, March 22th
print "sent behavior: ", behav
sleep(loop_time)
def __init__(self, limb):
ns = 'robot/limb/' + limb + '/'
self._client = actionlib.SimpleActionClient(
ns + "follow_joint_trajectory",
FollowJointTrajectoryAction,
)
self._goal = FollowJointTrajectoryGoal()
self._goal_time_tolerance = rospy.Time(0.1)
self._goal.goal_time_tolerance = self._goal_time_tolerance
server_up = self._client.wait_for_server(timeout=rospy.Duration(10.0))
if not server_up:
rospy.logerr("Timed out waiting for Joint Trajectory"
" Action Server to connect. Start the action server"
" before running example.")
rospy.signal_shutdown("Timed out waiting for Action Server")
sys.exit(1)
self._sub_path = rospy.Subscriber('joint_path', numpy_msg(Float32MultiArray), self.path_cb)
self._sub_traj = rospy.Subscriber('joint_traj', numpy_msg(Float32MultiArray), self.traj_cb)
self._limb_interface = baxter_interface.limb.Limb('right')
self._q_start = np.array([-0.22281071, -0.36470393, 0.36163597, 1.71920897, -0.82719914,
-1.16889336, -0.90888362])
self.clear(limb)
def publish_cloud_markers(pub_ns, _arr, c='b', stamp=None, flip_rb=False, frame_id='map'):
"""
Publish point cloud on:
pub_ns: Namespace on which the cloud will be published
arr: numpy array (N x 3) for point cloud data
c: Option 1: 'c', 'b', 'r' etc colors accepted by matplotlibs color
Option 2: float ranging from 0 to 1 via matplotlib's jet colormap
Option 3: numpy array (N x 3) with r,g,b vals ranging from 0 to 1
s: supported only by matplotlib plotting
alpha: supported only by matplotlib plotting
"""
arr, carr = copy_pointcloud_data(_arr, c, flip_rb=flip_rb)
marker = vis_msg.Marker(type=vis_msg.Marker.SPHERE_LIST, ns=pub_ns, action=vis_msg.Marker.ADD)
marker.header.frame_id = frame_id
marker.header.stamp = stamp if stamp is not None else rospy.Time.now()
marker.scale.x = 0.01
marker.scale.y = 0.01
# XYZ
inds, = np.where(~np.isnan(arr).any(axis=1))
marker.points = [geom_msg.Point(arr[j,0], arr[j,1], arr[j,2]) for j in inds]
# RGB (optionally alpha)
N, D = arr.shape[:2]
if D == 3:
marker.colors = [std_msg.ColorRGBA(carr[j,0], carr[j,1], carr[j,2], 1.0)
for j in inds]
elif D == 4:
marker.colors = [std_msg.ColorRGBA(carr[j,0], carr[j,1], carr[j,2], carr[j,3])
for j in inds]
marker.lifetime = rospy.Duration()
_publish_marker(marker)
print 'Publishing marker', N