python类AnsibleError()的实例源码

_ansible.py 文件源码 项目:0ops.exed 作者: whisperaven 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _run_tasks(self, play, reaper):
        """ Init TQM and run play. """
        tqm = TaskQueueManager(inventory=self._inventory,
            variable_manager=self._varmanager, 
            loader=self._loader,
            options=self._opts,
            passwords=None,
            stdout_callback=reaper)
        # with multiprocessing, the parent cannot handle exception riased
        #   by the child process.
        # which means, the try/except in the `runner._async_deploy` cannot
        #   known what happened here, and cause the entire celery worker
        #   process stop working without exit.
        # Solution:
        #   1, handle ansible exception here (inside executor).
        #   2, cannot raise other exception in `except` block, because of
        #       this piece of code may be run under other `fork()`.
        #   3, because of <2>, we use `reaper` to tell outside something going wrong.
        try:
            tqm.run(play)
        except AnsibleError:
            reaper.reaper_exception(ExecutorPrepareError(str(excinst())))
        finally:
            tqm.cleanup()
            reaper.done()
__init__.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def preprocess_vars(a):
    '''
    Ensures that vars contained in the parameter passed in are
    returned as a list of dictionaries, to ensure for instance
    that vars loaded from a file conform to an expected state.
    '''

    if a is None:
        return None
    elif not isinstance(a, list):
        data = [ a ]
    else:
        data = a

    for item in data:
        if not isinstance(item, MutableMapping):
            raise AnsibleError("variable files must contain either a dictionary of variables, or a list of dictionaries. Got: %s (%s)" % (a, type(a)))

    return data
__init__.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _preprocess_vars(self, a):
        '''
        Ensures that vars contained in the parameter passed in are
        returned as a list of dictionaries, to ensure for instance
        that vars loaded from a file conform to an expected state.
        '''

        if a is None:
            return None
        elif not isinstance(a, list):
            data = [ a ]
        else:
            data = a

        for item in data:
            if not isinstance(item, MutableMapping):
                raise AnsibleError("variable files must contain either a dictionary of variables, or a list of dictionaries. Got: %s (%s)" % (a, type(a)))

        return data
login.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_credentials(self):
        display.display(u'\n\n' + "We need your " + stringc("Github login",'bright cyan') + 
            " to identify you.", screen_only=True)
        display.display("This information will " + stringc("not be sent to Galaxy",'bright cyan') + 
            ", only to " + stringc("api.github.com.","yellow"), screen_only=True)
        display.display("The password will not be displayed." + u'\n\n', screen_only=True)
        display.display("Use " + stringc("--github-token",'yellow') + 
            " if you do not want to enter your password." + u'\n\n', screen_only=True)

        try:
            self.github_username = raw_input("Github Username: ")
        except:
            pass

        try:
            self.github_password = getpass.getpass("Password for %s: " % self.github_username)
        except:
            pass

        if not self.github_username or not self.github_password:
            raise AnsibleError("Invalid Github credentials. Username and password are required.")
login.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def remove_github_token(self):
        '''
        If for some reason an ansible-galaxy token was left from a prior login, remove it. We cannot
        retrieve the token after creation, so we are forced to create a new one.
        '''
        try:
            tokens = json.load(open_url(self.GITHUB_AUTH, url_username=self.github_username,
                url_password=self.github_password, force_basic_auth=True,))
        except HTTPError as e:
            res = json.load(e)
            raise AnsibleError(res['message'])

        for token in tokens:
            if token['note'] == 'ansible-galaxy login':
                display.vvvvv('removing token: %s' % token['token_last_eight'])
                try: 
                    open_url('https://api.github.com/authorizations/%d' % token['id'], url_username=self.github_username,
                        url_password=self.github_password, method='DELETE', force_basic_auth=True,)
                except HTTPError as e:
                    res = json.load(e)
                    raise AnsibleError(res['message'])
api.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _get_server_api_version(self):
        """
        Fetches the Galaxy API current version to ensure
        the API server is up and reachable.
        """
        url = '%s/api/' % self._api_server
        try:
            return_data = open_url(url, validate_certs=self._validate_certs)
        except Exception as e:
            raise AnsibleError("Failed to get data from the API server (%s): %s " % (url, to_native(e)))

        try:
            data = json.load(return_data)
        except Exception as e:
            raise AnsibleError("Could not process data from the API server (%s): %s " % (url, to_native(e)))

        if 'current_version' not in data:
            raise AnsibleError("missing required 'current_version' from server response (%s)" % url)

        return data['current_version']
