python类AnsibleError()的实例源码

__init__.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _execute_remote_stat(self, path, all_vars, follow, tmp=None):
        '''
        Get information from remote file.
        '''
        module_args=dict(
           path=path,
           follow=follow,
           get_md5=False,
           get_checksum=True,
           checksum_algo='sha1',
        )
        mystat = self._execute_module(module_name='stat', module_args=module_args, task_vars=all_vars, tmp=tmp, delete_remote_tmp=(tmp is None))

        if 'failed' in mystat and mystat['failed']:
            raise AnsibleError('Failed to get information on remote file (%s): %s' % (path, mystat['msg']))

        if not mystat['stat']['exists']:
            # empty might be matched, 1 should never match, also backwards compatible
            mystat['stat']['checksum'] = '1'

        # happens sometimes when it is a dir and not on bsd
        if 'checksum' not in mystat['stat']:
            mystat['stat']['checksum'] = ''

        return mystat['stat']
__init__.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _remote_checksum(self, path, all_vars, follow=False):
        '''
        Produces a remote checksum given a path,
        Returns a number 0-4 for specific errors instead of checksum, also ensures it is different
        0 = unknown error
        1 = file does not exist, this might not be an error
        2 = permissions issue
        3 = its a directory, not a file
        4 = stat module failed, likely due to not finding python
        '''
        x = "0"  # unknown error has occured
        try:
            remote_stat = self._execute_remote_stat(path, all_vars, follow=follow)
            if remote_stat['exists'] and remote_stat['isdir']:
                x = "3"  # its a directory not a file
            else:
                x = remote_stat['checksum']  # if 1, file is missing
        except AnsibleError as e:
            errormsg = to_text(e)
            if errormsg.endswith(u'Permission denied'):
                x = "2"  # cannot read file
            elif errormsg.endswith(u'MODULE FAILURE'):
                x = "4"  # python not found or module uncaught exception
        finally:
            return x
libvirt_lxc.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def put_file(self, in_path, out_path):
        ''' transfer a file from local to lxc '''
        super(Connection, self).put_file(in_path, out_path)
        display.vvv("PUT %s TO %s" % (in_path, out_path), host=self.lxc)

        out_path = pipes.quote(self._prefix_login_path(out_path))
        try:
            with open(to_bytes(in_path, errors='surrogate_or_strict'), 'rb') as in_file:
                try:
                    p = self._buffered_exec_command('dd of=%s bs=%s' % (out_path, BUFSIZE), stdin=in_file)
                except OSError:
                    raise AnsibleError("chroot connection requires dd command in the chroot")
                try:
                    stdout, stderr = p.communicate()
                except:
                    traceback.print_exc()
                    raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
                if p.returncode != 0:
                    raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
        except IOError:
            raise AnsibleError("file or module does not exist at: %s" % in_path)
libvirt_lxc.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def fetch_file(self, in_path, out_path):
        ''' fetch a file from lxc to local '''
        super(Connection, self).fetch_file(in_path, out_path)
        display.vvv("FETCH %s TO %s" % (in_path, out_path), host=self.lxc)

        in_path = pipes.quote(self._prefix_login_path(in_path))
        try:
            p = self._buffered_exec_command('dd if=%s bs=%s' % (in_path, BUFSIZE))
        except OSError:
            raise AnsibleError("chroot connection requires dd command in the chroot")

        with open(to_bytes(out_path, errors='surrogate_or_strict'), 'wb+') as out_file:
            try:
                chunk = p.stdout.read(BUFSIZE)
                while chunk:
                    out_file.write(chunk)
                    chunk = p.stdout.read(BUFSIZE)
            except:
                traceback.print_exc()
                raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
            stdout, stderr = p.communicate()
            if p.returncode != 0:
                raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
paramiko_ssh.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def put_file(self, in_path, out_path):
        ''' transfer a file from local to remote '''

        super(Connection, self).put_file(in_path, out_path)

        display.vvv("PUT %s TO %s" % (in_path, out_path), host=self._play_context.remote_addr)

        if not os.path.exists(to_bytes(in_path, errors='surrogate_or_strict')):
            raise AnsibleFileNotFound("file or module does not exist: %s" % in_path)

        try:
            self.sftp = self.ssh.open_sftp()
        except Exception as e:
            raise AnsibleError("failed to open a SFTP connection (%s)" % e)

        try:
            self.sftp.put(to_bytes(in_path, errors='surrogate_or_strict'), to_bytes(out_path, errors='surrogate_or_strict'))
        except IOError:
            raise AnsibleError("failed to transfer file to %s" % out_path)
