def run(self):
"""Send an action at a 40Hz cycle."""
rospy.init_node('action_manager', anonymous=True)
rospy.Subscriber('action_cmd', Twist, self.update_action)
rospy.Subscriber('termination', Bool, self.set_termination_flag)
rospy.Subscriber('pause', Bool, self.set_pause_flag)
action_publisher = rospy.Publisher('cmd_vel_mux/input/teleop',
Twist,
queue_size=1)
action_pub_rate = rospy.Rate(40)
while not rospy.is_shutdown():
if self.termination_flag:
break
if self.pause_flag is False:
# log action
speeds = (self.action.linear.x, self.action.angular.z)
actn = "linear: {}, angular: {}".format(*speeds)
rospy.logdebug("Sending action to Turtlebot: {}".format(actn))
# send new actions
if self.stop_once:
action_publisher.publish(self.STOP_ACTION)
self.stop_once = False
else:
action_publisher.publish(self.action)
action_pub_rate.sleep()
python类Bool()的实例源码
def publish_state(self):
pub = rospy.Publisher('/env/'+self.name+'/lamp/visual/get_state', Bool, queue_size=1)
rate = rospy.Rate(50) # 50hz
while not rospy.is_shutdown():
pub.publish(self._on)
rate.sleep()
def detect_and_visualize(self, root_dir=None, extension=None,
classes=[], thresh=0.6, show_timer=False):
global imgi
global img_rect
global Frame
global show
global trackpos
global imshow
global acc_pub
global acc_enable
acc_pub = rospy.Publisher("acc_cmd", ACCCommand, queue_size=2)
acc_enable = rospy.Publisher("acc_enable", Bool, queue_size=2)
# rospy.Timer(rospy.Duration(0.02), self.trackCallback)
rospy.Subscriber("/zed/left/image_rect_color/compressed",CompressedImage, self.ImgCcallback, queue_size = 4)
# rospy.Subscriber("/zed/left/image_rect_color",Image, self.Imgcallback, queue_size = 4)
rospy.Subscriber("/zed/depth/depth_registered",Image, self.DepthImgcallback, queue_size = 4)
im_path = '/home/wolfram/mxnet/example/ssd/data/demo/dog.jpg'
with open(im_path, 'rb') as fp:
img_content = fp.read()
trackpos = 0
imshow = 0
imgi = mx.img.imdecode(img_content)
while(1):
# ret,img_rect = cap.read()
dets = self.im_detect(root_dir, extension, show_timer=show_timer)
# if not isinstance(im_list, list):
# im_list = [im_list]
# assert len(dets) == len(im_list)
# for k, det in enumerate(dets):
# img[:, :, (0, 1, 2)] = img[:, :, (2, 1, 0)]
# img = img_rect.copy()
self.visualize_detection(img_rect, dets[0], classes, thresh)
if imshow == 1:
cv2.imshow("tester", show)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
def __init__(self):
self.is_rendering = False
rospy.loginfo('\033[91m[%s]\033[0m waiting for bringup social_mind...'%rospy.get_name())
self.rd_memory = {}
try:
rospy.wait_for_service('social_events_memory/read_data')
self.rd_memory['social_events_memory'] = rospy.ServiceProxy('social_events_memory/read_data', ReadData)
rospy.wait_for_service('environmental_memory/read_data')
self.rd_memory['environmental_memory'] = rospy.ServiceProxy('environmental_memory/read_data', ReadData)
rospy.wait_for_service('system_events_memory/read_data')
self.rd_memory['system_events_memory'] = rospy.ServiceProxy('system_events_memory/read_data', ReadData)
except rospy.exceptions.ROSInterruptException as e:
rospy.logerr(e)
quit()
self.renderer_client = actionlib.SimpleActionClient('render_scene', RenderSceneAction)
rospy.loginfo('\033[91m[%s]\033[0m waiting for motion_renderer to start...'%rospy.get_name())
try:
self.renderer_client.wait_for_server()
except rospy.exceptions.ROSInterruptException as e:
quit()
rospy.Subscriber('reply', Reply, self.handle_domain_reply)
self.pub_log_item = rospy.Publisher('log', LogItem, queue_size=10)
self.pub_start_speech_recognizer = rospy.Publisher('speech_recognizer/start', Empty, queue_size=1)
self.pub_stop_speech_recognizer = rospy.Publisher('speech_recognizer/stop', Empty, queue_size=1)
self.pub_start_robot_speech = rospy.Publisher('robot_speech/start', Empty, queue_size=1)
self.pub_stop_robot_speech = rospy.Publisher('robot_speech/stop', Empty, queue_size=1)
self.is_speaking_started = False
rospy.Subscriber('start_of_speech', Empty, self.handle_start_of_speech)
rospy.Subscriber('end_of_speech', Empty, self.handle_end_of_speech)
# rospy.Subscriber('emotion_status', EmotionStatus, self.handle_emotion_status, queue_size=10)
# self.current_emotion = 'neutral'
# self.current_emotion_intensity = 1.0
self.pub_set_idle_motion = rospy.Publisher('idle_motion/set_enabled', Bool, queue_size=10)
self.pub_set_idle_motion.publish(True)
self.scene_queue = Queue.Queue(MAX_QUEUE_SIZE)
self.scene_handle_thread = Thread(target=self.handle_scene_queue)
self.scene_handle_thread.start()
rospy.loginfo("\033[91m[%s]\033[0m initialized." % rospy.get_name())
def __init__(self, moduleName):
# get connection from command line:
from optparse import OptionParser
parser = OptionParser()
parser.add_option("--ip", dest="ip", default="",
help="IP/hostname of broker. Default is system's default IP address.", metavar="IP")
parser.add_option("--port", dest="port", default=0,
help="IP/hostname of broker. Default is automatic port.", metavar="PORT")
parser.add_option("--pip", dest="pip", default="127.0.0.1",
help="IP/hostname of parent broker. Default is 127.0.0.1.", metavar="IP")
parser.add_option("--pport", dest="pport", default=9559,
help="port of parent broker. Default is 9559.", metavar="PORT")
(options, args) = parser.parse_args()
self.ip = options.ip
self.port = int(options.port)
self.pip = options.pip
self.pport = int(options.pport)
self.moduleName = moduleName
self.init_almodule()
# ROS initialization:
rospy.init_node('naoqi_tactile')
# init. messages:
self.tactile = TactileTouch()
self.bumper = Bumper()
self.tactilePub = rospy.Publisher("tactile_touch", TactileTouch, queue_size=1)
self.bumperPub = rospy.Publisher("bumper", Bumper, queue_size=1)
try:
footContact = self.memProxy.getData("footContact", 0)
except RuntimeError:
footContact = None
if footContact is None:
self.hasFootContactKey = False
rospy.loginfo("Foot contact key is not present in ALMemory, will not publish to foot_contact topic.")
else:
self.hasFootContactKey = True
self.footContactPub = rospy.Publisher("foot_contact", Bool, latch=True, queue_size=1)
self.footContactPub.publish(footContact > 0.0)
# constants in TactileTouch and Bumper will not be available in callback functions
# as they are executed in the parent broker context (i.e. on robot),
# so they have to be copied to member variables
self.tactileTouchFrontButton = TactileTouch.buttonFront;
self.tactileTouchMiddleButton = TactileTouch.buttonMiddle;
self.tactileTouchRearButton = TactileTouch.buttonRear;
self.bumperRightButton = Bumper.right;
self.bumperLeftButton = Bumper.left;
self.subscribe()
rospy.loginfo("nao_tactile initialized")
def __init__(self, robot, *robotargs):
super(PickAndPlaceNode, self).__init__('pp_node')
rospy.loginfo("PickAndPlaceNode")
_post_perceive_trans = defaultdict(lambda: self._pick)
_post_perceive_trans.update({'c': self._calibrate})
_preplace = defaultdict(lambda: self._preplace)
self.transition_table = {
# If calibration has already happened, allow skipping it
'initial': {'c': self._calibrate, 'q': self._perceive,
's': self._preplace},
'calibrate': {'q': self._perceive, 'c': self._calibrate},
'perceive': {'p': self._post_perceive, 'q': self._perceive, 's': self._stop_perceive, 'c': self._calibrate},
'post_perceive': _post_perceive_trans,
'postpick': {'1': self._level, '2': self._level, '9': self._level},
'level': _preplace,
'preplace': {'s': self._place},
'place': {'q': self._perceive, 'c': self._calibrate}
}
rospy.loginfo("PickAndPlaceNode1")
if callable(robot):
self.robot = robot(*robotargs)
else:
self.robot = robot
self.robot.level = 1
rospy.loginfo("PickAndPlaceNode2")
# Hardcoded place for now
self.place_pose = PoseStamped(
Header(0, rospy.Time(0), self.robot.base),
Pose(Point(0.526025806, 0.4780144, -0.161326153),
Quaternion(1, 0, 0, 0)))
self.tl = tf.TransformListener()
self.num_objects = 0
# Would this work too? Both tf and tf2 have (c) 2008...
# self.tf2 = tf2_ros.TransformListener()
self.n_objects_sub = rospy.Subscriber("/num_objects", Int64,
self.update_num_objects,
queue_size=1)
self.perception_pub = rospy.Publisher("/perception/enabled",
Bool,
queue_size=1)
self.alignment_pub = rospy.Publisher("/alignment/doit",
Bool,
queue_size=1)
self.br = tf.TransformBroadcaster()
rospy.loginfo("PickAndPlaceNode3")
self.int_marker_server = InteractiveMarkerServer('int_markers')
# Dict to map imarker names and their updated poses
self.int_markers = {}
# Ideally this call would be in a Factory/Metaclass/Parent
self.show_options()
self.perceive = False
# self.robot.home()
# self.move_calib_position()
def __init__(self):
self.rospack = RosPack()
with open(join(self.rospack.get_path('apex_playground'), 'config', 'learning.json')) as f:
self.params = json.load(f)
self.translator = EnvironmentTranslator()
self.learning = None
# User control and critical resources
self.lock_iteration = RLock()
self.ready_for_interaction = True
self.focus = None
self.set_iteration = -1
# Saved experiment files
self.dir = join(self.rospack.get_path('apex_playground'), 'logs')
if not os.path.isdir(self.dir):
os.makedirs(self.dir)
# self.stamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# self.source_file = join(self.dir, self.source_name + '.pickle')
self.main_experiment = True
self.condition = ""
self.trial = ""
self.task = ""
# Serving these services
self.service_name_perceive = "learning/perceive"
self.service_name_produce = "learning/produce"
self.service_name_set_interest = "learning/set_interest"
self.service_name_set_iteration = "learning/set_iteration"
self.service_name_interests = "learning/get_interests"
# Publishing these topics
self.pub_focus = rospy.Publisher('learning/current_focus', String, queue_size=1, latch=True)
self.pub_user_focus = rospy.Publisher('learning/user_focus', String, queue_size=1, latch=True)
self.pub_ready = rospy.Publisher('learning/ready_for_interaction', Bool, queue_size=1, latch=True)
self.pub_iteration = rospy.Publisher('iteration', UInt32, queue_size=1, latch=True)
# Using these services
self.service_name_get_perception = "perception/get"
for service in [self.service_name_get_perception]:
rospy.loginfo("Learning node is waiting service {}...".format(service))
rospy.wait_for_service(service)
self.get_state = rospy.ServiceProxy(self.service_name_get_perception, GetSensorialState)