python类readlink()的实例源码

sc.py 文件源码 项目:MyPythonLib 作者: BillWang139967 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _read_fstab(self, line, **params):
        if not line or line.startswith('#'): return
        fields = line.split()
        dev = fields[0]
        config = {
            'dev':    fields[0],
            'mount':  fields[1],
            'fstype': fields[2],
        }
        if dev.startswith('/dev/'):
            try:
                devlink = os.readlink(dev)
                dev = os.path.abspath(os.path.join(os.path.dirname(devlink), dev))
            except:
                pass
            dev = dev.replace('/dev/', '')
            if dev == params['devname']:
                return config
        elif dev.startswith('UUID='):
            uuid = dev.replace('UUID=', '')
            partinfo = si.Server.partinfo(devname=params['devname'])
            if partinfo['uuid'] == uuid:
                return config
pdb.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def lookupmodule(self, filename):
        """Helper function for break/clear parsing -- may be overridden.

        lookupmodule() translates (possibly incomplete) file or module name
        into an absolute file name.
        """
        if os.path.isabs(filename) and  os.path.exists(filename):
            return filename
        f = os.path.join(sys.path[0], filename)
        if  os.path.exists(f) and self.canonic(f) == self.mainpyfile:
            return f
        root, ext = os.path.splitext(filename)
        if ext == '':
            filename = filename + '.py'
        if os.path.isabs(filename):
            return filename
        for dirname in sys.path:
            while os.path.islink(dirname):
                dirname = os.readlink(dirname)
            fullname = os.path.join(dirname, filename)
            if os.path.exists(fullname):
                return fullname
        return None
crawl_portrait_tweets.py 文件源码 项目:aurora 作者: carnby 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def load_models(self):
        now = datetime.now()
        self.rec_candidates = []

        if self.last_model_update is None or (now - self.last_model_update).days >= 1:
            print('loading model', now)
            model_path = '{0}/it-topics'.format(settings.PORTRAIT_FOLDER)
            lda_filename = os.readlink('{0}/current_lda_model.gensim'.format(model_path))

            self.lda_model = gensim.models.ldamulticore.LdaMulticore.load(lda_filename)
            self.topic_graph = nx.read_gpickle('{0}/current_topic_graph.nx'.format(model_path))

            with gzip.open('{0}/current_candidates.json.gz'.format(model_path), 'rt') as f:
                self.rec_candidates = json.load(f)
                print('loaded', len(self.rec_candidates), 'candidates')

            self.last_model_update = datetime.now()
_pslinux.py 文件源码 项目:pupy 作者: ru-faraon 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def readlink(path):
    """Wrapper around os.readlink()."""
    assert isinstance(path, basestring), path
    path = os.readlink(path)
    # readlink() might return paths containing null bytes ('\x00')
    # resulting in "TypeError: must be encoded string without NULL
    # bytes, not str" errors when the string is passed to other
    # fs-related functions (os.*, open(), ...).
    # Apparently everything after '\x00' is garbage (we can have
    # ' (deleted)', 'new' and possibly others), see:
    # https://github.com/giampaolo/psutil/issues/717
    path = path.split('\x00')[0]
    # Certain paths have ' (deleted)' appended. Usually this is
    # bogus as the file actually exists. Even if it doesn't we
    # don't care.
    if path.endswith(' (deleted)') and not path_exists_strict(path):
        path = path[:-10]
    return path
_pslinux.py 文件源码 项目:pupy 作者: ru-faraon 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def get_proc_inodes(self, pid):
        inodes = defaultdict(list)
        for fd in os.listdir("%s/%s/fd" % (self._procfs_path, pid)):
            try:
                inode = readlink("%s/%s/fd/%s" % (self._procfs_path, pid, fd))
            except OSError as err:
                # ENOENT == file which is gone in the meantime;
                # os.stat('/proc/%s' % self.pid) will be done later
                # to force NSP (if it's the case)
                if err.errno in (errno.ENOENT, errno.ESRCH):
                    continue
                elif err.errno == errno.EINVAL:
                    # not a link
                    continue
                else:
                    raise
            else:
                if inode.startswith('socket:['):
                    # the process is using a socket
                    inode = inode[8:][:-1]
                    inodes[inode].append((pid, int(fd)))
        return inodes
