python类Fault()的实例源码

__init__.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def send_loop(self):
        while True:
            await asyncio.sleep(0.25)
            if len(self.send_queue) == 0:
                continue

            # Copy send queue and clear the global one
            queue = self.send_queue.copy()
            self.send_queue.clear()

            # Process and push out the queue.
            try:
                await self.instance.gbx.multicall(*queue)
            except Fault as e:
                if 'Login unknown' in str(e):
                    return
                logger.exception(e)
                handle_exception(exception=e, module_name=__name__, func_name='send_loop')
            except Exception as e:
                logger.exception(e)
                handle_exception(exception=e, module_name=__name__, func_name='send_loop')
remote.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def handle_payload(self, handle_nr, method=None, data=None, fault=None):
        """
        Handle a callback/response payload or fault.

        :param handle_nr: Handler ID
        :param method: Method name
        :param data: Parsed payload data.
        :param fault: Fault object.
        """
        if handle_nr in self.handlers:
            await self.handle_response(handle_nr, method, data, fault)
        elif method and data is not None:
            if method == 'ManiaPlanet.ModeScriptCallbackArray':
                await self.handle_scripted(handle_nr, method, data)
            elif method == 'ManiaPlanet.ModeScriptCallback':
                await self.handle_scripted(handle_nr, method, data)
            else:
                await self.handle_callback(handle_nr, method, data)
        elif fault is not None:
            raise TransportException('Handle payload got invalid parameters, see fault exception! {}'.format(fault)) from fault
        else:
            print(method, handle_nr, data)
            logging.warning('Received gbx data, but handle wasn\'t known or payload invalid: handle_nr: {}, method: {}'.format(
                handle_nr, method,
            ))
__init__.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def podium_start(self, **kwargs):
        if len(self.jukebox) == 0:
            return
        next = self.jukebox.pop(0)
        message = '$fa0The next map will be $fff{}$z$s$fa0 as requested by $fff{}$z$s$fa0.'.format(next['map'].name, next['player'].nickname)

        # Try to set the map, if not successful it might be that the map is removed while juked!
        try:
            await asyncio.gather(
                self.instance.chat(message),
                self.instance.map_manager.set_next_map(next['map'])
            )
        except Fault as e:
            # It's removed from the server.
            if 'Map not in the selection' in e.faultString or 'Map unknown' in e.faultString:
                await self.instance.chat(
                    '$fa0Setting the next map has been canceled because the map is not on the server anymore!'
                )

                # Retry the next map(s).
                await self.podium_start()
            else:
                raise
XenAPI.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def xenapi_request(self, methodname, params):
        if methodname.startswith('login'):
            self._login(methodname, params)
            return None

        if methodname == 'logout' or methodname == 'session.logout':
            self._logout()
            return None

        retry_count = 0
        while retry_count < 3:
            full_params = (self._session,) + params
            result = _parse_result(getattr(self, methodname)(*full_params))
            if result is _RECONNECT_AND_RETRY:
                retry_count += 1
                if self.last_login_method:
                    self._login(self.last_login_method,
                                self.last_login_params)
                else:
                    raise xmlrpcclient.Fault(401, 'You must log in')
            else:
                return result
        raise xmlrpcclient.Fault(
            500, 'Tried 3 times to get a valid session, but failed')
XenAPI.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _login(self, method, params):
        try:
            result = _parse_result(
                getattr(self, 'session.%s' % method)(*params))
            if result is _RECONNECT_AND_RETRY:
                raise xmlrpcclient.Fault(
                    500, 'Received SESSION_INVALID when logging in')
            self._session = result
            self.last_login_method = method
            self.last_login_params = params
            self.API_version = self._get_api_version()
        except socket.error:
            e = sys.exc_info()[1]
            if e.errno == socket.errno.ETIMEDOUT:
                raise xmlrpcclient.Fault(504, 'The connection timed out')
            else:
                raise e
