python类ServiceException()的实例源码

demo_vision_poseest_pickplace.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def ik_quaternion(self, quaternion_pose):
        """Take a xyz qxqyqzqw stamped pose and convert it to joint angles
        using IK. You can call self.limb.move_to_joint_positions to
        move to those angles

        """
        node = ("ExternalTools/{}/PositionKinematicsNode/"
                "IKService".format(self.limb_name))
        ik_service = rospy.ServiceProxy(node, SolvePositionIK)
        ik_request = SolvePositionIKRequest()
        ik_request.pose_stamp.append(quaternion_pose)
        try:
            rospy.loginfo('ik: waiting for IK service...')
            rospy.wait_for_service(node, 5.0)
            ik_response = ik_service(ik_request)
        except (rospy.ServiceException, rospy.ROSException) as err:
            rospy.logerr("ik_move: service request failed: %r" % err)
        else:
            if ik_response.isValid[0]:
                rospy.loginfo("ik_move: valid joint configuration found")
                # convert response to joint position control dictionary
                limb_joints = dict(zip(ik_response.joints[0].name,
                                       ik_response.joints[0].position))
                return limb_joints
            else:
                rospy.logerr('ik_move: no valid configuration found')
                return None
demo_graspsuccessExp.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def ik_quaternion(self, quaternion_pose):
        """Take a xyz qxqyqzqw stamped pose and convert it to joint angles
        using IK. You can call self.limb.move_to_joint_positions to
        move to those angles

        """
        node = ("ExternalTools/{}/PositionKinematicsNode/"
                "IKService".format(self.limb_name))
        ik_service = rospy.ServiceProxy(node, SolvePositionIK)
        ik_request = SolvePositionIKRequest()
        ik_request.pose_stamp.append(quaternion_pose)
        try:
            rospy.loginfo('ik: waiting for IK service...')
            rospy.wait_for_service(node, 5.0)
            ik_response = ik_service(ik_request)
        except (rospy.ServiceException, rospy.ROSException) as err:
            rospy.logerr("ik_move: service request failed: %r" % err)
        else:
            if ik_response.isValid[0]:
                rospy.loginfo("ik_move: valid joint configuration found")
                # convert response to joint position control dictionary
                limb_joints = dict(zip(ik_response.joints[0].name,
                                       ik_response.joints[0].position))
                return limb_joints
            else:
                rospy.logerr('ik_move: no valid configuration found')
                return None
jaco.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def home(self):
        addr = '/{}_driver/in/home_arm'.format(self.robot_type)
        rospy.wait_for_service(addr)
        try:
            serv = rospy.ServiceProxy(addr, HomeArm)
            rep = serv()
            rospy.loginfo(rep)
        except rospy.ServiceException as e:
            rospy.loginfo("Service error {}".format(e))
user.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def update_focus(self):
        """ Updates focused interest."""
        try:
            interest_id = request.form['interestId']
            self.services.set_focus(interest_id)
        except ServiceException as e:
            rospy.logerr("Cannot set focus. " + repr(e))
        return '', 204
user.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def time_travel(self):
        """ Revert experiment state to a given point."""
        try:
            point = request.form['point']
            self.services.set_iteration(point)
        except ServiceException as e:
            rospy.logerr("Cannot set iteration. " + repr(e))
        return '', 204
user.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def update_assessment(self):
        """ Updates assessment. """
        try:
            assessment_id = request.form['assessmentId']
            self.services.assess(assessment_id)
        except ServiceException as e:
            rospy.logerr("Cannot set assessment. " + repr(e))
        return '', 204
baxter_arm_control.py 文件源码 项目:baxter_socket_server 作者: AndrewChenUoA 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def baxter_ik_move(self, rpy_pose):   
#        quaternion_pose = conversions.list_to_pose_stamped(rpy_pose, "base")
        quaternion_pose = convert_pose_to_msg(rpy_pose, "base")

        node = "ExternalTools/" + self.limb + "/PositionKinematicsNode/IKService"
        ik_service = rospy.ServiceProxy(node, SolvePositionIK)
        ik_request = SolvePositionIKRequest()
        hdr = Header(stamp=rospy.Time.now(), frame_id="base")

        ik_request.pose_stamp.append(quaternion_pose)
        try:
            rospy.wait_for_service(node, 5.0)
            ik_response = ik_service(ik_request)
        except (rospy.ServiceException, rospy.ROSException), error_message:
            rospy.logerr("Service request failed: %r" % (error_message,))
            print "ERROR - baxter_ik_move - Failed to append pose"
            return 1
        if (ik_response.isValid[0]):
            # convert response to joint position control dictionary
            limb_joints = dict(zip(ik_response.joints[0].name, ik_response.joints[0].position))
            # move limb
            self.limb_interface.move_to_joint_positions(limb_joints)
        else:
            print "ERROR - baxter_ik_move - No valid joint configuration found"
            return 2

        self.getPose() #Store current pose into self.pose
        print "Move Executed"

        return -1

