def get_package_loader(self, package, package_path):
from pkg_resources import DefaultProvider, ResourceManager, \
get_provider
loadtime = datetime.utcnow()
provider = get_provider(package)
manager = ResourceManager()
filesystem_bound = isinstance(provider, DefaultProvider)
def loader(path):
if path is None:
return None, None
path = posixpath.join(package_path, path)
if not provider.has_resource(path):
return None, None
basename = posixpath.basename(path)
if filesystem_bound:
return basename, self._opener(
provider.get_resource_filename(manager, path))
return basename, lambda: (
provider.get_resource_stream(manager, path),
loadtime,
0
)
return loader
python类join()的实例源码
def unpack_directory(filename, extract_dir, progress_filter=default_filter):
""""Unpack" a directory, using the same interface as for archives
Raises ``UnrecognizedFormat`` if `filename` is not a directory
"""
if not os.path.isdir(filename):
raise UnrecognizedFormat("%s is not a directory" % filename)
paths = {
filename: ('', extract_dir),
}
for base, dirs, files in os.walk(filename):
src, dst = paths[base]
for d in dirs:
paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
for f in files:
target = os.path.join(dst, f)
target = progress_filter(src + f, target)
if not target:
# skip non-files
continue
ensure_directory(target)
f = os.path.join(base, f)
shutil.copyfile(f, target)
shutil.copystat(f, target)
def make_temp_file_path_from_url(temp_dir_path, url):
try:
url_path = urlparse(url).path
except AttributeError:
raise InvalidFilePathError("url must be a string")
if typepy.is_null_string(url_path):
raise InvalidFilePathError("invalid URL path: {}".format(url_path))
temp_name = os.path.basename(url_path.rstrip("/"))
if typepy.is_null_string(temp_name):
temp_name = pathvalidate.replace_symbol(
temp_name, replacement_text="_")
if typepy.is_null_string(temp_name):
raise InvalidFilePathError("invalid URL: {}".format(url))
try:
return posixpath.join(temp_dir_path, temp_name)
except (TypeError, AttributeError):
raise InvalidFilePathError("temp_dir_path must be a string")
def distinfo_dirname(cls, name, version):
"""
The *name* and *version* parameters are converted into their
filename-escaped form, i.e. any ``'-'`` characters are replaced
with ``'_'`` other than the one in ``'dist-info'`` and the one
separating the name from the version number.
:parameter name: is converted to a standard distribution name by replacing
any runs of non- alphanumeric characters with a single
``'-'``.
:type name: string
:parameter version: is converted to a standard version string. Spaces
become dots, and all other non-alphanumeric characters
(except dots) become dashes, with runs of multiple
dashes condensed to a single dash.
:type version: string
:returns: directory name
:rtype: string"""
name = name.replace('-', '_')
return '-'.join([name, version]) + DISTINFO_EXT
def _get_records(self):
"""
Get the list of installed files for the distribution
:return: A list of tuples of path, hash and size. Note that hash and
size might be ``None`` for some entries. The path is exactly
as stored in the file (which is as in PEP 376).
"""
results = []
r = self.get_distinfo_resource('RECORD')
with contextlib.closing(r.as_stream()) as stream:
with CSVReader(stream=stream) as record_reader:
# Base location is parent dir of .dist-info dir
#base_location = os.path.dirname(self.path)
#base_location = os.path.abspath(base_location)
for row in record_reader:
missing = [None for i in range(len(row), 3)]
path, checksum, size = row + missing
#if not os.path.isabs(path):
# path = path.replace('/', os.sep)
# path = os.path.join(base_location, path)
results.append((path, checksum, size))
return results
def write_shared_locations(self, paths, dry_run=False):
"""
Write shared location information to the SHARED file in .dist-info.
:param paths: A dictionary as described in the documentation for
:meth:`shared_locations`.
:param dry_run: If True, the action is logged but no file is actually
written.
:return: The path of the file written to.
"""
shared_path = os.path.join(self.path, 'SHARED')
logger.info('creating %s', shared_path)
if dry_run:
return None
lines = []
for key in ('prefix', 'lib', 'headers', 'scripts', 'data'):
path = paths[key]
if os.path.isdir(paths[key]):
lines.append('%s=%s' % (key, path))
for ns in paths.get('namespace', ()):
lines.append('namespace=%s' % ns)
with codecs.open(shared_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
return shared_path
def check_installed_files(self):
"""
Checks that the hashes and sizes of the files in ``RECORD`` are
matched by the files themselves. Returns a (possibly empty) list of
mismatches. Each entry in the mismatch list will be a tuple consisting
of the path, 'exists', 'size' or 'hash' according to what didn't match
(existence is checked first, then size, then hash), the expected
value and the actual value.
"""
mismatches = []
record_path = os.path.join(self.path, 'installed-files.txt')
if os.path.exists(record_path):
for path, _, _ in self.list_installed_files():
if path == record_path:
continue
if not os.path.exists(path):
mismatches.append((path, 'exists', True, False))
return mismatches
def _get_index_urls_locations(self, project_name):
"""Returns the locations found via self.index_urls
Checks the url_name on the main (first in the list) index and
use this url_name to produce all locations
"""
def mkurl_pypi_url(url):
loc = posixpath.join(
url,
urllib_parse.quote(canonicalize_name(project_name)))
# For maximum compatibility with easy_install, ensure the path
# ends in a trailing slash. Although this isn't in the spec
# (and PyPI can handle it without the slash) some other index
# implementations might break if they relied on easy_install's
# behavior.
if not loc.endswith('/'):
loc = loc + '/'
return loc
return [mkurl_pypi_url(url) for url in self.index_urls]
def distinfo_dirname(cls, name, version):
"""
The *name* and *version* parameters are converted into their
filename-escaped form, i.e. any ``'-'`` characters are replaced
with ``'_'`` other than the one in ``'dist-info'`` and the one
separating the name from the version number.
:parameter name: is converted to a standard distribution name by replacing
any runs of non- alphanumeric characters with a single
``'-'``.
:type name: string
:parameter version: is converted to a standard version string. Spaces
become dots, and all other non-alphanumeric characters
(except dots) become dashes, with runs of multiple
dashes condensed to a single dash.
:type version: string
:returns: directory name
:rtype: string"""
name = name.replace('-', '_')
return '-'.join([name, version]) + DISTINFO_EXT
def _get_records(self):
"""
Get the list of installed files for the distribution
:return: A list of tuples of path, hash and size. Note that hash and
size might be ``None`` for some entries. The path is exactly
as stored in the file (which is as in PEP 376).
"""
results = []
r = self.get_distinfo_resource('RECORD')
with contextlib.closing(r.as_stream()) as stream:
with CSVReader(stream=stream) as record_reader:
# Base location is parent dir of .dist-info dir
#base_location = os.path.dirname(self.path)
#base_location = os.path.abspath(base_location)
for row in record_reader:
missing = [None for i in range(len(row), 3)]
path, checksum, size = row + missing
#if not os.path.isabs(path):
# path = path.replace('/', os.sep)
# path = os.path.join(base_location, path)
results.append((path, checksum, size))
return results
def shared_locations(self):
"""
A dictionary of shared locations whose keys are in the set 'prefix',
'purelib', 'platlib', 'scripts', 'headers', 'data' and 'namespace'.
The corresponding value is the absolute path of that category for
this distribution, and takes into account any paths selected by the
user at installation time (e.g. via command-line arguments). In the
case of the 'namespace' key, this would be a list of absolute paths
for the roots of namespace packages in this distribution.
The first time this property is accessed, the relevant information is
read from the SHARED file in the .dist-info directory.
"""
result = {}
shared_path = os.path.join(self.path, 'SHARED')
if os.path.isfile(shared_path):
with codecs.open(shared_path, 'r', encoding='utf-8') as f:
lines = f.read().splitlines()
for line in lines:
key, value = line.split('=', 1)
if key == 'namespace':
result.setdefault(key, []).append(value)
else:
result[key] = value
return result
def write_shared_locations(self, paths, dry_run=False):
"""
Write shared location information to the SHARED file in .dist-info.
:param paths: A dictionary as described in the documentation for
:meth:`shared_locations`.
:param dry_run: If True, the action is logged but no file is actually
written.
:return: The path of the file written to.
"""
shared_path = os.path.join(self.path, 'SHARED')
logger.info('creating %s', shared_path)
if dry_run:
return None
lines = []
for key in ('prefix', 'lib', 'headers', 'scripts', 'data'):
path = paths[key]
if os.path.isdir(paths[key]):
lines.append('%s=%s' % (key, path))
for ns in paths.get('namespace', ()):
lines.append('namespace=%s' % ns)
with codecs.open(shared_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
return shared_path
def _yield_distributions(self):
"""
Yield .dist-info and/or .egg(-info) distributions.
"""
# We need to check if we've seen some resources already, because on
# some Linux systems (e.g. some Debian/Ubuntu variants) there are
# symlinks which alias other files in the environment.
seen = set()
for path in self.path:
finder = resources.finder_for_path(path)
if finder is None:
continue
r = finder.find('')
if not r or not r.is_container:
continue
rset = sorted(r.resources)
for entry in rset:
r = finder.find(entry)
if not r or r.path in seen:
continue
if self._include_dist and entry.endswith(DISTINFO_EXT):
possible_filenames = [METADATA_FILENAME, WHEEL_METADATA_FILENAME]
for metadata_filename in possible_filenames:
metadata_path = posixpath.join(entry, metadata_filename)
pydist = finder.find(metadata_path)
if pydist:
break
else:
continue
with contextlib.closing(pydist.as_stream()) as stream:
metadata = Metadata(fileobj=stream, scheme='legacy')
logger.debug('Found %s', r.path)
seen.add(r.path)
yield new_dist_class(r.path, metadata=metadata,
env=self)
elif self._include_egg and entry.endswith(('.egg-info',
'.egg')):
logger.debug('Found %s', r.path)
seen.add(r.path)
yield old_dist_class(r.path, self)
def write_installed_files(self, paths, prefix, dry_run=False):
"""
Writes the ``RECORD`` file, using the ``paths`` iterable passed in. Any
existing ``RECORD`` file is silently overwritten.
prefix is used to determine when to write absolute paths.
"""
prefix = os.path.join(prefix, '')
base = os.path.dirname(self.path)
base_under_prefix = base.startswith(prefix)
base = os.path.join(base, '')
record_path = self.get_distinfo_file('RECORD')
logger.info('creating %s', record_path)
if dry_run:
return None
with CSVWriter(record_path) as writer:
for path in paths:
if os.path.isdir(path) or path.endswith(('.pyc', '.pyo')):
# do not put size and hash, as in PEP-376
hash_value = size = ''
else:
size = '%d' % os.path.getsize(path)
with open(path, 'rb') as fp:
hash_value = self.get_hash(fp.read())
if path.startswith(base) or (base_under_prefix and
path.startswith(prefix)):
path = os.path.relpath(path, base)
writer.writerow((path, hash_value, size))
# add the RECORD file itself
if record_path.startswith(base):
record_path = os.path.relpath(record_path, base)
writer.writerow((record_path, '', ''))
return record_path
def check_installed_files(self):
"""
Checks that the hashes and sizes of the files in ``RECORD`` are
matched by the files themselves. Returns a (possibly empty) list of
mismatches. Each entry in the mismatch list will be a tuple consisting
of the path, 'exists', 'size' or 'hash' according to what didn't match
(existence is checked first, then size, then hash), the expected
value and the actual value.
"""
mismatches = []
base = os.path.dirname(self.path)
record_path = self.get_distinfo_file('RECORD')
for path, hash_value, size in self.list_installed_files():
if not os.path.isabs(path):
path = os.path.join(base, path)
if path == record_path:
continue
if not os.path.exists(path):
mismatches.append((path, 'exists', True, False))
elif os.path.isfile(path):
actual_size = str(os.path.getsize(path))
if size and actual_size != size:
mismatches.append((path, 'size', size, actual_size))
elif hash_value:
if '=' in hash_value:
hasher = hash_value.split('=', 1)[0]
else:
hasher = None
with open(path, 'rb') as f:
actual_hash = self.get_hash(f.read(), hasher)
if actual_hash != hash_value:
mismatches.append((path, 'hash', hash_value, actual_hash))
return mismatches
def get_distinfo_file(self, path):
"""
Returns a path located under the ``.dist-info`` directory. Returns a
string representing the path.
:parameter path: a ``'/'``-separated path relative to the
``.dist-info`` directory or an absolute path;
If *path* is an absolute path and doesn't start
with the ``.dist-info`` directory path,
a :class:`DistlibException` is raised
:type path: str
:rtype: str
"""
# Check if it is an absolute path # XXX use relpath, add tests
if path.find(os.sep) >= 0:
# it's an absolute path?
distinfo_dirname, path = path.split(os.sep)[-2:]
if distinfo_dirname != self.path.split(os.sep)[-1]:
raise DistlibException(
'dist-info file %r does not belong to the %r %s '
'distribution' % (path, self.name, self.version))
# The file must be relative
if path not in DIST_FILES:
raise DistlibException('invalid path for a dist-info file: '
'%r at %r' % (path, self.path))
return os.path.join(self.path, path)
def list_distinfo_files(self):
"""
Iterates over the ``RECORD`` entries and returns paths for each line if
the path is pointing to a file located in the ``.dist-info`` directory
or one of its subdirectories.
:returns: iterator of paths
"""
base = os.path.dirname(self.path)
for path, checksum, size in self._get_records():
# XXX add separator or use real relpath algo
if not os.path.isabs(path):
path = os.path.join(base, path)
if path.startswith(self.path):
yield path
def list_installed_files(self):
"""
Iterates over the ``installed-files.txt`` entries and returns a tuple
``(path, hash, size)`` for each line.
:returns: a list of (path, hash, size)
"""
def _md5(path):
f = open(path, 'rb')
try:
content = f.read()
finally:
f.close()
return hashlib.md5(content).hexdigest()
def _size(path):
return os.stat(path).st_size
record_path = os.path.join(self.path, 'installed-files.txt')
result = []
if os.path.exists(record_path):
with codecs.open(record_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
p = os.path.normpath(os.path.join(self.path, line))
# "./" is present as a marker between installed files
# and installation metadata files
if not os.path.exists(p):
logger.warning('Non-existent file: %s', p)
if p.endswith(('.pyc', '.pyo')):
continue
#otherwise fall through and fail
if not os.path.isdir(p):
result.append((p, _md5(p), _size(p)))
result.append((record_path, None, None))
return result
def repr_node(self, dist, level=1):
"""Prints only a subgraph"""
output = [self._repr_dist(dist)]
for other, label in self.adjacency_list[dist]:
dist = self._repr_dist(other)
if label is not None:
dist = '%s [%s]' % (dist, label)
output.append(' ' * level + str(dist))
suboutput = self.repr_node(other, level + 1)
subs = suboutput.split('\n')
output.extend(subs[1:])
return '\n'.join(output)
def __repr__(self):
"""Representation of the graph"""
output = []
for dist, adjs in self.adjacency_list.items():
output.append(self.repr_node(dist))
return '\n'.join(output)