api.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_list(self, what):
        """
        Fetch the list of items specified.
        """
        try:
            url = '%s/%s/?page_size' % (self.baseurl, what)
            data = self.__call_galaxy(url)
            if "results" in data:
                results = data['results']
            else:
                results = data
            done = True
            if "next" in data:
                done = (data.get('next_link', None) is None)
            while not done:
                url = '%s%s' % (self._api_server, data['next_link'])
                data = self.__call_galaxy(url)
                results += data['results']
                done = (data.get('next_link', None) is None)
            return results
        except Exception as error:
            raise AnsibleError("Failed to download the %s list: %s" % (what, str(error)))
definition.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _load_role_name(self, ds):
        '''
        Returns the role name (either the role: or name: field) from
        the role definition, or (when the role definition is a simple
        string), just that string
        '''

        if isinstance(ds, string_types):
            return ds

        role_name = ds.get('role', ds.get('name'))
        if not role_name or not isinstance(role_name, string_types):
            raise AnsibleError('role definitions must contain a role name', obj=ds)

        # if we have the required datastructures, and if the role_name
        # contains a variable, try and template it now
        if self._variable_manager:
            all_vars = self._variable_manager.get_vars(loader=self._loader, play=self._play)
            templar = Templar(loader=self._loader, variables=all_vars)
            if templar._contains_vars(role_name):
                role_name = templar.template(role_name)

        return role_name
conditional.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def evaluate_conditional(self, templar, all_vars):
        '''
        Loops through the conditionals set on this object, returning
        False if any of them evaluate as such.
        '''

        # since this is a mix-in, it may not have an underlying datastructure
        # associated with it, so we pull it out now in case we need it for
        # error reporting below
        ds = None
        if hasattr(self, '_ds'):
            ds = getattr(self, '_ds')

        try:
            # this allows for direct boolean assignments to conditionals "when: False"
            if isinstance(self.when, bool):
                return self.when

            for conditional in self.when:
                if not self._check_conditional(conditional, templar, all_vars):
                    return False
        except Exception as e:
            raise AnsibleError("The conditional check '%s' failed. The error was: %s" % (to_native(conditional), to_native(e)), obj=ds)

        return True
galaxy.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def execute_remove(self):
        """
        Executes the remove action. The args list contains the list
        of roles to be removed. This list can contain more than one role.
        """

        if len(self.args) == 0:
            raise AnsibleOptionsError('- you must specify at least one role to remove.')

        for role_name in self.args:
            role = GalaxyRole(self.galaxy, role_name)
            try:
                if role.remove():
                    display.display('- successfully removed %s' % role_name)
                else:
                    display.display('- %s is not installed, skipping.' % role_name)
            except Exception as e:
                raise AnsibleError("Failed to remove role %s: %s" % (role_name, str(e)))

        return 0
galaxy.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def execute_delete(self):
        """
        Delete a role from galaxy.ansible.com
        """

        if len(self.args) < 2:
            raise AnsibleError("Missing one or more arguments. Expected: github_user github_repo")

        github_repo = self.args.pop()
        github_user = self.args.pop()
        resp = self.api.delete_role(github_user, github_repo)

        if len(resp['deleted_roles']) > 1:
            display.display("Deleted the following roles:")
            display.display("ID     User            Name")
            display.display("------ --------------- ----------")
            for role in resp['deleted_roles']:
                display.display("%-8s %-15s %s" % (role.id,role.namespace,role.name))

        display.display(resp['status'])

        return True
script.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_host_variables(self, host):
        """ Runs <script> --host <hostname> to determine additional host variables """
        if self.host_vars_from_top is not None:
            try:
                got = self.host_vars_from_top.get(host.name, {})
            except AttributeError as e:
                raise AnsibleError("Improperly formated host information for %s: %s" % (host.name,to_native(e)))
            return got

        cmd = [self.filename, "--host", host.name]
        try:
            sp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        except OSError as e:
            raise AnsibleError("problem running %s (%s)" % (' '.join(cmd), e))
        (out, err) = sp.communicate()
        if out.strip() == '':
            return dict()
        try:
            return json_dict_bytes_to_unicode(self._loader.load(out))
        except ValueError:
            raise AnsibleError("could not parse post variable response: %s, %s" % (cmd, out))
