python类logerr()的实例源码

ik_solver.py 文件源码 项目:qlearn_baxter 作者: mkrizmancic 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def ik_solve(limb, pos, orient):
    ns = "ExternalTools/" + limb + "/PositionKinematicsNode/IKService"
    iksvc = rospy.ServiceProxy(ns, SolvePositionIK)
    ikreq = SolvePositionIKRequest()

    hdr = Header(stamp=rospy.Time.now(), frame_id='base')
    poses = {
        str(limb): PoseStamped(header=hdr, pose=Pose(position=pos, orientation=orient))}

    ikreq.pose_stamp.append(poses[limb])
    try:
        rospy.wait_for_service(ns, 5.0)
        resp = iksvc(ikreq)
    except (rospy.ServiceException, rospy.ROSException), e:
        rospy.logerr("Service call failed: %s" % (e,))
        return 1

    if resp.isValid[0]:
        limb_joints = dict(zip(resp.joints[0].name, resp.joints[0].position))
        return limb_joints
    else:
        return Errors.RaiseNotReachable(pos)
telemetry_client.py 文件源码 项目:ros-interop 作者: mcgill-robotics 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def update_telemetry(navsat_msg, compass_msg):
    """Telemetry subscription callback.

    Args:
        navsat_msg: sensor_msgs/NavSatFix message.
        compass_msg: std_msgs/Float64 message in degrees.
    """
    try:
        client.post_telemetry(navsat_msg, compass_msg)
    except (ConnectionError, Timeout) as e:
        rospy.logwarn(e)
        return
    except (ValueError, HTTPError) as e:
        rospy.logerr(e)
        return
    except Exception as e:
        rospy.logfatal(e)
        return
proxy.py 文件源码 项目:Cloudroid 作者: cyberdb 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(self):
        self.topic_type = wait_topic_ready(self.topic_name, self.url)
    #print str(self.topic_type)+"  self.topic_type"
        if not self.topic_type:
            rospy.logerr('Type of topic %s are not equal in the remote and local sides', self.topic_name)
            return

        topic_type_module, topic_type_name = tuple(self.topic_type.split('/'))
        try:       
            roslib.load_manifest(topic_type_module)
            msg_module = import_module(topic_type_module + '.msg')
            self.rostype = getattr(msg_module, topic_type_name)

            if self.test:
                self.publisher = rospy.Publisher(self.topic_name + '_rb', self.rostype, queue_size = self.queue_size)
            else: 
                self.publisher = rospy.Publisher(self.topic_name, self.rostype, queue_size = self.queue_size)

            self.ws = websocket.WebSocketApp(self.url, on_message = self.on_message, on_error = self.on_error, on_close = self.on_close, on_open = self.on_open)
            rospy.loginfo('Create connection to Rosbridge server %s for subscribed topic %s successfully', self.url, self.topic_name)
            self.ws.run_forever()
        except ResourceNotFound, e:
            rospy.logerr('Proxy for subscribed topic %s init falied. Reason: Could not find the required resource: %s', self.topic_name, str(e))
        except Exception, e:
            rospy.logerr('Proxy for subscribed topic %s init falied. Reason: %s', self.topic_name, str(e))
proxy.py 文件源码 项目:Cloudroid 作者: cyberdb 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def on_message(self, ws, message):
        data = json.loads(message)

        ts = get_message_timestamp(data['msg'])
        if not comp_and_replace_value(data['topic'], self.topic_type, ts):
            return

        rosmsg = self.rostype()
        if not data or data['op'] != 'publish' or data['topic'] != self.topic_name:
            rospy.logerr('Failed to handle message on subscribed topic %s [%s]', self.topic_name, data)
            return

        data.pop('_format', None)      
        try:
            msgconv.populate_instance(data['msg'], rosmsg)
            self.publisher.publish(rosmsg)
        except Exception, e:
            rospy.logerr('Failed to publish message on topic %s. Reason: %s', self.topic_name, str(e))
