def goto_cup(self):
# self.calibrate_obj_det_pub.publish(True)
#
# while self.calibrated == False:
# pass
#
# print("Finger Sensors calibrated")
rate = rospy.Rate(100)
while not rospy.is_shutdown():
try:
trans = self.tfBuffer.lookup_transform('root', 'teaCup_position', rospy.Time())
break
except (tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException):
rate.sleep()
# continue
translation = [trans.transform.translation.x, trans.transform.translation.y, trans.transform.translation.z]
rotation = [trans.transform.rotation.x, trans.transform.rotation.y, trans.transform.rotation.z, trans.transform.rotation.w]
pose_value = translation + rotation
#second arg=0 (absolute movement), arg = '-r' (relative movement)
self.cmmnd_CartesianPosition(pose_value, 0)
python类Rate()的实例源码
2017_Task1.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
2017_Task5.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def cmmnd_makeContact_USBPlug(self, sensitivity):
rate = rospy.Rate(100)
while (self.bump_finger_1 < sensitivity)and not rospy.is_shutdown():
print (self.bump_finger_1)
self.cmmnd_CartesianVelocity([-0.03,0,0,0,0,0,1])
rate.sleep()
print ("contact made with the ground")
# def pick_USBlight_1(self, current_finger_position):
# ii = 0
# rate = rospy.Rate(100)
# while self.touch_finger_3 != True and not rospy.is_shutdown():
# current_finger_position[0] += 5 # slowly close finger_1 until contact is made
# print (current_finger_position[0])
# self.cmmnd_FingerPosition([current_finger_position[0], current_finger_position[1], current_finger_position[2]])
# rate.sleep()
# self.touch_finger_1 = False
# return current_finger_position
2017_Task7.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def cmmd_touchBlock(self,current_finger_position):
ii = 0
rate = rospy.Rate(100)
while self.touch_finger_1 != True and not rospy.is_shutdown():
current_finger_position[0] += 5 # slowly close finger_1 until contact is made
# print (current_finger_position[0])
self.cmmnd_FingerPosition([current_finger_position[0], current_finger_position[1], current_finger_position[2]])
rate.sleep()
self.touch_finger_1 = False
while self.touch_finger_2 != True and not rospy.is_shutdown():
current_finger_position[1] += 5 # slowly close finger_1 until contact is made
# print (current_finger_position[0])
self.cmmnd_FingerPosition([current_finger_position[0], current_finger_position[1], current_finger_position[2]])
rate.sleep()
self.touch_finger_2 = False
return current_finger_position
2017_Task3.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 15
收藏 0
点赞 0
评论 0
def lift_spoon(self):
rate = rospy.Rate(100) # NOTE to publish cmmds to velocity_pub at 100Hz
while self.touch_finger_3 != True:
self.cmmnd_CartesianVelocity([0,0.025,0,0,0,0,1])
rate.sleep()
self.touch_finger_3 = False
# p.cmmnd_CartesianPosition([0.02,0,0,0,0,0,1],'-r')
self.cmmnd_FingerPosition([0, 0, 60])
self.cmmnd_FingerPosition([100, 0, 60])
self.cmmnd_FingerPosition([100, 0, 100])
self.cmmnd_FingerPosition([100, 100, 100])
self.cmmnd_CartesianPosition([0, 0, 0.13, 0, 0, 0, 1],'-r')
# self.cmmnd_FingerPosition([100, 100, 100])
def search_USBlight(self):
if self.listen.frameExists("/j2n6a300_end_effector") and self.listen.frameExists("/root"):
# print ("we are in the search spoon fucntion")
self.listen.waitForTransform('/j2n6a300_end_effector','/root',rospy.Time(),rospy.Duration(100.0))
t = self.listen.getLatestCommonTime("/j2n6a300_end_effector","/root")
translation, quaternion = self.listen.lookupTransform("/j2n6a300_end_effector","/root",t)
matrix1=self.listen.fromTranslationRotation(translation,quaternion)
counter=0
rate=rospy.Rate(100)
while not self.obj_det:
counter = counter + 1
if(counter < 200):
cart_velocities = np.dot(matrix1[:3,:3],np.array([0,0,0.05])[np.newaxis].T) #change in y->x, z->y, x->z
cart_velocities = cart_velocities.T[0].tolist()
self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
else:
cart_velocities = np.dot(matrix1[:3,:3],np.array([0,0,-0.05])[np.newaxis].T)
cart_velocities = cart_velocities.T[0].tolist()
self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
rate.sleep()
if(counter >400):
counter=0
def search_straw(self):
if self.listen.frameExists("/j2n6a300_end_effector") and self.listen.frameExists("/root"):
# print ("we are in the search spoon fucntion")
self.listen.waitForTransform('/j2n6a300_end_effector','/root',rospy.Time(),rospy.Duration(100.0))
t = self.listen.getLatestCommonTime("/j2n6a300_end_effector","/root")
translation, quaternion = self.listen.lookupTransform("/j2n6a300_end_effector","/root",t)
matrix1=self.listen.fromTranslationRotation(translation,quaternion)
counter=0
rate=rospy.Rate(100)
while not self.obj_det:
counter = counter + 1
if(counter < 200):
cart_velocities = np.dot(matrix1[:3,:3],np.array([00,0,0.05])[np.newaxis].T) #change in y->x, z->y, x->z
cart_velocities = cart_velocities.T[0].tolist()
self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
else:
cart_velocities = np.dot(matrix1[:3,:3],np.array([0.0,0,-.05])[np.newaxis].T)
cart_velocities = cart_velocities.T[0].tolist()
self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
rate.sleep()
if(counter >400):
counter=0
def searchSpoon(self):
if self.listen.frameExists("/j2n6a300_end_effector") and self.listen.frameExists("/root"):
# print ("we are in the search spoon fucntion")
self.listen.waitForTransform('/j2n6a300_end_effector','/root',rospy.Time(),rospy.Duration(100.0))
t = self.listen.getLatestCommonTime("/j2n6a300_end_effector","/root")
translation, quaternion = self.listen.lookupTransform("/j2n6a300_end_effector","/root",t)
matrix1=self.listen.fromTranslationRotation(translation,quaternion)
counter=0
rate=rospy.Rate(100)
while not self.obj_det:
counter = counter + 1
if(counter < 200):
cart_velocities = np.dot(matrix1[:3,:3],np.array([0,0,0.05])[np.newaxis].T) #change in y->x, z->y, x->z
cart_velocities = cart_velocities.T[0].tolist()
self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
else:
cart_velocities = np.dot(matrix1[:3,:3],np.array([0,0,-0.05])[np.newaxis].T)
cart_velocities = cart_velocities.T[0].tolist()
self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
rate.sleep()
if(counter >400):
counter=0
action_database.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def back_forth_search_xy(self):
rate = rospy.Rate(100)
for i in range(100):
if np.all(np.array(self.finger_detect) == np.array(self.detect_goal)):
return 'found'
msg = self.create_pose_velocity_msg([0.0,-0.05,0.0,0.0,0.0,0.0])
self.jn.kinematic_control(msg)
rate.sleep()
for i in range(200):
if np.all(np.array(self.finger_detect)[[self.fingers]] == np.array(self.detect_goal)):
return 'found'
msg = self.create_pose_velocity_msg([0.0,0.05,0.0,0.0,0.0,0.0])
self.jn.kinematic_control(msg)
rate.sleep()
for i in range(100):
if np.all(np.array(self.finger_detect) == np.array(self.detect_goal)):
return 'found'
msg = self.create_pose_velocity_msg([0.0,-0.05,0.0,0.0,0.0,0.0])
self.jn.kinematic_control(msg)
rate.sleep()
return 'not_found'
def main():
rospy.init_node('issue_com')
pub = rospy.Publisher(POS_COM_TOPIC, PositionCommand, queue_size=10)
test_pub = rospy.Publisher(TEST_TOPIC, Empty, queue_size=10)
sub = rospy.Subscriber(POS_COM_TOPIC, PositionCommand, listen)
#sub = rospy.Subscriber('/joint_states', JointState, listen)
pc = PositionCommand()
pc.mode = JOINT_SPACE
#pc.arm = PositionCommand.LEFT_ARM
pc.arm = 1#PositionCommand.RIGHT_ARM
pc.data = np.zeros(7)
r = rospy.Rate(1)
#while not rospy.is_shutdown():
# pub.publish(pc)
# r.sleep()
# print 'published!'
r.sleep()
test_pub.publish(Empty())
pub.publish(pc)
def depth_callback(self, ros_image):
try:
inImg = self.bridge.imgmsg_to_cv2(ros_image)
except CvBridgeError, e:
print e
inImgarr = np.array(inImg, dtype=np.uint16)
# inImgarr = cv2.GaussianBlur(inImgarr, (3, 3), 0)
# cv2.normalize(inImgarr, inImgarr, 0, 1, cv2.NORM_MINMAX)
self.outImg, self.num_fingers = self.process_depth_image(inImgarr)
# outImg = self.process_depth_image(inImgarr)
# rate = rospy.Rate(10)
self.num_pub.publish(self.num_fingers)
# self.img_pub.publish(self.bridge.cv2_to_imgmsg(self.outImg, "bgr8"))
# rate.sleep()
cv2.imshow("Hand Gesture Recognition", self.outImg)
cv2.waitKey(3)
def __init__(self):
self.joy_pub = rospy.Publisher('sensors/joystick/1', Joy, queue_size=1)
self.joy_pub2 = rospy.Publisher('sensors/joystick/2', Joy, queue_size=1)
self.rospack = RosPack()
self.rate = rospy.Rate(20)
count = len(devices.gamepads)
if count < 2:
rospy.logerr("Sensors: Expecting 2 joysticks but found only {}, exiting".format(count))
sys.exit(-1)
gamepads = sorted(devices.gamepads, key=lambda pad: int(str(pad.name)[-1]))
rospy.loginfo(gamepads)
self.joysticks = [JoystickThread(pad) for pad in gamepads]
[joystick.start() for joystick in self.joysticks]
def __init__(self):
# Load parameters and hack the tuple conversions so that OpenCV is happy
self.rospack = RosPack()
with open(join(self.rospack.get_path('apex_playground'), 'config', 'environment.json')) as f:
self.params = json.load(f)
self.params['tracking']['ball']['lower'] = tuple(self.params['tracking']['ball']['lower'])
self.params['tracking']['ball']['upper'] = tuple(self.params['tracking']['ball']['upper'])
self.params['tracking']['arena']['lower'] = tuple(self.params['tracking']['arena']['lower'])
self.params['tracking']['arena']['upper'] = tuple(self.params['tracking']['arena']['upper'])
self.tracking = BallTracking(self.params)
self.conversions = EnvironmentConversions()
self.ball_pub = rospy.Publisher('environment/ball', CircularState, queue_size=1)
self.light_pub = rospy.Publisher('environment/light', UInt8, queue_size=1)
self.image_pub = rospy.Publisher('environment/image', Float32MultiArray, queue_size=1)
self.sound_pub = rospy.Publisher('environment/sound', Float32, queue_size=1)
self.rate = rospy.Rate(self.params['rate'])
def __init__(self):
self.rospack = RosPack()
with open(join(self.rospack.get_path('apex_playground'), 'config', 'torso.json')) as f:
self.params = json.load(f)
self.publish_rate = rospy.Rate(self.params['publish_rate'])
self.demo = rospy.get_param('demo_mode')
# Protected resources
self.in_rest_pose = False
self.robot_lock = RLock()
# Used services
self.torso = TorsoServices(self.params['robot_name'])
# Proposed services
self.reset_srv_name = 'torso/reset'
self.reset_srv = None
def wait_for_human_interaction(self, arm_threshold=1, joystick_threshold=0.15):
rospy.loginfo("We are waiting for human interaction...")
def detect_arm_variation():
new_effort = np.array(self.topics.torso_l_j.effort)
delta = np.absolute(effort - new_effort)
return np.amax(delta) > arm_threshold
def detect_joy_variation():
return np.amax(np.abs(self.topics.joy1.axes)) > joystick_threshold
effort = np.array(self.topics.torso_l_j.effort)
rate = rospy.Rate(50)
is_joystick_demo = None
while not rospy.is_shutdown():
if detect_arm_variation():
is_joystick_demo = False
break
elif detect_joy_variation():
is_joystick_demo = True
break
rate.sleep()
return is_joystick_demo
################################# Service callbacks
def main():
rospy.init_node('issue_com')
pub = rospy.Publisher(POS_COM_TOPIC, PositionCommand, queue_size=10)
test_pub = rospy.Publisher(TEST_TOPIC, Empty, queue_size=10)
sub = rospy.Subscriber(POS_COM_TOPIC, PositionCommand, listen)
#sub = rospy.Subscriber('/joint_states', JointState, listen)
pc = PositionCommand()
pc.mode = JOINT_SPACE
#pc.arm = PositionCommand.LEFT_ARM
pc.arm = 1#PositionCommand.RIGHT_ARM
pc.data = np.zeros(7)
r = rospy.Rate(1)
#while not rospy.is_shutdown():
# pub.publish(pc)
# r.sleep()
# print 'published!'
r.sleep()
test_pub.publish(Empty())
pub.publish(pc)
def wait_for_server(self):
"""Waits until interoperability server is reachable."""
reachable = False
rate = rospy.Rate(1)
while not reachable and not rospy.is_shutdown():
try:
response = requests.get(
self.url, timeout=self.timeout, verify=self.verify)
response.raise_for_status()
reachable = response.ok
except requests.ConnectionError:
rospy.logwarn_throttle(5.0, "Waiting for server: {}".format(
self.url))
except Exception as e:
rospy.logerr_throttle(
5.0, "Unexpected error waiting for server: {}, {}".format(
self.url, e))
if not reachable:
rate.sleep()
def spin(self):
# @todo: is this excessively hitting the master?
r = rospy.Rate(10.0)
while not rospy.is_shutdown():
for srv in self._local_services:
srv_uri = self._local_manager.lookup_service(srv)
if srv_uri:
self._foreign_manager.advertise_service(srv, srv_uri)
else:
self._foreign_manager.unadvertise_service(srv)
for srv in self._foreign_services:
srv_uri = self._foreign_manager.lookup_service(srv)
if srv_uri:
self._local_manager.advertise_service(srv, srv_uri)
else:
self._local_manager.unadvertise_service(srv)
r.sleep()
if self._local_manager:
self._local_manager.unsubscribe_all()
if self._foreign_manager:
self._foreign_manager.unsubscribe_all()
def __init__(self,dis=0.0):
State.__init__(self, outcomes=['succeeded','preempted','aborted'],input_keys=['target'])
rospy.on_shutdown(self.shutdown)
self.test_distance = target.y-dis
self.rate = 200
self.r = rospy.Rate(self.rate)
self.speed = rospy.get_param('~speed',0.08)
self.tolerance = rospy.get_param('~tolerance', 0.01)
self.cmd_vel = rospy.Publisher('/mobile_base_controller/smooth_cmd_vel',Twist,queue_size=5)
self.base_frame = rospy.get_param('~base_frame', '/base_link')
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
self.tf_listener = tf.TransformListener()
rospy.sleep(1)
self.tf_listener.waitForTransform(self.odom_frame,self.base_frame,rospy.Time(),rospy.Duration(60.0))
#define a bianliang
self.flag = rospy.get_param('~flag', True)
rospy.loginfo(self.test_distance)
#tf get position
#publish cmd_vel
def __init__(self,angle=0):
State.__init__(self, outcomes=['succeeded','aborted','preempted'],input_keys=["angle_in"])
self.speed = rospy.get_param('~speed', 0.10)
self.secretindigal = 1.0
self.tolerance = rospy.get_param('tolerance', math.radians(5))
self.start = True
self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
self.base_frame = rospy.get_param('~base_frame', '/base_link')
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
rospy.on_shutdown(self.shutdown)
self.rate = 30
self.start_test = True
self.r = rospy.Rate(self.rate)
self.tf_listener = tf.TransformListener()
rospy.sleep(0.5)
self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
def __init__(self, angle=0):
State.__init__(self, outcomes=['succeeded', 'aborted', 'preempted'], input_keys=["angle_in"])
self.speed = rospy.get_param('~speed', 0.03)
self.secretindigal = 1.0
self.tolerance = rospy.get_param('tolerance', math.radians(5))
self.start = True
self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
self.base_frame = rospy.get_param('~base_frame', '/base_link')
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
rospy.on_shutdown(self.shutdown)
self.rate = 30
self.start_test = True
self.r = rospy.Rate(self.rate)
self.angle=angle
self.tf_listener = tf.TransformListener()
rospy.sleep(0.5)
self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))