paramiko_ssh.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def fetch_file(self, in_path, out_path):
        ''' save a remote file to the specified path '''

        super(Connection, self).fetch_file(in_path, out_path)

        display.vvv("FETCH %s TO %s" % (in_path, out_path), host=self._play_context.remote_addr)

        try:
            self.sftp = self._connect_sftp()
        except Exception as e:
            raise AnsibleError("failed to open a SFTP connection (%s)", e)

        try:
            self.sftp.get(to_bytes(in_path, errors='surrogate_or_strict'), to_bytes(out_path, errors='surrogate_or_strict'))
        except IOError:
            raise AnsibleError("failed to transfer file from %s" % in_path)
chroot.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, play_context, new_stdin, *args, **kwargs):
        super(Connection, self).__init__(play_context, new_stdin, *args, **kwargs)

        self.chroot = self._play_context.remote_addr

        if os.geteuid() != 0:
            raise AnsibleError("chroot connection requires running as root")

        # we're running as root on the local system so do some
        # trivial checks for ensuring 'host' is actually a chroot'able dir
        if not os.path.isdir(self.chroot):
            raise AnsibleError("%s is not a directory" % self.chroot)

        chrootsh = os.path.join(self.chroot, 'bin/sh')
        # Want to check for a usable bourne shell inside the chroot.
        # is_executable() == True is sufficient.  For symlinks it
        # gets really complicated really fast.  So we punt on finding that
        # out.  As long as it's a symlink we assume that it will work
        if not (is_executable(chrootsh) or (os.path.lexists(chrootsh) and os.path.islink(chrootsh))):
            raise AnsibleError("%s does not look like a chrootable dir (/bin/sh missing)" % self.chroot)

        self.chroot_cmd = distutils.spawn.find_executable('chroot')
        if not self.chroot_cmd:
            raise AnsibleError("chroot command not found in PATH")
chroot.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def put_file(self, in_path, out_path):
        ''' transfer a file from local to chroot '''
        super(Connection, self).put_file(in_path, out_path)
        display.vvv("PUT %s TO %s" % (in_path, out_path), host=self.chroot)

        out_path = pipes.quote(self._prefix_login_path(out_path))
        try:
            with open(to_bytes(in_path, errors='surrogate_or_strict'), 'rb') as in_file:
                try:
                    p = self._buffered_exec_command('dd of=%s bs=%s' % (out_path, BUFSIZE), stdin=in_file)
                except OSError:
                    raise AnsibleError("chroot connection requires dd command in the chroot")
                try:
                    stdout, stderr = p.communicate()
                except:
                    traceback.print_exc()
                    raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
                if p.returncode != 0:
                    raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
        except IOError:
            raise AnsibleError("file or module does not exist at: %s" % in_path)
chroot.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def fetch_file(self, in_path, out_path):
        ''' fetch a file from chroot to local '''
        super(Connection, self).fetch_file(in_path, out_path)
        display.vvv("FETCH %s TO %s" % (in_path, out_path), host=self.chroot)

        in_path = pipes.quote(self._prefix_login_path(in_path))
        try:
            p = self._buffered_exec_command('dd if=%s bs=%s' % (in_path, BUFSIZE))
        except OSError:
            raise AnsibleError("chroot connection requires dd command in the chroot")

        with open(to_bytes(out_path, errors='surrogate_or_strict'), 'wb+') as out_file:
            try:
                chunk = p.stdout.read(BUFSIZE)
                while chunk:
                    out_file.write(chunk)
                    chunk = p.stdout.read(BUFSIZE)
            except:
                traceback.print_exc()
                raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
            stdout, stderr = p.communicate()
            if p.returncode != 0:
                raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