#Because we can't get full moveit_commander on the raspberry pi due to memory limitations, rewritten implementation of the required conversion function
human_model.py 文件源码 项目:human_moveit_config 作者: baxter-flowers 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def forward_kinematic(self, joint_state, base='base', links=None):
        def compute_fk_client():
            try:
                header = Header()
                header.stamp = rospy.Time.now()
                header.frame_id = self.prefix + '/base'
                rs = RobotState()
                rs.joint_state = joint_state
                res = self.compute_fk(header, links, rs)
                return res.pose_stamped
            except rospy.ServiceException, e:
                print "Service call failed: %s" % e
                # in case of troubles return 0 pose stamped
                return []

        if links is None:
            links = self.end_effectors['whole_body']
        if type(links) is not list:
            if links == "all":
                links = self.get_link_names('whole_body')
            else:
                links = [links]
        # check that the base is in links
        if base != 'base' and base not in links:
            links.append(base)
        pose_stamped_list = compute_fk_client()
        if not pose_stamped_list:
            return {}
        # transform it in a dict of poses
        pose_dict = {}
        if base != 'base':
            tr_base = transformations.pose_to_list(pose_stamped_list[links.index(base)].pose)
            inv_base = transformations.inverse_transform(tr_base)
            for i in range(len(links)):
                if links[i] != base:
                    tr = transformations.pose_to_list(pose_stamped_list[i].pose)
                    pose_dict[links[i]] = transformations.multiply_transform(inv_base, tr)
        else:
            for i in range(len(links)):
                pose_dict[links[i]] = transformations.pose_to_list(pose_stamped_list[i].pose)
        return pose_dict
human_model.py 文件源码 项目:human_moveit_config 作者: baxter-flowers 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def inverse_kinematic(self, desired_poses, fixed_joints={}, tolerance=0.001, group_names='whole_body', seed=None):
        def compute_ik_client():
            rospy.wait_for_service('compute_human_ik')
            try:
                compute_ik = rospy.ServiceProxy('compute_human_ik', GetHumanIK)
                res = compute_ik(poses, fixed_joint_state, tolerance, group_names, seed)
                return res.joint_state
            except rospy.ServiceException, e:
                print "Service call failed: %s" % e
                # in case of failure return T pose
                return self.get_initial_state()
        if seed is None:
            seed = self.get_current_state()
        if group_names is not list:
            group_names = [group_names]
        # convert the desired poses to PoseStamped
        poses = []
        for key, value in desired_poses.iteritems():
            pose = transformations.list_to_pose(value)
            pose.header.frame_id = key
            poses.append(pose)
        # convert the fixed joints to joint state
        fixed_joint_state = JointState()
        for key, value in fixed_joints.iteritems():
            fixed_joint_state.name += [key]
            fixed_joint_state.position += [value]
        return compute_ik_client()
human_model.py 文件源码 项目:human_moveit_config 作者: baxter-flowers 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def jacobian(self, group_name, joint_state, use_quaternion=False, link=None, ref_point=None):
        def compute_jacobian_srv():
            rospy.wait_for_service('compute_jacobian')
            try:
                compute_jac = rospy.ServiceProxy('compute_jacobian', GetJacobian)
                js = JointState()
                js.name = self.get_joint_names(group_name)
                js.position = self.extract_group_joints(group_name, joint_state)
                p = Point(x=ref_point[0], y=ref_point[1], z=ref_point[2])
                # call the service
                res = compute_jac(group_name, link, js, p, use_quaternion)
                # reorganize the jacobian
                jac_array = np.array(res.jacobian).reshape((res.nb_rows, res.nb_cols))
                # reorder the jacobian wrt to the joint state
                ordered_jac = np.zeros((len(jac_array), len(joint_state.name)))
                for i, n in enumerate(js.name):
                    ordered_jac[:, joint_state.name.index(n)] = jac_array[:, i]
                return ordered_jac
            except rospy.ServiceException, e:
                print "Service call failed: %s" % e
        #  compute the jacobian only for chains
        # if group_name not in ['right_arm', 'left_arm', 'head', 'right_leg', 'left_leg']:
        #     rospy.logerr('The Jacobian matrix can only be computed on kinematic chains')
        #     return []
        # assign values
        if link is None:
            link = self.end_effectors[group_name]
        if ref_point is None:
            ref_point = [0, 0, 0]
        # return the jacobian
        return compute_jacobian_srv()
