def test_egg_installed_paths(self):
here = os.path.abspath(os.path.dirname(__file__))
expected_prefix = os.path.join(here, 'test_apps')
real_prefix, sys.prefix = sys.prefix, expected_prefix
site_packages = os.path.join(expected_prefix, 'lib', 'python2.5', 'site-packages')
egg_path = os.path.join(site_packages, 'SiteEgg.egg')
sys.path.append(site_packages)
sys.path.append(egg_path)
try:
import site_egg # in SiteEgg.egg
self.assert_equal(site_egg.app.instance_path,
os.path.join(expected_prefix, 'var',
'site_egg-instance'))
finally:
sys.prefix = real_prefix
sys.path.remove(site_packages)
sys.path.remove(egg_path)
if 'site_egg' in sys.modules:
del sys.modules['site_egg']
python类prefix()的实例源码
def build_and_install(self, setup_script, setup_base):
args = ['bdist_egg', '--dist-dir']
dist_dir = tempfile.mkdtemp(
prefix='egg-dist-tmp-', dir=os.path.dirname(setup_script)
)
try:
self._set_fetcher_options(os.path.dirname(setup_script))
args.append(dist_dir)
self.run_setup(setup_script, setup_base, args)
all_eggs = Environment([dist_dir])
eggs = []
for key in all_eggs:
for dist in all_eggs[key]:
eggs.append(self.install_egg(dist.location, setup_base))
if not eggs and not self.dry_run:
log.warn("No eggs found in %s (setup script problem?)",
dist_dir)
return eggs
finally:
rmtree(dist_dir)
log.set_verbosity(self.verbose) # restore our log verbosity
def get_resources_dests(resources_root, rules):
"""Find destinations for resources files"""
def get_rel_path(root, path):
# normalizes and returns a lstripped-/-separated path
root = root.replace(os.path.sep, '/')
path = path.replace(os.path.sep, '/')
assert path.startswith(root)
return path[len(root):].lstrip('/')
destinations = {}
for base, suffix, dest in rules:
prefix = os.path.join(resources_root, base)
for abs_base in iglob(prefix):
abs_glob = os.path.join(abs_base, suffix)
for abs_path in iglob(abs_glob):
resource_file = get_rel_path(resources_root, abs_path)
if dest is None: # remove the entry if it was here
destinations.pop(resource_file, None)
else:
rel_path = get_rel_path(abs_base, abs_path)
rel_dest = dest.replace(os.path.sep, '/').rstrip('/')
destinations[resource_file] = rel_dest + '/' + rel_path
return destinations
def write_exports(exports, stream):
if sys.version_info[0] >= 3:
# needs to be a text stream
stream = codecs.getwriter('utf-8')(stream)
cp = configparser.ConfigParser()
for k, v in exports.items():
# TODO check k, v for valid values
cp.add_section(k)
for entry in v.values():
if entry.suffix is None:
s = entry.prefix
else:
s = '%s:%s' % (entry.prefix, entry.suffix)
if entry.flags:
s = '%s [%s]' % (s, ', '.join(entry.flags))
cp.set(k, entry.name, s)
cp.write(stream)
def save(self, pypi_version, current_time):
# Check to make sure that we own the directory
if not check_path_owner(os.path.dirname(self.statefile_path)):
return
# Now that we've ensured the directory is owned by this user, we'll go
# ahead and make sure that all our directories are created.
ensure_dir(os.path.dirname(self.statefile_path))
# Attempt to write out our version check file
with lockfile.LockFile(self.statefile_path):
if os.path.exists(self.statefile_path):
with open(self.statefile_path) as statefile:
state = json.load(statefile)
else:
state = {}
state[sys.prefix] = {
"last_check": current_time.strftime(SELFCHECK_DATE_FMT),
"pypi_version": pypi_version,
}
with open(self.statefile_path, "w") as statefile:
json.dump(state, statefile, sort_keys=True,
separators=(",", ":"))
def uninstall(self, auto_confirm=False, verbose=False):
"""
Uninstall the distribution currently satisfying this requirement.
Prompts before removing or modifying files unless
``auto_confirm`` is True.
Refuses to delete or modify files outside of ``sys.prefix`` -
thus uninstallation within a virtual environment can only
modify that virtual environment, even if the virtualenv is
linked to global site-packages.
"""
if not self.check_if_exists():
logger.warning("Skipping %s as it is not installed.", self.name)
return
dist = self.satisfied_by or self.conflicts_with
uninstalled_pathset = UninstallPathSet.from_dist(dist)
uninstalled_pathset.remove(auto_confirm, verbose)
return uninstalled_pathset
def get_install_args(self, global_options, record_filename, root, prefix):
install_args = [sys.executable, "-u"]
install_args.append('-c')
install_args.append(SETUPTOOLS_SHIM % self.setup_py)
install_args += list(global_options) + \
['install', '--record', record_filename]
install_args += ['--single-version-externally-managed']
if root is not None:
install_args += ['--root', root]
if prefix is not None:
install_args += ['--prefix', prefix]
if self.pycompile:
install_args += ["--compile"]
else:
install_args += ["--no-compile"]
if running_under_virtualenv():
py_ver_str = 'python' + sysconfig.get_python_version()
install_args += ['--install-headers',
os.path.join(sys.prefix, 'include', 'site',
py_ver_str, self.name)]
return install_args
def get_resources_dests(resources_root, rules):
"""Find destinations for resources files"""
def get_rel_path(base, path):
# normalizes and returns a lstripped-/-separated path
base = base.replace(os.path.sep, '/')
path = path.replace(os.path.sep, '/')
assert path.startswith(base)
return path[len(base):].lstrip('/')
destinations = {}
for base, suffix, dest in rules:
prefix = os.path.join(resources_root, base)
for abs_base in iglob(prefix):
abs_glob = os.path.join(abs_base, suffix)
for abs_path in iglob(abs_glob):
resource_file = get_rel_path(resources_root, abs_path)
if dest is None: # remove the entry if it was here
destinations.pop(resource_file, None)
else:
rel_path = get_rel_path(abs_base, abs_path)
rel_dest = dest.replace(os.path.sep, '/').rstrip('/')
destinations[resource_file] = rel_dest + '/' + rel_path
return destinations
def write_exports(exports, stream):
if sys.version_info[0] >= 3:
# needs to be a text stream
stream = codecs.getwriter('utf-8')(stream)
cp = configparser.ConfigParser()
for k, v in exports.items():
# TODO check k, v for valid values
cp.add_section(k)
for entry in v.values():
if entry.suffix is None:
s = entry.prefix
else:
s = '%s:%s' % (entry.prefix, entry.suffix)
if entry.flags:
s = '%s [%s]' % (s, ', '.join(entry.flags))
cp.set(k, entry.name, s)
cp.write(stream)
def ETA(self):
if self.done:
prefix = 'Done'
t = self.elapsed
#import pdb; pdb.set_trace()
else:
prefix = 'ETA '
if self.max is None:
t = -1
elif self.elapsed == 0 or (self.cur == self.min):
t = 0
else:
#import pdb; pdb.set_trace()
t = float(self.max - self.min)
t /= self.cur - self.min
t = (t - 1) * self.elapsed
return '%s: %s' % (prefix, self.format_duration(t))
def save(self, pypi_version, current_time):
# Check to make sure that we own the directory
if not check_path_owner(os.path.dirname(self.statefile_path)):
return
# Now that we've ensured the directory is owned by this user, we'll go
# ahead and make sure that all our directories are created.
ensure_dir(os.path.dirname(self.statefile_path))
# Attempt to write out our version check file
with lockfile.LockFile(self.statefile_path):
if os.path.exists(self.statefile_path):
with open(self.statefile_path) as statefile:
state = json.load(statefile)
else:
state = {}
state[sys.prefix] = {
"last_check": current_time.strftime(SELFCHECK_DATE_FMT),
"pypi_version": pypi_version,
}
with open(self.statefile_path, "w") as statefile:
json.dump(state, statefile, sort_keys=True,
separators=(",", ":"))
def get_install_paths(name):
"""
Return the (distutils) install paths for the named dist.
A dict with ('purelib', 'platlib', 'headers', 'scripts', 'data') keys.
"""
paths = {}
i = get_install_command(name)
for key in install.SCHEME_KEYS:
paths[key] = getattr(i, 'install_' + key)
# pip uses a similar path as an alternative to the system's (read-only)
# include directory:
if hasattr(sys, 'real_prefix'): # virtualenv
paths['headers'] = os.path.join(sys.prefix,
'include',
'site',
'python' + sys.version[:3],
name)
return paths
def run(self):
if self.verbose != self.distribution.verbose:
log.set_verbosity(self.verbose)
try:
for spec in self.args:
self.easy_install(spec, not self.no_deps)
if self.record:
outputs = self.outputs
if self.root: # strip any package prefix
root_len = len(self.root)
for counter in range(len(outputs)):
outputs[counter] = outputs[counter][root_len:]
from distutils import file_util
self.execute(
file_util.write_file, (self.record, outputs),
"writing list of installed files to '%s'" %
self.record
)
self.warn_deprecated_options()
finally:
log.set_verbosity(self.distribution.verbose)
def build_and_install(self, setup_script, setup_base):
args = ['bdist_egg', '--dist-dir']
dist_dir = tempfile.mkdtemp(
prefix='egg-dist-tmp-', dir=os.path.dirname(setup_script)
)
try:
self._set_fetcher_options(os.path.dirname(setup_script))
args.append(dist_dir)
self.run_setup(setup_script, setup_base, args)
all_eggs = Environment([dist_dir])
eggs = []
for key in all_eggs:
for dist in all_eggs[key]:
eggs.append(self.install_egg(dist.location, setup_base))
if not eggs and not self.dry_run:
log.warn("No eggs found in %s (setup script problem?)",
dist_dir)
return eggs
finally:
rmtree(dist_dir)
log.set_verbosity(self.verbose) # restore our log verbosity
def _expand(self, *attrs):
config_vars = self.get_finalized_command('install').config_vars
if self.prefix:
# Set default install_dir/scripts from --prefix
config_vars = config_vars.copy()
config_vars['base'] = self.prefix
scheme = self.INSTALL_SCHEMES.get(os.name, self.DEFAULT_SCHEME)
for attr, val in scheme.items():
if getattr(self, attr, None) is None:
setattr(self, attr, val)
from distutils.util import subst_vars
for attr in attrs:
val = getattr(self, attr)
if val is not None:
val = subst_vars(val, config_vars)
if os.name == 'posix':
val = os.path.expanduser(val)
setattr(self, attr, val)
def find_config_files(self):
found = old_find_config_files(self)
system_distutils = os.path.join(distutils_path, 'distutils.cfg')
#if os.path.exists(system_distutils):
# found.insert(0, system_distutils)
# What to call the per-user config file
if os.name == 'posix':
user_filename = ".pydistutils.cfg"
else:
user_filename = "pydistutils.cfg"
user_filename = os.path.join(sys.prefix, user_filename)
if os.path.isfile(user_filename):
for item in list(found):
if item.endswith('pydistutils.cfg'):
found.remove(item)
found.append(user_filename)
return found
def get_resources_dests(resources_root, rules):
"""Find destinations for resources files"""
def get_rel_path(base, path):
# normalizes and returns a lstripped-/-separated path
base = base.replace(os.path.sep, '/')
path = path.replace(os.path.sep, '/')
assert path.startswith(base)
return path[len(base):].lstrip('/')
destinations = {}
for base, suffix, dest in rules:
prefix = os.path.join(resources_root, base)
for abs_base in iglob(prefix):
abs_glob = os.path.join(abs_base, suffix)
for abs_path in iglob(abs_glob):
resource_file = get_rel_path(resources_root, abs_path)
if dest is None: # remove the entry if it was here
destinations.pop(resource_file, None)
else:
rel_path = get_rel_path(abs_base, abs_path)
rel_dest = dest.replace(os.path.sep, '/').rstrip('/')
destinations[resource_file] = rel_dest + '/' + rel_path
return destinations
def write_exports(exports, stream):
if sys.version_info[0] >= 3:
# needs to be a text stream
stream = codecs.getwriter('utf-8')(stream)
cp = configparser.ConfigParser()
for k, v in exports.items():
# TODO check k, v for valid values
cp.add_section(k)
for entry in v.values():
if entry.suffix is None:
s = entry.prefix
else:
s = '%s:%s' % (entry.prefix, entry.suffix)
if entry.flags:
s = '%s [%s]' % (s, ', '.join(entry.flags))
cp.set(k, entry.name, s)
cp.write(stream)
def ETA(self):
if self.done:
prefix = 'Done'
t = self.elapsed
#import pdb; pdb.set_trace()
else:
prefix = 'ETA '
if self.max is None:
t = -1
elif self.elapsed == 0 or (self.cur == self.min):
t = 0
else:
#import pdb; pdb.set_trace()
t = float(self.max - self.min)
t /= self.cur - self.min
t = (t - 1) * self.elapsed
return '%s: %s' % (prefix, self.format_duration(t))
def save(self, pypi_version, current_time):
# Check to make sure that we own the directory
if not check_path_owner(os.path.dirname(self.statefile_path)):
return
# Now that we've ensured the directory is owned by this user, we'll go
# ahead and make sure that all our directories are created.
ensure_dir(os.path.dirname(self.statefile_path))
# Attempt to write out our version check file
with lockfile.LockFile(self.statefile_path):
if os.path.exists(self.statefile_path):
with open(self.statefile_path) as statefile:
state = json.load(statefile)
else:
state = {}
state[sys.prefix] = {
"last_check": current_time.strftime(SELFCHECK_DATE_FMT),
"pypi_version": pypi_version,
}
with open(self.statefile_path, "w") as statefile:
json.dump(state, statefile, sort_keys=True,
separators=(",", ":"))