def import_it(self, partname, fqname, parent, force_load=0):
if not partname:
# completely empty module name should only happen in
# 'from . import' or __import__("")
return parent
if not force_load:
try:
return self.modules[fqname]
except KeyError:
pass
try:
path = parent and parent.__path__
except AttributeError:
return None
partname = str(partname)
stuff = self.loader.find_module(partname, path)
if not stuff:
return None
fqname = str(fqname)
m = self.loader.load_module(fqname, stuff)
if parent:
setattr(parent, partname, m)
return m
python类__import__()的实例源码
def import_it(self, partname, fqname, parent, force_load=0):
if not partname:
# completely empty module name should only happen in
# 'from . import' or __import__("")
return parent
if not force_load:
try:
return self.modules[fqname]
except KeyError:
pass
try:
path = parent and parent.__path__
except AttributeError:
return None
partname = str(partname)
stuff = self.loader.find_module(partname, path)
if not stuff:
return None
fqname = str(fqname)
m = self.loader.load_module(fqname, stuff)
if parent:
setattr(parent, partname, m)
return m
def test_discovery_failed_discovery(self):
loader = unittest.TestLoader()
package = types.ModuleType('package')
orig_import = __import__
def _import(packagename, *args, **kwargs):
sys.modules[packagename] = package
return package
def cleanup():
builtins.__import__ = orig_import
self.addCleanup(cleanup)
builtins.__import__ = _import
with self.assertRaises(TypeError) as cm:
loader.discover('package')
self.assertEqual(str(cm.exception),
'don\'t know how to discover from {0!r}'
.format(package))
def import_it(self, partname, fqname, parent, force_load=0):
if not partname:
# completely empty module name should only happen in
# 'from . import' or __import__("")
return parent
if not force_load:
try:
return self.modules[fqname]
except KeyError:
pass
try:
path = parent and parent.__path__
except AttributeError:
return None
partname = str(partname)
stuff = self.loader.find_module(partname, path)
if not stuff:
return None
fqname = str(fqname)
m = self.loader.load_module(fqname, stuff)
if parent:
setattr(parent, partname, m)
return m
def import_it(self, partname, fqname, parent, force_load=0):
if not partname:
# completely empty module name should only happen in
# 'from . import' or __import__("")
return parent
if not force_load:
try:
return self.modules[fqname]
except KeyError:
pass
try:
path = parent and parent.__path__
except AttributeError:
return None
partname = str(partname)
stuff = self.loader.find_module(partname, path)
if not stuff:
return None
fqname = str(fqname)
m = self.loader.load_module(fqname, stuff)
if parent:
setattr(parent, partname, m)
return m
def import_it(self, partname, fqname, parent, force_load=0):
if not partname:
# completely empty module name should only happen in
# 'from . import' or __import__("")
return parent
if not force_load:
try:
return self.modules[fqname]
except KeyError:
pass
try:
path = parent and parent.__path__
except AttributeError:
return None
partname = str(partname)
stuff = self.loader.find_module(partname, path)
if not stuff:
return None
fqname = str(fqname)
m = self.loader.load_module(fqname, stuff)
if parent:
setattr(parent, partname, m)
return m
def test_passlib(self):
# force import of passlib pbkdf2 function, and compare headers/keys
# generated from both hashlib and passlib
self.cipher1 = sagecipher.Cipher()
try:
import builtins
except ImportError:
import __builtin__ as builtins
realimport = builtins.__import__
def myimport(*args, **kwargs):
if args[0] == 'hashlib' and args[3] is not None and 'pbkdf2_hmac' in args[3]:
raise ImportError
return realimport(*args, **kwargs)
builtins.__import__ = myimport
del(sagecipher.cipher.pbkdf2_hashlib)
reload(sagecipher.cipher)
reload(sagecipher)
self.assertIn('pbkdf2_passlib', dir(sagecipher.cipher))
self.assertNotIn('pbkdf2_hashlib', dir(sagecipher.cipher))
self.cipher2 = sagecipher.Cipher(self.cipher1.header())
self.assertEqual(self.cipher1.header(), self.cipher2.header())
def test_hooked_import(self):
# It should work even if __import__ is hooked by someone else.
saved_import = builtins.__import__
@functools.wraps(saved_import)
def my_import(*args, **kwargs):
return saved_import(*args, **kwargs)
end
builtins.__import__ = my_import
try:
with self.assertRaises(SyntaxError) as cm:
import cases.hooked_import
end
self.assertTrue('hooked_import.py' in str(cm.exception))
finally:
builtins.__import__ = saved_import
end
def import_it(self, partname, fqname, parent, force_load=0):
if not partname:
# completely empty module name should only happen in
# 'from . import' or __import__("")
return parent
if not force_load:
try:
return self.modules[fqname]
except KeyError:
pass
try:
path = parent and parent.__path__
except AttributeError:
return None
partname = str(partname)
stuff = self.loader.find_module(partname, path)
if not stuff:
return None
fqname = str(fqname)
m = self.loader.load_module(fqname, stuff)
if parent:
setattr(parent, partname, m)
return m
def import_it(self, partname, fqname, parent, force_load=0):
if not partname:
# completely empty module name should only happen in
# 'from . import' or __import__("")
return parent
if not force_load:
try:
return self.modules[fqname]
except KeyError:
pass
try:
path = parent and parent.__path__
except AttributeError:
return None
partname = str(partname)
stuff = self.loader.find_module(partname, path)
if not stuff:
return None
fqname = str(fqname)
m = self.loader.load_module(fqname, stuff)
if parent:
setattr(parent, partname, m)
return m
def import_it(self, partname, fqname, parent, force_load=0):
if not partname:
# completely empty module name should only happen in
# 'from . import' or __import__("")
return parent
if not force_load:
try:
return self.modules[fqname]
except KeyError:
pass
try:
path = parent and parent.__path__
except AttributeError:
return None
partname = str(partname)
stuff = self.loader.find_module(partname, path)
if not stuff:
return None
fqname = str(fqname)
m = self.loader.load_module(fqname, stuff)
if parent:
setattr(parent, partname, m)
return m
def deep_import_hook(name, globals=None, locals=None, fromlist=None, level=-1):
"""Replacement for __import__()"""
parent, buf = get_parent(globals, level)
head, name, buf = load_next(
parent, None if level < 0 else parent, name, buf)
tail = head
while name:
tail, name, buf = load_next(tail, tail, name, buf)
# If tail is None, both get_parent and load_next found
# an empty module name: someone called __import__("") or
# doctored faulty bytecode
if tail is None:
raise ValueError('Empty module name')
if not fromlist:
return head
ensure_fromlist(tail, fromlist, buf, 0)
return tail
def install(self):
self.save_import_module = __builtin__.__import__
self.save_reload = __builtin__.reload
if not hasattr(__builtin__, 'unload'):
__builtin__.unload = None
self.save_unload = __builtin__.unload
__builtin__.__import__ = self.import_module
__builtin__.reload = self.reload
__builtin__.unload = self.unload
def uninstall(self):
__builtin__.__import__ = self.save_import_module
__builtin__.reload = self.save_reload
__builtin__.unload = self.save_unload
if not __builtin__.unload:
del __builtin__.unload
def _import(self, mod, **kwds):
"""
Request the remote process import a module (or symbols from a module)
and return the proxied results. Uses built-in __import__() function, but
adds a bit more processing:
_import('module') => returns module
_import('module.submodule') => returns submodule
(note this differs from behavior of __import__)
_import('module', fromlist=[name1, name2, ...]) => returns [module.name1, module.name2, ...]
(this also differs from behavior of __import__)
"""
return self.send(request='import', callSync='sync', opts=dict(module=mod), **kwds)
def _import(self, mod, **kwds):
"""
Request the remote process import a module (or symbols from a module)
and return the proxied results. Uses built-in __import__() function, but
adds a bit more processing:
_import('module') => returns module
_import('module.submodule') => returns submodule
(note this differs from behavior of __import__)
_import('module', fromlist=[name1, name2, ...]) => returns [module.name1, module.name2, ...]
(this also differs from behavior of __import__)
"""
return self.send(request='import', callSync='sync', opts=dict(module=mod), **kwds)
def install(self):
self.save_import_module = __builtin__.__import__
self.save_reload = __builtin__.reload
if not hasattr(__builtin__, 'unload'):
__builtin__.unload = None
self.save_unload = __builtin__.unload
__builtin__.__import__ = self.import_module
__builtin__.reload = self.reload
__builtin__.unload = self.unload
def uninstall(self):
__builtin__.__import__ = self.save_import_module
__builtin__.reload = self.save_reload
__builtin__.unload = self.save_unload
if not __builtin__.unload:
del __builtin__.unload
def test_discovery_from_dotted_namespace_packages(self):
if not getattr(types, 'SimpleNamespace', None):
raise unittest.SkipTest('Namespaces not supported')
loader = unittest.TestLoader()
orig_import = __import__
package = types.ModuleType('package')
package.__path__ = ['/a', '/b']
package.__spec__ = types.SimpleNamespace(
loader=None,
submodule_search_locations=['/a', '/b']
)
def _import(packagename, *args, **kwargs):
sys.modules[packagename] = package
return package
def cleanup():
builtins.__import__ = orig_import
self.addCleanup(cleanup)
builtins.__import__ = _import
_find_tests_args = []
def _find_tests(start_dir, pattern, namespace=None):
_find_tests_args.append((start_dir, pattern))
return ['%s/tests' % start_dir]
loader._find_tests = _find_tests
loader.suiteClass = list
suite = loader.discover('package')
self.assertEqual(suite, ['/a/tests', '/b/tests'])
def __import__(name, globals={}, locals={}, fromlist=[], level=-1):
"""Compatibility definition for Python 2.4.
Silently ignore the `level` argument missing in Python < 2.5.
"""
# we need the level arg because the default changed in Python 3.3
return __builtin__.__import__(name, globals, locals, fromlist)
def __import__(name, globals={}, locals={}, fromlist=[], level=-1):
"""Compatibility definition for Python 2.4.
Silently ignore the `level` argument missing in Python < 2.5.
"""
# we need the level arg because the default changed in Python 3.3
return __builtin__.__import__(name, globals, locals, fromlist)
def local_import_aux(name, reload_force=False, app='welcome'):
"""
In apps, instead of importing a local module
(in applications/app/modules) with::
import a.b.c as d
you should do::
d = local_import('a.b.c')
or (to force a reload):
d = local_import('a.b.c', reload=True)
This prevents conflict between applications and un-necessary execs.
It can be used to import any module, including regular Python modules.
"""
items = name.replace('/', '.')
name = "applications.%s.modules.%s" % (app, items)
module = __import__(name)
for item in name.split(".")[1:]:
module = getattr(module, item)
if reload_force:
reload(module)
return module
def custom_import_install():
if __builtin__.__import__ == NATIVE_IMPORTER:
INVALID_MODULES.update(sys.modules.keys())
__builtin__.__import__ = custom_importer
def __import__(*args, **kwargs):
"""
__import__(name, globals=None, locals=None, fromlist=(), level=0) -> object
Normally python protects imports against concurrency by doing some locking
at the C level (at least, it does that in CPython). This function just
wraps the normal __import__ functionality in a recursive lock, ensuring that
we're protected against greenlet import concurrency as well.
"""
if len(args) > 0 and not issubclass(type(args[0]), _allowed_module_name_types):
# if a builtin has been acquired as a bound instance method,
# python knows not to pass 'self' when the method is called.
# No such protection exists for monkey-patched builtins,
# however, so this is necessary.
args = args[1:]
if not __lock_imports:
return _import(*args, **kwargs)
module_lock = __module_lock(args[0]) # Get a lock for the module name
imp.acquire_lock()
try:
module_lock.acquire()
try:
result = _import(*args, **kwargs)
finally:
module_lock.release()
finally:
imp.release_lock()
return result
def _unlock_imports():
"""
Internal function, called when gevent needs to perform imports
lazily, but does not know the state of the system. It may be impossible
to take the import lock because there are no other running greenlets, for
example. This causes a monkey-patched __import__ to avoid taking any locks.
until the corresponding call to lock_imports. This should only be done for limited
amounts of time and when the set of imports is statically known to be "safe".
"""
global __lock_imports
# This could easily become a list that we push/pop from or an integer
# we increment if we need to do this recursively, but we shouldn't get
# that complex.
__lock_imports = False
def __import__(*args, **kwargs):
"""
__import__(name, globals=None, locals=None, fromlist=(), level=0) -> object
Normally python protects imports against concurrency by doing some locking
at the C level (at least, it does that in CPython). This function just
wraps the normal __import__ functionality in a recursive lock, ensuring that
we're protected against greenlet import concurrency as well.
"""
if len(args) > 0 and not issubclass(type(args[0]), _allowed_module_name_types):
# if a builtin has been acquired as a bound instance method,
# python knows not to pass 'self' when the method is called.
# No such protection exists for monkey-patched builtins,
# however, so this is necessary.
args = args[1:]
if not __lock_imports:
return _import(*args, **kwargs)
module_lock = __module_lock(args[0]) # Get a lock for the module name
imp.acquire_lock()
try:
module_lock.acquire()
try:
result = _import(*args, **kwargs)
finally:
module_lock.release()
finally:
imp.release_lock()
return result
def _unlock_imports():
"""
Internal function, called when gevent needs to perform imports
lazily, but does not know the state of the system. It may be impossible
to take the import lock because there are no other running greenlets, for
example. This causes a monkey-patched __import__ to avoid taking any locks.
until the corresponding call to lock_imports. This should only be done for limited
amounts of time and when the set of imports is statically known to be "safe".
"""
global __lock_imports
# This could easily become a list that we push/pop from or an integer
# we increment if we need to do this recursively, but we shouldn't get
# that complex.
__lock_imports = False
def custom_import_install():
if __builtin__.__import__ == NATIVE_IMPORTER:
INVALID_MODULES.update(sys.modules.keys())
__builtin__.__import__ = custom_importer
def _install_import_hook():
global _import_hook_installed
if not _import_hook_installed:
builtins.__import__ = _pytypes___import__
_import_hook_installed = True
def local_import_aux(name, reload_force=False, app='welcome'):
"""
In apps, instead of importing a local module
(in applications/app/modules) with::
import a.b.c as d
you should do::
d = local_import('a.b.c')
or (to force a reload):
d = local_import('a.b.c', reload=True)
This prevents conflict between applications and un-necessary execs.
It can be used to import any module, including regular Python modules.
"""
items = name.replace('/', '.')
name = "applications.%s.modules.%s" % (app, items)
module = __import__(name)
for item in name.split(".")[1:]:
module = getattr(module, item)
if reload_force:
reload(module)
return module