path.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def makedirs_safe(path, mode=None):
    '''Safe way to create dirs in muliprocess/thread environments.

    :arg path: A byte or text string representing a directory to be created
    :kwarg mode: If given, the mode to set the directory to
    :raises AnsibleError: If the directory cannot be created and does not already exists.
    :raises UnicodeDecodeError: if the path is not decodable in the utf-8 encoding.
    '''

    rpath = unfrackpath(path)
    b_rpath = to_bytes(rpath)
    if not os.path.exists(b_rpath):
        try:
            if mode:
                os.makedirs(b_rpath, mode)
            else:
                os.makedirs(b_rpath)
        except OSError as e:
            if e.errno != EEXIST:
                raise AnsibleError("Unable to create local directories(%s): %s" % (to_native(rpath), to_native(e)))
display.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def deprecated(self, msg, version=None, removed=False):
        ''' used to print out a deprecation message.'''

        if not removed and not C.DEPRECATION_WARNINGS:
            return

        if not removed:
            if version:
                new_msg = "[DEPRECATION WARNING]: %s.\nThis feature will be removed in version %s." % (msg, version)
            else:
                new_msg = "[DEPRECATION WARNING]: %s.\nThis feature will be removed in a future release." % (msg)
            new_msg = new_msg + " Deprecation warnings can be disabled by setting deprecation_warnings=False in ansible.cfg.\n\n"
        else:
            raise AnsibleError("[DEPRECATED]: %s.\nPlease update your playbooks." % msg)

        wrapped = textwrap.wrap(new_msg, self.columns, replace_whitespace=False, drop_whitespace=False)
        new_msg = "\n".join(wrapped) + "\n"

        if new_msg not in self._deprecations:
            self.display(new_msg.strip(), color=C.COLOR_DEPRECATE, stderr=True)
            self._deprecations[new_msg] = 1
vars.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _validate_mutable_mappings(a, b):
    """
    Internal convenience function to ensure arguments are MutableMappings

    This checks that all arguments are MutableMappings or raises an error

    :raises AnsibleError: if one of the arguments is not a MutableMapping
    """

    # If this becomes generally needed, change the signature to operate on
    # a variable number of arguments instead.

    if not (isinstance(a, MutableMapping) and isinstance(b, MutableMapping)):
        myvars = []
        for x in [a, b]:
            try:
                myvars.append(dumps(x))
            except:
                myvars.append(to_native(x))
        raise AnsibleError("failed to combine variables, expected dicts but got a '{0}' and a '{1}': \n{2}\n{3}".format(
            a.__class__.__name__, b.__class__.__name__, myvars[0], myvars[1])
        )
hashing.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def secure_hash(filename, hash_func=sha1):
    ''' Return a secure hash hex digest of local file, None if file is not present or a directory. '''

    if not os.path.exists(to_bytes(filename, errors='surrogate_or_strict')) or os.path.isdir(to_bytes(filename, errors='strict')):
        return None
    digest = hash_func()
    blocksize = 64 * 1024
    try:
        infile = open(to_bytes(filename, errors='surrogate_or_strict'), 'rb')
        block = infile.read(blocksize)
        while block:
            digest.update(block)
            block = infile.read(blocksize)
        infile.close()
    except IOError as e:
        raise AnsibleError("error while accessing the file %s, error was: %s" % (filename, e))
    return digest.hexdigest()

# The checksum algorithm must match with the algorithm in ShellModule.checksum() method
consul_kv.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def parse_params(self, term):
        params = term.split(' ')

        paramvals = {
            'key': params[0],
            'token': None,
            'recurse': False,
            'index': None
        }

        # parameters specified?
        try:
            for param in params[1:]:
                if param and len(param) > 0:
                    name, value = param.split('=')
                    assert name in paramvals, "% not a valid consul lookup parameter" % name
                    paramvals[name] = value
        except (ValueError, AssertionError) as e:
            raise AnsibleError(e)

        return paramvals
