python类get_param()的实例源码

handstand.py 文件源码 项目:srcsim2017 作者: ZarjRobotics 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def lookupFeet(name, tfBuffer):
    global lf_start_position
    global lf_start_orientation
    global rf_start_position
    global rf_start_orientation

    lfp = "/ihmc_ros/{0}/left_foot_frame_name".format(name)
    rfp = "/ihmc_ros/{0}/right_foot_frame_name".format(name)

    if rospy.has_param(rfp) and rospy.has_param(lfp):
        lfname = rospy.get_param(lfp)
        rfname = rospy.get_param(rfp)

        leftFootWorld = tfBuffer.lookup_transform('world', lfname, rospy.Time())
        lf_start_orientation = leftFootWorld.transform.rotation
        lf_start_position = leftFootWorld.transform.translation

        rightFootWorld = tfBuffer.lookup_transform('world', rfname, rospy.Time())
        rf_start_orientation = rightFootWorld.transform.rotation
        rf_start_position = rightFootWorld.transform.translation
walk.py 文件源码 项目:srcsim2017 作者: ZarjRobotics 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def lookupFeet(name, tfBuffer):
    global lf_start_position
    global lf_start_orientation
    global rf_start_position
    global rf_start_orientation

    lfp = "/ihmc_ros/{0}/left_foot_frame_name".format(name)
    rfp = "/ihmc_ros/{0}/right_foot_frame_name".format(name)

    if rospy.has_param(rfp) and rospy.has_param(lfp):
        lfname = rospy.get_param(lfp)
        rfname = rospy.get_param(rfp)

        leftFootWorld = tfBuffer.lookup_transform('world', lfname, rospy.Time())
        lf_start_orientation = leftFootWorld.transform.rotation
        lf_start_position = leftFootWorld.transform.translation

        rightFootWorld = tfBuffer.lookup_transform('world', rfname, rospy.Time())
        rf_start_orientation = rightFootWorld.transform.rotation
        rf_start_position = rightFootWorld.transform.translation
joint_trajectory_file_playback.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def wait(self):
        """
        Waits for and verifies trajectory execution result
        """
        #create a timeout for our trajectory execution
        #total time trajectory expected for trajectory execution plus a buffer
        last_time = self._r_goal.trajectory.points[-1].time_from_start.to_sec()
        time_buffer = rospy.get_param(self._param_ns + 'goal_time', 0.0) + 1.5
        timeout = rospy.Duration(self._slow_move_offset +
                                 last_time +
                                 time_buffer)

        l_finish = self._left_client.wait_for_result(timeout)
        r_finish = self._right_client.wait_for_result(timeout)
        l_result = (self._left_client.get_result().error_code == 0)
        r_result = (self._right_client.get_result().error_code == 0)

        #verify result
        if all([l_finish, r_finish, l_result, r_result]):
            return True
        else:
            msg = ("Trajectory action failed or did not finish before "
                   "timeout/interrupt.")
            rospy.logwarn(msg)
            return False
image_reply.py 文件源码 项目:telegram_robot 作者: uts-magic-lab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self):
        token = rospy.get_param('/telegram/token', None)
        if token is None:
            rospy.logerr("No token found in /telegram/token param server.")
            exit(0)
        else:
            rospy.loginfo("Got telegram bot token from param server.")

        # Set CvBridge
        self.bridge = CvBridge()

        # Create the EventHandler and pass it your bot's token.
        updater = Updater(token)

        # Get the dispatcher to register handlers
        dp = updater.dispatcher

        # on noncommand i.e message - echo the message on Telegram
        dp.add_handler(MessageHandler(Filters.text, self.pub_received))

        # log all errors
        dp.add_error_handler(self.error)

        # Start the Bot
        updater.start_polling()
arrows.py 文件源码 项目:telegram_robot 作者: uts-magic-lab 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self):
        self.base_pub = rospy.Publisher("/base_controller/command", Twist,
                                        queue_size=1)
        token = rospy.get_param('/telegram/token', None)

        # Create the Updater and pass it your bot's token.
        updater = Updater(token)

        # Add command and error handlers
        updater.dispatcher.add_handler(CommandHandler('start', self.start))
        updater.dispatcher.add_handler(CommandHandler('help', self.help))
        updater.dispatcher.add_handler(MessageHandler(Filters.text, self.echo))
        updater.dispatcher.add_error_handler(self.error)

        # Start the Bot
        updater.start_polling()
