def get_factory_from_template(maintype):
path = os.path.join(BASE_DIR, 'templates', maintype, FACTORY_FILENAME)
if (python_version_gte(3, 5)):
# Python 3.5 code in this block
import importlib.util
spec = importlib.util.spec_from_file_location(
"{}.factory".format(maintype), path)
foo = importlib.util.module_from_spec(spec)
spec.loader.exec_module(foo)
return foo
elif (python_version_gte(3, 0)):
from importlib.machinery import SourceFileLoader
foo = SourceFileLoader(
"{}.factory".format(maintype), path).load_module()
return foo
else:
# Python 2 code in this block
import imp
foo = imp.load_source("{}.factory".format(maintype), path)
return foo
python类util()的实例源码
def load_stache(stache_name, possible_dirs):
for dir_ in possible_dirs:
path = Path(dir_).resolve()
python_path = path.parent
with PythonPathContext(str(python_path)):
stache_path = find_last_child(path) / stache_name
stache_module = "{}.{}".format(str(find_last_child(path)), stache_name)
module_spec = importlib.util.spec_from_file_location(
stache_module,
"{}.py".format(stache_path))
if module_spec:
foo = importlib.util.module_from_spec(module_spec)
module_spec.loader.exec_module(foo)
return
raise Exception("No stache with name {} found".format(stache_name))
def load_user_logic(module_name):
full_path = bge.logic.expandPath("//bgelogic/cells/{}.py".format(module_name))
loaded_value = _loaded_userlogic_files.get(full_path)
if loaded_value: return loaded_value
import sys
python_version = sys.version_info
major = python_version[0]
minor = python_version[1]
if (major < 3) or (major == 3 and minor < 3):
import imp
loaded_value = imp.load_source(module_name, full_path)
elif (major == 3) and (minor < 5):
from importlib.machinery import SourceFileLoader
loaded_value = SourceFileLoader(module_name, full_path).load_module()
else:
import importlib.util
spec = importlib.util.spec_from_file_location(module_name, full_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
loaded_value = module
_loaded_userlogic_files[module_name] = loaded_value
return loaded_value
def _abs_import(module_name, full_path):
import sys
python_version = sys.version_info
major = python_version[0]
minor = python_version[1]
if (major < 3) or (major == 3 and minor < 3):
import imp
return imp.load_source(module_name, full_path)
elif (major == 3) and (minor < 5):
from importlib.machinery import SourceFileLoader
return SourceFileLoader(module_name, full_path).load_module()
else:
import importlib.util
spec = importlib.util.spec_from_file_location(module_name, full_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
pass
def forget(modname):
"""'Forget' a module was ever imported.
This removes the module from sys.modules and deletes any PEP 3147 or
legacy .pyc and .pyo files.
"""
unload(modname)
for dirname in sys.path:
source = os.path.join(dirname, modname + '.py')
# It doesn't matter if they exist or not, unlink all possible
# combinations of PEP 3147 and legacy pyc and pyo files.
unlink(source + 'c')
unlink(source + 'o')
unlink(importlib.util.cache_from_source(source, debug_override=True))
unlink(importlib.util.cache_from_source(source, debug_override=False))
# Check whether a gui is actually available
def run(self):
# wait for other modules to stop being imported
# We assume that module loading is finished when sys.modules doesn't
# get longer in 5 consecutive 1ms waiting steps
counter = 0
last = -1
while counter < 5:
new = len(sys.modules)
if new == last:
counter += 1
else:
last = new
counter = 0
time.sleep(0.001)
# now import module properly
modname = importlib.util.resolve_name(self.name, self.package)
if isinstance(sys.modules[modname], BackgroundModuleProxy):
del sys.modules[modname]
mod = importlib.import_module(self.name, package=self.package)
for targname, varname in self.replacements.items():
if targname in sys.modules:
targmod = sys.modules[targname]
setattr(targmod, varname, mod)
def find_loader(fullname):
"""Find a PEP 302 "loader" object for fullname
This is a backwards compatibility wrapper around
importlib.util.find_spec that converts most failures to ImportError
and only returns the loader rather than the full spec
"""
if fullname.startswith('.'):
msg = "Relative module name {!r} not supported".format(fullname)
raise ImportError(msg)
try:
spec = importlib.util.find_spec(fullname)
except (ImportError, AttributeError, TypeError, ValueError) as ex:
# This hack fixes an impedance mismatch between pkgutil and
# importlib, where the latter raises other errors for cases where
# pkgutil previously raised ImportError
msg = "Error while finding loader for {!r} ({}: {})"
raise ImportError(msg.format(fullname, type(ex), ex)) from ex
return spec.loader if spec is not None else None
def test_timestamp_overflow(self):
# A modification timestamp larger than 2**32 should not be a problem
# when importing a module (issue #11235).
sys.path.insert(0, os.curdir)
try:
source = TESTFN + ".py"
compiled = importlib.util.cache_from_source(source)
with open(source, 'w') as f:
pass
try:
os.utime(source, (2 ** 33 - 5, 2 ** 33 - 5))
except OverflowError:
self.skipTest("cannot set modification time to large integer")
except OSError as e:
if e.errno != getattr(errno, 'EOVERFLOW', None):
raise
self.skipTest("cannot set modification time to large integer ({})".format(e))
__import__(TESTFN)
# The pyc file was created.
os.stat(compiled)
finally:
del sys.path[0]
remove_files(TESTFN)
def forget(modname):
"""'Forget' a module was ever imported.
This removes the module from sys.modules and deletes any PEP 3147 or
legacy .pyc and .pyo files.
"""
unload(modname)
for dirname in sys.path:
source = os.path.join(dirname, modname + '.py')
# It doesn't matter if they exist or not, unlink all possible
# combinations of PEP 3147 and legacy pyc and pyo files.
unlink(source + 'c')
unlink(source + 'o')
unlink(importlib.util.cache_from_source(source, debug_override=True))
unlink(importlib.util.cache_from_source(source, debug_override=False))
# Check whether a gui is actually available
def test_run_name(self):
depth = 1
run_name = "And now for something completely different"
pkg_dir, mod_fname, mod_name, mod_spec = (
self._make_pkg(example_source, depth))
forget(mod_name)
expected_ns = example_namespace.copy()
expected_ns.update({
"__name__": run_name,
"__file__": mod_fname,
"__cached__": importlib.util.cache_from_source(mod_fname),
"__package__": mod_name.rpartition(".")[0],
"__spec__": mod_spec,
})
def create_ns(init_globals):
return run_module(mod_name, init_globals, run_name)
try:
self.check_code_execution(create_ns, expected_ns)
finally:
self._del_pkg(pkg_dir, depth, mod_name)
def _check_path_limitations(self, module_name):
# base directory
source_path_len = len(self.here)
# a path separator + `longname` (twice)
source_path_len += 2 * (len(self.longname) + 1)
# a path separator + `module_name` + ".py"
source_path_len += len(module_name) + 1 + len(".py")
cached_path_len = (source_path_len +
len(importlib.util.cache_from_source("x.py")) - len("x.py"))
if os.name == 'nt' and cached_path_len >= 258:
# Under Windows, the max path len is 260 including C's terminating
# NUL character.
# (see http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247%28v=vs.85%29.aspx#maxpath)
self.skipTest("test paths too long (%d characters) for Windows' 260 character limit"
% cached_path_len)
elif os.name == 'nt' and verbose:
print("cached_path_len =", cached_path_len)
def find_loader(fullname):
"""Find a PEP 302 "loader" object for fullname
This is a backwards compatibility wrapper around
importlib.util.find_spec that converts most failures to ImportError
and only returns the loader rather than the full spec
"""
if fullname.startswith('.'):
msg = "Relative module name {!r} not supported".format(fullname)
raise ImportError(msg)
try:
spec = importlib.util.find_spec(fullname)
except (ImportError, AttributeError, TypeError, ValueError) as ex:
# This hack fixes an impedance mismatch between pkgutil and
# importlib, where the latter raises other errors for cases where
# pkgutil previously raised ImportError
msg = "Error while finding loader for {!r} ({}: {})"
raise ImportError(msg.format(fullname, type(ex), ex)) from ex
return spec.loader if spec is not None else None
def test_timestamp_overflow(self):
# A modification timestamp larger than 2**32 should not be a problem
# when importing a module (issue #11235).
sys.path.insert(0, os.curdir)
try:
source = TESTFN + ".py"
compiled = importlib.util.cache_from_source(source)
with open(source, 'w') as f:
pass
try:
os.utime(source, (2 ** 33 - 5, 2 ** 33 - 5))
except OverflowError:
self.skipTest("cannot set modification time to large integer")
except OSError as e:
if e.errno != getattr(errno, 'EOVERFLOW', None):
raise
self.skipTest("cannot set modification time to large integer ({})".format(e))
__import__(TESTFN)
# The pyc file was created.
os.stat(compiled)
finally:
del sys.path[0]
remove_files(TESTFN)
def forget(modname):
"""'Forget' a module was ever imported.
This removes the module from sys.modules and deletes any PEP 3147 or
legacy .pyc and .pyo files.
"""
unload(modname)
for dirname in sys.path:
source = os.path.join(dirname, modname + '.py')
# It doesn't matter if they exist or not, unlink all possible
# combinations of PEP 3147 and legacy pyc and pyo files.
unlink(source + 'c')
unlink(source + 'o')
unlink(importlib.util.cache_from_source(source, debug_override=True))
unlink(importlib.util.cache_from_source(source, debug_override=False))
# Check whether a gui is actually available
def test_run_name(self):
depth = 1
run_name = "And now for something completely different"
pkg_dir, mod_fname, mod_name, mod_spec = (
self._make_pkg(example_source, depth))
forget(mod_name)
expected_ns = example_namespace.copy()
expected_ns.update({
"__name__": run_name,
"__file__": mod_fname,
"__cached__": importlib.util.cache_from_source(mod_fname),
"__package__": mod_name.rpartition(".")[0],
"__spec__": mod_spec,
})
def create_ns(init_globals):
return run_module(mod_name, init_globals, run_name)
try:
self.check_code_execution(create_ns, expected_ns)
finally:
self._del_pkg(pkg_dir, depth, mod_name)
def _check_path_limitations(self, module_name):
# base directory
source_path_len = len(self.here)
# a path separator + `longname` (twice)
source_path_len += 2 * (len(self.longname) + 1)
# a path separator + `module_name` + ".py"
source_path_len += len(module_name) + 1 + len(".py")
cached_path_len = (source_path_len +
len(importlib.util.cache_from_source("x.py")) - len("x.py"))
if os.name == 'nt' and cached_path_len >= 258:
# Under Windows, the max path len is 260 including C's terminating
# NUL character.
# (see http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247%28v=vs.85%29.aspx#maxpath)
self.skipTest("test paths too long (%d characters) for Windows' 260 character limit"
% cached_path_len)
elif os.name == 'nt' and verbose:
print("cached_path_len =", cached_path_len)
def import_python_module_by_filename(name, module_filename):
"""
Import's a file as a python module, with specified name.
Don't ask about the `name` argument, it's required.
:param name: The name of the module to override upon imported filename.
:param module_filename: The filename to import as a python module.
:return: The newly imported python module.
"""
sys.path.append(abspath(dirname(module_filename)))
spec = importlib.util.spec_from_file_location(
name,
location=module_filename)
imported_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(imported_module)
return imported_module
def _load_module(path):
'Helper to load a Python file at path and return as a module'
module_name = os.path.splitext(os.path.basename(path))[0]
module = None
if sys.version_info.minor < 5:
loader = importlib.machinery.SourceFileLoader(module_name, path)
module = loader.load_module()
else:
spec = importlib.util.spec_from_file_location(module_name, path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def find_spec(self, fullname, path, target=None):
if fullname.startswith("svrx.nodes.script."):
name = fullname.split(".")[-1]
text_name = text_remap(name, reverse=True)
if text_name in bpy.data.texts:
return importlib.util.spec_from_loader(fullname, SvRxLoader(text_name))
else:
print("couldn't find file")
elif fullname == "svrx.nodes.script":
# load Module, right now uses real but empty module, will perhaps change
pass
return None
def has_module(name):
"""
Check if the Python module 'name' is available.
Parameters
----------
name : str
The name of the Python module, as used in an import instruction.
This function uses ideas from this Stack Overflow question:
http://stackoverflow.com/questions/14050281/
Returns
-------
b : bool
True if the module exists, or False otherwise.
"""
if sys.version_info > (3, 3):
import importlib.util
return importlib.util.find_spec(name) is not None
elif sys.version_info > (2, 7, 99):
import importlib
return importlib.find_loader(name) is not None
else:
import pkgutil
return pkgutil.find_loader(name) is not None
def load_module(cls, module_name, module_filename):
try:
spec = importlib.util.spec_from_file_location(module_name, module_filename)
foo = importlib.util.module_from_spec(spec)
spec.loader.exec_module(foo)
except Exception as ex:
Global.LOGGER.warn(f"{ex}")
Global.LOGGER.warn(f"an error occured while importing {module_name}, so the module will be skipped.")
def make_legacy_pyc(source):
"""Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location.
The choice of .pyc or .pyo extension is done based on the __debug__ flag
value.
:param source: The file system path to the source file. The source file
does not need to exist, however the PEP 3147 pyc file must exist.
:return: The file system path to the legacy pyc file.
"""
pyc_file = importlib.util.cache_from_source(source)
up_one = os.path.dirname(os.path.abspath(source))
legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o'))
os.rename(pyc_file, legacy_pyc)
return legacy_pyc
def load_module_in_background(name, package=None, debug='DEBUG', env=None,
replacements=None):
"""Entry point for loading modules in background thread.
Parameters
----------
name : str
Module name to load in background thread.
package : str or None, optional
Package name, has the same meaning as in importlib.import_module().
debug : str, optional
Debugging symbol name to look up in the environment.
env : Mapping or None, optional
Environment this will default to __xonsh_env__, if available, and
os.environ otherwise.
replacements : Mapping or None, optional
Dictionary mapping fully qualified module names (eg foo.bar.baz) that
import the lazily loaded moudle, with the variable name in that
module. For example, suppose that foo.bar imports module a as b,
this dict is then {'foo.bar': 'b'}.
Returns
-------
module : ModuleType
This is either the original module that is found in sys.modules or
a proxy module that will block until delay attribute access until the
module is fully loaded.
"""
modname = importlib.util.resolve_name(name, package)
if modname in sys.modules:
return sys.modules[modname]
if env is None:
env = getattr(builtins, '__xonsh_env__', os.environ)
if env.get(debug, None):
mod = importlib.import_module(name, package=package)
return mod
proxy = sys.modules[modname] = BackgroundModuleProxy(modname)
BackgroundModuleLoader(name, package, replacements or {})
return proxy
def execute(argv, argv0, engine):
import lib, importlib, re
import code
os.environ.setdefault("AIOWEB_SETTINGS_MODULE", "settings")
from aioweb import settings
sys.path.append(settings.BASE_DIR)
# Initialize Orator ORM
parser = OptionParser()
parser.add_option("--no-input", help="ask confirmation before collecting", action="store_true", dest="silent")
(options, args) = parser.parse_args()
if options.silent:
agree = 'y'
else:
agree = input("Do you want to overwrite your assets dir? [y/n] ")
if agree.lower() == 'y':
DEST_DIR = os.path.join(settings.BASE_DIR, 'public')
if os.path.exists(DEST_DIR):
shutil.rmtree(DEST_DIR)
else:
os.makedirs(DEST_DIR)
path = os.path.join(settings.BASE_DIR, 'app', 'assets')
if os.path.exists(path):
recursive_overwrite(path, DEST_DIR)
for appName in settings.APPS:
try:
path = os.path.join(os.path.dirname(importlib.util.find_spec(appName).origin), 'assets')
if os.path.exists(path):
print("collecting %s" % path)
recursive_overwrite(path, DEST_DIR)
except ImportError as exc:
# traceback.print_exc()
print("no such module: %s" % appName)
def _get_spec(finder, name):
"""Return the finder-specific module spec."""
# Works with legacy finders.
try:
find_spec = finder.find_spec
except AttributeError:
loader = finder.find_module(name)
if loader is None:
return None
return importlib.util.spec_from_loader(name, loader)
else:
return find_spec(name)
def read_code(stream):
# This helper is needed in order for the PEP 302 emulation to
# correctly handle compiled files
import marshal
magic = stream.read(4)
if magic != importlib.util.MAGIC_NUMBER:
return None
stream.read(8) # Skip timestamp and size
return marshal.load(stream)
def get_data(package, resource):
"""Get a resource from a package.
This is a wrapper round the PEP 302 loader get_data API. The package
argument should be the name of a package, in standard module format
(foo.bar). The resource argument should be in the form of a relative
filename, using '/' as the path separator. The parent directory name '..'
is not allowed, and nor is a rooted name (starting with a '/').
The function returns a binary string, which is the contents of the
specified resource.
For packages located in the filesystem, which have already been imported,
this is the rough equivalent of
d = os.path.dirname(sys.modules[package].__file__)
data = open(os.path.join(d, resource), 'rb').read()
If the package cannot be located or loaded, or it uses a PEP 302 loader
which does not support get_data(), then None is returned.
"""
spec = importlib.util.find_spec(package)
if spec is None:
return None
loader = spec.loader
if loader is None or not hasattr(loader, 'get_data'):
return None
# XXX needs test
mod = (sys.modules.get(package) or
importlib._bootstrap._SpecMethods(spec).load())
if mod is None or not hasattr(mod, '__file__'):
return None
# Modify the resource name to be compatible with the loader.get_data
# signature - an os.path format "filename" starting with the dirname of
# the package's __file__
parts = resource.split('/')
parts.insert(0, os.path.dirname(mod.__file__))
resource_name = os.path.join(*parts)
return loader.get_data(resource_name)
def test_cached_mode_issue_2051(self):
# permissions of .pyc should match those of .py, regardless of mask
mode = 0o600
with temp_umask(0o022), _ready_to_import() as (name, path):
cached_path = importlib.util.cache_from_source(path)
os.chmod(path, mode)
__import__(name)
if not os.path.exists(cached_path):
self.fail("__import__ did not result in creation of "
"either a .pyc or .pyo file")
stat_info = os.stat(cached_path)
self.assertEqual(oct(stat.S_IMODE(stat_info.st_mode)), oct(mode))
def test_cached_readonly(self):
mode = 0o400
with temp_umask(0o022), _ready_to_import() as (name, path):
cached_path = importlib.util.cache_from_source(path)
os.chmod(path, mode)
__import__(name)
if not os.path.exists(cached_path):
self.fail("__import__ did not result in creation of "
"either a .pyc or .pyo file")
stat_info = os.stat(cached_path)
expected = mode | 0o200 # Account for fix for issue #6074
self.assertEqual(oct(stat.S_IMODE(stat_info.st_mode)), oct(expected))
def test_pyc_always_writable(self):
# Initially read-only .pyc files on Windows used to cause problems
# with later updates, see issue #6074 for details
with _ready_to_import() as (name, path):
# Write a Python file, make it read-only and import it
with open(path, 'w') as f:
f.write("x = 'original'\n")
# Tweak the mtime of the source to ensure pyc gets updated later
s = os.stat(path)
os.utime(path, (s.st_atime, s.st_mtime-100000000))
os.chmod(path, 0o400)
m = __import__(name)
self.assertEqual(m.x, 'original')
# Change the file and then reimport it
os.chmod(path, 0o600)
with open(path, 'w') as f:
f.write("x = 'rewritten'\n")
unload(name)
importlib.invalidate_caches()
m = __import__(name)
self.assertEqual(m.x, 'rewritten')
# Now delete the source file and check the pyc was rewritten
unlink(path)
unload(name)
importlib.invalidate_caches()
if __debug__:
bytecode_only = path + "c"
else:
bytecode_only = path + "o"
os.rename(importlib.util.cache_from_source(path), bytecode_only)
m = __import__(name)
self.assertEqual(m.x, 'rewritten')