proxy.py 文件源码 项目:Cloudroid 作者: cyberdb 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run(self):
        self.service_type, self.service_args = wait_service_ready(self.service_name, self.url)
        if not self.service_type:
            rospy.logerr('Type of service %s are not equal in the remote and local sides', self.service_type)
            return

        service_type_module, service_type_name = tuple(self.service_type.split('/'))
        try:       
            roslib.load_manifest(service_type_module)
            msg_module = import_module(service_type_module + '.srv')
            self.srvtype = getattr(msg_module, service_type_name)

            if self.test:
                self.caller = rospy.Service(self.service_name + '_rb', self.srvtype, self.callback)#, self.queue_size)
            else: 
                self.caller = rospy.Service(self.service_name, self.srvtype, self.callback)#, self.queue_size)

            self.ws = websocket.WebSocketApp(self.url, on_message = self.on_message, on_error = self.on_error, on_close = self.on_close, on_open = self.on_open)
            rospy.loginfo('Create connection to Rosbridge server %s for calling service %s successfully', self.url, self.service_name)
            self.ws.run_forever()
        except ResourceNotFound, e:
            rospy.logerr('Proxy for service %s init falied. Reason: Could not find the required resource: %s', self.service_name, str(e))
        except Exception, e:
            rospy.logerr('Proxy for service %s init falied. Reason: %s', self.service_name, str(e))
proxy.py 文件源码 项目:Cloudroid 作者: cyberdb 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def on_message(self, ws, message):
        data = json.loads(message)
        if not data or data['op'] != 'service_response' or data['service'] != self.service_name:
            rospy.logerr('Failed to handle message on service type %s [%s]', self.service, data)
            return

        try:
            rosmsg = self.srvtype()._response_class()
            msgconv.populate_instance(data['values'], rosmsg)
            # need lock to protect
            call_id = data.get('id').encode('ascii')

            with self.lock:
                self.event_queue[call_id]['result'] = rosmsg
                self.event_queue[call_id]['event'].set()

        except Exception, e:
            rospy.logerr('Failed to call service on %s. Reason: %s', self.service_name, str(e))
proxy.py 文件源码 项目:Cloudroid 作者: cyberdb 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run(self):
        self.topic_type = wait_topic_ready(self.topic_name, self.url)
        if not self.topic_type:
            rospy.logerr('Type of topic %s are not equal in the remote and local sides', self.topic_name)
            return

        topic_type_module, topic_type_name = tuple(self.topic_type.split('/'))
        try:      
            roslib.load_manifest(topic_type_module)
            msg_module = import_module(topic_type_module + '.msg')
            self.rostype = getattr(msg_module, topic_type_name)

            self.subscriber = rospy.Subscriber(self.topic_name, self.rostype, self.callback)

            self.ws = websocket.WebSocketApp(self.url, on_message = self.on_message, on_error = self.on_error, on_close = self.on_close, on_open = self.on_open)
            rospy.loginfo('Create connection to Rosbridge server for published topic %s successfully', self.topic_name)
            self.ws.run_forever()
        except ResourceNotFound, e:
            rospy.logerr('Could not find the required resource %s', str(e))
        except Exception, e:
            rospy.logerr('Proxy for published topic %s init falied. Reason: %s', self.topic_name, str(e))
proxy.py 文件源码 项目:Cloudroid 作者: cyberdb 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def callback(self, rosmsg):
        if not self.advertised:
            return

        data = msgconv.extract_values(rosmsg)
        ts = get_message_timestamp(data)

        if not comp_and_replace_value(self.rb_topic_name, self.topic_type, ts):
            return
        try:
            self.ws.send(json.dumps({
                'op': 'publish',
                'topic': self.rb_topic_name,
                'msg': data,
            }))
        except Exception, e:
            rospy.logerr('Failed to publish message on topic %s with %s. Reason: %s', self.topic_name, data, str(e))
nao_walker_v2.py 文件源码 项目:nao_slam_amcl 作者: hu7241 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def handleTargetPose(self, data, post=True):
        """handles cmd_pose requests, walks to (x,y,theta) in robot coordinate system"""

        rospy.logdebug("Walk target_pose: %f %f %f", data.x,
                data.y, data.theta)

        self.gotoStartWalkPose()

        try:
            if post:
                self.motionProxy.post.moveTo(data.x, data.y, data.theta, self.footGaitConfig)
            else:
                self.motionProxy.moveTo(data.x, data.y, data.theta, self.footGaitConfig)
            return True
        except RuntimeError,e:
            rospy.logerr("Exception caught in handleTargetPose:\n%s", e)
            return False
