def LoadPlugins(plugins, verbose):
if plugins == '':
return
scriptPath = os.path.dirname(sys.argv[0])
for plugin in sum(map(ProcessAt, plugins.split(',')), []):
try:
if not plugin.lower().endswith('.py'):
plugin += '.py'
if os.path.dirname(plugin) == '':
if not os.path.exists(plugin):
scriptPlugin = os.path.join(scriptPath, plugin)
if os.path.exists(scriptPlugin):
plugin = scriptPlugin
exec(open(plugin, 'r').read())
except Exception as e:
print('Error loading plugin: %s' % plugin)
if verbose:
raise e
python类path()的实例源码
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location, such as /tmp, it opens up an opportunity for an attacker to
replace an extracted file with an unauthorized payload. Warn the user
if a known insecure location is used.
See Distribute #375 for more details.
"""
if os.name == 'nt' and not path.startswith(os.environ['windir']):
# On Windows, permissions are generally restrictive by default
# and temp directories are not writable by other users, so
# bypass the warning.
return
mode = os.stat(path).st_mode
if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
msg = ("%s is writable by group/others and vulnerable to attack "
"when "
"used with get_resource_filename. Consider a more secure "
"location (set with .set_extraction_path or the "
"PYTHON_EGG_CACHE environment variable)." % path)
warnings.warn(msg, UserWarning)
def atomic_writer(file_path, mode):
"""Atomic file writer.
:param file_path: path of file to write to.
:type file_path: ``unicode``
:param mode: sames as for `func:open`
:type mode: string
.. versionadded:: 1.12
Context manager that ensures the file is only written if the write
succeeds. The data is first written to a temporary file.
"""
temp_suffix = '.aw.temp'
temp_file_path = file_path + temp_suffix
with open(temp_file_path, mode) as file_obj:
try:
yield file_obj
os.rename(temp_file_path, file_path)
finally:
try:
os.remove(temp_file_path)
except (OSError, IOError):
pass
def cachedir(self):
"""Path to workflow's cache directory.
The cache directory is a subdirectory of Alfred's own cache directory
in ``~/Library/Caches``. The full path is:
``~/Library/Caches/com.runningwithcrayons.Alfred-X/Workflow Data/<bundle id>``
``Alfred-X`` may be ``Alfred-2`` or ``Alfred-3``.
:returns: full path to workflow's cache directory
:rtype: ``unicode``
"""
if self.alfred_env.get('workflow_cache'):
dirpath = self.alfred_env.get('workflow_cache')
else:
dirpath = self._default_cachedir
return self._create(dirpath)
def datadir(self):
"""Path to workflow's data directory.
The data directory is a subdirectory of Alfred's own data directory in
``~/Library/Application Support``. The full path is:
``~/Library/Application Support/Alfred 2/Workflow Data/<bundle id>``
:returns: full path to workflow data directory
:rtype: ``unicode``
"""
if self.alfred_env.get('workflow_data'):
dirpath = self.alfred_env.get('workflow_data')
else:
dirpath = self._default_datadir
return self._create(dirpath)
def _delete_directory_contents(self, dirpath, filter_func):
"""Delete all files in a directory.
:param dirpath: path to directory to clear
:type dirpath: ``unicode`` or ``str``
:param filter_func function to determine whether a file shall be
deleted or not.
:type filter_func ``callable``
"""
if os.path.exists(dirpath):
for filename in os.listdir(dirpath):
if not filter_func(filename):
continue
path = os.path.join(dirpath, filename)
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.unlink(path)
self.logger.debug('Deleted : %r', path)
def test_rar_detection(self):
"""
Tests the rar file detection process
"""
from newsreap.codecs.CodecRar import RAR_PART_RE
result = RAR_PART_RE.match('/path/to/test.rar')
assert result is not None
assert result.group('part') is None
result = RAR_PART_RE.match('/path/to/test.RaR')
assert result is not None
assert result.group('part') is None
result = RAR_PART_RE.match('/path/to/test.r01')
assert result is not None
assert result.group('part') == '01'
result = RAR_PART_RE.match('/path/to/test.R200')
assert result is not None
assert result.group('part') == '200'
result = RAR_PART_RE.match('/path/to/test.part65.rar')
assert result is not None
assert result.group('part') == '65'
def test_7z_detection(self):
"""
Tests the 7z file detection process
"""
from newsreap.codecs.Codec7Zip import SEVEN_ZIP_PART_RE
result = SEVEN_ZIP_PART_RE.match('/path/to/test.7z')
assert result is not None
assert result.group('part') is None
result = SEVEN_ZIP_PART_RE.match('/path/to/test.7Z')
assert result is not None
assert result.group('part') is None
result = SEVEN_ZIP_PART_RE.match('/path/to/test.7z.001')
assert result is not None
assert result.group('part0') == '001'
result = SEVEN_ZIP_PART_RE.match('/path/to/test.part65.7z')
assert result is not None
assert result.group('part') == '65'
def _build_from_requirements(cls, req_spec):
"""
Build a working set from a requirement spec. Rewrites sys.path.
"""
# try it without defaults already on sys.path
# by starting with an empty path
ws = cls([])
reqs = parse_requirements(req_spec)
dists = ws.resolve(reqs, Environment())
for dist in dists:
ws.add(dist)
# add any missing entries from sys.path
for entry in sys.path:
if entry not in ws.entries:
ws.add_entry(entry)
# then copy back to sys.path
sys.path[:] = ws.entries
return ws
def __init__(self, search_path=None, platform=get_supported_platform(),
python=PY_MAJOR):
"""Snapshot distributions available on a search path
Any distributions found on `search_path` are added to the environment.
`search_path` should be a sequence of ``sys.path`` items. If not
supplied, ``sys.path`` is used.
`platform` is an optional string specifying the name of the platform
that platform-specific distributions must be compatible with. If
unspecified, it defaults to the current platform. `python` is an
optional string naming the desired version of Python (e.g. ``'3.3'``);
it defaults to the current version.
You may explicitly set `platform` (and/or `python`) to ``None`` if you
wish to map *all* distributions, not just those compatible with the
running platform or Python version.
"""
self._distmap = {}
self.platform = platform
self.python = python
self.scan(search_path)
def get_cache_path(self, archive_name, names=()):
"""Return absolute location in cache for `archive_name` and `names`
The parent directory of the resulting path will be created if it does
not already exist. `archive_name` should be the base filename of the
enclosing egg (which may not be the name of the enclosing zipfile!),
including its ".egg" extension. `names`, if provided, should be a
sequence of path name parts "under" the egg's extraction location.
This method should only be called by resource providers that need to
obtain an extraction location, and only for names they intend to
extract, as it tracks the generated names for possible cleanup later.
"""
extract_path = self.extraction_path or get_default_cache()
target_path = os.path.join(extract_path, archive_name + '-tmp', *names)
try:
_bypass_ensure_directory(target_path)
except:
self.extraction_error()
self._warn_unsafe_extraction_path(extract_path)
self.cached_files[target_path] = 1
return target_path
def build(cls, path):
"""
Build a dictionary similar to the zipimport directory
caches, except instead of tuples, store ZipInfo objects.
Use a platform-specific path separator (os.sep) for the path keys
for compatibility with pypy on Windows.
"""
with ContextualZipFile(path) as zfile:
items = (
(
name.replace('/', os.sep),
zfile.getinfo(name),
)
for name in zfile.namelist()
)
return dict(items)
def _index(self):
try:
return self._dirindex
except AttributeError:
ind = {}
for path in self.zipinfo:
parts = path.split(os.sep)
while parts:
parent = os.sep.join(parts[:-1])
if parent in ind:
ind[parent].append(parts[-1])
break
else:
ind[parent] = [parts.pop()]
self._dirindex = ind
return ind
def find_eggs_in_zip(importer, path_item, only=False):
"""
Find eggs in zip files; possibly multiple nested eggs.
"""
if importer.archive.endswith('.whl'):
# wheels are not supported with this finder
# they don't have PKG-INFO metadata, and won't ever contain eggs
return
metadata = EggMetadata(importer)
if metadata.has_metadata('PKG-INFO'):
yield Distribution.from_filename(path_item, metadata=metadata)
if only:
# don't yield nested distros
return
for subitem in metadata.resource_listdir('/'):
if _is_unpacked_egg(subitem):
subpath = os.path.join(path_item, subitem)
for dist in find_eggs_in_zip(zipimport.zipimporter(subpath), subpath):
yield dist
def _by_version_descending(names):
"""
Given a list of filenames, return them in descending order
by version number.
>>> names = 'bar', 'foo', 'Python-2.7.10.egg', 'Python-2.7.2.egg'
>>> _by_version_descending(names)
['Python-2.7.10.egg', 'Python-2.7.2.egg', 'foo', 'bar']
>>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.egg'
>>> _by_version_descending(names)
['Setuptools-1.2.3.egg', 'Setuptools-1.2.3b1.egg']
>>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.post1.egg'
>>> _by_version_descending(names)
['Setuptools-1.2.3.post1.egg', 'Setuptools-1.2.3b1.egg']
"""
def _by_version(name):
"""
Parse each component of the filename
"""
name, ext = os.path.splitext(name)
parts = itertools.chain(name.split('-'), [ext])
return [packaging.version.parse(part) for part in parts]
return sorted(names, key=_by_version, reverse=True)
def register_namespace_handler(importer_type, namespace_handler):
"""Register `namespace_handler` to declare namespace packages
`importer_type` is the type or class of a PEP 302 "Importer" (sys.path item
handler), and `namespace_handler` is a callable like this::
def namespace_handler(importer, path_entry, moduleName, module):
# return a path_entry to use for child packages
Namespace handlers are only called if the importer object has already
agreed that it can handle the relevant path item, and they should only
return a subpath if the module __path__ does not already contain an
equivalent subpath. For an example namespace handler, see
``pkg_resources.file_ns_handler``.
"""
_namespace_handlers[importer_type] = namespace_handler
def _handle_ns(packageName, path_item):
"""Ensure that named package includes a subpath of path_item (if needed)"""
importer = get_importer(path_item)
if importer is None:
return None
loader = importer.find_module(packageName)
if loader is None:
return None
module = sys.modules.get(packageName)
if module is None:
module = sys.modules[packageName] = types.ModuleType(packageName)
module.__path__ = []
_set_parent_ns(packageName)
elif not hasattr(module, '__path__'):
raise TypeError("Not a package:", packageName)
handler = _find_adapter(_namespace_handlers, importer)
subpath = handler(importer, path_item, packageName, module)
if subpath is not None:
path = module.__path__
path.append(subpath)
loader.load_module(packageName)
_rebuild_mod_path(path, packageName, module)
return subpath
def check_version_conflict(self):
if self.key == 'setuptools':
# ignore the inevitable setuptools self-conflicts :(
return
nsp = dict.fromkeys(self._get_metadata('namespace_packages.txt'))
loc = normalize_path(self.location)
for modname in self._get_metadata('top_level.txt'):
if (modname not in sys.modules or modname in nsp
or modname in _namespace_packages):
continue
if modname in ('pkg_resources', 'setuptools', 'site'):
continue
fn = getattr(sys.modules[modname], '__file__', None)
if fn and (normalize_path(fn).startswith(loc) or
fn.startswith(self.location)):
continue
issue_warning(
"Module %s was already imported from %s, but %s is being added"
" to sys.path" % (modname, fn, self.location),
)
def __init__(self, name=None, delete=None):
# If we were not given an explicit directory, and we were not given an
# explicit delete option, then we'll default to deleting.
if name is None and delete is None:
delete = True
if name is None:
# We realpath here because some systems have their default tmpdir
# symlinked to another directory. This tends to confuse build
# scripts, so we canonicalize the path by traversing potential
# symlinks here.
name = os.path.realpath(tempfile.mkdtemp(prefix="pip-build-"))
# If we were not given an explicit directory, and we were not given
# an explicit delete option, then we'll default to deleting.
if delete is None:
delete = True
self.name = name
self.delete = delete
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
def fix_script(path):
"""Replace #!python with #!/path/to/python
Return True if file was changed."""
# XXX RECORD hashes will need to be updated
if os.path.isfile(path):
with open(path, 'rb') as script:
firstline = script.readline()
if not firstline.startswith(b'#!python'):
return False
exename = sys.executable.encode(sys.getfilesystemencoding())
firstline = b'#!' + exename + os.linesep.encode("ascii")
rest = script.read()
with open(path, 'wb') as script:
script.write(firstline)
script.write(rest)
return True
def uninstallation_paths(dist):
"""
Yield all the uninstallation paths for dist based on RECORD-without-.pyc
Yield paths to all the files in RECORD. For each .py file in RECORD, add
the .pyc in the same directory.
UninstallPathSet.add() takes care of the __pycache__ .pyc.
"""
from pip.utils import FakeFile # circular import
r = csv.reader(FakeFile(dist.get_metadata_lines('RECORD')))
for row in r:
path = os.path.join(dist.location, row[0])
yield path
if path.endswith('.py'):
dn, fn = os.path.split(path)
base = fn[:-3]
path = os.path.join(dn, base + '.pyc')
yield path
def _build_one(self, req, output_dir, python_tag=None):
"""Build one wheel.
:return: The filename of the built wheel, or None if the build failed.
"""
tempd = tempfile.mkdtemp('pip-wheel-')
try:
if self.__build_one(req, tempd, python_tag=python_tag):
try:
wheel_name = os.listdir(tempd)[0]
wheel_path = os.path.join(output_dir, wheel_name)
shutil.move(os.path.join(tempd, wheel_name), wheel_path)
logger.info('Stored in directory: %s', output_dir)
return wheel_path
except:
pass
# Ignore return, we can't do anything else useful.
self._clean_one(req)
return None
finally:
rmtree(tempd)
def load(self):
# XXX JSON is not a great database
for path in load_config_paths('wheel'):
conf = os.path.join(native(path), self.CONFIG_NAME)
if os.path.exists(conf):
with open(conf, 'r') as infile:
self.data = json.load(infile)
for x in ('signers', 'verifiers'):
if not x in self.data:
self.data[x] = []
if 'schema' not in self.data:
self.data['schema'] = self.SCHEMA
elif self.data['schema'] != self.SCHEMA:
raise ValueError(
"Bad wheel.json version {0}, expected {1}".format(
self.data['schema'], self.SCHEMA))
break
return self
def _build_from_requirements(cls, req_spec):
"""
Build a working set from a requirement spec. Rewrites sys.path.
"""
# try it without defaults already on sys.path
# by starting with an empty path
ws = cls([])
reqs = parse_requirements(req_spec)
dists = ws.resolve(reqs, Environment())
for dist in dists:
ws.add(dist)
# add any missing entries from sys.path
for entry in sys.path:
if entry not in ws.entries:
ws.add_entry(entry)
# then copy back to sys.path
sys.path[:] = ws.entries
return ws
def __init__(self, search_path=None, platform=get_supported_platform(),
python=PY_MAJOR):
"""Snapshot distributions available on a search path
Any distributions found on `search_path` are added to the environment.
`search_path` should be a sequence of ``sys.path`` items. If not
supplied, ``sys.path`` is used.
`platform` is an optional string specifying the name of the platform
that platform-specific distributions must be compatible with. If
unspecified, it defaults to the current platform. `python` is an
optional string naming the desired version of Python (e.g. ``'3.3'``);
it defaults to the current version.
You may explicitly set `platform` (and/or `python`) to ``None`` if you
wish to map *all* distributions, not just those compatible with the
running platform or Python version.
"""
self._distmap = {}
self.platform = platform
self.python = python
self.scan(search_path)
def get_cache_path(self, archive_name, names=()):
"""Return absolute location in cache for `archive_name` and `names`
The parent directory of the resulting path will be created if it does
not already exist. `archive_name` should be the base filename of the
enclosing egg (which may not be the name of the enclosing zipfile!),
including its ".egg" extension. `names`, if provided, should be a
sequence of path name parts "under" the egg's extraction location.
This method should only be called by resource providers that need to
obtain an extraction location, and only for names they intend to
extract, as it tracks the generated names for possible cleanup later.
"""
extract_path = self.extraction_path or get_default_cache()
target_path = os.path.join(extract_path, archive_name + '-tmp', *names)
try:
_bypass_ensure_directory(target_path)
except:
self.extraction_error()
self._warn_unsafe_extraction_path(extract_path)
self.cached_files[target_path] = 1
return target_path
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location, such as /tmp, it opens up an opportunity for an attacker to
replace an extracted file with an unauthorized payload. Warn the user
if a known insecure location is used.
See Distribute #375 for more details.
"""
if os.name == 'nt' and not path.startswith(os.environ['windir']):
# On Windows, permissions are generally restrictive by default
# and temp directories are not writable by other users, so
# bypass the warning.
return
mode = os.stat(path).st_mode
if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
msg = ("%s is writable by group/others and vulnerable to attack "
"when "
"used with get_resource_filename. Consider a more secure "
"location (set with .set_extraction_path or the "
"PYTHON_EGG_CACHE environment variable)." % path)
warnings.warn(msg, UserWarning)
def run_script(self, script_name, namespace):
script = 'scripts/' + script_name
if not self.has_metadata(script):
raise ResolutionError("No script named %r" % script_name)
script_text = self.get_metadata(script).replace('\r\n', '\n')
script_text = script_text.replace('\r', '\n')
script_filename = self._fn(self.egg_info, script)
namespace['__file__'] = script_filename
if os.path.exists(script_filename):
source = open(script_filename).read()
code = compile(source, script_filename, 'exec')
exec(code, namespace, namespace)
else:
from linecache import cache
cache[script_filename] = (
len(script_text), 0, script_text.split('\n'), script_filename
)
script_code = compile(script_text, script_filename, 'exec')
exec(script_code, namespace, namespace)
def build(cls, path):
"""
Build a dictionary similar to the zipimport directory
caches, except instead of tuples, store ZipInfo objects.
Use a platform-specific path separator (os.sep) for the path keys
for compatibility with pypy on Windows.
"""
with ContextualZipFile(path) as zfile:
items = (
(
name.replace('/', os.sep),
zfile.getinfo(name),
)
for name in zfile.namelist()
)
return dict(items)