_pslinux.py 文件源码 项目:pupy 作者: ru-faraon 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def exe(self):
        try:
            return readlink("%s/%s/exe" % (self._procfs_path, self.pid))
        except OSError as err:
            if err.errno in (errno.ENOENT, errno.ESRCH):
                # no such file error; might be raised also if the
                # path actually exists for system processes with
                # low pids (about 0-20)
                if os.path.lexists("%s/%s" % (self._procfs_path, self.pid)):
                    return ""
                else:
                    if not pid_exists(self.pid):
                        raise NoSuchProcess(self.pid, self._name)
                    else:
                        raise ZombieProcess(self.pid, self._name, self._ppid)
            if err.errno in (errno.EPERM, errno.EACCES):
                raise AccessDenied(self.pid, self._name)
            raise
_psbsd.py 文件源码 项目:pupy 作者: ru-faraon 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def cwd(self):
        """Return process current working directory."""
        # sometimes we get an empty string, in which case we turn
        # it into None
        if OPENBSD and self.pid == 0:
            return None  # ...else it would raise EINVAL
        elif NETBSD:
            with wrap_exceptions_procfs(self):
                return os.readlink("/proc/%s/cwd" % self.pid)
        elif hasattr(cext, 'proc_open_files'):
            # FreeBSD < 8 does not support functions based on
            # kinfo_getfile() and kinfo_getvmmap()
            return cext.proc_cwd(self.pid) or None
        else:
            raise NotImplementedError(
                "supported only starting from FreeBSD 8" if
                FREEBSD else "")
_pssunos.py 文件源码 项目:pupy 作者: ru-faraon 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def terminal(self):
        procfs_path = self._procfs_path
        hit_enoent = False
        tty = wrap_exceptions(
            cext.proc_basic_info(self.pid, self._procfs_path)[0])
        if tty != cext.PRNODEV:
            for x in (0, 1, 2, 255):
                try:
                    return os.readlink(
                        '%s/%d/path/%d' % (procfs_path, self.pid, x))
                except OSError as err:
                    if err.errno == errno.ENOENT:
                        hit_enoent = True
                        continue
                    raise
        if hit_enoent:
            # raise NSP if the process disappeared on us
            os.stat('%s/%s' % (procfs_path, self.pid))
_pssunos.py 文件源码 项目:pupy 作者: ru-faraon 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def open_files(self):
        retlist = []
        hit_enoent = False
        procfs_path = self._procfs_path
        pathdir = '%s/%d/path' % (procfs_path, self.pid)
        for fd in os.listdir('%s/%d/fd' % (procfs_path, self.pid)):
            path = os.path.join(pathdir, fd)
            if os.path.islink(path):
                try:
                    file = os.readlink(path)
                except OSError as err:
                    # ENOENT == file which is gone in the meantime
                    if err.errno == errno.ENOENT:
                        hit_enoent = True
                        continue
                    raise
                else:
                    if isfile_strict(file):
                        retlist.append(_common.popenfile(file, int(fd)))
        if hit_enoent:
            # raise NSP if the process disappeared on us
            os.stat('%s/%s' % (procfs_path, self.pid))
        return retlist
_pslinux.py 文件源码 项目:pupy 作者: ru-faraon 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def readlink(path):
    """Wrapper around os.readlink()."""
    assert isinstance(path, basestring), path
    path = os.readlink(path)
    # readlink() might return paths containing null bytes ('\x00')
    # resulting in "TypeError: must be encoded string without NULL
    # bytes, not str" errors when the string is passed to other
    # fs-related functions (os.*, open(), ...).
    # Apparently everything after '\x00' is garbage (we can have
    # ' (deleted)', 'new' and possibly others), see:
    # https://github.com/giampaolo/psutil/issues/717
    path = path.split('\x00')[0]
    # Certain paths have ' (deleted)' appended. Usually this is
    # bogus as the file actually exists. Even if it doesn't we
    # don't care.
    if path.endswith(' (deleted)') and not path_exists_strict(path):
        path = path[:-10]
    return path