torch_control.py 文件源码 项目:baxter_dqn 作者: powertj 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def delete_gazebo_models():
    # This will be called on ROS Exit, deleting Gazebo models
    try:
        delete_model = rospy.ServiceProxy('/gazebo/delete_model', DeleteModel)
        resp_delete = delete_model("cafe_table")
        resp_delete = delete_model("object")
    except rospy.ServiceException, e:
        rospy.loginfo("Delete Model service call failed: {0}".format(e))
check_driver_io.py 文件源码 项目:raspimouse_ros 作者: ryuichiueda 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def switch_motors(onoff):
    rospy.wait_for_service('/switch_motors')
    try:
        p = rospy.ServiceProxy('/switch_motors', SwitchMotors)
        res = p(onoff)
        return res.accepted
    except rospy.ServiceException, e:
        print "Service call failed: %s"%e
    else:
        return False
check_driver_io.py 文件源码 项目:raspimouse_ros 作者: ryuichiueda 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def pos_control(left_hz,right_hz,time_ms):
    rospy.wait_for_service('/put_motor_freqs')
    try:
        p = rospy.ServiceProxy('/put_motor_freqs', PutMotorFreqs)
        res = p(left_hz,right_hz,time_ms)
        return res.accepted
    except rospy.ServiceException, e:
        print "Service call failed: %s"%e
    else:
        return False
ik_execute.py 文件源码 项目:robot-grasp 作者: falcondai 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_ik_joints(target_x, target_y, initial_z, target_z, target_theta, n_steps):
    ns = "ExternalTools/left/PositionKinematicsNode/IKService"
    iksvc = rospy.ServiceProxy(ns, SolvePositionIK)
    ikreq = SolvePositionIKRequest(seed_mode=SolvePositionIKRequest.SEED_CURRENT)
    hdr = Header(stamp=rospy.Time.now(), frame_id='base')

    qx = np.sin(target_theta * 0.5)
    qy = np.cos(target_theta * 0.5)

    for z in np.arange(initial_z, target_z, (target_z-initial_z)/n_steps):
        pose = PoseStamped(
            header=hdr,
            pose=Pose(
                position=Point( x=target_x, y=target_y, z=z, ),
                orientation=Quaternion( x=qx, y=qy, z=0., w=0., ),
            ),
        )
        ikreq.pose_stamp.append(pose)
    try:
        rospy.wait_for_service(ns, 5.0)
        resp = iksvc(ikreq)
    except (rospy.ServiceException, rospy.ROSException), e:
        rospy.logerr("Service call failed: %s" % (e,))
        return 1

    return [j for v, j in zip(resp.isValid, resp.joints) if v]
simple_limb.py 文件源码 项目:baxter_gui 作者: HumaRobotics 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def getAnglesFromPose(self,pose):
        if type(pose)==Pose:
            goal=PoseStamped()
            goal.header.frame_id="/base"
            goal.pose=pose
        else:
            goal=pose


        if not self.ik:
            rospy.logerror("ERROR: Inverse Kinematics service was not loaded")
            return None
        goalstr="%f,%f,%f"%(goal.pose.position.x,goal.pose.position.y,goal.pose.position.z)
        ikreq = SolvePositionIKRequest()

        ikreq.pose_stamp.append(goal)
        try:
            resp = self.iksvc(ikreq)
            if (resp.isValid[0]):
                return resp.joints[0]
            else:
                rospy.logerr("FAILURE - No Valid Joint Solution Found for %s"%goalstr)
                return None
        except rospy.ServiceException,e :
            rospy.loginfo("Service call failed: %s" % (e,))
            return None