jail.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def fetch_file(self, in_path, out_path):
        ''' fetch a file from jail to local '''
        super(Connection, self).fetch_file(in_path, out_path)
        display.vvv("FETCH %s TO %s" % (in_path, out_path), host=self.jail)

        in_path = pipes.quote(self._prefix_login_path(in_path))
        try:
            p = self._buffered_exec_command('dd if=%s bs=%s' % (in_path, BUFSIZE))
        except OSError:
            raise AnsibleError("jail connection requires dd command in the jail")

        with open(to_bytes(out_path, errors='surrogate_or_strict'), 'wb+') as out_file:
            try:
                chunk = p.stdout.read(BUFSIZE)
                while chunk:
                    out_file.write(chunk)
                    chunk = p.stdout.read(BUFSIZE)
            except:
                traceback.print_exc()
                raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
            stdout, stderr = p.communicate()
            if p.returncode != 0:
                raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
zone.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def fetch_file(self, in_path, out_path):
        ''' fetch a file from zone to local '''
        super(Connection, self).fetch_file(in_path, out_path)
        display.vvv("FETCH %s TO %s" % (in_path, out_path), host=self.zone)

        in_path = pipes.quote(self._prefix_login_path(in_path))
        try:
            p = self._buffered_exec_command('dd if=%s bs=%s' % (in_path, BUFSIZE))
        except OSError:
            raise AnsibleError("zone connection requires dd command in the zone")

        with open(out_path, 'wb+') as out_file:
            try:
                chunk = p.stdout.read(BUFSIZE)
                while chunk:
                    out_file.write(chunk)
                    chunk = p.stdout.read(BUFSIZE)
            except:
                traceback.print_exc()
                raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
            stdout, stderr = p.communicate()
            if p.returncode != 0:
                raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
jsonfile.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):

        self._timeout = float(C.CACHE_PLUGIN_TIMEOUT)
        self._cache = {}
        self._cache_dir = None

        if C.CACHE_PLUGIN_CONNECTION:
            # expects a dir path
            self._cache_dir = os.path.expanduser(os.path.expandvars(C.CACHE_PLUGIN_CONNECTION))

        if not self._cache_dir:
            raise AnsibleError("error, 'jsonfile' cache plugin requires the 'fact_caching_connection' config option to be set (to a writeable directory path)")

        if not os.path.exists(self._cache_dir):
            try:
                os.makedirs(self._cache_dir)
            except (OSError,IOError) as e:
                display.warning("error in 'jsonfile' cache plugin while trying to create cache dir %s : %s" % (self._cache_dir, to_bytes(e)))
                return None
__init__.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def rekey_file(self, filename, new_password):

        check_prereqs()

        prev = os.stat(filename)
        ciphertext = self.read_data(filename)
        try:
            plaintext = self.vault.decrypt(ciphertext)
        except AnsibleError as e:
            raise AnsibleError("%s for %s" % (to_bytes(e),to_bytes(filename)))

        new_vault = VaultLib(new_password)
        new_ciphertext = new_vault.encrypt(plaintext)

        self.write_data(new_ciphertext, filename)

        # preserve permissions
        os.chmod(filename, prev.st_mode)
        os.chown(filename, prev.st_uid, prev.st_gid)
hashivault.py 文件源码 项目:ansible-modules-hashivault 作者: TerryHowe 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run(self, terms, variables, **kwargs):
        path = terms[0]
        key = terms[1]
        token = hashivault_default_token()
        authtype = 'token'
        params = {
            'url': self.get_url(),
            'verify': self.get_verify(),
            'token': token,
            'authtype': 'token',
            'secret': path,
            'key': key,
        }
        result = hashivault_read.hashivault_read(params)

        if 'value' not in result:
            raise AnsibleError('Error reading vault %s/%s: %s\n%s' % (path, key, result.get('msg', 'msg not set'), result.get('stack_trace', '')))
        return [result['value']]
manager.py 文件源码 项目:ansible-provider-docs 作者: alibaba 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def preprocess_vars(a):
    '''
    Ensures that vars contained in the parameter passed in are
    returned as a list of dictionaries, to ensure for instance
    that vars loaded from a file conform to an expected state.
    '''

    if a is None:
        return None
    elif not isinstance(a, list):
        data = [a]
    else:
        data = a

    for item in data:
        if not isinstance(item, MutableMapping):
            raise AnsibleError("variable files must contain either a dictionary of variables, or a list of dictionaries. Got: %s (%s)" % (a, type(a)))

    return data