nao_walker_v2.py 文件源码 项目:nao_slam_amcl 作者: hu7241 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def handleStep(self, data):
        rospy.logdebug("Step leg: %d; target: %f %f %f", data.leg, data.pose.x,
                data.pose.y, data.pose.theta)
        try:
            if data.leg == StepTarget.right:
                leg = "RLeg"
            elif data.leg == StepTarget.left:
                leg = "LLeg"
            else:
                rospy.logerr("Received a wrong leg constant: %d, ignoring step command", data.leg)
                return
            self.motionProxy.stepTo(leg, data.pose.x, data.pose.y, data.pose.theta)
            return True
        except RuntimeError, e:
            rospy.logerr("Exception caught in handleStep:\n%s", e)
            return False
ik_service_client.py 文件源码 项目:intera_sdk 作者: RethinkRobotics 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def main():
    """RSDK Inverse Kinematics Example

    A simple example of using the Rethink Inverse Kinematics
    Service which returns the joint angles and validity for
    a requested Cartesian Pose.

    Run this example, the example will use the default limb
    and call the Service with a sample Cartesian
    Pose, pre-defined in the example code, printing the
    response of whether a valid joint solution was found,
    and if so, the corresponding joint angles.
    """
    rospy.init_node("rsdk_ik_service_client")

    if ik_service_client():
        rospy.loginfo("Simple IK call passed!")
    else:
        rospy.logerr("Simple IK call FAILED")

    if ik_service_client(use_advanced_options=True):
        rospy.loginfo("Advanced IK call passed!")
    else:
        rospy.logerr("Advanced IK call FAILED")
joint_torque_springs.py 文件源码 项目:intera_sdk 作者: RethinkRobotics 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def attach_springs(self):
        """
        Switches to joint torque mode and attached joint springs to current
        joint positions.
        """
        # record initial joint angles
        self._start_angles = self._limb.joint_angles()

        # set control rate
        control_rate = rospy.Rate(self._rate)

        # for safety purposes, set the control rate command timeout.
        # if the specified number of command cycles are missed, the robot
        # will timeout and return to Position Control Mode
        self._limb.set_command_timeout((1.0 / self._rate) * self._missed_cmds)

        # loop at specified rate commanding new joint torques
        while not rospy.is_shutdown():
            if not self._rs.state().enabled:
                rospy.logerr("Joint torque example failed to meet "
                             "specified control rate timeout.")
                break
            self._update_forces()
            control_rate.sleep()
fk_service_client.py 文件源码 项目:intera_sdk 作者: RethinkRobotics 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main():
    """RSDK Forward Kinematics Example

    A simple example of using the Rethink Forward Kinematics
    Service which returns the Cartesian Pose based on the input joint angles.

    Run this example, the example will use the default limb
    and call the Service with a sample Joint Position,
    pre-defined in the example code, printing the
    response of whether a valid Cartesian solution was found,
    and if so, the corresponding Cartesian pose.
    """
    rospy.init_node("rsdk_fk_service_client")

    if fk_service_client():
        rospy.loginfo("Simple FK call passed!")
    else:
        rospy.logerr("Simple FK call FAILED")
calibrate_arm.py 文件源码 项目:intera_sdk 作者: RethinkRobotics 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, limb="right"):
        """
        @param limb: Limb to run CalibrateArm on arm side.
        """
        self._limb=limb
        self._client = actionlib.SimpleActionClient('/calibration_command',
                                   CalibrationCommandAction)
        # Waits until the action server has started up and started
        # listening for goals.
        server_up = self._client.wait_for_server(rospy.Duration(10.0))
        if not server_up:
            rospy.logerr("Timed out waiting for Calibration"
                         " Server to connect. Check your ROS networking"
                         "  and/or reboot the robot.")
            rospy.signal_shutdown("Timed out waiting for Action Server")
            sys.exit(1)
