def _add_point(self, positions, side, time):
"""
Appends trajectory with new point
@param positions: joint positions
@param side: limb to command point
@param time: time from start for point in seconds
"""
#creates a point in trajectory with time_from_start and positions
point = JointTrajectoryPoint()
point.positions = copy(positions)
point.time_from_start = rospy.Duration(time)
if side == self.limb:
self.goal.trajectory.points.append(point)
elif self.gripper and side == self.gripper_name:
self.grip.trajectory.points.append(point)
python类Duration()的实例源码
joint_trajectory_file_playback.py 文件源码
项目:intera_sdk
作者: RethinkRobotics
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def __init__(self, limb, joint_names):
self._joint_names = joint_names
ns = 'robot/limb/' + limb + '/'
self._client = actionlib.SimpleActionClient(
ns + "follow_joint_trajectory",
FollowJointTrajectoryAction,
)
self._goal = FollowJointTrajectoryGoal()
self._goal_time_tolerance = rospy.Time(0.1)
self._goal.goal_time_tolerance = self._goal_time_tolerance
server_up = self._client.wait_for_server(timeout=rospy.Duration(10.0))
if not server_up:
rospy.logerr("Timed out waiting for Joint Trajectory"
" Action Server to connect. Start the action server"
" before running example.")
rospy.signal_shutdown("Timed out waiting for Action Server")
sys.exit(1)
self.clear(limb)
joint_trajectory_file_playback.py 文件源码
项目:intera_sdk
作者: RethinkRobotics
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def wait(self):
"""
Waits for and verifies trajectory execution result
"""
#create a timeout for our trajectory execution
#total time trajectory expected for trajectory execution plus a buffer
last_time = self.goal.trajectory.points[-1].time_from_start.to_sec()
time_buffer = rospy.get_param(self._param_ns + 'goal_time', 0.0) + 1.5
timeout = rospy.Duration(self._slow_move_offset +
last_time +
time_buffer)
finish = self._limb_client.wait_for_result(timeout)
result = (self._limb_client.get_result().error_code == 0)
#verify result
if all([finish, result]):
return True
else:
msg = ("Trajectory action failed or did not finish before "
"timeout/interrupt.")
rospy.logwarn(msg)
return False
def __init__(self, limb="right"):
"""
@param limb: Limb to run CalibrateArm on arm side.
"""
self._limb=limb
self._client = actionlib.SimpleActionClient('/calibration_command',
CalibrationCommandAction)
# Waits until the action server has started up and started
# listening for goals.
server_up = self._client.wait_for_server(rospy.Duration(10.0))
if not server_up:
rospy.logerr("Timed out waiting for Calibration"
" Server to connect. Check your ROS networking"
" and/or reboot the robot.")
rospy.signal_shutdown("Timed out waiting for Action Server")
sys.exit(1)
def _get_minjerk_point(self, m_matrix, idx, t, cmd_time, dimensions_dict):
pnt = JointTrajectoryPoint()
pnt.time_from_start = rospy.Duration(cmd_time)
num_joints = m_matrix.shape[0]
pnt.positions = [0.0] * num_joints
if dimensions_dict['velocities']:
pnt.velocities = [0.0] * num_joints
if dimensions_dict['accelerations']:
pnt.accelerations = [0.0] * num_joints
for jnt in range(num_joints):
m_point = minjerk.minjerk_point(m_matrix[jnt, :, :, :], idx, t)
# Positions at specified time
pnt.positions[jnt] = m_point[0]
# Velocities at specified time
if dimensions_dict['velocities']:
pnt.velocities[jnt] = m_point[1]
# Accelerations at specified time
if dimensions_dict['accelerations']:
pnt.accelerations[jnt] = m_point[-1]
return pnt
def revalidate(self, timeout, invalidate_state=True, invalidate_config=True):
"""
invalidate the state and config topics, then wait up to timeout
seconds for them to become valid again.
return true if both the state and config topic data are valid
"""
if invalidate_state:
self.invalidate_state()
if invalidate_config:
self.invalidate_config()
timeout_time = rospy.Time.now() + rospy.Duration(timeout)
while not self.is_state_valid() and not rospy.is_shutdown():
rospy.sleep(0.1)
if timeout_time < rospy.Time.now():
rospy.logwarn("Timed out waiting for node interface valid...")
return False
return True
def move(self, goal):
# Send the goal pose to the MoveBaseAction server
self.move_base.send_goal(goal)
# Allow 1 minute to get there
finished_within_time = self.move_base.wait_for_result(rospy.Duration(60))
# If we don't get there in time, abort the goal
if not finished_within_time:
self.move_base.cancel_goal()
rospy.loginfo("Timed out achieving goal")
else:
# We made it!
state = self.move_base.get_state()
if state == GoalStatus.SUCCEEDED:
rospy.loginfo("Goal succeeded!")
def moveToJointAngles(self, jointStates, timeout=1, wait=False, rightArm=True):
# Create and send trajectory message for new joint angles
trajMsg = JointTrajectoryGoal()
trajPoint = JointTrajectoryPoint()
trajPoint.positions = jointStates
trajPoint.time_from_start = rospy.Duration(timeout)
trajMsg.trajectory.points.append(trajPoint)
trajMsg.trajectory.joint_names = self.rightJointNames if rightArm else self.leftJointNames
if not wait:
if rightArm:
self.rightArmClient.send_goal(trajMsg)
else:
self.leftArmClient.send_goal(trajMsg)
else:
if rightArm:
self.rightArmClient.send_goal_and_wait(trajMsg)
else:
self.leftArmClient.send_goal_and_wait(trajMsg)
def lookAt(self, pos, sim=False):
goal = PointHeadGoal()
point = PointStamped()
point.header.frame_id = self.frame
point.point.x = pos[0]
point.point.y = pos[1]
point.point.z = pos[2]
goal.target = point
# Point using kinect frame
goal.pointing_frame = 'head_mount_kinect_rgb_link'
if sim:
goal.pointing_frame = 'high_def_frame'
goal.pointing_axis.x = 1
goal.pointing_axis.y = 0
goal.pointing_axis.z = 0
goal.min_duration = rospy.Duration(1.0)
goal.max_velocity = 1.0
self.pointHeadClient.send_goal_and_wait(goal)
def printJointStates(self):
try:
# now = rospy.Time.now()
# self.tf.waitForTransform(self.frame, 'r_gripper_tool_frame', now, rospy.Duration(10.0))
self.tf.waitForTransform(self.frame, 'r_gripper_tool_frame', rospy.Time(), rospy.Duration(10.0))
currentRightPos, currentRightOrient = self.tf.lookupTransform(self.frame, 'r_gripper_tool_frame', rospy.Time(0))
msg = rospy.wait_for_message('/r_arm_controller/state', JointTrajectoryControllerState)
currentRightJointPositions = msg.actual.positions
print 'Right positions:', currentRightPos, currentRightOrient
print 'Right joint positions:', currentRightJointPositions
# now = rospy.Time.now()
# self.tf.waitForTransform(self.frame, 'l_gripper_tool_frame', now, rospy.Duration(10.0))
currentLeftPos, currentLeftOrient = self.tf.lookupTransform(self.frame, 'l_gripper_tool_frame', rospy.Time(0))
msg = rospy.wait_for_message('/l_arm_controller/state', JointTrajectoryControllerState)
currentLeftJointPositions = msg.actual.positions
print 'Left positions:', currentLeftPos, currentLeftOrient
print 'Left joint positions:', currentLeftJointPositions
# print 'Right gripper state:', rospy.wait_for_message('/r_gripper_controller/state', JointControllerState)
except tf.ExtrapolationException:
print 'No transformation available! Failing to record this time step.'
def __init__(self):
# ROS initialization:
rospy.init_node('pose_manager')
self.poseLibrary = dict()
self.readInPoses()
self.poseServer = actionlib.SimpleActionServer("body_pose", BodyPoseAction,
execute_cb=self.executeBodyPose,
auto_start=False)
self.trajectoryClient = actionlib.SimpleActionClient("joint_trajectory", JointTrajectoryAction)
if self.trajectoryClient.wait_for_server(rospy.Duration(3.0)):
try:
rospy.wait_for_service("stop_walk_srv", timeout=2.0)
self.stopWalkSrv = rospy.ServiceProxy("stop_walk_srv", Empty)
except:
rospy.logwarn("stop_walk_srv not available, pose_manager will not stop the walker before executing a trajectory. "
+"This is normal if there is no nao_walker running.")
self.stopWalkSrv = None
self.poseServer.start()
rospy.loginfo("pose_manager running, offering poses: %s", self.poseLibrary.keys());
else:
rospy.logfatal("Could not connect to required \"joint_trajectory\" action server, is the nao_controller node running?");
rospy.signal_shutdown("Required component missing");
def parseXapPoses(self, xaplibrary):
try:
poses = xapparser.getpostures(xaplibrary)
except RuntimeError as re:
rospy.logwarn("Error while parsing the XAP file: %s" % str(re))
return
for name, pose in poses.items():
trajectory = JointTrajectory()
trajectory.joint_names = pose.keys()
joint_values = pose.values()
point = JointTrajectoryPoint()
point.time_from_start = Duration(2.0) # hardcoded duration!
point.positions = pose.values()
trajectory.points = [point]
self.poseLibrary[name] = trajectory
def update_step_timer(self, step):
"""
Helper method that shuts the current step timer down, then recreates
the step timer for the next step in the task.
If the step timeout is zero, do not create a new timeout; otherwise
create a new timer using the timeout value.
Args:
step (dict): the dictionary object representing the task step.
"""
if self.step_timer and self.step_timer.is_alive():
self.step_timer.shutdown()
if not self.active:
return
if step and step.timeout > 0:
self.step_timer = rospy.Timer(
rospy.Duration(step.timeout),
self.step_to_handler,
oneshot=True
)
def init_transforms(self):
""" Initialize the tf2 transforms """
rospy.loginfo("Start transform calibration.")
self.tf_buffer = tf2_ros.Buffer(rospy.Duration(300.0))
self.tf_listener = tf2_ros.TransformListener(self.tf_buffer)
while True:
try:
now = rospy.get_rostime()
self.tf_buffer.lookup_transform('head',
'left_camera_optical_frame',
now - rospy.Duration(0.1))
except:
self.rate.sleep()
continue
break
rospy.loginfo('transform calibration finished.')
def move(x, y):
"""
Moves the robot to a place defined by coordinates x and y.
:type x: float
:type y: int
:rtype: NoneType
"""
client = actionlib.SimpleActionClient("move_base",
move_base_msgs.msg.MoveBaseAction)
client.wait_for_server(rospy.Duration(10))
goal = move_base_msgs.msg.MoveBaseGoal()
goal.target_pose.header.frame_id = "map"
goal.target_pose.header.stamp = rospy.Time.now()
goal.target_pose.pose.position.x = x
goal.target_pose.pose.position.y = y
goal.target_pose.pose.orientation.w = 1
client.send_goal(goal)
success = client.wait_for_result(rospy.Duration(60))
loginfo(success)
joint_trajectory_file_playback.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def _add_point(self, positions, side, time):
"""
Appends trajectory with new point
@param positions: joint positions
@param side: limb to command point
@param time: time from start for point in seconds
"""
#creates a point in trajectory with time_from_start and positions
point = JointTrajectoryPoint()
point.positions = copy(positions)
point.time_from_start = rospy.Duration(time)
if side == 'left':
self._l_goal.trajectory.points.append(point)
elif side == 'right':
self._r_goal.trajectory.points.append(point)
elif side == 'left_gripper':
self._l_grip.trajectory.points.append(point)
elif side == 'right_gripper':
self._r_grip.trajectory.points.append(point)
joint_trajectory_file_playback.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def wait(self):
"""
Waits for and verifies trajectory execution result
"""
#create a timeout for our trajectory execution
#total time trajectory expected for trajectory execution plus a buffer
last_time = self._r_goal.trajectory.points[-1].time_from_start.to_sec()
time_buffer = rospy.get_param(self._param_ns + 'goal_time', 0.0) + 1.5
timeout = rospy.Duration(self._slow_move_offset +
last_time +
time_buffer)
l_finish = self._left_client.wait_for_result(timeout)
r_finish = self._right_client.wait_for_result(timeout)
l_result = (self._left_client.get_result().error_code == 0)
r_result = (self._right_client.get_result().error_code == 0)
#verify result
if all([l_finish, r_finish, l_result, r_result]):
return True
else:
msg = ("Trajectory action failed or did not finish before "
"timeout/interrupt.")
rospy.logwarn(msg)
return False
pose_action_client.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def cartesian_pose_client(position, orientation):
"""Send a cartesian goal to the action server."""
action_address = '/j2n6s300_driver/pose_action/tool_pose'
client = actionlib.SimpleActionClient(action_address, kinova_msgs.msg.ArmPoseAction)
client.wait_for_server()
goal = kinova_msgs.msg.ArmPoseGoal()
goal.pose.header = std_msgs.msg.Header(frame_id='j2n6s300_link_base')
goal.pose.pose.position = geometry_msgs.msg.Point(
x=position[0], y=position[1], z=position[2])
goal.pose.pose.orientation = geometry_msgs.msg.Quaternion(
x=orientation[0], y=orientation[1], z=orientation[2], w=orientation[3])
# print('goal.pose in client 1: {}'.format(goal.pose.pose)) # debug
client.send_goal(goal)
if client.wait_for_result(rospy.Duration(15.0)):
return client.get_result()
else:
client.cancel_all_goals()
print(' the cartesian action timed-out')
return None
fingers_action_client.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def gripper_client(finger_positions):
"""Send a gripper goal to the action server."""
action_address = '/' + prefix + 'driver/fingers_action/finger_positions'
client = actionlib.SimpleActionClient(action_address,
kinova_msgs.msg.SetFingersPositionAction)
client.wait_for_server()
goal = kinova_msgs.msg.SetFingersPositionGoal()
goal.fingers.finger1 = float(finger_positions[0])
goal.fingers.finger2 = float(finger_positions[1])
# The MICO arm has only two fingers, but the same action definition is used
if len(finger_positions) < 3:
goal.fingers.finger3 = 0.0
else:
goal.fingers.finger3 = float(finger_positions[2])
client.send_goal(goal)
if client.wait_for_result(rospy.Duration(5.0)):
return client.get_result()
else:
client.cancel_all_goals()
rospy.WARN(' the gripper action timed-out')
return None
joints_action_client.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def joint_angle_client(angle_set):
"""Send a joint angle goal to the action server."""
action_address = '/j2n6s300_driver/joints_action/joint_angles'
client = actionlib.SimpleActionClient(action_address,
kinova_msgs.msg.ArmJointAnglesAction)
client.wait_for_server()
goal = kinova_msgs.msg.ArmJointAnglesGoal()
goal.angles.joint1 = angle_set[0]
goal.angles.joint2 = angle_set[1]
goal.angles.joint3 = angle_set[2]
goal.angles.joint4 = angle_set[3]
goal.angles.joint5 = angle_set[4]
goal.angles.joint6 = angle_set[5]
goal.angles.joint7 = angle_set[6]
client.send_goal(goal)
if client.wait_for_result(rospy.Duration(20.0)):
return client.get_result()
else:
print(' the joint angle action timed-out')
client.cancel_all_goals()
return None
def search_USBlight(self):
if self.listen.frameExists("/j2n6a300_end_effector") and self.listen.frameExists("/root"):
# print ("we are in the search spoon fucntion")
self.listen.waitForTransform('/j2n6a300_end_effector','/root',rospy.Time(),rospy.Duration(100.0))
t = self.listen.getLatestCommonTime("/j2n6a300_end_effector","/root")
translation, quaternion = self.listen.lookupTransform("/j2n6a300_end_effector","/root",t)
matrix1=self.listen.fromTranslationRotation(translation,quaternion)
counter=0
rate=rospy.Rate(100)
while not self.obj_det:
counter = counter + 1
if(counter < 200):
cart_velocities = np.dot(matrix1[:3,:3],np.array([0,0,0.05])[np.newaxis].T) #change in y->x, z->y, x->z
cart_velocities = cart_velocities.T[0].tolist()
self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
else:
cart_velocities = np.dot(matrix1[:3,:3],np.array([0,0,-0.05])[np.newaxis].T)
cart_velocities = cart_velocities.T[0].tolist()
self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
rate.sleep()
if(counter >400):
counter=0
def search_straw(self):
if self.listen.frameExists("/j2n6a300_end_effector") and self.listen.frameExists("/root"):
# print ("we are in the search spoon fucntion")
self.listen.waitForTransform('/j2n6a300_end_effector','/root',rospy.Time(),rospy.Duration(100.0))
t = self.listen.getLatestCommonTime("/j2n6a300_end_effector","/root")
translation, quaternion = self.listen.lookupTransform("/j2n6a300_end_effector","/root",t)
matrix1=self.listen.fromTranslationRotation(translation,quaternion)
counter=0
rate=rospy.Rate(100)
while not self.obj_det:
counter = counter + 1
if(counter < 200):
cart_velocities = np.dot(matrix1[:3,:3],np.array([00,0,0.05])[np.newaxis].T) #change in y->x, z->y, x->z
cart_velocities = cart_velocities.T[0].tolist()
self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
else:
cart_velocities = np.dot(matrix1[:3,:3],np.array([0.0,0,-.05])[np.newaxis].T)
cart_velocities = cart_velocities.T[0].tolist()
self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
rate.sleep()
if(counter >400):
counter=0
def goto_plate(self):
if self.listen.frameExists("/root") and self.listen.frameExists("/plate_position"):
self.listen.waitForTransform('/root','/plate_position',rospy.Time(),rospy.Duration(100.0))
# t1 = self.listen.getLatestCommonTime("/root", "bowl_position")
translation, quaternion = self.listen.lookupTransform("/root", "/plate_position", rospy.Time(0))
translation = list(translation)
quaternion = list(quaternion)
#quaternion = [0.8678189045198146, 0.0003956789257977804, -0.4968799802988633, 0.0006910675928639343]
#quaternion = [0]*4
pose_value = translation + quaternion
#second arg=0 (absolute movement), arg = '-r' (relative movement)
self.cmmnd_CartesianPosition(pose_value, 0)
else:
print ("we DONT have the bowl frame")
def searchSpoon(self):
if self.listen.frameExists("/j2n6a300_end_effector") and self.listen.frameExists("/root"):
# print ("we are in the search spoon fucntion")
self.listen.waitForTransform('/j2n6a300_end_effector','/root',rospy.Time(),rospy.Duration(100.0))
t = self.listen.getLatestCommonTime("/j2n6a300_end_effector","/root")
translation, quaternion = self.listen.lookupTransform("/j2n6a300_end_effector","/root",t)
matrix1=self.listen.fromTranslationRotation(translation,quaternion)
counter=0
rate=rospy.Rate(100)
while not self.obj_det:
counter = counter + 1
if(counter < 200):
cart_velocities = np.dot(matrix1[:3,:3],np.array([0,0,0.05])[np.newaxis].T) #change in y->x, z->y, x->z
cart_velocities = cart_velocities.T[0].tolist()
self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
else:
cart_velocities = np.dot(matrix1[:3,:3],np.array([0,0,-0.05])[np.newaxis].T)
cart_velocities = cart_velocities.T[0].tolist()
self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
rate.sleep()
if(counter >400):
counter=0
def scoopSpoon(self):
while not (self.listen.frameExists("/j2n6a300_end_effector") and self.listen.frameExists("/j2n6a300_link_finger_tip_3")):
pass
print ("Publishing transform of figner wrt endEffector frame")
p.listen.waitForTransform('/j2n6a300_end_effector','/j2n6a300_link_finger_tip_3',rospy.Time(),rospy.Duration(100.0))
t = self.listen.getLatestCommonTime("/j2n6a300_end_effector", "/j2n6a300_link_finger_tip_3")
translation, quaternion = self.listen.lookupTransform("/j2n6a300_end_effector", "/j2n6a300_link_finger_tip_3", t)
# print (translation)
matrix2 = self.listen.fromTranslationRotation(translation, quaternion)
# print (matrix2)
# required_cartvelo = [0,0,0,0.1,0,0]
quaternion = pose_action_client.EulerXYZ2Quaternion([0.15,0,0]) # set rot angle HERE (deg)
matrix1 = self.listen.fromTranslationRotation([0,0,0], quaternion)
# print (matrix1)
matrix3 = np.dot(matrix2,matrix1)
scale, shear, rpy_angles, trans, perps = tf.transformations.decompose_matrix(matrix3)
trans = list(trans.tolist())
rpy_angles = list(rpy_angles)
# euler = pose_action_client.Quaternion2EulerXYZ(quat_1)
# pose = trans + rpy_angles
return pose
# return translation, quaternion
def searchdriver(self):
if self.listen.frameExists("/j2n6a300_end_effector") and self.listen.frameExists("/root"):
# print ("we are in the search spoon fucntion")
self.listen.waitForTransform('/j2n6a300_end_effector','/root',rospy.Time(),rospy.Duration(100.0))
t = self.listen.getLatestCommonTime("/j2n6a300_end_effector","/root")
translation, quaternion = self.listen.lookupTransform("/j2n6a300_end_effector","/root",t)
matrix1=self.listen.fromTranslationRotation(translation,quaternion)
counter=0
rate=rospy.Rate(100)
while not self.obj_det:
counter = counter + 1
if(counter < 200):
cart_velocities = np.dot(matrix1[:3,:3],np.array([0,0,0.05])[np.newaxis].T) #change in y->x, z->y, x->z
cart_velocities = cart_velocities.T[0].tolist()
self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
else:
cart_velocities = np.dot(matrix1[:3,:3],np.array([0,0,-0.05])[np.newaxis].T)
cart_velocities = cart_velocities.T[0].tolist()
self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
rate.sleep()
if(counter >400):
counter=0
action_database.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 15
收藏 0
点赞 0
评论 0
def create_pose_velocity_msg(self,cart_velo,transform=False):
if transform:
if self.listen.frameExists("/root") and self.listen.frameExists("/j2n6a300_end_effector"):
self.listen.waitForTransform('/root',"/j2n6a300_end_effector",rospy.Time(),rospy.Duration(100.0))
t = self.listen.getLatestCommonTime("/root", "/j2n6a300_end_effector")
translation, quaternion = self.listen.lookupTransform("/root", "/j2n6a300_end_effector", t)
transform = self.transformer.fromTranslationRotation(translation, quaternion)
transformed_vel = np.dot(transform,(cart_velo[0:3]+[1.0]))
print(transformed_vel)
cart_velo[0:3] = transformed_vel[0:3]
msg = PoseVelocity(
twist_linear_x=cart_velo[0],
twist_linear_y=cart_velo[1],
twist_linear_z=cart_velo[2],
twist_angular_x=cart_velo[3],
twist_angular_y=cart_velo[4],
twist_angular_z=cart_velo[5])
return msg
def __init__(self):
# rospy.init_node('nav_test', anonymous=False)
#what to do if shut down (e.g. ctrl + C or failure)
rospy.on_shutdown(self._shutdown)
#tell the action client that we want to spin a thread by default
self.move_base = actionlib.SimpleActionClient("move_base", MoveBaseAction)
rospy.loginfo("waiting for the action server to come up...")
#allow up to 5 seconds for the action server to come up
self.move_base.wait_for_server(rospy.Duration(5))
#we'll send a goal to the robot to tell it to move to a pose that's near the docking station
self.goal = MoveBaseGoal()
self.goal.target_pose.header.frame_id = 'odom'
self.goal.target_pose.header.stamp = rospy.Time.now()
def move_to_pose(self, x1, y1):
# Goal
self.goal.target_pose.pose.position.x = x1
self.goal.target_pose.pose.position.y = y1
self.goal.target_pose.pose.position.z = 0.0
self.goal.target_pose.pose.orientation.x = 0.0
self.goal.target_pose.pose.orientation.y = 0.0
self.goal.target_pose.pose.orientation.z = -0.5
self.goal.target_pose.pose.orientation.w = 0.1
#start moving
self.move_base.send_goal(self.goal)
rospy.loginfo("Moving to desired position...")
#allow TurtleBot up to 60 seconds to complete task
self.success = self.move_base.wait_for_result(rospy.Duration(60))
if not self.success:
self.move_base.cancel_goal()
rospy.loginfo("The base failed to reach the desired position :(")
else:
# We made it!
state = self.move_base.get_state()
if state == GoalStatus.SUCCEEDED:
rospy.loginfo("Destination reached!")
def servo_axis_rotation(self, x):
if abs(x) > self.params['sensitivity_joy']:
x = x if abs(x) > self.params['sensitivity_joy'] else 0
min_x = self.params['bounds'][0][0] + self.params['bounds'][3][0]
max_x = self.params['bounds'][0][1] + self.params['bounds'][3][1]
self.goal = min(max(min_x, self.goal + self.params['speed']*x*self.delta_t), max_x)
if self.goal > self.params['bounds'][0][1]:
new_x_m3 = self.goal - self.params['bounds'][0][1]
new_x = self.params['bounds'][0][1]
elif self.goal < self.params['bounds'][0][0]:
new_x_m3 = self.goal - self.params['bounds'][0][0]
new_x = self.params['bounds'][0][0]
else:
new_x = self.goal
new_x_m3 = 0
new_x_m3 = max(min(new_x_m3, self.params['bounds'][3][1]), self.params['bounds'][3][0])
self.reach({'m1': new_x, 'm4': new_x_m3}, 0) # Duration = 0 means joint teleportation