torso.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self):
        self.rospack = RosPack()
        with open(join(self.rospack.get_path('apex_playground'), 'config', 'torso.json')) as f:
            self.params = json.load(f)

        self.publish_rate = rospy.Rate(self.params['publish_rate'])
        self.demo = rospy.get_param('demo_mode')

        # Protected resources
        self.in_rest_pose = False
        self.robot_lock = RLock()

        # Used services
        self.torso = TorsoServices(self.params['robot_name'])

        # Proposed services
        self.reset_srv_name = 'torso/reset'
        self.reset_srv = None
controller.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self):
        self.rospack = RosPack()
        with open(join(self.rospack.get_path('apex_playground'), 'config', 'general.json')) as f:
            self.params = json.load(f)

        with open(join(self.rospack.get_path('apex_playground'), 'config', 'torso.json')) as f:
            self.torso_params = json.load(f)

        self.outside_ros = rospy.get_param('/use_sim_time', False)  # True if work manager <-> controller comm must use ZMQ
        id = search(r"(\d+)", rospy.get_namespace())
        self.worker_id = 0 if id is None else int(id.groups()[0])  # TODO string worker ID
        self.work = WorkManager(self.worker_id, self.outside_ros)
        self.torso = Torso()
        self.ergo = Ergo()
        self.learning = Learning()
        self.perception = Perception()
        self.recorder = Recorder()
        self.demonstrate = ""  # Skill (Target space for Produce) or empty string if not running assessment

        # Served services
        self.service_name_demonstrate = "controller/assess"
        rospy.Service(self.service_name_demonstrate, Assess, self.cb_assess)

        rospy.loginfo('Controller fully started!')
user.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self):
        self.rospack = RosPack()
        self.port = rospy.get_param('ui_port', 80 if os.getuid() == 0 else 5000)
        self.web_app_root = join(self.rospack.get_path('apex_playground'), 'webapp', 'static')
        self.app = Flask(__name__, static_url_path='', static_folder=self.web_app_root)
        self.cors = CORS(self.app, resources={r'/api/*': {'origins': '*'}})
        self.services = UserServices()
        self.window_length = 500
        self.display_point_interval = 1

        self.app.route('/')(self.root)
        self.app.route('/api/interests', methods=['GET'])(self.experiment_status)
        self.app.route('/api/focus', methods=['POST'])(self.update_focus)
        self.app.route('/api/time-travel', methods=['POST'])(self.time_travel)
        self.app.route('/api/reset', methods=['POST'])(self.reset)
        self.app.route('/api/assessment', methods=['POST'])(self.update_assessment)
system_calibrator.py 文件源码 项目:overhead_mobile_tracker 作者: NU-MSR 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self):
        self.marker_id = rospy.get_param("~marker_id", 12)
        self.frame_id = rospy.get_param("~frame_id", "odom_meas")
        self.camera_frame_id = rospy.get_param("~camera_frame_id", "overhead_cam_frame")
        self.count = rospy.get_param("~count", 50)

        # local vars:
        self.calibrate_flag = False
        self.calibrated = False
        self.calibrate_count = 0
        self.kb = kbhit.KBHit()
        self.trans_arr = np.zeros((self.count, 3))
        self.quat_arr = np.zeros((self.count, 4))
        self.trans_ave = np.zeros(3)
        self.quat_ave = np.zeros(3)

        # now create subscribers, timers, and publishers
        self.br = tf.TransformBroadcaster()
        self.kb_timer = rospy.Timer(rospy.Duration(0.1), self.keycb)
        rospy.on_shutdown(self.kb.set_normal_term)
        self.alvar_sub = rospy.Subscriber("ar_pose_marker", AlvarMarkers, self.alvarcb)
        return
turtlebot_node.py 文件源码 项目:mrobot-indigo 作者: ROSClub1 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _init_params(self):
        self.port = rospy.get_param('~port', self.default_port)
        self.robot_type = rospy.get_param('~robot_type', 'create')
        #self.baudrate = rospy.get_param('~baudrate', self.default_baudrate)
        self.update_rate = rospy.get_param('~update_rate', self.default_update_rate)
        self.drive_mode = rospy.get_param('~drive_mode', 'twist')
        self.has_gyro = rospy.get_param('~has_gyro', True)
        self.odom_angular_scale_correction = rospy.get_param('~odom_angular_scale_correction', 1.0)
        self.odom_linear_scale_correction = rospy.get_param('~odom_linear_scale_correction', 1.0)
        self.cmd_vel_timeout = rospy.Duration(rospy.get_param('~cmd_vel_timeout', 0.6))
        self.stop_motors_on_bump = rospy.get_param('~stop_motors_on_bump', True)
        self.min_abs_yaw_vel = rospy.get_param('~min_abs_yaw_vel', None)
        self.max_abs_yaw_vel = rospy.get_param('~max_abs_yaw_vel', None)
        self.publish_tf = rospy.get_param('~publish_tf', False)
        self.odom_frame = rospy.get_param('~odom_frame', 'odom')
        self.base_frame = rospy.get_param('~base_frame', 'base_footprint')
        self.operate_mode = rospy.get_param('~operation_mode', 3)

        rospy.loginfo("serial port: %s"%(self.port))
        rospy.loginfo("update_rate: %s"%(self.update_rate))
        rospy.loginfo("drive mode: %s"%(self.drive_mode))
        rospy.loginfo("has gyro: %s"%(self.has_gyro))