XenAPI.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _parse_result(result):
    if type(result) != dict or 'Status' not in result:
        raise xmlrpcclient.Fault(
            500, 'Missing Status in response from server' + result)
    if result['Status'] == 'Success':
        if 'Value' in result:
            return result['Value']
        else:
            raise xmlrpcclient.Fault(
                500, 'Missing Value in response from server')
    else:
        if 'ErrorDescription' in result:
            if result['ErrorDescription'][0] == 'SESSION_INVALID':
                return _RECONNECT_AND_RETRY
            else:
                raise Failure(result['ErrorDescription'])
        else:
            raise xmlrpcclient.Fault(
                500, 'Missing ErrorDescription in response from server')


# Based upon _Method from xmlrpclib.
client.py 文件源码 项目:infusionsoft-client 作者: theY4Kman 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __request(self, methodname, args):
        retries_left = self.__retries
        while True:
            try:
                return self.__real_request(methodname, args)
            except (ProtocolError, Fault, socket.error) as exc:
                if isinstance(exc, Fault):
                    # Retry request on InvalidConfig (a false, infrequent fault)
                    if '[InvalidConfig]' not in exc.faultString:
                        raise

                if retries_left == 0:
                    raise
                else:
                    retries_left -= 1
                    continue
publisher.py 文件源码 项目:sphinxcontrib-confluencebuilder 作者: tonybaloney 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def removePage(self, page_id):
        if self.use_rest:
            try:
                self.rest_client.delete('content', page_id)
            except ConfluencePermissionError:
                raise ConfluencePermissionError(
                    """Publish user does not have permission to delete """
                    """from the configured space."""
                )
        else:
            try:
                self.xmlrpc.removePage(self.token, page_id)
            except xmlrpclib.Fault as ex:
                if ex.faultString.find('NotPermittedException') != -1:
                    raise ConfluencePermissionError(
                        """Publish user does not have permission to delete """
                        """from the configured space."""
                    )
                raise
connection.py 文件源码 项目:python-mysql-pool 作者: LuciferJack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def execute(self, group, command, *args, **kwargs):
        """Executes the given command with MySQL protocol

        Executes the given command with the given parameters.

        Returns an iterator to navigate to navigate through the result set
        returned by Fabric
        """
        params = self.create_params(*args, **kwargs)
        cmd = "CALL {0}.{1}({2})".format(group, command, params)

        fab_set = None
        try:
            data = self._execute_cmd(cmd)
            fab_set = FabricMySQLSet(data)
        except (Fault, socket.error, InterfaceError) as exc:
            msg = "Executing {group}.{command} failed: {error}".format(
                group=group, command=command, error=str(exc))
            raise InterfaceError(msg)

        return fab_set
connection.py 文件源码 项目:python-mysql-pool 作者: LuciferJack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def execute(self, group, command, *args, **kwargs):
        """Executes the given command with XML-RPC protocol

        Executes the given command with the given parameters

        Returns an iterator to navigate to navigate through the result set
        returned by Fabric
        """
        try:
            grp = getattr(self.handler.proxy, group)
            cmd = getattr(grp, command)
        except AttributeError as exc:
            raise ValueError("{group}.{command} not available ({err})".format(
                group=group, command=command, err=str(exc)))

        fab_set = None
        try:
            data = cmd(*args, **kwargs)
            fab_set = FabricSet(data)
        except (Fault, socket.error, InterfaceError) as exc:
            msg = "Executing {group}.{command} failed: {error}".format(
                group=group, command=command, error=str(exc))
            raise InterfaceError(msg)

        return fab_set