hashi_vault.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self, terms, variables, **kwargs):
        vault_args = terms[0].split(' ')
        vault_dict = {}
        ret = []

        for param in vault_args:
            try:
                key, value = param.split('=')
            except ValueError as e:
                raise AnsibleError("hashi_vault plugin needs key=value pairs, but received %s" % terms)
            vault_dict[key] = value

        vault_conn = HashiVault(**vault_dict)

        for term in terms:
           key = term.split()[0]
           value = vault_conn.get()
           ret.append(value)

        return ret
file.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def run(self, terms, variables=None, **kwargs):

        ret = []

        for term in terms:
            display.debug("File lookup term: %s" % term)

            # Find the file in the expected search path
            lookupfile = self.find_file_in_search_path(variables, 'files', term)
            display.vvvv(u"File lookup using %s as file" % lookupfile)
            try:
                if lookupfile:
                    contents, show_data = self._loader._get_file_contents(lookupfile)
                    ret.append(contents.rstrip())
                else:
                    raise AnsibleParserError()
            except AnsibleParserError:
                raise AnsibleError("could not locate file in lookup: %s" % term)

        return ret
url.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self, terms, variables=None, **kwargs):

        validate_certs = kwargs.get('validate_certs', True)

        ret = []
        for term in terms:
            display.vvvv("url lookup connecting to %s" % term)
            try:
                response = open_url(term, validate_certs=validate_certs)
            except HTTPError as e:
                raise AnsibleError("Received HTTP error for %s : %s" % (term, str(e)))
            except URLError as e:
                raise AnsibleError("Failed lookup url for %s : %s" % (term, str(e)))
            except SSLValidationError as e:
                raise AnsibleError("Error validating the server's certificate for %s: %s" % (term, str(e)))
            except ConnectionError as e:
                raise AnsibleError("Error connecting to %s: %s" % (term, str(e)))

            for line in response.read().splitlines():
                ret.append(to_text(line))
        return ret
