def test_realpath_symlink_loops(self):
# Bug #930024, return the path unchanged if we get into an infinite
# symlink loop.
try:
old_path = abspath('.')
os.symlink(ABSTFN, ABSTFN)
self.assertEqual(realpath(ABSTFN), ABSTFN)
os.symlink(ABSTFN+"1", ABSTFN+"2")
os.symlink(ABSTFN+"2", ABSTFN+"1")
self.assertEqual(realpath(ABSTFN+"1"), ABSTFN+"1")
self.assertEqual(realpath(ABSTFN+"2"), ABSTFN+"2")
# Test using relative path as well.
os.chdir(dirname(ABSTFN))
self.assertEqual(realpath(basename(ABSTFN)), ABSTFN)
finally:
os.chdir(old_path)
support.unlink(ABSTFN)
support.unlink(ABSTFN+"1")
support.unlink(ABSTFN+"2")
python类basename()的实例源码
def _find_url_name(self, index_url, url_name, req):
"""Finds the true URL name of a package, when the given name isn't quite correct.
This is usually used to implement case-insensitivity."""
if not index_url.url.endswith('/'):
# Vaguely part of the PyPI API... weird but true.
## FIXME: bad to modify this?
index_url.url += '/'
page = self._get_page(index_url, req)
if page is None:
logger.fatal('Cannot fetch index base URL %s' % index_url)
return
norm_name = normalize_name(req.url_name)
for link in page.links:
base = posixpath.basename(link.path.rstrip('/'))
if norm_name == normalize_name(base):
logger.notify('Real name of requirement %s is %s' % (url_name, base))
return base
return None
def get_package_loader(self, package, package_path):
from pkg_resources import DefaultProvider, ResourceManager, \
get_provider
loadtime = datetime.utcnow()
provider = get_provider(package)
manager = ResourceManager()
filesystem_bound = isinstance(provider, DefaultProvider)
def loader(path):
if path is None:
return None, None
path = posixpath.join(package_path, path)
if not provider.has_resource(path):
return None, None
basename = posixpath.basename(path)
if filesystem_bound:
return basename, self._opener(
provider.get_resource_filename(manager, path))
return basename, lambda: (
provider.get_resource_stream(manager, path),
loadtime,
0
)
return loader
def fetch_sideshow_ionex(path,
date,
work_path=None,
templates=[JPLH_TEMPLATE, JPLH_ARCHIVE_TEMPLATE]):
"""
???
"""
with SmartTempDir(work_path) as work_path:
for template in templates:
server_fname = template.format(date=date)
local_fname = os.path.join(path, posixpath.basename(server_fname)[:-3])
try:
update_sideshow_file(local_fname,
server_fname)
return local_fname
except:
logger.info('could not download {}'.format(server_fname))
continue
raise RuntimeError('could not download IONEX file from sideshow for {:%Y-%m-%d}'.format(date))
def get_package_loader(self, package, package_path):
from pkg_resources import DefaultProvider, ResourceManager, \
get_provider
loadtime = datetime.utcnow()
provider = get_provider(package)
manager = ResourceManager()
filesystem_bound = isinstance(provider, DefaultProvider)
def loader(path):
if path is None:
return None, None
path = posixpath.join(package_path, path)
if not provider.has_resource(path):
return None, None
basename = posixpath.basename(path)
if filesystem_bound:
return basename, self._opener(
provider.get_resource_filename(manager, path))
s = provider.get_resource_string(manager, path)
return basename, lambda: (
BytesIO(s),
loadtime,
len(s)
)
return loader
def get_package_loader(self, package, package_path):
from pkg_resources import DefaultProvider, ResourceManager, \
get_provider
loadtime = datetime.utcnow()
provider = get_provider(package)
manager = ResourceManager()
filesystem_bound = isinstance(provider, DefaultProvider)
def loader(path):
if path is None:
return None, None
path = posixpath.join(package_path, path)
if not provider.has_resource(path):
return None, None
basename = posixpath.basename(path)
if filesystem_bound:
return basename, self._opener(
provider.get_resource_filename(manager, path))
return basename, lambda: (
provider.get_resource_stream(manager, path),
loadtime,
0
)
return loader
def get_package_loader(self, package, package_path):
from pkg_resources import DefaultProvider, ResourceManager, \
get_provider
loadtime = datetime.utcnow()
provider = get_provider(package)
manager = ResourceManager()
filesystem_bound = isinstance(provider, DefaultProvider)
def loader(path):
if path is None:
return None, None
path = posixpath.join(package_path, path)
if not provider.has_resource(path):
return None, None
basename = posixpath.basename(path)
if filesystem_bound:
return basename, self._opener(
provider.get_resource_filename(manager, path))
return basename, lambda: (
provider.get_resource_stream(manager, path),
loadtime,
0
)
return loader
def find_links(this_file):
"""Read links of a markdown file.
Return a list of (target, title, lineno) pairs where title can be None.
"""
result = []
with common.slashfix_open(this_file, 'r') as f:
for match, lineno in common.find_links(f):
target = match.group(2)
if '#' in target:
file, title = target.split('#', 1)
if not file:
# link to this file, [blabla](#hi)
file = posixpath.basename(this_file)
else:
file = target
title = None
result.append((file, title, lineno))
return result
def get_package_loader(self, package, package_path):
from pkg_resources import DefaultProvider, ResourceManager, \
get_provider
loadtime = datetime.utcnow()
provider = get_provider(package)
manager = ResourceManager()
filesystem_bound = isinstance(provider, DefaultProvider)
def loader(path):
if path is None:
return None, None
path = posixpath.join(package_path, path)
if not provider.has_resource(path):
return None, None
basename = posixpath.basename(path)
if filesystem_bound:
return basename, self._opener(
provider.get_resource_filename(manager, path))
return basename, lambda: (
provider.get_resource_stream(manager, path),
loadtime,
0
)
return loader
def __init__(self, symbolizer):
self._symbolizer = symbolizer
self._lib_file_name = posixpath.basename(symbolizer.elf_file_path)
# The request queue (i.e. addresses pushed to addr2line's stdin and not
# yet retrieved on stdout)
self._request_queue = collections.deque()
# This is essentially len(self._request_queue). It has been optimized to a
# separate field because turned out to be a perf hot-spot.
self.queue_size = 0
# Keep track of the number of symbols a process has processed to
# avoid a single process growing too big and using all the memory.
self._processed_symbols_count = 0
# Objects required to handle the addr2line subprocess.
self._proc = None # Subprocess.Popen(...) instance.
self._thread = None # Threading.thread instance.
self._out_queue = None # Queue.Queue instance (for buffering a2l stdout).
self._RestartAddr2LineProcess()
def get_package_loader(self, package, package_path):
from pkg_resources import DefaultProvider, ResourceManager, \
get_provider
loadtime = datetime.utcnow()
provider = get_provider(package)
manager = ResourceManager()
filesystem_bound = isinstance(provider, DefaultProvider)
def loader(path):
if path is None:
return None, None
path = posixpath.join(package_path, path)
if not provider.has_resource(path):
return None, None
basename = posixpath.basename(path)
if filesystem_bound:
return basename, self._opener(
provider.get_resource_filename(manager, path))
return basename, lambda: (
provider.get_resource_stream(manager, path),
loadtime,
0
)
return loader
def get_package_loader(self, package, package_path):
from pkg_resources import DefaultProvider, ResourceManager, \
get_provider
loadtime = datetime.utcnow()
provider = get_provider(package)
manager = ResourceManager()
filesystem_bound = isinstance(provider, DefaultProvider)
def loader(path):
if path is None:
return None, None
path = posixpath.join(package_path, path)
if not provider.has_resource(path):
return None, None
basename = posixpath.basename(path)
if filesystem_bound:
return basename, self._opener(
provider.get_resource_filename(manager, path))
return basename, lambda: (
provider.get_resource_stream(manager, path),
loadtime,
0
)
return loader
def _find_url_name(self, index_url, url_name, req):
"""Finds the true URL name of a package, when the given name isn't quite correct.
This is usually used to implement case-insensitivity."""
if not index_url.url.endswith('/'):
# Vaguely part of the PyPI API... weird but true.
## FIXME: bad to modify this?
index_url.url += '/'
page = self._get_page(index_url, req)
if page is None:
logger.fatal('Cannot fetch index base URL %s' % index_url)
return
norm_name = normalize_name(req.url_name)
for link in page.links:
base = posixpath.basename(link.path.rstrip('/'))
if norm_name == normalize_name(base):
logger.notify('Real name of requirement %s is %s' % (url_name, base))
return base
return None
def get_package_loader(self, package, package_path):
from pkg_resources import DefaultProvider, ResourceManager, \
get_provider
loadtime = datetime.utcnow()
provider = get_provider(package)
manager = ResourceManager()
filesystem_bound = isinstance(provider, DefaultProvider)
def loader(path):
if path is None:
return None, None
path = posixpath.join(package_path, path)
if not provider.has_resource(path):
return None, None
basename = posixpath.basename(path)
if filesystem_bound:
return basename, self._opener(
provider.get_resource_filename(manager, path))
return basename, lambda: (
provider.get_resource_stream(manager, path),
loadtime,
0
)
return loader
def __init__(self, symbolizer):
self._symbolizer = symbolizer
self._lib_file_name = posixpath.basename(symbolizer.elf_file_path)
# The request queue (i.e. addresses pushed to addr2line's stdin and not
# yet retrieved on stdout)
self._request_queue = collections.deque()
# This is essentially len(self._request_queue). It has been optimized to a
# separate field because turned out to be a perf hot-spot.
self.queue_size = 0
# Keep track of the number of symbols a process has processed to
# avoid a single process growing too big and using all the memory.
self._processed_symbols_count = 0
# Objects required to handle the addr2line subprocess.
self._proc = None # Subprocess.Popen(...) instance.
self._thread = None # Threading.thread instance.
self._out_queue = None # Queue.Queue instance (for buffering a2l stdout).
self._RestartAddr2LineProcess()
def _handle_solo_scout(self):
"""
Handles a solo scout
:return: Path of scout image
"""
card = await self._scout_cards()
# Send error message if no card was returned
if not card:
self.results = []
return None
card = card[0]
if card["card_image"] is None:
url = "http:" + card["card_idolized_image"]
else:
url = "http:" + card["card_image"]
fname = basename(urlsplit(url).path)
image_path = idol_img_path.joinpath(fname)
bytes_ = await get_one_img(
url, image_path, self._bot.session_manager)
return ScoutImage(bytes_, fname)
def test_realpath_resolve_first(self):
# Bug #1213894: The first component of the path, if not absolute,
# must be resolved too.
try:
old_path = abspath('.')
os.mkdir(ABSTFN)
os.mkdir(ABSTFN + "/k")
os.symlink(ABSTFN, ABSTFN + "link")
os.chdir(dirname(ABSTFN))
base = basename(ABSTFN)
self.assertEqual(realpath(base + "link"), ABSTFN)
self.assertEqual(realpath(base + "link/k"), ABSTFN + "/k")
finally:
os.chdir(old_path)
support.unlink(ABSTFN + "link")
safe_rmdir(ABSTFN + "/k")
safe_rmdir(ABSTFN)
def _find_url_name(self, index_url, url_name, req):
"""Finds the true URL name of a package, when the given name isn't quite correct.
This is usually used to implement case-insensitivity."""
if not index_url.url.endswith('/'):
# Vaguely part of the PyPI API... weird but true.
## FIXME: bad to modify this?
index_url.url += '/'
page = self._get_page(index_url, req)
if page is None:
logger.fatal('Cannot fetch index base URL %s' % index_url)
return
norm_name = normalize_name(req.url_name)
for link in page.links:
base = posixpath.basename(link.path.rstrip('/'))
if norm_name == normalize_name(base):
logger.notify('Real name of requirement %s is %s' % (url_name, base))
return base
return None
def score_url(self, url):
"""
Give an url a score which can be used to choose preferred URLs
for a given project release.
"""
t = urlparse(url)
basename = posixpath.basename(t.path)
compatible = True
is_wheel = basename.endswith('.whl')
if is_wheel:
compatible = is_compatible(Wheel(basename), self.wheel_tags)
return (t.scheme != 'https', 'pypi.python.org' in t.netloc,
is_wheel, compatible, basename)
def filename(self):
_, netloc, path, _, _ = urllib_parse.urlsplit(self.url)
name = posixpath.basename(path.rstrip('/')) or netloc
name = urllib_parse.unquote(name)
assert name, ('URL %r produced no filename' % self.url)
return name
def splitext(self):
return splitext(posixpath.basename(self.path.rstrip('/')))
def show_url(self):
return posixpath.basename(self.url.split('#', 1)[0].split('?', 1)[0])
def score_url(self, url):
"""
Give an url a score which can be used to choose preferred URLs
for a given project release.
"""
t = urlparse(url)
basename = posixpath.basename(t.path)
compatible = True
is_wheel = basename.endswith('.whl')
if is_wheel:
compatible = is_compatible(Wheel(basename), self.wheel_tags)
return (t.scheme != 'https', 'pypi.python.org' in t.netloc,
is_wheel, compatible, basename)
def filename(self):
_, netloc, path, _, _ = urllib_parse.urlsplit(self.url)
name = posixpath.basename(path.rstrip('/')) or netloc
name = urllib_parse.unquote(name)
assert name, ('URL %r produced no filename' % self.url)
return name
def splitext(self):
return splitext(posixpath.basename(self.path.rstrip('/')))
def show_url(self):
return posixpath.basename(self.url.split('#', 1)[0].split('?', 1)[0])
def score_url(self, url):
"""
Give an url a score which can be used to choose preferred URLs
for a given project release.
"""
t = urlparse(url)
return (t.scheme != 'https', 'pypi.python.org' in t.netloc,
posixpath.basename(t.path))
def filename(self):
_, netloc, path, _, _ = urlparse.urlsplit(self.url)
name = posixpath.basename(path.rstrip('/')) or netloc
assert name, ('URL %r produced no filename' % self.url)
return name
def splitext(self):
return splitext(posixpath.basename(self.path.rstrip('/')))
def show_url(self):
return posixpath.basename(self.url.split('#', 1)[0].split('?', 1)[0])