connection.py 文件源码 项目:python-mysql-pool 作者: LuciferJack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def report_failure(self, server_uuid, errno):
        """Report failure to Fabric

        This method sets the status of a MySQL server identified by
        server_uuid.
        """
        if not self._report_errors:
            return

        errno = int(errno)
        current_host = socket.getfqdn()

        if errno in REPORT_ERRORS or errno in REPORT_ERRORS_EXTRA:
            _LOGGER.debug("Reporting error %d of server %s", errno,
                          server_uuid)
            inst = self.get_instance()
            try:
                data = inst.execute('threat', 'report_failure',
                                    server_uuid, current_host, errno)
                FabricResponse(data)
            except (Fault, socket.error) as exc:
                _LOGGER.debug("Failed reporting server to Fabric (%s)",
                              str(exc))
                # Not requiring further action
connection.py 文件源码 项目:python-mysql-pool 作者: LuciferJack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def is_connected(self):
        """Check whether connection with Fabric is valid

        Return True if we can still interact with the Fabric server; False
        if Not.

        Returns True or False.
        """
        try:
            self._proxy._some_nonexisting_method()  # pylint: disable=W0212
        except Fault:
            return True
        except (TypeError, AttributeError):
            return False
        else:
            return False
connection.py 文件源码 项目:importacsv 作者: rasertux 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def execute(self, group, command, *args, **kwargs):
        """Executes the given command with MySQL protocol

        Executes the given command with the given parameters.

        Returns an iterator to navigate to navigate through the result set
        returned by Fabric
        """
        params = self.create_params(*args, **kwargs)
        cmd = "CALL {0}.{1}({2})".format(group, command, params)

        fab_set = None
        try:
            data = self._execute_cmd(cmd)
            fab_set = FabricMySQLSet(data)
        except (Fault, socket.error, InterfaceError) as exc:
            msg = "Executing {group}.{command} failed: {error}".format(
                group=group, command=command, error=str(exc))
            raise InterfaceError(msg)

        return fab_set
connection.py 文件源码 项目:importacsv 作者: rasertux 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def execute(self, group, command, *args, **kwargs):
        """Executes the given command with XML-RPC protocol

        Executes the given command with the given parameters

        Returns an iterator to navigate to navigate through the result set
        returned by Fabric
        """
        try:
            grp = getattr(self.handler.proxy, group)
            cmd = getattr(grp, command)
        except AttributeError as exc:
            raise ValueError("{group}.{command} not available ({err})".format(
                group=group, command=command, err=str(exc)))

        fab_set = None
        try:
            data = cmd(*args, **kwargs)
            fab_set = FabricSet(data)
        except (Fault, socket.error, InterfaceError) as exc:
            msg = "Executing {group}.{command} failed: {error}".format(
                group=group, command=command, error=str(exc))
            raise InterfaceError(msg)

        return fab_set
connection.py 文件源码 项目:importacsv 作者: rasertux 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def report_failure(self, server_uuid, errno):
        """Report failure to Fabric

        This method sets the status of a MySQL server identified by
        server_uuid.
        """
        if not self._report_errors:
            return

        errno = int(errno)
        current_host = socket.getfqdn()

        if errno in REPORT_ERRORS or errno in REPORT_ERRORS_EXTRA:
            _LOGGER.debug("Reporting error %d of server %s", errno,
                          server_uuid)
            inst = self.get_instance()
            try:
                data = inst.execute('threat', 'report_failure',
                                    server_uuid, current_host, errno)
                FabricResponse(data)
            except (Fault, socket.error) as exc:
                _LOGGER.debug("Failed reporting server to Fabric (%s)",
                              str(exc))
                # Not requiring further action
connection.py 文件源码 项目:importacsv 作者: rasertux 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def is_connected(self):
        """Check whether connection with Fabric is valid

        Return True if we can still interact with the Fabric server; False
        if Not.

        Returns True or False.
        """
        try:
            self._proxy._some_nonexisting_method()  # pylint: disable=W0212
        except Fault:
            return True
        except (TypeError, AttributeError):
            return False
        else:
            return False