robot_params.py 文件源码 项目:intera_sdk 作者: RethinkRobotics 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_robot_assemblies(self):
        """
        Return the names of the robot's assemblies from ROS parameter.

        @rtype: list [str]
        @return: ordered list of assembly names
                 (e.g. right, left, torso, head). on networked robot
        """
        assemblies = list()
        try:
            assemblies = rospy.get_param("/robot_config/assembly_names")
        except KeyError:
            rospy.logerr("RobotParam:get_robot_assemblies cannot detect assembly names"
                         " under param /robot_config/assembly_names")
        except (socket.error, socket.gaierror):
            self._log_networking_error()
        return assemblies
robot_params.py 文件源码 项目:intera_sdk 作者: RethinkRobotics 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_joint_names(self, limb_name):
        """
        Return the names of the joints for the specified
        limb from ROS parameter.

        @type  limb_name: str
        @param limb_name: name of the limb for which to retrieve joint names

        @rtype: list [str]
        @return: ordered list of joint names from proximal to distal
                 (i.e. shoulder to wrist). joint names for limb
        """
        joint_names = list()
        try:
            joint_names = rospy.get_param(
                            "robot_config/{0}_config/joint_names".format(limb_name))
        except KeyError:
            rospy.logerr(("RobotParam:get_joint_names cannot detect joint_names for"
                          " arm \"{0}\"").format(limb_name))
        except (socket.error, socket.gaierror):
            self._log_networking_error()
        return joint_names
robot_params.py 文件源码 项目:intera_sdk 作者: RethinkRobotics 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_robot_name(self):
        """
        Return the name of class of robot from ROS parameter.

        @rtype: str
        @return: name of the class of robot (eg. "sawyer", "baxter", etc.)
        """
        robot_name = None
        try:
            robot_name = rospy.get_param("/manifest/robot_class")
        except KeyError:
            rospy.logerr("RobotParam:get_robot_name cannot detect robot name"
                         " under param /manifest/robot_class")
        except (socket.error, socket.gaierror):
            self._log_networking_error()
        return robot_name
limb.py 文件源码 项目:intera_sdk 作者: RethinkRobotics 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def move_to_neutral(self, timeout=15.0, speed=0.3):
        """
        Command the Limb joints to a predefined set of "neutral" joint angles.
        From rosparam named_poses/<limb>/poses/neutral.

        @type timeout: float
        @param timeout: seconds to wait for move to finish [15]
        @type speed: float
        @param speed: ratio of maximum joint speed for execution
                      default= 0.3; range= [0.0-1.0]
        """
        try:
            neutral_pose = rospy.get_param("named_poses/{0}/poses/neutral".format(self.name))
        except KeyError:
            rospy.logerr(("Get neutral pose failed, arm: \"{0}\".").format(self.name))
            return
        angles = dict(zip(self.joint_names(), neutral_pose))
        self.set_joint_position_speed(speed)
        return self.move_to_joint_positions(angles, timeout)
cuff.py 文件源码 项目:intera_sdk 作者: RethinkRobotics 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, limb="right"):
        """
        Constructor.

        @type limb: str
        @param limb: limb side to interface
        """
        params = RobotParams()
        limb_names = params.get_limb_names()
        if limb not in limb_names:
            rospy.logerr("Cannot detect Cuff's limb {0} on this robot."
                         " Valid limbs are {1}. Exiting Cuff.init().".format(
                                                            limb, limb_names))
            return
        self.limb = limb
        self.device = None
        self.name = "cuff"
        self.cuff_config_sub = rospy.Subscriber('/io/robot/cuff/config', IODeviceConfiguration, self._config_callback)
        # Wait for the cuff status to be true
        intera_dataflow.wait_for(
            lambda: not self.device is None, timeout=5.0,
            timeout_msg=("Failed find cuff on limb '{0}'.".format(limb))
            )
        self._cuff_io = IODeviceInterface("robot", self.name)
head_display.py 文件源码 项目:intera_sdk 作者: RethinkRobotics 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _setup_image(self, image_path):
        """
        Load the image located at the specified path

        @type image_path: str
        @param image_path: the relative or absolute file path to the image file

        @rtype: sensor_msgs/Image or None
        @param: Returns sensor_msgs/Image if image convertable and None otherwise
        """
        if not os.access(image_path, os.R_OK):
            rospy.logerr("Cannot read file at '{0}'".format(image_path))
            return None

        img = cv2.imread(image_path)
        # Return msg
        return cv_bridge.CvBridge().cv2_to_imgmsg(img, encoding="bgr8")