_pslinux.py 文件源码 项目:pupy 作者: ru-faraon 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def get_proc_inodes(self, pid):
        inodes = defaultdict(list)
        for fd in os.listdir("%s/%s/fd" % (self._procfs_path, pid)):
            try:
                inode = readlink("%s/%s/fd/%s" % (self._procfs_path, pid, fd))
            except OSError as err:
                # ENOENT == file which is gone in the meantime;
                # os.stat('/proc/%s' % self.pid) will be done later
                # to force NSP (if it's the case)
                if err.errno in (errno.ENOENT, errno.ESRCH):
                    continue
                elif err.errno == errno.EINVAL:
                    # not a link
                    continue
                else:
                    raise
            else:
                if inode.startswith('socket:['):
                    # the process is using a socket
                    inode = inode[8:][:-1]
                    inodes[inode].append((pid, int(fd)))
        return inodes
_pslinux.py 文件源码 项目:pupy 作者: ru-faraon 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def exe(self):
        try:
            return readlink("%s/%s/exe" % (self._procfs_path, self.pid))
        except OSError as err:
            if err.errno in (errno.ENOENT, errno.ESRCH):
                # no such file error; might be raised also if the
                # path actually exists for system processes with
                # low pids (about 0-20)
                if os.path.lexists("%s/%s" % (self._procfs_path, self.pid)):
                    return ""
                else:
                    if not pid_exists(self.pid):
                        raise NoSuchProcess(self.pid, self._name)
                    else:
                        raise ZombieProcess(self.pid, self._name, self._ppid)
            if err.errno in (errno.EPERM, errno.EACCES):
                raise AccessDenied(self.pid, self._name)
            raise
_psbsd.py 文件源码 项目:pupy 作者: ru-faraon 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def cwd(self):
        """Return process current working directory."""
        # sometimes we get an empty string, in which case we turn
        # it into None
        if OPENBSD and self.pid == 0:
            return None  # ...else it would raise EINVAL
        elif NETBSD:
            with wrap_exceptions_procfs(self):
                return os.readlink("/proc/%s/cwd" % self.pid)
        elif hasattr(cext, 'proc_open_files'):
            # FreeBSD < 8 does not support functions based on
            # kinfo_getfile() and kinfo_getvmmap()
            return cext.proc_cwd(self.pid) or None
        else:
            raise NotImplementedError(
                "supported only starting from FreeBSD 8" if
                FREEBSD else "")
_pssunos.py 文件源码 项目:pupy 作者: ru-faraon 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def terminal(self):
        procfs_path = self._procfs_path
        hit_enoent = False
        tty = wrap_exceptions(
            cext.proc_basic_info(self.pid, self._procfs_path)[0])
        if tty != cext.PRNODEV:
            for x in (0, 1, 2, 255):
                try:
                    return os.readlink(
                        '%s/%d/path/%d' % (procfs_path, self.pid, x))
                except OSError as err:
                    if err.errno == errno.ENOENT:
                        hit_enoent = True
                        continue
                    raise
        if hit_enoent:
            # raise NSP if the process disappeared on us
            os.stat('%s/%s' % (procfs_path, self.pid))
_pssunos.py 文件源码 项目:pupy 作者: ru-faraon 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def open_files(self):
        retlist = []
        hit_enoent = False
        procfs_path = self._procfs_path
        pathdir = '%s/%d/path' % (procfs_path, self.pid)
        for fd in os.listdir('%s/%d/fd' % (procfs_path, self.pid)):
            path = os.path.join(pathdir, fd)
            if os.path.islink(path):
                try:
                    file = os.readlink(path)
                except OSError as err:
                    # ENOENT == file which is gone in the meantime
                    if err.errno == errno.ENOENT:
                        hit_enoent = True
                        continue
                    raise
                else:
                    if isfile_strict(file):
                        retlist.append(_common.popenfile(file, int(fd)))
        if hit_enoent:
            # raise NSP if the process disappeared on us
            os.stat('%s/%s' % (procfs_path, self.pid))
        return retlist