dev_rpypacker.py 文件源码 项目:taf 作者: taf3 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _send_request(self, method, *args, **kwargs):
        """Send xmlrpc request to remote pypacker server.

        """
        try:
            return getattr(self.xmlproxy, method)(pickle.dumps(args), pickle.dumps(kwargs))
        except Fault as err:
            err_tye_and_msg = err.faultString[-1].strip()
            if "PypackerException: " in err_tye_and_msg:
                err_msg = err_tye_and_msg.split(": ", 1)[-1].strip(" '")
                raise PypackerException(err_msg)
            elif "Skipped: " in err_tye_and_msg:
                skip_msg = err_tye_and_msg.split(": ", 1)[-1].strip(" '")
                pytest.skip(skip_msg)
            else:
                raise
containers.py 文件源码 项目:Kiwi 作者: kiwitcms 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _add(self, testcases):
        """ Add given test cases to the test run """
        # Short info about the action
        identifiers = [testcase.identifier for testcase in testcases]
        log.info("Adding {0} to {1}".format(
            listed(identifiers, "testcase", max=3),
            self._object.identifier))
        # Prepare data and push
        data = [testcase.id for testcase in testcases]
        log.data(pretty(data))
        try:
            self._server.TestRun.add_cases(self.id, data)
        # Handle duplicate entry errors by adding test cases one by one
        except xmlrpclib.Fault as error:
            if "Duplicate entry" not in str(error):
                raise
            log.warn(error)
            for id in data:
                try:
                    self._server.TestRun.add_cases(self.id, id)
                except xmlrpclib.Fault:
                    pass

        # RunCaseRuns will need update ---> erase current data
        self._object.caseruns._init()
test_testbuild.py 文件源码 项目:Kiwi 作者: kiwitcms 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_build_create_with_no_required_fields(self):
        values = {
            "description": "Test Build",
            "is_active": False
        }
        with self.assertRaisesRegex(XmlRPCFault, 'Product and name are both required'):
            self.rpc_client.Build.create(values)

        values["name"] = "TB"
        with self.assertRaisesRegex(XmlRPCFault, 'Product and name are both required'):
            self.rpc_client.Build.create(values)

        del values["name"]
        values["product"] = self.product.pk
        with self.assertRaisesRegex(XmlRPCFault, 'Product and name are both required'):
            self.rpc_client.Build.create(values)
test_testcaserun.py 文件源码 项目:Kiwi 作者: kiwitcms 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_create_with_no_required_fields(self):
        values = [
            {
                "assignee": self.staff.pk,
                "case_run_status": self.case_run_status.pk,
                "notes": "unit test 2"
            },
            {
                "build": self.build.pk,
                "assignee": self.staff.pk,
                "case_run_status": 1,
                "notes": "unit test 2"
            },
            {
                "run": self.test_run.pk,
                "build": self.build.pk,
                "assignee": self.staff.pk,
                "case_run_status": self.case_run_status.pk,
                "notes": "unit test 2"
            },
        ]
        for value in values:
            with self.assertRaisesRegex(XmlRPCFault, 'This field is required'):
                self.rpc_client.TestCaseRun.create(value)
test_testcaserun.py 文件源码 项目:Kiwi 作者: kiwitcms 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_create_with_non_exist_fields(self):
        values = [
            {
                "run": self.test_run.pk,
                "build": self.build.pk,
                "case": 111111,
            },
            {
                "run": 11111,
                "build": self.build.pk,
                "case": self.case.pk,
            },
            {
                "run": self.test_run.pk,
                "build": 11222222,
                "case": self.case.pk,
            },
        ]
        for value in values:
            with self.assertRaisesRegex(XmlRPCFault, 'Select a valid choice'):
                self.rpc_client.TestCaseRun.create(value)