camera.py 文件源码 项目:intera_sdk 作者: RethinkRobotics 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self):
        """
        Constructor.
        """
        camera_param_dict = RobotParams().get_camera_details()
        camera_list = camera_param_dict.keys()
        # check to make sure cameras is not an empty list
        if not camera_list:
            rospy.logerr(' '.join(["camera list is empty: ", ' , '.join(camera_list)]))
            return
        camera_color_dict = {"mono":['cognex'], "color":['ienso_ethernet']}
        self.cameras_io = dict()
        for camera in camera_list:
            if camera_param_dict[camera]['cameraType'] in camera_color_dict[''
            'color']:
                is_color = True
            else:
                is_color = False
            self.cameras_io[camera] = {'interface': IODeviceInterface("internal"
                "_camera", camera), 'is_color': is_color}
camera.py 文件源码 项目:intera_sdk 作者: RethinkRobotics 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def stop_streaming(self, camera_name):
        """
        Stop camera streaming by given the camera name.

        @type camera_name: str
        @param camera_name: camera name

        @rtype: bool
        @return: False if camera not exists in camera name list or not able
                 to stop streaming camera. True if the camera not is streaming
                 mode or the camera successfully stop streaming.
        """
        if not self.verify_camera_exists(camera_name):
            return False
        elif self._camera_streaming_status(camera_name):
            self.cameras_io[camera_name]['interface'].set_signal_value(
                "camera_streaming", False)
            if self._camera_streaming_status(camera_name):
                rospy.logerr(' '.join(["Failed to stop", camera_name,
                " from streaming on this robot."]))
                return False
            else:
                return True
        else: # Camera not in streaming mode
            return True
io_interface.py 文件源码 项目:intera_sdk 作者: RethinkRobotics 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def set_signal_value(self, signal_name, signal_value, signal_type=None, timeout=5.0):
        """
        set the value for the given signal
        return True if the signal value is set, False if the requested signal is invalid
        """
        if signal_name not in self.list_signal_names():
            rospy.logerr("Cannot find signal '{0}' in this IO Device.".format(signal_name))
            return
        if signal_type == None:
            s_type = self.get_signal_type(signal_name)
            if s_type == None:
                rospy.logerr("Failed to get 'type' for signal '{0}'.".format(signal_name))
                return
        else:
            s_type = signal_type
        set_command = SetCommand().set_signal(signal_name, s_type, signal_value)
        self.publish_command(set_command.op, set_command.args, timeout=timeout)
        # make sure both state and config are valid:
        self.revalidate(timeout, invalidate_state=False, invalidate_config=False)
io_interface.py 文件源码 项目:intera_sdk 作者: RethinkRobotics 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def set_port_value(self, port_name, port_value, port_type=None, timeout=5.0):
        """
        set the value for the given port
        return True if the port value is set, False if the requested port is invalid
        """
        if port_name not in list_port_names():
            rospy.logerr("Cannot find port '{0}' in this IO Device.".format(port_name))
            return
        if port_type == None:
            p_type = self.get_port_type(port_name)
            if p_type == None:
                rospy.logerr("Failed to get 'type' for port '{0}'.".format(port_name))
                return
        else:
            p_type = port_type
        set_command = SetCommand().set_port(port_name, p_type, port_value)
        self.publish_command(set_command.op, set_command.args, timeout=timeout)
        # make sure both state and config are valid:
        self.revalidate(timeout, invalidate_state=False, invalidate_config=False)
server.py 文件源码 项目:needybot-speech 作者: needybot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def clean_cache(self):
        """
        This method completely cleans the cache directory of this
        ``SpeechActionServer`` instance. Use with care.
        """
        rospy.loginfo('Purging speech cache...')
        for f in os.listdir(self._cache_dir):
            if f == '.gitkeep':
                continue
            try:
                p = os.path.join(self._cache_dir, f)
                if os.path.isfile(p):
                    os.unlink(p)
            except Exception as e:
                rospy.logerr(e)
        rospy.loginfo('Speech cache has been purged.')
        self.cleaned_pub.publish()