virtualenv.py 文件源码 项目:threatdetectionservice 作者: flyballlabs 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def copyfile(src, dest, symlink=True):
    if not os.path.exists(src):
        # Some bad symlink in the src
        logger.warn('Cannot find file %s (bad symlink)', src)
        return
    if os.path.exists(dest):
        logger.debug('File %s already exists', dest)
        return
    if not os.path.exists(os.path.dirname(dest)):
        logger.info('Creating parent directories for %s' % os.path.dirname(dest))
        os.makedirs(os.path.dirname(dest))
    if not os.path.islink(src):
        srcpath = os.path.abspath(src)
    else:
        srcpath = os.readlink(src)
    if symlink and hasattr(os, 'symlink') and not is_win:
        logger.info('Symlinking %s', dest)
        try:
            os.symlink(srcpath, dest)
        except (OSError, NotImplementedError):
            logger.info('Symlinking failed, copying to %s', dest)
            copyfileordir(src, dest)
    else:
        logger.info('Copying to %s', dest)
        copyfileordir(src, dest)
mount.py 文件源码 项目:kostka 作者: pixers 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def mount_lowerdirs(self):
        # First, build a dependency graph in order to avoid duplicate entries
        dependencies = {}

        def dependency_path(dep):
            container = self.__class__(dep['imageName'])
            path = dep.get('path', os.readlink(os.path.join(container.path, 'overlay.fs')))
            return os.path.join(dep['imageName'], path)

        pending_deps = set(map(dependency_path, self.dependencies))
        while len(pending_deps) > 0:
            path = pending_deps.pop()
            name = path.split('/')[-2]
            if name not in dependencies:
                dependencies[path] = set(map(dependency_path, self.__class__(name).dependencies))
                pending_deps |= dependencies[path]

        # Then sort it topologically. The list is reversed, because overlayfs
        # will check the mounts in order they are given, so the base fs has to
        # be the last one.
        dependencies = reversed(list(toposort_flatten(dependencies)))
        return ':'.join(os.path.join(self.metadata_dir, dep) for dep in dependencies)
file.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def recursive_set_attributes(module, b_path, follow, file_args):
    changed = False
    for b_root, b_dirs, b_files in os.walk(b_path):
        for b_fsobj in b_dirs + b_files:
            b_fsname = os.path.join(b_root, b_fsobj)
            if not os.path.islink(b_fsname):
                tmp_file_args = file_args.copy()
                tmp_file_args['path'] = to_native(b_fsname, errors='surrogate_or_strict')
                changed |= module.set_fs_attributes_if_different(tmp_file_args, changed)
            else:
                tmp_file_args = file_args.copy()
                tmp_file_args['path'] = to_native(b_fsname, errors='surrogate_or_strict')
                changed |= module.set_fs_attributes_if_different(tmp_file_args, changed)
                if follow:
                    b_fsname = os.path.join(b_root, os.readlink(b_fsname))
                    if os.path.isdir(b_fsname):
                        changed |= recursive_set_attributes(module, b_fsname, follow, file_args)
                    tmp_file_args = file_args.copy()
                    tmp_file_args['path'] = to_native(b_fsname, errors='surrogate_or_strict')
                    changed |= module.set_fs_attributes_if_different(tmp_file_args, changed)
    return changed
utils.py 文件源码 项目:build-calibre 作者: kovidgoyal 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def lcopy(src, dst, no_hardlinks=False):
    try:
        if issymlink(src):
            linkto = os.readlink(src)
            os.symlink(linkto, dst)
            return True
        else:
            if no_hardlinks:
                shutil.copy(src, dst)
            else:
                hardlink(src, dst)
            return False
    except EnvironmentError as err:
        if err.errno == errno.EEXIST:
            os.unlink(dst)
            return lcopy(src, dst)
        raise
files.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_file_contents(code: models.File) -> bytes:
    """Get the contents of the given :class:`.models.File`.

    :param code: The file object to read.
    :returns: The contents of the file with newlines.
    """
    if code.is_directory:
        raise APIException(
            'Cannot display this file as it is a directory.',
            f'The selected file with id {code.id} is a directory.',
            APICodes.OBJECT_WRONG_TYPE, 400
        )

    filename = code.get_diskname()
    if os.path.islink(filename):
        raise APIException(
            f'This file is a symlink to `{os.readlink(filename)}`.',
            'The file {} is a symlink'.format(code.id), APICodes.INVALID_STATE,
            410
        )
    with open(filename, 'rb') as codefile:
        return codefile.read()


问题


面经


文章

微信
公众号

扫码关注公众号