connection.py 文件源码 项目:Chorus 作者: DonaldBough 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def execute(self, group, command, *args, **kwargs):
        """Executes the given command with MySQL protocol

        Executes the given command with the given parameters.

        Returns an iterator to navigate to navigate through the result set
        returned by Fabric
        """
        params = self.create_params(*args, **kwargs)
        cmd = "CALL {0}.{1}({2})".format(group, command, params)

        fab_set = None
        try:
            data = self._execute_cmd(cmd)
            fab_set = FabricMySQLSet(data)
        except (Fault, socket.error, InterfaceError) as exc:
            msg = "Executing {group}.{command} failed: {error}".format(
                group=group, command=command, error=str(exc))
            raise InterfaceError(msg)

        return fab_set
connection.py 文件源码 项目:Chorus 作者: DonaldBough 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def execute(self, group, command, *args, **kwargs):
        """Executes the given command with XML-RPC protocol

        Executes the given command with the given parameters

        Returns an iterator to navigate to navigate through the result set
        returned by Fabric
        """
        try:
            grp = getattr(self.handler.proxy, group)
            cmd = getattr(grp, command)
        except AttributeError as exc:
            raise ValueError("{group}.{command} not available ({err})".format(
                group=group, command=command, err=str(exc)))

        fab_set = None
        try:
            data = cmd(*args, **kwargs)
            fab_set = FabricSet(data)
        except (Fault, socket.error, InterfaceError) as exc:
            msg = "Executing {group}.{command} failed: {error}".format(
                group=group, command=command, error=str(exc))
            raise InterfaceError(msg)

        return fab_set
connection.py 文件源码 项目:Chorus 作者: DonaldBough 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def report_failure(self, server_uuid, errno):
        """Report failure to Fabric

        This method sets the status of a MySQL server identified by
        server_uuid.
        """
        if not self._report_errors:
            return

        errno = int(errno)
        current_host = socket.getfqdn()

        if errno in REPORT_ERRORS or errno in REPORT_ERRORS_EXTRA:
            _LOGGER.debug("Reporting error %d of server %s", errno,
                          server_uuid)
            inst = self.get_instance()
            try:
                data = inst.execute('threat', 'report_failure',
                                    server_uuid, current_host, errno)
                FabricResponse(data)
            except (Fault, socket.error) as exc:
                _LOGGER.debug("Failed reporting server to Fabric (%s)",
                              str(exc))
                # Not requiring further action
connection.py 文件源码 项目:Chorus 作者: DonaldBough 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def is_connected(self):
        """Check whether connection with Fabric is valid

        Return True if we can still interact with the Fabric server; False
        if Not.

        Returns True or False.
        """
        try:
            self._proxy._some_nonexisting_method()  # pylint: disable=W0212
        except Fault:
            return True
        except (TypeError, AttributeError):
            return False
        else:
            return False
__init__.py 文件源码 项目:osaAPI 作者: ingrammicro 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def Execute(self, method, params=None, server='BM'):
        if not params:
            params = []
        v = {
            'Params': params,
            'Server': server,
            'Method': method,
        }
        try:
            response = {
                'status': 0,
                'result': self.__server__.Execute(v)['Result'].pop(),
            }
            return response
        except client.Fault as err:
            response = {
                'error_message': base64.b64decode(err.faultString).strip(),
                'status': -1,
                'method': method,
                'params': params,
                'server': server,
                'host': self.host,
                'result': None,
            }
            return response
remote.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def listen(self):
        """
        Listen to socket.
        """
        try:
            while True:
                head = await self.reader.readexactly(8)
                size, handle = struct.unpack_from('<LL', head)
                body = await self.reader.readexactly(size)
                data = method = fault = None

                try:
                    data, method = loads(body, use_builtin_types=True)
                except Fault as e:
                    fault = e
                except ExpatError as e:
                    # See #121 for this solution.
                    handle_exception(exception=e, module_name=__name__, func_name='listen', extra_data={'body': body})
                    continue

                if data and len(data) == 1:
                    data = data[0]

                self.event_loop.create_task(self.handle_payload(handle, method, data, fault))
        except ConnectionResetError as e:
            logger.critical(
                'Connection with the dedicated server has been closed, we will now close down the subprocess! {}'.format(str(e))
            )
            # When the connection has been reset, we will close the controller process so it can be restarted by the god
            # process. Exit code 10 gives the information to the god process.
            exit(10)
        except Exception as e:
            handle_exception(exception=e, module_name=__name__, func_name='listen')
            raise