naoqi_node.py 文件源码 项目:spqrel_tools 作者: LCAS 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_proxy(self, name, warn=True):
        """
        Returns a proxy to a specific module. If it has not been created yet, it is created
        :param name: the name of the module to create a proxy for
        :return: a proxy to the corresponding module
        """
        if name in self.__proxies and self.__proxies[name] is not None:
            return self.__proxies[name]

        proxy = None
        try:
            proxy = ALProxy(name,self.pip,self.pport)
        except RuntimeError,e:
            if warn:
                rospy.logerr("Could not create Proxy to \"%s\". \nException message:\n%s",name, e)

        self.__proxies[name] = proxy
        return proxy
localization.py 文件源码 项目:spqrel_tools 作者: LCAS 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def connectNaoQi(self):
        rospy.loginfo("Exploration Node Connecting to NaoQi at %s:%d", self.pip, self.pport)

        self.navigation = self.get_proxy("ALNavigation")
        self.motion = self.get_proxy("ALMotion")
        if self.navigation is None or self.motion is None:
            rospy.logerr("Unable to reach ALMotion and ALNavigation.")
            exit(0)
        version_array = self.get_proxy("ALSystem").systemVersion().split('.')
        if len(version_array) < 3:
            rospy.logerr("Unable to deduce the system version.")
            exit(0)
        version_tuple = (int(version_array[0]), int(version_array[1]), int(version_array[2]))
        min_version = (2, 5, 2)
        if version_tuple < min_version:
            rospy.logerr("Naoqi version " + str(min_version) + " required for localization. Yours is " + str(version_tuple))
            exit(0)
loading_map.py 文件源码 项目:spqrel_tools 作者: LCAS 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def connectNaoQi(self):
        rospy.loginfo("Node Loading map at %s:%d", self.pip, self.pport)

        self.navigation = self.get_proxy("ALNavigation")
        if self.navigation is None:
            rospy.logerr("Unable to reach ALNavigation.")
            exit(0)

        version_array = self.get_proxy("ALSystem").systemVersion().split('.')
        if len(version_array) < 3:
            rospy.logerr("Unable to deduce the system version.")
            exit(0)
        version_tuple = (int(version_array[0]), int(version_array[1]), int(version_array[2]))
        min_version = (2, 5, 2)
        if version_tuple < min_version:
            rospy.logerr("Naoqi version " + str(min_version) +
            " required for localization. Yours is " + str(version_tuple))
            exit(0)
cable.py 文件源码 项目:needybot-core 作者: needybot 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def publish(topic, instructions=None):
    if type(topic) not in [str, nb_channels.Messages]:
        rospy.logerr("** Topic must be of type string or Messages in order to publish to cable **")
        return False

    if instructions and type(instructions) is not dict:
        rospy.logerr("** Instructions must be of type dictionary in order to publish to cable **")
        return False

    # Properly format topic string
    topic_val = topic if type(topic) is str else topic.value 

    try:
        rospy.wait_for_service('ui_send', 0.1)

        client = UIClient()
        return client.send(
            topic_val,
            json.dumps(instructions) 
        )
    except rospy.ROSException:
        return UIMessageResponse(success=False)
hands.py 文件源码 项目:srcsim2017 作者: ZarjRobotics 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def set_arm_configuration(self, sidename, joints, movetime = None):
        if sidename == 'left':
            side = ArmTrajectoryRosMessage.LEFT
        elif sidename == 'right':
            side = ArmTrajectoryRosMessage.RIGHT
        else:
            rospy.logerr("Unknown side {}".format(sidename))
            return

        if movetime is None:
            movetime = self.ARM_MOVE_TIME

        for i in range(len(joints)):
            if math.isnan(joints[i]):
                joints[i] = self.last_arms[side][i]

        msg = self.make_arm_trajectory(side, ArmTrajectoryRosMessage.OVERRIDE, movetime, joints)
        self.last_arms[side] = deepcopy(joints)
        log('Setting {} arm to {}'.format(sidename, joints))
        self.arm_trajectory_publisher.publish(msg)
        rospy.sleep(movetime)


问题


面经


文章

微信
公众号

扫码关注公众号