estop.py 文件源码 项目:makerfaire-berlin-2016-demos 作者: bitcraze 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def joystickUpdated(data):
    rospy.logfatal("Emergency stop butoon pressed!")
    if data.buttons[0] == 1:
        for e in emergencies:
            try:
                e()
            except ServiceException:
                pass
mavros_driver.py 文件源码 项目:offboard 作者: Stifael 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def set_mode(self, mode):
        if not self.current_state.connected:
            print "No FCU connection"

        elif self.current_state.mode == mode:
            print "Already in " + mode + " mode"

        else:

            # wait for service
            rospy.wait_for_service("mavros/set_mode")   


            # service client
            set_mode = rospy.ServiceProxy("mavros/set_mode", SetMode)


            # set request object
            req = SetModeRequest()
            req.custom_mode = mode


            # zero time 
            t0 = rospy.get_time()


            # check response
            while not rospy.is_shutdown() and (self.current_state.mode != req.custom_mode):
                if rospy.get_time() - t0 > 2.0: # check every 5 seconds

                    try:
                        # request 
                        set_mode.call(req)

                    except rospy.ServiceException, e:
                        print "Service did not process request: %s"%str(e)

                    t0 = rospy.get_time()


            print "Mode: "+self.current_state.mode + " established"
pos_tools.py 文件源码 项目:baxter 作者: destrygomorphous 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def go_to_pose(lmb, pose_msg, timeout=15.0):

    """
    Given a limb (right or left) and a desired pose as a stamped message,
    run inverse kinematics to attempt to find a joint configuration to yield
    the pose and then move limb to the configuration.
    After 5 seconds of attempts to solve the IK, raise an exception.
    """

    node = "ExternalTools/" + lmb + "/PositionKinematicsNode/IKService"
    ik_service = rospy.ServiceProxy(node, SolvePositionIK)
    ik_request = SolvePositionIKRequest()
    ik_request.pose_stamp.append(pose_msg)

    # Allow 5 seconds to find an IK solution
    try:
        rospy.wait_for_service(node, 5.0)
        ik_response = ik_service(ik_request)
    except (rospy.ServiceException, rospy.ROSException), e:
        rospy.logerr("Service call failed: %s" % (e,))
        return 1

    # Check if result valid
    if ik_response.isValid[0]:
        # convert response to joint position control dictionary
        limb_joints = dict(zip(ik_response.joints[0].name,
                               ik_response.joints[0].position))
        # send limb to joint positions
        baxter_interface.limb.Limb(lmb).move_to_joint_positions(
            limb_joints, timeout=timeout)
    else:
        raise Exception("Failed to find valid joint configuration.")
service.py 文件源码 项目:rostrace 作者: ChrisTimperley 项目源码 文件源码 阅读 69 收藏 0 点赞 0 评论 0
def __handler(self, server, service_name, proxy, req):
        time_start = timer()
        client = req._connection_header['callerid']

        # generate a JSON-encodable description of the parameters for this request
        # TODO: will fail with complex, embedded objects
        params = {p: getattr(req, p) for p in req.__slots__}

        # send the request and wait for a response
        success = False
        try:
            ret = proxy(req)
            success = True
            response = {p: getattr(ret, p) for p in ret.__slots__}

        except rospy.ServiceException, e:
            success = False
            response = {'reason': e} 

        # log the service call
        finally:
            time_end = timer()
            time_duration = time_end - time_start

            log = {
                'service': service_name,
                'server': server,
                'client': client,
                'time_start': time_start,
                'time_end': time_end,
                'time_duration': time_duration,
                'params': params,
                'response': response,
                'success': success
            }
            serviceCallPublisher.publish(json.dumps(log))

        return ret
gazeboObject.py 文件源码 项目:arm_scenario_simulator 作者: cmaestre 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def delete(self):
        if not self.spawned: return
        try:
            rospy.loginfo("Deleting "+self.gazebo_name)
            GazeboObject.delete_model_srv.wait_for_service()
            resp_delete = GazeboObject.delete_model_srv(self.gazebo_name)
            self.spawned = False
        except rospy.ServiceException, e: 
            pass # Don't know why, but an exception is raised by ROS whereas the deletion is actually successful ... So ignore the exception


问题


面经


文章

微信
公众号

扫码关注公众号