localize.py 文件源码 项目:decawave_localization 作者: mit-drl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self):
        self.frame_id = rospy.get_param("~frame_id", "map")
        cov_x = rospy.get_param("~cov_x", 0.6)
        cov_y = rospy.get_param("~cov_y", 0.6)
        cov_z = rospy.get_param("~cov_z", 0.6)
        self.cov = self.cov_matrix(cov_x, cov_y, cov_z)
        self.ps_pub = rospy.Publisher(
            POSE_TOPIC, PoseStamped, queue_size=1)
        self.ps_cov_pub = rospy.Publisher(
            POSE_COV_TOPIC, PoseWithCovarianceStamped, queue_size=1)
        self.ps_pub_3d = rospy.Publisher(
            POSE_TOPIC_3D, PoseStamped, queue_size=1)
        self.ps_cov_pub_3d = rospy.Publisher(
            POSE_COV_TOPIC_3D, PoseWithCovarianceStamped, queue_size=1)
        self.last = None
        self.listener = tf.TransformListener()
        self.tag_range_topics = rospy.get_param("~tag_range_topics")
        self.subs = list()
        self.ranges = dict()
        self.tag_pos = dict()
        self.altitude = 0.0
        self.last_3d = None
        for topic in self.tag_range_topics:
            self.subs.append(rospy.Subscriber(topic, Range, self.range_cb))
teleop_key.py 文件源码 项目:cozmo_driver 作者: OTL 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self):
        # setup
        CozmoTeleop.settings = termios.tcgetattr(sys.stdin)
        atexit.register(self.reset_terminal)

        # vars
        self.head_angle = STD_HEAD_ANGLE
        self.lift_height = STD_LIFT_HEIGHT

        # params
        self.lin_vel = rospy.get_param('~lin_vel', 0.2)
        self.ang_vel = rospy.get_param('~ang_vel', 1.5757)

        # pubs
        self._head_pub = rospy.Publisher('head_angle', Float64, queue_size=1)
        self._lift_pub = rospy.Publisher('lift_height', Float64, queue_size=1)
        self._cmd_vel_pub = rospy.Publisher('cmd_vel', Twist, queue_size=1)
operate_recognize_object.py 文件源码 项目:xm_strategy 作者: xm-project 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self,dis=0.0):
        State.__init__(self, outcomes=['succeeded','preempted','aborted'],input_keys=['target'])
        rospy.on_shutdown(self.shutdown)
        self.test_distance = target.y-dis
        self.rate = 200
        self.r = rospy.Rate(self.rate)
        self.speed = rospy.get_param('~speed',0.08)
        self.tolerance = rospy.get_param('~tolerance', 0.01)
        self.cmd_vel = rospy.Publisher('/mobile_base_controller/smooth_cmd_vel',Twist,queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        self.tf_listener = tf.TransformListener()
        rospy.sleep(1)
        self.tf_listener.waitForTransform(self.odom_frame,self.base_frame,rospy.Time(),rospy.Duration(60.0))
        #define a bianliang
        self.flag = rospy.get_param('~flag', True)
        rospy.loginfo(self.test_distance)
        #tf get position

        #publish cmd_vel
operate_recognize_object.py 文件源码 项目:xm_strategy 作者: xm-project 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self,angle=0):
        State.__init__(self, outcomes=['succeeded','aborted','preempted'],input_keys=["angle_in"])
        self.speed = rospy.get_param('~speed', 0.10)
        self.secretindigal = 1.0
        self.tolerance = rospy.get_param('tolerance', math.radians(5))
        self.start = True
        self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        rospy.on_shutdown(self.shutdown)
        self.rate = 30
        self.start_test = True
        self.r = rospy.Rate(self.rate)

        self.tf_listener = tf.TransformListener()
        rospy.sleep(0.5)
        self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