manager.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def add_map(self, filename, insert=True, save_matchsettings=True):
        """
        Add or insert map to current online playlist.

        :param filename: Load from filename relative to the 'Maps' directory on the dedicated host server.
        :param insert: Insert after the current map, this will make it play directly after the current map. True by default.
        :param save_matchsettings: Save match settings as well.
        :type filename: str
        :type insert: bool
        :type save_matchsettings: bool
        :raise: pyplanet.contrib.map.exceptions.MapIncompatible
        :raise: pyplanet.contrib.map.exceptions.MapException
        """
        gbx_method = 'InsertMap' if insert else 'AddMap'

        try:
            result = await self._instance.gbx(gbx_method, filename)
        except Fault as e:
            if 'unknown' in e.faultString:
                raise MapNotFound('Map is not found on the server.')
            elif 'already' in e.faultString:
                raise MapException('Map already added to server.')
            raise MapException(e.faultString)

        # Try to save match settings.
        try:
            if save_matchsettings:
                await self.save_matchsettings()
        except Exception as e:
            handle_exception(e, __name__, 'add_map', extra_data={'EXTRAHOOK': 'Map Insert bug, see #306'})

        return result
publisher.py 文件源码 项目:sphinxcontrib-confluencebuilder 作者: tonybaloney 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def getBasePageId(self):
        base_page_id = None

        if not self.parent_name:
            return base_page_id

        if self.use_rest:
            rsp = self.rest_client.get('content', {
                'type': 'page',
                'spaceKey': self.space_name,
                'title': self.parent_name,
                'status': 'current'
                })
            if rsp['size'] == 0:
                raise ConfluenceConfigurationError("""Configured parent """
                    """page name do not exist.""")
            page = rsp['results'][0]
            if self.parent_id and page['id'] != str(self.parent_id):
                raise ConfluenceConfigurationError("""Configured parent """
                    """page ID and name do not match.""")
            base_page_id = page['id']
        else:
            try:
                page = self.xmlrpc.getPage(
                    self.token, self.space_name, self.parent_name)
            except xmlrpclib.Fault:
                raise ConfluenceConfigurationError("""Configured parent """
                    """page name do not exist.""")
            if self.parent_id and page['id'] != str(self.parent_id):
                raise ConfluenceConfigurationError("""Configured parent """
                    """page ID and name do not match.""")
            base_page_id = page['id']

        if not base_page_id and self.parent_id:
            raise ConfluenceConfigurationError("""Unable to find the """
                """parent page matching the ID or name provided.""")

        return base_page_id
publisher.py 文件源码 项目:sphinxcontrib-confluencebuilder 作者: tonybaloney 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def updateSpaceHome(self, page_id):
        if not page_id:
            return

        if self.use_rest:
            page = self.rest_client.get('content/' + page_id, None)
            try:
                self.rest_client.put('space', self.space_name, {
                    'key': self.space_name,
                    'name': self.space_display_name,
                    'homepage': page
                })
            except ConfluencePermissionError:
                raise ConfluencePermissionError(
                    """Publish user does not have permission to update """
                    """space's homepage."""
                )
        else:
            space = self.xmlrpc.getSpace(self.token, self.space_name)
            space['homePage'] = page_id
            try:
                self.xmlrpc.storeSpace(self.token, space)
            except xmlrpclib.Fault as ex:
                if ex.faultString.find('NotPermittedException') != -1:
                    raise ConfluencePermissionError(
                        """Publish user does not have permission to update """
                        """space's homepage."""
                    )
                raise


问题


面经


文章

微信
公众号

扫码关注公众号