def lookupmodule(self, filename):
"""Helper function for break/clear parsing -- may be overridden.
lookupmodule() translates (possibly incomplete) file or module name
into an absolute file name.
"""
if os.path.isabs(filename) and os.path.exists(filename):
return filename
f = os.path.join(sys.path[0], filename)
if os.path.exists(f) and self.canonic(f) == self.mainpyfile:
return f
root, ext = os.path.splitext(filename)
if ext == '':
filename = filename + '.py'
if os.path.isabs(filename):
return filename
for dirname in sys.path:
while os.path.islink(dirname):
dirname = os.readlink(dirname)
fullname = os.path.join(dirname, filename)
if os.path.exists(fullname):
return fullname
return None
python类readlink()的实例源码
def _resolve_link(path):
"""Internal helper function. Takes a path and follows symlinks
until we either arrive at something that isn't a symlink, or
encounter a path we've seen before (meaning that there's a loop).
"""
paths_seen = set()
while islink(path):
if path in paths_seen:
# Already seen this path, so we must have a symlink loop
return None
paths_seen.add(path)
# Resolve where the link points to
resolved = os.readlink(path)
if not isabs(resolved):
dir = dirname(path)
path = normpath(join(dir, resolved))
else:
path = normpath(resolved)
return path
def execute(self):
from ranger.container.file import File
new_path = self.rest(1)
cf = self.fm.thisfile
if not new_path:
return self.fm.notify('Syntax: relink <newpath>', bad=True)
if not cf.is_link:
return self.fm.notify('%s is not a symlink!' % cf.relative_path, bad=True)
if new_path == os.readlink(cf.path):
return
try:
os.remove(cf.path)
os.symlink(new_path, cf.path)
except OSError as err:
self.fm.notify(err)
self.fm.reset()
self.fm.thisdir.pointed_obj = cf
self.fm.thisfile = cf
def find_exec(name, error='error', extra=""):
proc = subprocess.Popen(["which", name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
path = proc.communicate()[0]
if path:
path = path.strip()
if os.path.islink(path):
path = os.readlink(path)
return path
else:
if error in ('error', 'warning'):
sys.stderr.write("%s: can't find %s in $PATH.\n" % (error, name))
if extra:
sys.stderr.write("%s\n" % (extra,))
if error == 'error':
sys.exit()
return False
# Classes
##########
def lookupmodule(self, filename):
"""Helper function for break/clear parsing -- may be overridden.
lookupmodule() translates (possibly incomplete) file or module name
into an absolute file name.
"""
if os.path.isabs(filename) and os.path.exists(filename):
return filename
f = os.path.join(sys.path[0], filename)
if os.path.exists(f) and self.canonic(f) == self.mainpyfile:
return f
root, ext = os.path.splitext(filename)
if ext == '':
filename = filename + '.py'
if os.path.isabs(filename):
return filename
for dirname in sys.path:
while os.path.islink(dirname):
dirname = os.readlink(dirname)
fullname = os.path.join(dirname, filename)
if os.path.exists(fullname):
return fullname
return None
def copy(self, target, mode=False):
""" copy path to target."""
if self.check(file=1):
if target.check(dir=1):
target = target.join(self.basename)
assert self!=target
copychunked(self, target)
if mode:
copymode(self.strpath, target.strpath)
else:
def rec(p):
return p.check(link=0)
for x in self.visit(rec=rec):
relpath = x.relto(self)
newx = target.join(relpath)
newx.dirpath().ensure(dir=1)
if x.check(link=1):
newx.mksymlinkto(x.readlink())
continue
elif x.check(file=1):
copychunked(x, newx)
elif x.check(dir=1):
newx.ensure(dir=1)
if mode:
copymode(x.strpath, newx.strpath)
def _getAppDirConfig(self, appname):
try:
appconfig = self.app.configuration.get("SplunkDeployment.apps")[appname]
appdir = appconfig.get("directory")
if os.path.islink(appdir):
appdir = os.readlink(appdir)
if not os.path.isdir(appdir):
raise AppNotFoundException("The app \"" + appname + "\" does not exist in \"" + appdir + "\".")
return [appdir, appconfig]
except AppNotFoundException as e:
raise e
except:
pass
appconfig = self.app.configuration.get("SplunkDeployment.apps.default")
appdir = os.path.join(
appconfig.get("directory"),
appname
)
if os.path.islink(appdir):
appdir = os.readlink(appdir)
if not os.path.isdir(appdir):
raise AppNotFoundException("The app \"" + appname + "\" does not exist in \"" + appdir + "\".")
return [appdir, appconfig]
def readlink(path):
"""Wrapper around os.readlink()."""
assert isinstance(path, basestring), path
path = os.readlink(path)
# readlink() might return paths containing null bytes ('\x00')
# resulting in "TypeError: must be encoded string without NULL
# bytes, not str" errors when the string is passed to other
# fs-related functions (os.*, open(), ...).
# Apparently everything after '\x00' is garbage (we can have
# ' (deleted)', 'new' and possibly others), see:
# https://github.com/giampaolo/psutil/issues/717
path = path.split('\x00')[0]
# Certain paths have ' (deleted)' appended. Usually this is
# bogus as the file actually exists. Even if it doesn't we
# don't care.
if path.endswith(' (deleted)') and not path_exists_strict(path):
path = path[:-10]
return path
def get_proc_inodes(self, pid):
inodes = defaultdict(list)
for fd in os.listdir("%s/%s/fd" % (self._procfs_path, pid)):
try:
inode = readlink("%s/%s/fd/%s" % (self._procfs_path, pid, fd))
except OSError as err:
# ENOENT == file which is gone in the meantime;
# os.stat('/proc/%s' % self.pid) will be done later
# to force NSP (if it's the case)
if err.errno in (errno.ENOENT, errno.ESRCH):
continue
elif err.errno == errno.EINVAL:
# not a link
continue
else:
raise
else:
if inode.startswith('socket:['):
# the process is using a socket
inode = inode[8:][:-1]
inodes[inode].append((pid, int(fd)))
return inodes
def exe(self):
try:
return readlink("%s/%s/exe" % (self._procfs_path, self.pid))
except OSError as err:
if err.errno in (errno.ENOENT, errno.ESRCH):
# no such file error; might be raised also if the
# path actually exists for system processes with
# low pids (about 0-20)
if os.path.lexists("%s/%s" % (self._procfs_path, self.pid)):
return ""
else:
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
def cwd(self):
"""Return process current working directory."""
# sometimes we get an empty string, in which case we turn
# it into None
if OPENBSD and self.pid == 0:
return None # ...else it would raise EINVAL
elif NETBSD:
with wrap_exceptions_procfs(self):
return os.readlink("/proc/%s/cwd" % self.pid)
elif hasattr(cext, 'proc_open_files'):
# FreeBSD < 8 does not support functions based on
# kinfo_getfile() and kinfo_getvmmap()
return cext.proc_cwd(self.pid) or None
else:
raise NotImplementedError(
"supported only starting from FreeBSD 8" if
FREEBSD else "")
def terminal(self):
procfs_path = self._procfs_path
hit_enoent = False
tty = wrap_exceptions(
cext.proc_basic_info(self.pid, self._procfs_path)[0])
if tty != cext.PRNODEV:
for x in (0, 1, 2, 255):
try:
return os.readlink(
'%s/%d/path/%d' % (procfs_path, self.pid, x))
except OSError as err:
if err.errno == errno.ENOENT:
hit_enoent = True
continue
raise
if hit_enoent:
# raise NSP if the process disappeared on us
os.stat('%s/%s' % (procfs_path, self.pid))
def open_files(self):
retlist = []
hit_enoent = False
procfs_path = self._procfs_path
pathdir = '%s/%d/path' % (procfs_path, self.pid)
for fd in os.listdir('%s/%d/fd' % (procfs_path, self.pid)):
path = os.path.join(pathdir, fd)
if os.path.islink(path):
try:
file = os.readlink(path)
except OSError as err:
# ENOENT == file which is gone in the meantime
if err.errno == errno.ENOENT:
hit_enoent = True
continue
raise
else:
if isfile_strict(file):
retlist.append(_common.popenfile(file, int(fd)))
if hit_enoent:
# raise NSP if the process disappeared on us
os.stat('%s/%s' % (procfs_path, self.pid))
return retlist
def readlink(path):
"""Wrapper around os.readlink()."""
assert isinstance(path, basestring), path
path = os.readlink(path)
# readlink() might return paths containing null bytes ('\x00')
# resulting in "TypeError: must be encoded string without NULL
# bytes, not str" errors when the string is passed to other
# fs-related functions (os.*, open(), ...).
# Apparently everything after '\x00' is garbage (we can have
# ' (deleted)', 'new' and possibly others), see:
# https://github.com/giampaolo/psutil/issues/717
path = path.split('\x00')[0]
# Certain paths have ' (deleted)' appended. Usually this is
# bogus as the file actually exists. Even if it doesn't we
# don't care.
if path.endswith(' (deleted)') and not path_exists_strict(path):
path = path[:-10]
return path
def get_proc_inodes(self, pid):
inodes = defaultdict(list)
for fd in os.listdir("%s/%s/fd" % (self._procfs_path, pid)):
try:
inode = readlink("%s/%s/fd/%s" % (self._procfs_path, pid, fd))
except OSError as err:
# ENOENT == file which is gone in the meantime;
# os.stat('/proc/%s' % self.pid) will be done later
# to force NSP (if it's the case)
if err.errno in (errno.ENOENT, errno.ESRCH):
continue
elif err.errno == errno.EINVAL:
# not a link
continue
else:
raise
else:
if inode.startswith('socket:['):
# the process is using a socket
inode = inode[8:][:-1]
inodes[inode].append((pid, int(fd)))
return inodes
def exe(self):
try:
return readlink("%s/%s/exe" % (self._procfs_path, self.pid))
except OSError as err:
if err.errno in (errno.ENOENT, errno.ESRCH):
# no such file error; might be raised also if the
# path actually exists for system processes with
# low pids (about 0-20)
if os.path.lexists("%s/%s" % (self._procfs_path, self.pid)):
return ""
else:
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name)
else:
raise ZombieProcess(self.pid, self._name, self._ppid)
if err.errno in (errno.EPERM, errno.EACCES):
raise AccessDenied(self.pid, self._name)
raise
def cwd(self):
"""Return process current working directory."""
# sometimes we get an empty string, in which case we turn
# it into None
if OPENBSD and self.pid == 0:
return None # ...else it would raise EINVAL
elif NETBSD:
with wrap_exceptions_procfs(self):
return os.readlink("/proc/%s/cwd" % self.pid)
elif hasattr(cext, 'proc_open_files'):
# FreeBSD < 8 does not support functions based on
# kinfo_getfile() and kinfo_getvmmap()
return cext.proc_cwd(self.pid) or None
else:
raise NotImplementedError(
"supported only starting from FreeBSD 8" if
FREEBSD else "")
def terminal(self):
procfs_path = self._procfs_path
hit_enoent = False
tty = wrap_exceptions(
cext.proc_basic_info(self.pid, self._procfs_path)[0])
if tty != cext.PRNODEV:
for x in (0, 1, 2, 255):
try:
return os.readlink(
'%s/%d/path/%d' % (procfs_path, self.pid, x))
except OSError as err:
if err.errno == errno.ENOENT:
hit_enoent = True
continue
raise
if hit_enoent:
# raise NSP if the process disappeared on us
os.stat('%s/%s' % (procfs_path, self.pid))
def open_files(self):
retlist = []
hit_enoent = False
procfs_path = self._procfs_path
pathdir = '%s/%d/path' % (procfs_path, self.pid)
for fd in os.listdir('%s/%d/fd' % (procfs_path, self.pid)):
path = os.path.join(pathdir, fd)
if os.path.islink(path):
try:
file = os.readlink(path)
except OSError as err:
# ENOENT == file which is gone in the meantime
if err.errno == errno.ENOENT:
hit_enoent = True
continue
raise
else:
if isfile_strict(file):
retlist.append(_common.popenfile(file, int(fd)))
if hit_enoent:
# raise NSP if the process disappeared on us
os.stat('%s/%s' % (procfs_path, self.pid))
return retlist
def execute(self):
from ranger.container.file import File
new_path = self.rest(1)
cf = self.fm.thisfile
if not new_path:
return self.fm.notify('Syntax: relink <newpath>', bad=True)
if not cf.is_link:
return self.fm.notify('%s is not a symlink!' % cf.relative_path, bad=True)
if new_path == os.readlink(cf.path):
return
try:
os.remove(cf.path)
os.symlink(new_path, cf.path)
except OSError as err:
self.fm.notify(err)
self.fm.reset()
self.fm.thisdir.pointed_obj = cf
self.fm.thisfile = cf
def _SetCurrent(self, image):
filename = '%d.iso' % (image['timestamp'])
path = os.path.join(self._image_dir, filename)
current_path = os.path.join(self._image_dir, 'current')
try:
link = os.readlink(current_path)
link_path = os.path.join(self._image_dir, link)
if link_path == path:
return
except FileNotFoundError:
pass
print('Changing current link to:', filename, flush=True)
temp_path = tempfile.mktemp(dir=self._image_dir)
os.symlink(filename, temp_path)
os.rename(temp_path, current_path)
def get_function_by_ifname(ifname):
"""Get the function by the interface name
Given the device name, returns the PCI address of a device
and returns True if the address is in a physical function.
"""
dev_path = "/sys/class/net/%s/device" % ifname
sriov_totalvfs = 0
if os.path.isdir(dev_path):
try:
# sriov_totalvfs contains the maximum possible VFs for this PF
with open(os.path.join(dev_path, _SRIOV_TOTALVFS)) as fd:
sriov_totalvfs = int(fd.read())
return (os.readlink(dev_path).strip("./"),
sriov_totalvfs > 0)
except (IOError, ValueError):
return os.readlink(dev_path).strip("./"), False
return None, False
def get_vf_num_by_pci_address(pci_addr):
"""Get the VF number based on a VF's pci address
A VF is associated with an VF number, which ip link command uses to
configure it. This number can be obtained from the PCI device filesystem.
"""
VIRTFN_RE = re.compile("virtfn(\d+)")
virtfns_path = "/sys/bus/pci/devices/%s/physfn/virtfn*" % (pci_addr)
vf_num = None
try:
for vf_path in glob.iglob(virtfns_path):
if re.search(pci_addr, os.readlink(vf_path)):
t = VIRTFN_RE.search(vf_path)
vf_num = t.group(1)
break
except Exception:
pass
if vf_num is None:
raise exception.PciDeviceNotFoundById(id=pci_addr)
return vf_num
def test_update_link_to(self):
for ver in six.moves.range(1, 6):
for kind in ALL_FOUR:
self._write_out_kind(kind, ver)
self.assertEqual(ver, self.test_rc.current_version(kind))
# pylint: disable=protected-access
self.test_rc._update_link_to("cert", 3)
self.test_rc._update_link_to("privkey", 2)
self.assertEqual(3, self.test_rc.current_version("cert"))
self.assertEqual(2, self.test_rc.current_version("privkey"))
self.assertEqual(5, self.test_rc.current_version("chain"))
self.assertEqual(5, self.test_rc.current_version("fullchain"))
# Currently we are allowed to update to a version that doesn't exist
self.test_rc._update_link_to("chain", 3000)
# However, current_version doesn't allow querying the resulting
# version (because it's a broken link).
self.assertEqual(os.path.basename(os.readlink(self.test_rc.chain)),
"chain3000.pem")
def get_link_target(link):
"""Get an absolute path to the target of link.
:param str link: Path to a symbolic link
:returns: Absolute path to the target of link
:rtype: str
:raises .CertStorageError: If link does not exists.
"""
try:
target = os.readlink(link)
except OSError:
raise errors.CertStorageError(
"Expected {0} to be a symlink".format(link))
if not os.path.isabs(target):
target = os.path.join(os.path.dirname(link), target)
return os.path.abspath(target)
def _fix_symlinks(self):
"""Fixes symlinks in the event of an incomplete version update.
If there is no problem with the current symlinks, this function
has no effect.
"""
previous_symlinks = self._previous_symlinks()
if all(os.path.exists(link[1]) for link in previous_symlinks):
for kind, previous_link in previous_symlinks:
current_link = getattr(self, kind)
if os.path.lexists(current_link):
os.unlink(current_link)
os.symlink(os.readlink(previous_link), current_link)
for _, link in previous_symlinks:
if os.path.exists(link):
os.unlink(link)
def packet_socket():
'''
Function to return a list of pids and process names utilizing packet sockets.
'''
packetcontent = _packetload()
packetresult = []
for line in packetcontent:
line_array = _remove_empty(line.split(' '))
inode = line_array[8].rstrip()
pid = _get_pid_of_inode(inode)
try:
exe = os.readlink('/proc/' + pid + '/exe')
except BaseException:
exe = None
nline = [pid, exe]
packetresult.append(nline)
return packetresult
def need_to_create_symlink(directory, checksums, filetype, symlink_path):
"""Check if we need to create a symlink for an existing file"""
# If we don't have a symlink path, we don't need to create a symlink
if symlink_path is None:
return False
pattern = EFormats.get_content(filetype)
filename, _ = get_name_and_checksum(checksums, pattern)
full_filename = os.path.join(directory, filename)
symlink_name = os.path.join(symlink_path, filename)
if os.path.islink(symlink_name):
existing_link = os.readlink(symlink_name)
if full_filename == existing_link:
return False
return True
def is_git_dir(d):
""" This is taken from the git setup.c:is_git_directory
function.
@throws WorkTreeRepositoryUnsupported if it sees a worktree directory. It's quite hacky to do that here,
but at least clearly indicates that we don't support it.
There is the unlikely danger to throw if we see directories which just look like a worktree dir,
but are none."""
if osp.isdir(d):
if osp.isdir(osp.join(d, 'objects')) and osp.isdir(osp.join(d, 'refs')):
headref = osp.join(d, 'HEAD')
return osp.isfile(headref) or \
(osp.islink(headref) and
os.readlink(headref).startswith('refs'))
elif (osp.isfile(osp.join(d, 'gitdir')) and
osp.isfile(osp.join(d, 'commondir')) and
osp.isfile(osp.join(d, 'gitfile'))):
raise WorkTreeRepositoryUnsupported(d)
return False
def lookupmodule(self, filename):
"""Helper function for break/clear parsing -- may be overridden.
lookupmodule() translates (possibly incomplete) file or module name
into an absolute file name.
"""
if os.path.isabs(filename) and os.path.exists(filename):
return filename
f = os.path.join(sys.path[0], filename)
if os.path.exists(f) and self.canonic(f) == self.mainpyfile:
return f
root, ext = os.path.splitext(filename)
if ext == '':
filename = filename + '.py'
if os.path.isabs(filename):
return filename
for dirname in sys.path:
while os.path.islink(dirname):
dirname = os.readlink(dirname)
fullname = os.path.join(dirname, filename)
if os.path.exists(fullname):
return fullname
return None