object_smach.py 文件源码 项目:xm_strategy 作者: xm-project 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, angle=0):
        State.__init__(self, outcomes=['succeeded', 'aborted', 'preempted'], input_keys=["angle_in"])
        self.speed = rospy.get_param('~speed', 0.03)
        self.secretindigal = 1.0
        self.tolerance = rospy.get_param('tolerance', math.radians(5))
        self.start = True
        self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        rospy.on_shutdown(self.shutdown)
        self.rate = 30
        self.start_test = True
        self.r = rospy.Rate(self.rate)
        self.angle=angle
        self.tf_listener = tf.TransformListener()
        rospy.sleep(0.5)
        self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
object_smach.py 文件源码 项目:xm_strategy 作者: xm-project 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, angle=0):
        State.__init__(self, outcomes=['succeeded', 'aborted', 'preempted'])
        self.speed = rospy.get_param('~speed', 0.03)
        self.secretindigal = 1.0
        self.tolerance = rospy.get_param('tolerance', math.radians(5))
        self.start = True
        self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        rospy.on_shutdown(self.shutdown)
        self.rate = 30
        self.start_test = True
        self.r = rospy.Rate(self.rate)
        self.angle=angle
        self.tf_listener = tf.TransformListener()
        rospy.sleep(0.5)
        self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
1operate_recognize_object.py 文件源码 项目:xm_strategy 作者: xm-project 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self,dis=0):
        State.__init__(self, outcomes=['succeeded','preempted','aborted'])
        rospy.on_shutdown(self.shutdown)
        self.test_distance = dis
        self.rate = 100
        self.r = rospy.Rate(self.rate)
        self.speed = rospy.get_param('~speed',0.08)
        self.tolerance = rospy.get_param('~tolerance', 0.01)
        self.cmd_vel = rospy.Publisher('/mobile_base_controller/smooth_cmd_vel',Twist,queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        self.tf_listener = tf.TransformListener()
        rospy.sleep(1)
        self.tf_listener.waitForTransform(self.odom_frame,self.base_frame,rospy.Time(),rospy.Duration(60.0))
        self.flag = rospy.get_param('~flag', True)
        #tf get position
        self.position = Point()
        self.position = self.get_position()
        self.y_start = self.position.y
        self.x_start = self.position.x
        #publish cmd_vel
1operate_recognize_object.py 文件源码 项目:xm_strategy 作者: xm-project 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self,dis=0.0):
        State.__init__(self, outcomes=['succeeded','preempted','aborted'])
        rospy.on_shutdown(self.shutdown)
        self.test_distance = target.y-0.0
        self.rate = 100
        self.r = rospy.Rate(self.rate)
        self.speed = rospy.get_param('~speed',0.08)
        self.tolerance = rospy.get_param('~tolerance', 0.01)
        self.cmd_vel = rospy.Publisher('/mobile_base_controller/smooth_cmd_vel',Twist,queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        self.tf_listener = tf.TransformListener()
        rospy.sleep(1)
        self.tf_listener.waitForTransform(self.odom_frame,self.base_frame,rospy.Time(),rospy.Duration(60.0))
        #define a bianliang
        self.flag = rospy.get_param('~flag', True)
        rospy.loginfo(self.test_distance)
        #tf get position

        #publish cmd_vel
1operate_recognize_object.py 文件源码 项目:xm_strategy 作者: xm-project 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self,angle=0):
        State.__init__(self, outcomes=['succeeded','aborted','preempted'])
        self.speed = rospy.get_param('~speed', 0.20)
        self.secretindigal = 1.0
        self.tolerance = rospy.get_param('tolerance', math.radians(5))
        self.start = True
        self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        rospy.on_shutdown(self.shutdown)
        self.rate = 30
        self.start_test = True
        self.r = rospy.Rate(self.rate)
        self.angle = angle
        self.tf_listener = tf.TransformListener()
        rospy.sleep(0.5)
        self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
test.py 文件源码 项目:xm_strategy 作者: xm-project 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self,angle=0):
        State.__init__(self, outcomes=['succeeded','aborted','preempted'])
        self.speed = rospy.get_param('~speed', 0.20)
        self.secretindigal = 1.0
        self.tolerance = rospy.get_param('tolerance', math.radians(5))
        self.start = True
        self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        rospy.on_shutdown(self.shutdown)
        self.rate = 30
        self.start_test = True
        self.r = rospy.Rate(self.rate)
        self.angle = angle
        self.tf_listener = tf.TransformListener()
        rospy.sleep(0.5)
        self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))


问题


面经


文章

微信
公众号

扫码关注公众号