def _read_fstab(self, line, **params):
if not line or line.startswith('#'): return
fields = line.split()
dev = fields[0]
config = {
'dev': fields[0],
'mount': fields[1],
'fstype': fields[2],
}
if dev.startswith('/dev/'):
try:
devlink = os.readlink(dev)
dev = os.path.abspath(os.path.join(os.path.dirname(devlink), dev))
except:
pass
dev = dev.replace('/dev/', '')
if dev == params['devname']:
return config
elif dev.startswith('UUID='):
uuid = dev.replace('UUID=', '')
partinfo = si.Server.partinfo(devname=params['devname'])
if partinfo['uuid'] == uuid:
return config
python类readlink()的实例源码
def lookupmodule(self, filename):
"""Helper function for break/clear parsing -- may be overridden.
lookupmodule() translates (possibly incomplete) file or module name
into an absolute file name.
"""
if os.path.isabs(filename) and os.path.exists(filename):
return filename
f = os.path.join(sys.path[0], filename)
if os.path.exists(f) and self.canonic(f) == self.mainpyfile:
return f
root, ext = os.path.splitext(filename)
if ext == '':
filename = filename + '.py'
if os.path.isabs(filename):
return filename
for dirname in sys.path:
while os.path.islink(dirname):
dirname = os.readlink(dirname)
fullname = os.path.join(dirname, filename)
if os.path.exists(fullname):
return fullname
return None
def load_models(self):
now = datetime.now()
self.rec_candidates = []
if self.last_model_update is None or (now - self.last_model_update).days >= 1:
print('loading model', now)
model_path = '{0}/it-topics'.format(settings.PORTRAIT_FOLDER)
lda_filename = os.readlink('{0}/current_lda_model.gensim'.format(model_path))
self.lda_model = gensim.models.ldamulticore.LdaMulticore.load(lda_filename)
self.topic_graph = nx.read_gpickle('{0}/current_topic_graph.nx'.format(model_path))
with gzip.open('{0}/current_candidates.json.gz'.format(model_path), 'rt') as f:
self.rec_candidates = json.load(f)
print('loaded', len(self.rec_candidates), 'candidates')
self.last_model_update = datetime.now()
def readlink(path):
"""Wrapper around os.readlink()."""
assert isinstance(path, basestring), path
path = os.readlink(path)
# readlink() might return paths containing null bytes ('\x00')
# resulting in "TypeError: must be encoded string without NULL
# bytes, not str" errors when the string is passed to other
# fs-related functions (os.*, open(), ...).
# Apparently everything after '\x00' is garbage (we can have
# ' (deleted)', 'new' and possibly others), see:
# https://github.com/giampaolo/psutil/issues/717
path = path.split('\x00')[0]
# Certain paths have ' (deleted)' appended. Usually this is
# bogus as the file actually exists. Even if it doesn't we
# don't care.
if path.endswith(' (deleted)') and not path_exists_strict(path):
path = path[:-10]
return path
def get_proc_inodes(self, pid):
inodes = defaultdict(list)
for fd in os.listdir("%s/%s/fd" % (self._procfs_path, pid)):
try:
inode = readlink("%s/%s/fd/%s" % (self._procfs_path, pid, fd))
except OSError as err:
# ENOENT == file which is gone in the meantime;
# os.stat('/proc/%s' % self.pid) will be done later
# to force NSP (if it's the case)
if err.errno in (errno.ENOENT, errno.ESRCH):
continue
elif err.errno == errno.EINVAL:
# not a link
continue
else:
raise
else:
if inode.startswith('socket:['):
# the process is using a socket
inode = inode[8:][:-1]
inodes[inode].append((pid, int(fd)))
return inodes
def exe(self):
try:
return readlink("%s/%s/exe" % (self._procfs_path, self.pid))
except OSError as err:
if err.errno in (errno.ENOENT, errno.ESRCH):
# no such file error; might be raised also if the
# path actually exists for system processes with
# low pids (about 0-20)
if os.path.lexists("%s/%s" % (self._procfs_path, self.pid)):
return ""
else:
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
def cwd(self):
"""Return process current working directory."""
# sometimes we get an empty string, in which case we turn
# it into None
if OPENBSD and self.pid == 0:
return None # ...else it would raise EINVAL
elif NETBSD:
with wrap_exceptions_procfs(self):
return os.readlink("/proc/%s/cwd" % self.pid)
elif hasattr(cext, 'proc_open_files'):
# FreeBSD < 8 does not support functions based on
# kinfo_getfile() and kinfo_getvmmap()
return cext.proc_cwd(self.pid) or None
else:
raise NotImplementedError(
"supported only starting from FreeBSD 8" if
FREEBSD else "")
def terminal(self):
procfs_path = self._procfs_path
hit_enoent = False
tty = wrap_exceptions(
cext.proc_basic_info(self.pid, self._procfs_path)[0])
if tty != cext.PRNODEV:
for x in (0, 1, 2, 255):
try:
return os.readlink(
'%s/%d/path/%d' % (procfs_path, self.pid, x))
except OSError as err:
if err.errno == errno.ENOENT:
hit_enoent = True
continue
raise
if hit_enoent:
# raise NSP if the process disappeared on us
os.stat('%s/%s' % (procfs_path, self.pid))
def open_files(self):
retlist = []
hit_enoent = False
procfs_path = self._procfs_path
pathdir = '%s/%d/path' % (procfs_path, self.pid)
for fd in os.listdir('%s/%d/fd' % (procfs_path, self.pid)):
path = os.path.join(pathdir, fd)
if os.path.islink(path):
try:
file = os.readlink(path)
except OSError as err:
# ENOENT == file which is gone in the meantime
if err.errno == errno.ENOENT:
hit_enoent = True
continue
raise
else:
if isfile_strict(file):
retlist.append(_common.popenfile(file, int(fd)))
if hit_enoent:
# raise NSP if the process disappeared on us
os.stat('%s/%s' % (procfs_path, self.pid))
return retlist
def readlink(path):
"""Wrapper around os.readlink()."""
assert isinstance(path, basestring), path
path = os.readlink(path)
# readlink() might return paths containing null bytes ('\x00')
# resulting in "TypeError: must be encoded string without NULL
# bytes, not str" errors when the string is passed to other
# fs-related functions (os.*, open(), ...).
# Apparently everything after '\x00' is garbage (we can have
# ' (deleted)', 'new' and possibly others), see:
# https://github.com/giampaolo/psutil/issues/717
path = path.split('\x00')[0]
# Certain paths have ' (deleted)' appended. Usually this is
# bogus as the file actually exists. Even if it doesn't we
# don't care.
if path.endswith(' (deleted)') and not path_exists_strict(path):
path = path[:-10]
return path
def get_proc_inodes(self, pid):
inodes = defaultdict(list)
for fd in os.listdir("%s/%s/fd" % (self._procfs_path, pid)):
try:
inode = readlink("%s/%s/fd/%s" % (self._procfs_path, pid, fd))
except OSError as err:
# ENOENT == file which is gone in the meantime;
# os.stat('/proc/%s' % self.pid) will be done later
# to force NSP (if it's the case)
if err.errno in (errno.ENOENT, errno.ESRCH):
continue
elif err.errno == errno.EINVAL:
# not a link
continue
else:
raise
else:
if inode.startswith('socket:['):
# the process is using a socket
inode = inode[8:][:-1]
inodes[inode].append((pid, int(fd)))
return inodes
def exe(self):
try:
return readlink("%s/%s/exe" % (self._procfs_path, self.pid))
except OSError as err:
if err.errno in (errno.ENOENT, errno.ESRCH):
# no such file error; might be raised also if the
# path actually exists for system processes with
# low pids (about 0-20)
if os.path.lexists("%s/%s" % (self._procfs_path, self.pid)):
return ""
else:
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
def cwd(self):
"""Return process current working directory."""
# sometimes we get an empty string, in which case we turn
# it into None
if OPENBSD and self.pid == 0:
return None # ...else it would raise EINVAL
elif NETBSD:
with wrap_exceptions_procfs(self):
return os.readlink("/proc/%s/cwd" % self.pid)
elif hasattr(cext, 'proc_open_files'):
# FreeBSD < 8 does not support functions based on
# kinfo_getfile() and kinfo_getvmmap()
return cext.proc_cwd(self.pid) or None
else:
raise NotImplementedError(
"supported only starting from FreeBSD 8" if
FREEBSD else "")
def terminal(self):
procfs_path = self._procfs_path
hit_enoent = False
tty = wrap_exceptions(
cext.proc_basic_info(self.pid, self._procfs_path)[0])
if tty != cext.PRNODEV:
for x in (0, 1, 2, 255):
try:
return os.readlink(
'%s/%d/path/%d' % (procfs_path, self.pid, x))
except OSError as err:
if err.errno == errno.ENOENT:
hit_enoent = True
continue
raise
if hit_enoent:
# raise NSP if the process disappeared on us
os.stat('%s/%s' % (procfs_path, self.pid))
def open_files(self):
retlist = []
hit_enoent = False
procfs_path = self._procfs_path
pathdir = '%s/%d/path' % (procfs_path, self.pid)
for fd in os.listdir('%s/%d/fd' % (procfs_path, self.pid)):
path = os.path.join(pathdir, fd)
if os.path.islink(path):
try:
file = os.readlink(path)
except OSError as err:
# ENOENT == file which is gone in the meantime
if err.errno == errno.ENOENT:
hit_enoent = True
continue
raise
else:
if isfile_strict(file):
retlist.append(_common.popenfile(file, int(fd)))
if hit_enoent:
# raise NSP if the process disappeared on us
os.stat('%s/%s' % (procfs_path, self.pid))
return retlist
def copyfile(src, dest, symlink=True):
if not os.path.exists(src):
# Some bad symlink in the src
logger.warn('Cannot find file %s (bad symlink)', src)
return
if os.path.exists(dest):
logger.debug('File %s already exists', dest)
return
if not os.path.exists(os.path.dirname(dest)):
logger.info('Creating parent directories for %s' % os.path.dirname(dest))
os.makedirs(os.path.dirname(dest))
if not os.path.islink(src):
srcpath = os.path.abspath(src)
else:
srcpath = os.readlink(src)
if symlink and hasattr(os, 'symlink') and not is_win:
logger.info('Symlinking %s', dest)
try:
os.symlink(srcpath, dest)
except (OSError, NotImplementedError):
logger.info('Symlinking failed, copying to %s', dest)
copyfileordir(src, dest)
else:
logger.info('Copying to %s', dest)
copyfileordir(src, dest)
def mount_lowerdirs(self):
# First, build a dependency graph in order to avoid duplicate entries
dependencies = {}
def dependency_path(dep):
container = self.__class__(dep['imageName'])
path = dep.get('path', os.readlink(os.path.join(container.path, 'overlay.fs')))
return os.path.join(dep['imageName'], path)
pending_deps = set(map(dependency_path, self.dependencies))
while len(pending_deps) > 0:
path = pending_deps.pop()
name = path.split('/')[-2]
if name not in dependencies:
dependencies[path] = set(map(dependency_path, self.__class__(name).dependencies))
pending_deps |= dependencies[path]
# Then sort it topologically. The list is reversed, because overlayfs
# will check the mounts in order they are given, so the base fs has to
# be the last one.
dependencies = reversed(list(toposort_flatten(dependencies)))
return ':'.join(os.path.join(self.metadata_dir, dep) for dep in dependencies)
def recursive_set_attributes(module, b_path, follow, file_args):
changed = False
for b_root, b_dirs, b_files in os.walk(b_path):
for b_fsobj in b_dirs + b_files:
b_fsname = os.path.join(b_root, b_fsobj)
if not os.path.islink(b_fsname):
tmp_file_args = file_args.copy()
tmp_file_args['path'] = to_native(b_fsname, errors='surrogate_or_strict')
changed |= module.set_fs_attributes_if_different(tmp_file_args, changed)
else:
tmp_file_args = file_args.copy()
tmp_file_args['path'] = to_native(b_fsname, errors='surrogate_or_strict')
changed |= module.set_fs_attributes_if_different(tmp_file_args, changed)
if follow:
b_fsname = os.path.join(b_root, os.readlink(b_fsname))
if os.path.isdir(b_fsname):
changed |= recursive_set_attributes(module, b_fsname, follow, file_args)
tmp_file_args = file_args.copy()
tmp_file_args['path'] = to_native(b_fsname, errors='surrogate_or_strict')
changed |= module.set_fs_attributes_if_different(tmp_file_args, changed)
return changed
def lcopy(src, dst, no_hardlinks=False):
try:
if issymlink(src):
linkto = os.readlink(src)
os.symlink(linkto, dst)
return True
else:
if no_hardlinks:
shutil.copy(src, dst)
else:
hardlink(src, dst)
return False
except EnvironmentError as err:
if err.errno == errno.EEXIST:
os.unlink(dst)
return lcopy(src, dst)
raise
def get_file_contents(code: models.File) -> bytes:
"""Get the contents of the given :class:`.models.File`.
:param code: The file object to read.
:returns: The contents of the file with newlines.
"""
if code.is_directory:
raise APIException(
'Cannot display this file as it is a directory.',
f'The selected file with id {code.id} is a directory.',
APICodes.OBJECT_WRONG_TYPE, 400
)
filename = code.get_diskname()
if os.path.islink(filename):
raise APIException(
f'This file is a symlink to `{os.readlink(filename)}`.',
'The file {} is a symlink'.format(code.id), APICodes.INVALID_STATE,
410
)
with open(filename, 'rb') as codefile:
return codefile.read()