manager.py 文件源码 项目:ansible-provider-docs 作者: alibaba 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, loader=None, inventory=None):

        self._nonpersistent_fact_cache = defaultdict(dict)
        self._vars_cache = defaultdict(dict)
        self._extra_vars = defaultdict(dict)
        self._host_vars_files = defaultdict(dict)
        self._group_vars_files = defaultdict(dict)
        self._inventory = inventory
        self._loader = loader
        self._hostvars = None
        self._omit_token = '__omit_place_holder__%s' % sha1(os.urandom(64)).hexdigest()
        self._options_vars = defaultdict(dict)

        # bad cache plugin is not fatal error
        try:
            self._fact_cache = FactCache()
        except AnsibleError as e:
            display.warning(to_native(e))
            # fallback to a dict as in memory cache
            self._fact_cache = {}
manager.py 文件源码 项目:ansible-provider-docs 作者: alibaba 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _preprocess_vars(self, a):
        '''
        Ensures that vars contained in the parameter passed in are
        returned as a list of dictionaries, to ensure for instance
        that vars loaded from a file conform to an expected state.
        '''

        if a is None:
            return None
        elif not isinstance(a, list):
            data = [a]
        else:
            data = a

        for item in data:
            if not isinstance(item, MutableMapping):
                raise AnsibleError("variable files must contain either a dictionary of variables, or a list of dictionaries. Got: %s (%s)" % (a, type(a)))

        return data
login.py 文件源码 项目:ansible-provider-docs 作者: alibaba 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def get_credentials(self):
        display.display(u'\n\n' + "We need your " + stringc("Github login", 'bright cyan') +
                        " to identify you.", screen_only=True)
        display.display("This information will " + stringc("not be sent to Galaxy", 'bright cyan') +
                        ", only to " + stringc("api.github.com.", "yellow"), screen_only=True)
        display.display("The password will not be displayed." + u'\n\n', screen_only=True)
        display.display("Use " + stringc("--github-token", 'yellow') +
                        " if you do not want to enter your password." + u'\n\n', screen_only=True)

        try:
            self.github_username = input("Github Username: ")
        except:
            pass

        try:
            self.github_password = getpass.getpass("Password for %s: " % self.github_username)
        except:
            pass

        if not self.github_username or not self.github_password:
            raise AnsibleError("Invalid Github credentials. Username and password are required.")
login.py 文件源码 项目:ansible-provider-docs 作者: alibaba 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def remove_github_token(self):
        '''
        If for some reason an ansible-galaxy token was left from a prior login, remove it. We cannot
        retrieve the token after creation, so we are forced to create a new one.
        '''
        try:
            tokens = json.load(open_url(self.GITHUB_AUTH, url_username=self.github_username,
                               url_password=self.github_password, force_basic_auth=True,))
        except HTTPError as e:
            res = json.load(e)
            raise AnsibleError(res['message'])

        for token in tokens:
            if token['note'] == 'ansible-galaxy login':
                display.vvvvv('removing token: %s' % token['token_last_eight'])
                try:
                    open_url('https://api.github.com/authorizations/%d' % token['id'], url_username=self.github_username,
                             url_password=self.github_password, method='DELETE', force_basic_auth=True)
                except HTTPError as e:
                    res = json.load(e)
                    raise AnsibleError(res['message'])
api.py 文件源码 项目:ansible-provider-docs 作者: alibaba 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_server_api_version(self):
        """
        Fetches the Galaxy API current version to ensure
        the API server is up and reachable.
        """
        url = '%s/api/' % self._api_server
        try:
            return_data = open_url(url, validate_certs=self._validate_certs)
        except Exception as e:
            raise AnsibleError("Failed to get data from the API server (%s): %s " % (url, to_native(e)))

        try:
            data = json.loads(to_text(return_data.read(), errors='surrogate_or_strict'))
        except Exception as e:
            raise AnsibleError("Could not process data from the API server (%s): %s " % (url, to_native(e)))

        if 'current_version' not in data:
            raise AnsibleError("missing required 'current_version' from server response (%s)" % url)

        return data['current_version']


问题


面经


文章

微信
公众号

扫码关注公众号