def talker():
pub = rospy.Publisher('chatter', String, queue_size=10)
rospy.init_node('talker', anonymous=True)
rate = rospy.Rate(10) # 10hz
while not rospy.is_shutdown():
hello_str = "hello world %s" % rospy.get_time()
rospy.loginfo(hello_str)
pub.publish(hello_str)
rate.sleep()
# rate = rospy.Rate(10)
# hello_str = "hello world"
# rospy.loginfo(hello_str)
# pub.publish(hello_str)
# rospy.spin()
# exit(0)
python类Rate()的实例源码
def run_driver():
# init moveit commander
moveit_commander.roscpp_initialize(sys.argv)
# specify move group
group = moveit_commander.MoveGroupCommander("arm")
# init ros node
rospy.init_node('real_servo_driver', anonymous=True)
print "============ Waiting for RVIZ..."
rospy.sleep(2)
# move grasper to init position
init_position(group)
# set ros publisher rate, 10hz = 10 seconds for a circle
rate = rospy.Rate(50)
while True:
group.set_random_target()
plan_msg = group.plan()
group.execute(plan_msg=plan_msg, wait=False)
rospy.sleep(5)
if is_exit:
break
# shutdown moveit commander
moveit_commander.roscpp_shutdown()
def init(self):
pygame.init()
clock = pygame.time.Clock()
screen = pygame.display.set_mode((250, 250))
rospy.init_node('highway_teleop')
self.rate = rospy.Rate(rospy.get_param('~hz', 10))
self.acc = rospy.get_param('~acc', 5)
self.yaw = rospy.get_param('~yaw', 0)
self.robot_pub = rospy.Publisher('robot_0/cmd_vel', Twist, queue_size=1)
print "Usage: \n \
up arrow: accelerate \n \
down arrow: decelerate \n \
left arrow: turn left \n \
right arrow: turn right"
def init(self):
pygame.init()
clock = pygame.time.Clock()
screen = pygame.display.set_mode((250, 250))
rospy.init_node('teleop')
self.rate = rospy.Rate(rospy.get_param('~hz', 20))
self.acc = rospy.get_param('~acc', 1)
self.yaw = rospy.get_param('~yaw', 0.25)
self.robot_pub = rospy.Publisher('control_command', controlCommand, queue_size=1)
self.path_record_pub = rospy.Publisher('record_state', \
RecordState, queue_size=1)
self.highway_game_start_pub = rospy.Publisher('highway_game_start', RecordState, queue_size=1)
print "Usage: \n \
up arrow: accelerate \n \
down arrow: decelerate \n \
left arrow: turn left \n \
right arrow: turn right"
def attach_springs(self):
"""
Switches to joint torque mode and attached joint springs to current
joint positions.
"""
# record initial joint angles
self._start_angles = self._limb.joint_angles()
# set control rate
control_rate = rospy.Rate(self._rate)
# for safety purposes, set the control rate command timeout.
# if the specified number of command cycles are missed, the robot
# will timeout and return to Position Control Mode
self._limb.set_command_timeout((1.0 / self._rate) * self._missed_cmds)
# loop at specified rate commanding new joint torques
while not rospy.is_shutdown():
if not self._rs.state().enabled:
rospy.logerr("Joint torque example failed to meet "
"specified control rate timeout.")
break
self._update_forces()
control_rate.sleep()
def wobble(self):
self.set_neutral()
"""
Performs the wobbling
"""
command_rate = rospy.Rate(1)
control_rate = rospy.Rate(100)
start = rospy.get_time()
while not rospy.is_shutdown() and (rospy.get_time() - start < 10.0):
angle = random.uniform(-2.0, 0.95)
while (not rospy.is_shutdown() and
not (abs(self._head.pan() - angle) <=
intera_interface.HEAD_PAN_ANGLE_TOLERANCE)):
self._head.set_pan(angle, speed=0.3, timeout=0)
control_rate.sleep()
command_rate.sleep()
self._done = True
rospy.signal_shutdown("Example finished.")
def robot_listener(self):
'''
rospy.wait_for_service('spawn')
spawner = rospy.ServiceProxy('spawn', turtlesim.srv.Spawn)
spawner(4, 2, 0, 'turtle2')
'''
robot_vel_pub = rospy.Publisher('robot_0/cmd_vel', geometry_msgs.msg.Twist, queue_size=1)
rate = rospy.Rate(10.0)
while not rospy.is_shutdown():
try:
(trans,rot) = self.listener.lookupTransform('/robot_3', '/robot_0', rospy.Time(0))
except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException):
continue
angular = 4 * math.atan2(trans[1], trans[0])
linear = 0.5 * math.sqrt(trans[0] ** 2 + trans[1] ** 2)
cmd = geometry_msgs.msg.Twist()
cmd.linear.x = linear
cmd.angular.z = angular
robot_vel_pub.publish(cmd)
rate.sleep()
def water_potential_hydrogen_callback(msg): # float -1 ~ 1
command = msg.data
# reset state to idle
actuator_state["pump_3_ph_up_1"] = False
actuator_state["pump_4_ph_down_1"] = False
# Set actuator_state based on command
if command > 0:
actuator_state["pump_3_ph_up_1"] = True
elif command < 0:
actuator_state["pump_4_ph_down_1"] = True
# nutrient_flora_duo_a is a "Rate" of dosage, so we can just change the dosage
# without resetting to "idle state" since that doesn't exist.
def compute_static_stability_thread():
rate = rospy.Rate(60)
global kron_com_vertices
while not rospy.is_shutdown():
if 'COM-static' in support_area_handles \
and not gui.show_com_support_area:
delete_support_area_display('COM-static')
if not motion_plan or not motion_plan.started \
or 'COM-static' in support_area_handles \
or not gui.show_com_support_area:
rate.sleep()
continue
color = (0.1, 0.1, 0.1, 0.5)
req = contact_stability.srv.StaticAreaRequest(
contacts=convert_contacts_to_ros(motion_plan.cur_stance.contacts),
mass=robot.mass, z_out=motion_plan.com_height)
try:
res = compute_com_area(req)
vertices = [array([v.x, v.y, v.z]) for v in res.vertices]
update_support_area_display('COM-static', vertices, [], color)
except rospy.ServiceException:
delete_support_area_display('COM-static')
rate.sleep()
def run(self):
r=rospy.Rate(2)
while self.is_looping():
if self.pub_audio_.get_num_connections() == 0:
if self.isSubscribed:
rospy.loginfo('Unsubscribing from audio bridge as nobody listens to the topics.')
self.release()
continue
if not self.isSubscribed:
self.reconfigure(self.config, 0)
r.sleep()
if self.isSubscribed:
self.release()
self.myBroker.shutdown()
def __init__(self):
"""Constructor for the class
initialize topic subscription and
instance variables
"""
self.r = rospy.Rate(5)
self.publisher = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
rospy.Subscriber('/scan', LaserScan, self.process_scan)
# user chooses which parking mode to be performed
self.is_parallel = rospy.get_param('~parallel', False)
# Instance Variables
self.timestamp1 = None
self.timestamp2 = None
self.dist2Neato = None
self.dist2Wall = None
self.widthOfSpot = None
self.twist = None
self.radius = None
# Adjusment to be made before moving along the arc
self.adjustment = 0
self.isAligned = False
def run(self, distance, angle, snap_to, options):
""" Run our code """
rospy.loginfo("Start test code")
self.task_subscriber = rospy.Subscriber("/srcsim/finals/task", Task, self.task_status)
rate = rospy.Rate(1) # 10hz
if self.task_subscriber.get_num_connections() == 0:
rospy.loginfo('waiting for task publisher...')
while self.task_subscriber.get_num_connections() == 0:
rate.sleep()
if distance > 0.0001 or distance < -0.005:
self.zarj_os.walk.forward(distance, True)
while not self.zarj_os.walk.walk_is_done():
rospy.sleep(0.01)
if abs(angle) > 0.0 or "turn" in options:
align = "align" in options
small_forward = 0.005 if align else None
self.zarj_os.walk.turn(angle, snap_to, align_to_snap = align, small_forward = small_forward)
while not self.zarj_os.walk.walk_is_done():
rospy.sleep(0.01)
find_cylinder_state.py 文件源码
项目:master_robot_strage
作者: nwpu-basketball-robot
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def get_cylinder_status(self):
self.cylinder_laser_client.wait_for_service()
self.cylinder_opencv_client.wait_for_service()
flag = 0
r = rospy.Rate(2)
while not rospy.is_shutdown() and flag != 1:
res_opencv = self.cylinder_opencv_client()
res_laser = self.cylinder_laser_client()
x_laser = res_laser.x
theta_laser = res_laser.theta
theta_opencv = res_opencv.theta
if abs(theta_laser - theta_opencv) < 0.01:
flag = 1
break
r.sleep()
return (x_laser,theta_laser)
old_linear_move.py 文件源码
项目:master_robot_strage
作者: nwpu-basketball-robot
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def __init__(self):
rospy.loginfo('[robot_move_pkg]->linear_move is initial')
#???????????????
self.robot_state = robot_state.robot_position_state()
self.cmd_move_pub = rospy.Publisher('/cmd_move', g_msgs.Twist, queue_size = 100)
self.rate = rospy.Rate(150)
#???????????
self.stop_tolerance = config.high_speed_stop_tolerance
self.angular_tolerance = config.high_turn_angular_stop_tolerance
#?????????????
self.accurate_turn_an_angular = turn_an_angular.turn_an_angular()
self.x_speed = 0.0
self.y_speed = 0.0
self.w_speed = config.high_turn_angular_speed
#???????
self.linear_sp = spline_func.growth_curve()
self.amend_speed = 0.12
go_along_circle.py 文件源码
项目:master_robot_strage
作者: nwpu-basketball-robot
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def __init__(self):
#?????????? m/s
self.move_cmd_pub = rospy.Publisher('cmd_move_robot',g_msgs.Twist,queue_size=100)
self.move_speed = config.go_along_circle_speed
self.get_position = robot_state.robot_position_state()
#????????? rad
self.stop_tolerance = config.go_along_circle_angular_tolerance
#????????
rospy.on_shutdown(self.brake)
# ??sleep ??? ???????????
self.rate = 100.0
self.R = rospy.Rate(int(self.rate))
self.MODE = { 1:(-1, 1),
2:( 1,-1),
3:( 1, 1),
4:(-1,-1)}
go_close_line.py 文件源码
项目:master_robot_strage
作者: nwpu-basketball-robot
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def correct_angle(self):
# rospy.loginfo("sadasdafasf")
(x,theta,if_close_line) = self.close_line_cmd.find_line()
r = rospy.Rate(50)
move_velocity = g_msgs.Twist()
if theta > 0:
move_velocity.angular.z = -0.10
else:
move_velocity.angular.z = 0.10
while not rospy.is_shutdown():
(x,theta,if_close_line) = self.close_line_cmd.find_line()
self.cmd_move_pub.publish(move_velocity)
#????????????????
if theta < 0.02 and theta > -0.02:
rospy.loginfo("will Stop!!!!!!!!!!")
self.brake()
break
r.sleep()
#??3??????
go_close_line.py 文件源码
项目:master_robot_strage
作者: nwpu-basketball-robot
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def go_to_home(self):
(x,theta,if_close_line) = self.close_line_cmd.find_line()
r = rospy.Rate(50)
move_velocity = g_msgs.Twist()
while not rospy.is_shutdown():
(x,theta,if_close_line) = self.close_line_cmd.find_line()
move_velocity.linear.y = -0.3
self.cmd_move_pub.publish(move_velocity)
rospy.loginfo("python: y = %s",move_velocity.linear.y)
if if_close_line != 0:
rospy.loginfo("will Stop!!!!!!!!!!")
self.brake()
break
r.sleep()
#????????
def __init__(self):
rospy.loginfo('[robot_move_pkg]->linear_move is initial')
#???????????????
self.robot_state = robot_state.robot_position_state()
self.cmd_move_pub = rospy.Publisher('/cmd_move', g_msgs.Twist, queue_size = 100)
self.rate = rospy.Rate(150)
#???????????
self.stop_tolerance = config.high_speed_stop_tolerance
self.angular_tolerance = config.high_turn_angular_stop_tolerance
#?????????????
self.accurate_turn_an_angular = turn_an_angular.turn_an_angular()
self.x_speed = 0.0
self.y_speed = 0.0
self.w_speed = config.high_turn_angular_speed
#???????
self.linear_sp = spline_func.growth_curve()
self.amend_speed = 0.18
move_in_robot.py 文件源码
项目:master_robot_strage
作者: nwpu-basketball-robot
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def turn(self, goal_angular):
# ???????????????,??????????????
# ????,????
rospy.loginfo('[robot_move_pkg]->move_in_robot.turn will turn %s'%goal_angular)
current_angular = start_angular = self.robot_state.get_robot_current_w()#??????????
r = rospy.Rate(100)
delta_angular = current_angular - start_angular
move_velocity = g_msgs.Twist()
while not rospy.is_shutdown() :
if abs(delta_angular - abs(goal_angular)) < self.turn_move_stop_tolerance:
break
current_angular = self.robot_state.get_robot_current_w()
move_velocity.angular.z = math.copysign(self.w_speed, goal_angular)
delta_angular += abs(abs(current_angular) - abs(start_angular) )
start_angular = current_angular
self.cmd_move_pub.publish(move_velocity) #???????????
r.sleep()
self.brake()
find_volleyball.py 文件源码
项目:master_robot_strage
作者: nwpu-basketball-robot
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def find_volleyball(self):
self.find_ball_client.wait_for_service()
res = self.find_ball_client(False)
x = res.z
y = -res.x
theta = -res.theta
if_volleyball = res.if_volleyball
r = rospy.Rate(50)
while not if_volleyball == True:
res = self.find_ball_client(False)
x = res.z
y = -res.x
theta = -res.theta
if_volleyball = res.if_volleyball
if if_volleyball == True:
return (x,y,theta)
#??????????????????????
#??????????
def simulate(self):
loginfo("Beginning simulation")
rate = Rate(2)
self.temperature = 23.0
while not self.increased and not self.finished:
self.temperature += 0.1
rate.sleep()
loginfo(self.temperature)
while not self.decreased and not self.finished:
self.temperature -= 0.1
rate.sleep()
loginfo(self.temperature)
while self.temperature < 22.5 and not self.finished:
self.temperature += 0.1
rate.sleep()
loginfo(self.temperature)
loginfo("Simulation ended.")
def start_simulation(self):
loginfo("Beginning simulation")
rate = rospy.Rate(1)
while not self.on_fire:
rate.sleep()
loginfo("{} persons in building!".format(self.persons))
while not self.persons <= 0:
self.persons -= 1
loginfo("{} persons in building!".format(self.persons))
rate.sleep()
loginfo("Building is empty!")
while not self.fire_department_arrived:
rate.sleep()
loginfo("Fire department has arrived!")
rate.sleep()
self.on_fire = False
loginfo("Simulation ended.")
def __init__(self, loop_time = 0.1, pubs = {}, subs = {}):
"""init args: pubs: dict with topic / [type,], subs: dict with topic / [type, callback]"""
smp_thread.__init__(self, loop_time = loop_time)
# now init ros node
rospy.init_node(self.name, anonymous=True)
# loop frequency / sampling rate
self.rate = rospy.Rate(1./self.loop_time)
# local pub / sub
self.default_queue_size_pub = 2
self.default_queue_size_sub = 2
if len(pubs) == 0 and len(subs) == 0:
self.pub_sub_local_legacy()
else:
self.pub_sub_local(pubs, subs)
# print "smp_thread_ros pubs", self.pub
# print "smp_thread_ros subs", self.sub
def attach_springs(self):
"""
Switches to joint torque mode and attached joint springs to current
joint positions.
"""
# record initial joint angles
self._des_angles = self._limb.joint_angles()
# set control rate
control_rate = rospy.Rate(self._rate)
# for safety purposes, set the control rate command timeout.
# if the specified number of command cycles are missed, the robot
# will timeout and disable
self._limb.set_command_timeout((1.0 / self._rate) * self._missed_cmds)
# loop at specified rate commanding new joint torques
while not rospy.is_shutdown():
if not self._rs.state().enabled:
rospy.logerr("Joint torque example failed to meet "
"specified control rate timeout.")
break
self._update_forces()
control_rate.sleep()
def execute():
# define publisher and its topic
pub = rospy.Publisher('write_angles',Angles,queue_size = 10)
rospy.init_node('write_angles_node',anonymous = True)
rate = rospy.Rate(10)
# write 4 angles
if len(sys.argv) == 5:
s1 = int(sys.argv[1])
s2 = int(sys.argv[2])
s3 = int(sys.argv[3])
s4 = int(sys.argv[4])
pub.publish(s1,s2,s3,s4)
else:
raiseError()
rate.sleep()
# main function
stacking_blocks.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def zero_sensor(self):
rospy.loginfo("Zeroing sensor...")
# Wait a little bit until we get a message from the sensor
rospy.sleep(1)
self.tip_offset, self.inside_offset = (np.zeros_like(self.tip),
np.zeros_like(self.inside))
inside_vals, tip_vals = [], []
r = rospy.Rate(10)
while not rospy.is_shutdown() and len(inside_vals) < 10:
inside, tip = self.inside, self.tip
# If there are zero values (most likely becase a message
# has not yet been received), skip that. We could also
# initialize them with nans to find out if there's a
# problem
if all(inside) and all(tip):
inside_vals.append(inside)
tip_vals.append(tip)
r.sleep()
# Center around 5000, so ranges are similar to when not centering
self.inside_offset = np.min(inside_vals, axis=0) - 5000
self.tip_offset = np.min(tip_vals, axis=0) - 5000
rospy.loginfo("Zeroing finished")
demo_graspEventdetect.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def limb_pose(limb_name):
"""Get limb pose at time of OK cuff button press."""
button = CuffOKButton(limb_name)
rate = rospy.Rate(20) # rate at which we check whether button was
# pressed or not
rospy.loginfo(
'Waiting for %s OK cuff button press to save pose' % limb_name)
while not button.pressed and not rospy.is_shutdown():
rate.sleep()
joint_pose = baxter_interface.Limb(limb_name).joint_angles()
# Now convert joint coordinates to end effector cartesian
# coordinates using forward kinematics.
kinematics = baxter_kinematics(limb_name)
endpoint_pose = kinematics.forward_position_kinematics(joint_pose)
# How is this different from
# baxter_interface.Limb(limb_name).endpoint_pose()
print()
print('baxter_interface endpoint pose:')
print(baxter_interface.Limb(limb_name).endpoint_pose())
print('pykdl forward kinematics endpoint pose:')
print(endpoint_pose)
print()
return endpoint_pose
safety_stop.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def zero_sensor(self):
rospy.loginfo("Zeroing sensor...")
# Wait a little bit until we get a message from the sensor
rospy.sleep(1)
self.tip_offset, self.inside_offset = (np.zeros_like(self.tip),
np.zeros_like(self.inside))
inside_vals, tip_vals = [], []
r = rospy.Rate(10)
while not rospy.is_shutdown() and len(inside_vals) < 10:
inside, tip = self.inside, self.tip
# If there are zero values (most likely becase a message
# has not yet been received), skip that. We could also
# initialize them with nans to find out if there's a
# problem
if all(inside) and all(tip):
inside_vals.append(inside)
tip_vals.append(tip)
r.sleep()
# Center around 5000, so ranges are similar to when not centering
self.inside_offset = np.min(inside_vals, axis=0) - 5000
self.tip_offset = np.min(tip_vals, axis=0) - 5000
rospy.loginfo("Zeroing finished")
torque_controller.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def attach_springs(self):
self._start_angles = self._get_cmd_positions()
control_rate = rospy.Rate(self._rate)
# for safety purposes, set the control rate command timeout.
# if the specified number of command cycles are missed, the robot
# will timeout and disable
self._limb.set_command_timeout((1.0 / self._rate) * self._missed_cmds)
error = [self._limb.joint_angles()[key] - self._start_angles[key] for key in self._limb.joint_angles().keys()]
self.sum_sqr_error = sum([error[i]**2 for i in range(len(error))])
print ('-------new set of joint position---------')
print (self.sum_sqr_error)
tolerance = 0.020
# loop at specified rate commanding new joint torques
while self.sum_sqr_error>tolerance and not rospy.is_shutdown():
if not self._rs.state().enabled:
rospy.logerr("Joint torque example failed to meet "
"specified control rate timeout.")
break
self._update_forces()
control_rate.sleep()
demo_graspsuccessExp.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def limb_pose(limb_name):
"""Get limb pose at time of OK cuff button press."""
button = CuffOKButton(limb_name)
rate = rospy.Rate(20) # rate at which we check whether button was
# pressed or not
rospy.loginfo(
'Waiting for %s OK cuff button press to save pose' % limb_name)
while not button.pressed and not rospy.is_shutdown():
rate.sleep()
joint_pose = baxter_interface.Limb(limb_name).joint_angles()
# Now convert joint coordinates to end effector cartesian
# coordinates using forward kinematics.
kinematics = baxter_kinematics(limb_name)
endpoint_pose = kinematics.forward_position_kinematics(joint_pose)
#print (endpoint_pose)
return endpoint_pose