pipe.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def run(self, terms, variables, **kwargs):

        ret = []
        for term in terms:
            '''
            http://docs.python.org/2/library/subprocess.html#popen-constructor

            The shell argument (which defaults to False) specifies whether to use the 
            shell as the program to execute. If shell is True, it is recommended to pass 
            args as a string rather than as a sequence

            https://github.com/ansible/ansible/issues/6550
            '''
            term = str(term)

            p = subprocess.Popen(term, cwd=self._loader.get_basedir(), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
            (stdout, stderr) = p.communicate()
            if p.returncode == 0:
                ret.append(stdout.decode("utf-8").rstrip())
            else:
                raise AnsibleError("lookup_plugin.pipe(%s) returned %d" % (term, p.returncode))
        return ret
nested.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run(self, terms, variables=None, **kwargs):

        terms = self._lookup_variables(terms, variables)

        my_list = terms[:]
        my_list.reverse()
        result = []
        if len(my_list) == 0:
            raise AnsibleError("with_nested requires at least one element in the nested list")
        result = my_list.pop()
        while len(my_list) > 0:
            result2 = self._combine(result, my_list.pop())
            result  = result2
        new_result = []
        for x in result:
            new_result.append(self._flatten(x))
        return new_result
dnstxt.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def run(self, terms, variables=None, **kwargs):

        if HAVE_DNS == False:
            raise AnsibleError("Can't LOOKUP(dnstxt): module dns.resolver is not installed")

        ret = []
        for term in terms:
            domain = term.split()[0]
            string = []
            try:
                answers = dns.resolver.query(domain, 'TXT')
                for rdata in answers:
                    s = rdata.to_text()
                    string.append(s[1:-1])  # Strip outside quotes on TXT rdata

            except dns.resolver.NXDOMAIN:
                string = 'NXDOMAIN'
            except dns.resolver.Timeout:
                string = ''
            except DNSException as e:
                raise AnsibleError("dns.resolver unhandled exception", e)

            ret.append(''.join(string))

        return ret
credstash.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(self, terms, variables, **kwargs):

        if not CREDSTASH_INSTALLED:
            raise AnsibleError('The credstash lookup plugin requires credstash to be installed.')

        ret = []
        for term in terms:
            try:
                version = kwargs.pop('version', '')
                region = kwargs.pop('region', None)
                table = kwargs.pop('table', 'credential-store')
                val = credstash.getSecret(term, version, region, table,
                                          context=kwargs)
            except credstash.ItemNotFound:
                raise AnsibleError('Key {0} not found'.format(term))
            except Exception as e:
                raise AnsibleError('Encountered exception while fetching {0}: {1}'.format(term, e.message))
            ret.append(val)

        return ret
sequence.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def parse_kv_args(self, args):
        """parse key-value style arguments"""
        for arg in ["start", "end", "count", "stride"]:
            try:
                arg_raw = args.pop(arg, None)
                if arg_raw is None:
                    continue
                arg_cooked = int(arg_raw, 0)
                setattr(self, arg, arg_cooked)
            except ValueError:
                raise AnsibleError(
                    "can't parse arg %s=%r as integer"
                        % (arg, arg_raw)
                )
            if 'format' in args:
                self.format = args.pop("format")
        if args:
            raise AnsibleError(
                "unrecognized arguments to with_sequence: %r"
                % args.keys()
            )
sequence.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def sanity_check(self):
        if self.count is None and self.end is None:
            raise AnsibleError( "must specify count or end in with_sequence")
        elif self.count is not None and self.end is not None:
            raise AnsibleError( "can't specify both count and end in with_sequence")
        elif self.count is not None:
            # convert count to end
            if self.count != 0:
                self.end = self.start + self.count * self.stride - 1
            else:
                self.start = 0
                self.end = 0
                self.stride = 0
            del self.count
        if self.stride > 0 and self.end < self.start:
            raise AnsibleError("to count backwards make stride negative")
        if self.stride < 0 and self.end > self.start:
            raise AnsibleError("to count forward don't make stride negative")
        if self.format.count('%') != 1:
            raise AnsibleError("bad formatting string: %s" % self.format)
sequence.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run(self, terms, variables, **kwargs):
        results = []

        for term in terms:
            try:
                self.reset()  # clear out things for this iteration
                try:
                    if not self.parse_simple_args(term):
                        self.parse_kv_args(parse_kv(term))
                except Exception as e:
                    raise AnsibleError("unknown error parsing with_sequence arguments: %r. Error was: %s" % (term, e))

                self.sanity_check()
                if self.stride != 0:
                    results.extend(self.generate_sequence())
            except AnsibleError:
                raise
            except Exception as e:
                raise AnsibleError(
                    "unknown error generating sequence: %s" % e
                )

        return results
include_vars.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _ignore_file(self, filename):
        """ Return True if a file matches the list of ignore_files.
        Args:
            filename (str): The filename that is being matched against.

        Returns:
            Boolean
        """
        for file_type in self.ignore_files:
            try:
                if re.search(r'{0}$'.format(file_type), filename):
                    return True
            except Exception:
                err_msg = 'Invalid regular expression: {0}'.format(file_type)
                raise AnsibleError(err_msg)
        return False
__init__.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _transfer_data(self, remote_path, data):
        '''
        Copies the module data out to the temporary module path.
        '''

        if isinstance(data, dict):
            data = jsonify(data)

        afd, afile = tempfile.mkstemp()
        afo = os.fdopen(afd, 'wb')
        try:
            data = to_bytes(data, errors='surrogate_or_strict')
            afo.write(data)
        except Exception as e:
            raise AnsibleError("failure writing module data to temporary file for transfer: %s" % to_native(e))

        afo.flush()
        afo.close()

        try:
            self._transfer_file(afile, remote_path)
        finally:
            os.unlink(afile)

        return remote_path
__init__.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _fixup_perms(self, remote_path, remote_user, execute=True, recursive=True):
        """
        We need the files we upload to be readable (and sometimes executable)
        by the user being sudo'd to but we want to limit other people's access
        (because the files could contain passwords or other private
        information.

        Deprecated in favor of _fixup_perms2. Ansible code has been updated to
        use _fixup_perms2. This code is maintained to provide partial support
        for custom actions (non-recursive mode only).

        """

        display.deprecated('_fixup_perms is deprecated. Use _fixup_perms2 instead.', version='2.4', removed=False)

        if recursive:
            raise AnsibleError('_fixup_perms with recursive=True (the default) is no longer supported. ' +
                               'Use _fixup_perms2 if support for previous releases is not required. '
                               'Otherwise use fixup_perms with recursive=False.')

        return self._fixup_perms2([remote_path], remote_user, execute)


问题


面经


文章

微信
公众号

扫码关注公众号