def test_publish_to_topics(self):
topic_ending = "desired"
rospy.logdebug("Sleeping for 5 seconds to let ROS spin up...")
rospy.sleep(5)
for variable, value in self.variables:
# Publish to each variable/desired topic to see if all of the
# actuators come on as expected.
topic_string = variable + "/" + topic_ending
rospy.logdebug("Testing Publishing to " + topic_string)
pub_desired = rospy.Publisher(topic_string,
Float64, queue_size=10)
sub_desired = rospy.Subscriber(topic_string, Float64, self.callback)
rospy.sleep(2)
print(self.namespace + "/" + topic_string)
for _ in range(NUM_TIMES_TO_PUBLISH):
pub_desired.publish(value)
rospy.sleep(1)
rospy.sleep(2)
pub_desired.publish(0)
python类sleep()的实例源码
def ros_loop(test):
while True:
rospy.Subscriber('human_turn_topic', String, human_turn)
rospy.Subscriber('human_chosen_topic', String, have_chosen)
rospy.Subscriber('human_predict_turn_topic', String, human_predict)
rospy.Subscriber('robot_turn_topic', String, robot_turn)
rospy.Subscriber('robot_chosen_topic', String, have_chosen)
rospy.Subscriber('story_telling', String, new_phrase)
rospy.Subscriber('new_element', String, new_element)
rospy.sleep(0.1)
rospy.spin()
################################################
def ros_loop(test):
while True:
rospy.Subscriber('human_turn_topic', String, human_turn)
rospy.Subscriber('human_chosen_topic', String, have_chosen)
rospy.Subscriber('human_predict_turn_topic', String, human_predict)
rospy.Subscriber('robot_turn_topic', String, robot_turn)
rospy.Subscriber('robot_chosen_topic', String, have_chosen)
rospy.Subscriber('story_telling', String, new_phrase)
rospy.Subscriber('new_element', String, new_element)
rospy.sleep(0.1)
rospy.spin()
################################################
def ros_loop(test):
while True:
rospy.Subscriber('human_turn_topic', String, human_turn)
rospy.Subscriber('human_chosen_topic', String, have_chosen)
rospy.Subscriber('human_predict_turn_topic', String, human_predict)
rospy.Subscriber('robot_turn_topic', String, robot_turn)
rospy.Subscriber('robot_chosen_topic', String, have_chosen)
rospy.Subscriber('story_telling', String, new_phrase)
rospy.Subscriber('new_element', String, new_element)
rospy.sleep(0.1)
rospy.spin()
################################################
def record_motion(self):
for countdown in ['ready?', 3, 2, 1, "go"]:
self.say('{}'.format(countdown), blocking=False)
rospy.sleep(1)
self.arm.recorder.start(10)
rospy.loginfo("Recording...")
choice = ""
while choice != 'stop' and not rospy.is_shutdown():
choice = self.read_user_input(['stop'])
joints, eef = self.arm.recorder.stop()
self.say('Recorded', blocking=True)
if len(joints.joint_trajectory.points) == 0:
self.say('This demo is empty, please retry')
else:
target_id = self.promp.add_demonstration(joints, eef)
self.say('Added to Pro MP {}'.format(target_id), blocking=False)
def __init__(self, context):
super(RqtCalibrationMovements, self).__init__(context)
# Give QObjects reasonable names
self.setObjectName('LocalMover')
rospy.sleep(1.0)
# Process standalone plugin command-line arguments
from argparse import ArgumentParser
parser = ArgumentParser()
# Add argument(s) to the parser.
parser.add_argument("-q", "--quiet", action="store_true",
dest="quiet",
help="Put plugin in silent mode")
args, unknowns = parser.parse_known_args(context.argv())
if not args.quiet:
print 'arguments: ', args
print 'unknowns: ', unknowns
# Create QWidget
self._widget = CalibrationMovementsGUI()
if context.serial_number() > 1:
self._widget.setWindowTitle(self._widget.windowTitle() + (' (%d)' % context.serial_number()))
# Add widget to the user interface
context.add_widget(self._widget)
def test_light_interface(light_name='head_green_light'):
"""Blinks a desired Light on then off."""
l = Lights()
rospy.loginfo("All available lights on this robot:\n{0}\n".format(
', '.join(l.list_all_lights())))
rospy.loginfo("Blinking Light: {0}".format(light_name))
on_off = lambda x: 'ON' if l.get_light_state(x) else 'OFF'
rospy.loginfo("Initial state: {0}".format(on_off(light_name)))
# turn on light
l.set_light_state(light_name, True)
rospy.sleep(1)
rospy.loginfo("New state: {0}".format(on_off(light_name)))
# turn off light
l.set_light_state(light_name, False)
rospy.sleep(1)
rospy.loginfo("New state: {0}".format(on_off(light_name)))
# turn on light
l.set_light_state(light_name, True)
rospy.sleep(1)
rospy.loginfo("New state: {0}".format(on_off(light_name)))
# reset output
l.set_light_state(light_name, False)
rospy.sleep(1)
rospy.loginfo("Final state: {0}".format(on_off(light_name)))
def revalidate(self, timeout, invalidate_state=True, invalidate_config=True):
"""
invalidate the state and config topics, then wait up to timeout
seconds for them to become valid again.
return true if both the state and config topic data are valid
"""
if invalidate_state:
self.invalidate_state()
if invalidate_config:
self.invalidate_config()
timeout_time = rospy.Time.now() + rospy.Duration(timeout)
while not self.is_state_valid() and not rospy.is_shutdown():
rospy.sleep(0.1)
if timeout_time < rospy.Time.now():
rospy.logwarn("Timed out waiting for node interface valid...")
return False
return True
def connect(self):
try:
print "Connecting to Arduino on port", self.port, "..."
self.port = Serial(port=self.port, baudrate=self.baudrate, timeout=self.timeout, writeTimeout=self.writeTimeout)
# The next line is necessary to give the firmware time to wake up.
time.sleep(1)
test = self.get_baud()
if test != self.baudrate:
time.sleep(1)
test = self.get_baud()
if test != self.baudrate:
raise SerialException
print "Connected at", self.baudrate
print "Arduino is ready."
except SerialException:
print "Serial Exception:"
print sys.exc_info()
print "Traceback follows:"
traceback.print_exc(file=sys.stdout)
print "Cannot connect to Arduino!"
os._exit(1)
def handle_wait_event(self, goal):
d = datetime.datetime.fromtimestamp(rospy.get_time())
query_result_count = 0;
while query_result_count == 0 and self.wait_for_event_server.is_active() == True:
if self.wait_for_event_server.is_preempt_requested():
result = WaitForEventResult()
result.result = False
result.error_msg = 'The client cancel goal.'
self.wait_for_event_server.set_preempted(result)
return result
for i in range(len(goal.event_name)):
memory_name = goal.event_name[i]
memory_query = json.loads(goal.query[i])
memory_query['time'] = {"$gte": d}
query_result = self.collector[memory_name].find(memory_query)
query_result_count += query_result.count()
rospy.sleep(0.2)
result = WaitEventResult()
result.result = True
self.wait_for_event_server.set_succeeded(result)
def zeroData(self):
self.RGripRFingerForceMean = None
self.RGripRFingerForceRecent = []
self.accelMean = None
self.accelRecent = []
self.temperatureMean = None
self.temperatureRecent = []
self.contactmicMean = None
self.contactmicRecent = []
self.zeroing = True
self.statePublisher.publish('zeroing')
while self.RGripRFingerForceMean is None or self.accelMean is None or self.temperatureMean is None or self.contactmicMean is None:
rospy.sleep(0.01)
self.statePublisher.publish('stop')
self.zeroing = False
print 'Data zeroed'
def setUp(self):
self.client = UIClient()
self.node = UINode()
self.node.start()
self.msg = None
'''
self.instruct_pub = rospy.Publisher(
nb_channels.Messages.instruct.value,
String,
queue_size=10
)
'''
self.subscribe()
rospy.sleep(0.1)
def setUp(self, MockSequence):
server = TaskServer('needybot')
self.server_client = TaskServerClient('needybot')
self.boot = MockSequence()
server.add_boot_sequence(self.boot)
self.idle = Task("idle")
self.task_one = Task("task_one")
self.idle.steps['load'].entered_handler = MagicMock()
self.idle.instruct = MagicMock()
self.task_one.steps['load'].entered_handler = MagicMock()
self.task_one.steps['abort'].entered_handler = MagicMock()
self.task_one.instruct = MagicMock()
self.server_client.boot(EmptyRequest())
rospy.sleep(0.1)
def monitor(self):
# Wait until subscriber on instruct message is present
notified = False
while not rospy.is_shutdown():
_, subscribers, _ = Master('/needybot').getSystemState()
if dict(subscribers).get(nb_channels.Messages.instruct.value) is not None:
if self.is_connected == False:
self.connected_pub.publish()
self.is_connected = True
else:
if self.is_connected or not notified:
notified = True
self.disconnected_pub.publish()
self.is_connected = False
rospy.sleep(0.1)
def neck_control(self, positions, wait=True):
""" Control all 3 points of the head:
lowerNeckPitch - 0 to 0.5 - looks down
neckYaw - -1.0 to 1.0 - looks left and right
upperNeckPitch - -0.5 to 0.0 - looks up
"""
trajectories = []
for p in positions:
trajectories.append(self.zarj.make_joint(self.MOVE_TIME, p))
msg = NeckTrajectoryRosMessage()
msg.joint_trajectory_messages = trajectories
msg.unique_id = self.zarj.uid
self.neck_publisher.publish(msg)
if wait:
rospy.sleep(self.MOVE_TIME + 0.1)
def set_arm_configuration(self, sidename, joints, movetime = None):
if sidename == 'left':
side = ArmTrajectoryRosMessage.LEFT
elif sidename == 'right':
side = ArmTrajectoryRosMessage.RIGHT
else:
rospy.logerr("Unknown side {}".format(sidename))
return
if movetime is None:
movetime = self.ARM_MOVE_TIME
for i in range(len(joints)):
if math.isnan(joints[i]):
joints[i] = self.last_arms[side][i]
msg = self.make_arm_trajectory(side, ArmTrajectoryRosMessage.OVERRIDE, movetime, joints)
self.last_arms[side] = deepcopy(joints)
log('Setting {} arm to {}'.format(sidename, joints))
self.arm_trajectory_publisher.publish(msg)
rospy.sleep(movetime)
def test(self):
# move_arm(self)
# move_hand(self)
# self.plot_arms()
rospy.sleep(0.1)
# self.mashButton()
rospy.sleep(0.1)
self.close_grasp(RIGHT)
rospy.sleep(0.1)
self.open_grasp(RIGHT)
rospy.sleep(0.1)
self.close_grasp(LEFT)
rospy.sleep(0.1)
self.open_grasp(LEFT)
rospy.sleep(0.1)
self.close_grasp(LEFT)
def start_processing(self):
""" Start processing data """
if self.disabled:
rospy.loginfo("Processing started")
self.disabled = False
if self.sub_left is None:
self.sub_left = rospy.Subscriber(
"/multisense/camera/left/image_color", Image,
self.left_color_detection)
rospy.sleep(0.1)
if self.sub_right is None:
self.sub_right = rospy.Subscriber(
"/multisense/camera/right/image_color", Image,
self.right_color_detection)
rospy.sleep(0.1)
if self.sub_cloud is None:
self.sub_cloud = rospy.Subscriber(
"/multisense/image_points2_color", PointCloud2,
self.stereo_cloud)
def run(self, distance, angle, snap_to, options):
""" Run our code """
rospy.loginfo("Start test code")
self.task_subscriber = rospy.Subscriber("/srcsim/finals/task", Task, self.task_status)
rate = rospy.Rate(1) # 10hz
if self.task_subscriber.get_num_connections() == 0:
rospy.loginfo('waiting for task publisher...')
while self.task_subscriber.get_num_connections() == 0:
rate.sleep()
if distance > 0.0001 or distance < -0.005:
self.zarj_os.walk.forward(distance, True)
while not self.zarj_os.walk.walk_is_done():
rospy.sleep(0.01)
if abs(angle) > 0.0 or "turn" in options:
align = "align" in options
small_forward = 0.005 if align else None
self.zarj_os.walk.turn(angle, snap_to, align_to_snap = align, small_forward = small_forward)
while not self.zarj_os.walk.walk_is_done():
rospy.sleep(0.01)
def start(self, _=None):
""" Start the macro """
joints = deepcopy(LimbTypes.arm_styles['pass_football'])
joints = LimbTypes.invert_arm_configuration('right', joints)
log("Setting right hand to pass a football")
self.fc.zarj.hands.set_arm_configuration('right', joints)
joints = deepcopy(LimbTypes.arm_styles['tuck'])
joints = LimbTypes.invert_arm_configuration('left', joints)
log("Tucking left arm, setting left hand to opposition")
self.fc.zarj.hands.set_arm_configuration('left', joints)
joints = deepcopy(LimbTypes.hand_styles['oppose'])
joints = LimbTypes.invert_hand_configuration('left', joints)
self.fc.zarj.hands.set_hand_configuration('left', joints)
log("Looking down")
self.fc.zarj.neck.neck_control([0.5, 0, 0], True)
rospy.sleep(0.5)
self.fc.send_stereo_camera()
log("Pick the two top corners of the array handle.")
self.done = True
def start(self, _=None):
""" Start the macro """
joints = deepcopy(LimbTypes.hand_styles['open'])
joints = LimbTypes.invert_hand_configuration('left', joints)
self.fc.zarj.hands.set_hand_configuration('left', joints)
log("Looking down and left")
self.fc.zarj.neck.neck_control([0.5, 1.0, 0], True)
rospy.sleep(0.5)
self.fc.clear_points()
if not self.chain_okay:
self.fc.send_stereo_camera()
log("Click the button.")
self.done = True
def start(self, _=None):
""" Start the macro """
anchor = self.fc.assure_anchor("center")
if anchor is None:
raise ZarjConfused("Cannot find the choke")
x = anchor.adjusted[0]
y = anchor.adjusted[1] + (.05 * math.sin(math.radians(anchor.angle)))
z = anchor.adjusted[2] + 0.14
log("Commanding hand above cable end")
msg = ZarjMovePalmCommand('right', False, x, y, z, -1 * anchor.angle, 20, -90, True)
self.fc.process_palm_msg(msg)
self.done = True
log("Giving the hand a long time to settle down.")
rospy.sleep(3.0)
def start(self, _=None):
""" Start the macro """
anchor = self.fc.assure_anchor("plugin_start")
if anchor is None:
raise ZarjConfused("Cannot find the plug")
x = anchor.adjusted[0]
y = anchor.adjusted[1] + 0.05
z = anchor.adjusted[2]
log("Commanding hand above and left of plug; x {}, y {}, z {}".format(x, y, z))
msg = ZarjMovePalmCommand('right', False, x, y, z, 0, 0, 45, True)
self.fc.process_palm_msg(msg)
self.done = True
log("Giving the hand a long time to settle down.")
rospy.sleep(3.0)
def _spin_wheel(self):
""" Spin the wheel. """
joints = deepcopy(LimbTypes.arm_styles['door_out'])
joints = LimbTypes.invert_arm_configuration('left', joints)
self.fc.zarj.hands.set_arm_configuration('left', joints, 1.0)
rospy.sleep(0.5)
joints = deepcopy(LimbTypes.arm_styles['door_top'])
joints = LimbTypes.invert_arm_configuration('left', joints)
self.fc.zarj.hands.set_arm_configuration('left', joints, 1.0)
rospy.sleep(0.5)
joints = deepcopy(LimbTypes.arm_styles['door_bottom'])
joints = LimbTypes.invert_arm_configuration('left', joints)
self.fc.zarj.hands.set_arm_configuration('left', joints, 1.5)
rospy.sleep(0.5)
def start(self, _=None):
""" Start the macro """
joints = deepcopy(LimbTypes.arm_styles['pass_football'])
joints = LimbTypes.invert_arm_configuration('right', joints)
log("Setting right hand to pass a football")
self.fc.zarj.hands.set_arm_configuration('right', joints)
joints = deepcopy(LimbTypes.arm_styles['king_tut'])
joints = LimbTypes.invert_arm_configuration('left', joints)
log("Extending left arm, setting left hand to grab down")
self.fc.zarj.hands.set_arm_configuration('left', joints)
joints = deepcopy(LimbTypes.hand_styles['open_down'])
joints = LimbTypes.invert_hand_configuration('left', joints)
self.fc.zarj.hands.set_hand_configuration('left', joints)
rospy.sleep(0.5)
self.fc.send_stereo_camera()
log("Pick two points along the _GRIP_, long way (antenna <--> base) of "
"the detector. Generally just a little in toward the robot from "
"the center line")
def start(self, _=None):
""" Start the macro """
anchor = self.fc.assure_anchor("center")
if anchor is None:
return False
x = anchor.adjusted[0]
y = anchor.adjusted[1]
z = 0.838 + 0.1
log("Commanding hand to position just above object")
msg = ZarjMovePalmCommand('left', False, x, y, z, -1 * anchor.angle, 25, 90, True)
self.fc.process_palm_msg(msg)
log("Giving it a while to settle")
rospy.sleep(3.0)
log("Commanding hand onto object")
msg = ZarjMovePalmCommand('left', False, x, y, z - .08, -1 * anchor.angle, 25, 90, True)
self.fc.process_palm_msg(msg)
rospy.sleep(0.5)
first_project_state.py 文件源码
项目:master_robot_strage
作者: nwpu-basketball-robot
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def execute(self, ud):
rospy.loginfo('Start Return to home!!')
if self.preempt_requested():
self.service_preempt()
return 'failed'
(current_x,current_y) = self.cmd_position.get_robot_current_x_y()
self.cmd_move.move_to(x = 2-current_x,y = -1.5-current_y)
# self.cmd_move.move_to(y = -current_y)
self.cmd_turn.turn_to(math.pi)
rospy.sleep(0.5)
self.cmd_return.go_close_line()
# self.cmd_move.move_to(x = -2.6)
return 'successed'
#??????????????????????
first_project_state.py 文件源码
项目:master_robot_strage
作者: nwpu-basketball-robot
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def execute(self, ud):
rospy.loginfo("Start Shovel ball!!!!!")
if self.preempt_requested():
self.service_preempt()
return 'failed'
self.cmd_shovel.control_shovel(control_type=0)
rospy.sleep(0.5)
return 'successed'
############################################
###########pass_ball_first##################
############################################
#?????????????????????
second_project_state.py 文件源码
项目:master_robot_strage
作者: nwpu-basketball-robot
项目源码
文件源码
阅读 15
收藏 0
点赞 0
评论 0
def execute(self, ud):
if self.preempt_requested():
self.service_preempt()
return 'failed'
rospy.loginfo("x = %s"%ud.column_x)
rospy.loginfo("theta = %s"%ud.column_theta)
self.move_cmd.move_to(x = ud.column_x - 2.3)
self.move_cmd.move_to( yaw=ud.column_theta)
rospy.sleep(1)
(x,column_theta) = find_cylinder_state.find_cylinder_state().find_cylinder()
# self.move_cmd.move_to(x - 2.3)
self.move_cmd.move_to( yaw= column_theta)
return 'successed'
############################################
###############Find Ball####################
############################################
#???????????????????????
# ball_x ????????x??
# ball_y ????????y??
# ball_theta ?????x????
#??? ???????????????x????????
second_project_state.py 文件源码
项目:master_robot_strage
作者: nwpu-basketball-robot
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def execute(self, ud):
if self.preempt_requested():
self.service_preempt()
return 'failed'
rospy.sleep(0.5)
(x,y,theta) = find_volleyball.find_volleyball().find_volleyball()
rospy.loginfo("x = %s,y = %s ",x,y)
self.move_cmd.turn_to(theta*0.95)
self.move_cmd.move_to(x = math.sqrt(x*x+y*y) - 0.25)
# self.move_cmd.move_to(y = y)
# self.move_cmd.move_to(x = x - 0.2 )
return 'successed'
############################################
################Control#####################
############################################