def _handle_ns(packageName, path_item):
"""Ensure that named package includes a subpath of path_item (if needed)"""
importer = get_importer(path_item)
if importer is None:
return None
loader = importer.find_module(packageName)
if loader is None:
return None
module = sys.modules.get(packageName)
if module is None:
module = sys.modules[packageName] = imp.new_module(packageName)
module.__path__ = []
_set_parent_ns(packageName)
elif not hasattr(module,'__path__'):
raise TypeError("Not a package:", packageName)
handler = _find_adapter(_namespace_handlers, importer)
subpath = handler(importer,path_item,packageName,module)
if subpath is not None:
path = module.__path__
path.append(subpath)
loader.load_module(packageName)
module.__path__ = path
return subpath
python类new_module()的实例源码
def main():
host_config = parse_argv()
namespace = probe_host(host_config)
host_config.namespace = namespace
live_test_mod = imp.new_module('livetests')
sys.modules['livetests'] = live_test_mod
def add_test_class(klass, name=None):
if name is None:
name = klass.__name__
else:
if not PY3:
name = name.encode('ascii')
klass.__name__ = name
setattr(live_test_mod, name, klass)
TestGeneral.conf = host_config
add_test_class(TestGeneral)
add_test_class(createUidTestClass(host_config, use_uid=True), 'TestWithUIDs')
add_test_class(createUidTestClass(host_config, use_uid=False), 'TestWithoutUIDs')
unittest.main(module='livetests')
def test_wrap_module(self):
"""??wrap_module???
"""
plugin_module = imp.new_module("test_plugin")
plugin_module.execute = lambda ctx, arg: "execute {}".format(arg)
plugin = Plugin.wrap_module(plugin_module)
self.assertEquals(plugin.name, "test_plugin")
plugin.sys_prepare(None)
self.assertEquals(plugin.execute(None, "cat"), "execute cat")
plugin_module.execute = lambda: "execute"
self.failUnlessException(
InvalidPluginException,
Plugin.wrap_module,
plugin_module
)
def load_module(self, name):
# If there is an existing module object named 'fullname' in
# sys.modules, the loader must use that existing module. (Otherwise,
# the reload() builtin will not work correctly.)
if name in sys.modules:
return sys.modules[name]
co, pyc = self.modules.pop(name)
# I wish I could just call imp.load_compiled here, but __file__ has to
# be set properly. In Python 3.2+, this all would be handled correctly
# by load_compiled.
mod = sys.modules[name] = imp.new_module(name)
try:
mod.__file__ = co.co_filename
# Normally, this attribute is 3.2+.
mod.__cached__ = pyc
mod.__loader__ = self
py.builtin.exec_(co, mod.__dict__)
except:
del sys.modules[name]
raise
return sys.modules[name]
def __init__(self, mod_name='__main__'):
import threading
ReplBackend.__init__(self)
if mod_name is not None:
if sys.platform == 'cli':
self.exec_mod = Scope()
self.exec_mod.__name__ = '__main__'
else:
sys.modules[mod_name] = self.exec_mod = imp.new_module(mod_name)
else:
self.exec_mod = sys.modules['__main__']
self.code_flags = 0
self.execute_item = None
self.execute_item_lock = threading.Lock()
self.execute_item_lock.acquire() # lock starts acquired (we use it like manual reset event)
def __init__(self, mod_name='__main__'):
import threading
ReplBackend.__init__(self)
if mod_name is not None:
if sys.platform == 'cli':
self.exec_mod = Scope()
self.exec_mod.__name__ = '__main__'
else:
sys.modules[mod_name] = self.exec_mod = imp.new_module(mod_name)
else:
self.exec_mod = sys.modules['__main__']
self.code_flags = 0
self.execute_item = None
self.execute_item_lock = threading.Lock()
self.execute_item_lock.acquire() # lock starts acquired (we use it like manual reset event)
def load_module(self, fullname):
print('loading {}'.format(fullname))
if fullname in sys.modules:
mod = sys.modules[fullname]
else:
mod = sys.modules.setdefault(
fullname,
imp.new_module(fullname))
# Set a few properties required by PEP 302
mod.__file__ = fullname
mod.__name__ = fullname
# always looks like a package
mod.__path__ = ['path-entry-goes-here']
mod.__loader__ = self
mod.__package__ = '.'.join(fullname.split('.')[:-1])
return mod
# Install the meta-path finder
def load_ai(filename):
# the sandbox is currently as clever as ... well ... it's not clever
code = open(filename).read() # quick and dirty, yet production ready code ;)
module = imp.new_module('ai')
mdict = module.__dict__
def getboard(board, x, y):
if x in range(8) and y in range(8):
return board[y][x]
else:
return None
mdict['getboard'] = getboard
# Pop the obvious exploits
mdict.pop('exec',0)
mdict.pop('eval',0)
# Lexical analyzer
# TODO: Fix the stupid
forbidden = ['import subprocess', 'import sys', 'import os']
for word in forbidden:
if word in code:
raise Exception('Found security issue in {} while looking for {}'.format(filename, word))
exec(code, mdict) # 'exec' is the root of all eval, here we gonna sandbox
return module
def load_pupyimporter(self):
""" load pupyimporter in case it is not """
if "pupyimporter" not in self.conn.modules.sys.modules:
pupyimporter_code=""
with open(os.path.join(ROOT, "packages","all","pupyimporter.py"),'rb') as f:
pupyimporter_code=f.read()
self.conn.execute(textwrap.dedent(
"""
import imp
import sys
def pupyimporter_preimporter(code):
mod = imp.new_module("pupyimporter")
mod.__name__="pupyimporter"
mod.__file__="<memimport>\\\\pupyimporter"
mod.__package__="pupyimporter"
sys.modules["pupyimporter"]=mod
exec code+"\\n" in mod.__dict__
mod.install()
"""))
self.conn.namespace["pupyimporter_preimporter"](pupyimporter_code)
def _setup_package(self, context_id, parent_ids):
global mitogen
mitogen = imp.new_module('mitogen')
mitogen.__package__ = 'mitogen'
mitogen.__path__ = []
mitogen.__loader__ = self.importer
mitogen.is_master = False
mitogen.context_id = context_id
mitogen.parent_ids = parent_ids
mitogen.parent_id = parent_ids[0]
mitogen.core = sys.modules['__main__']
mitogen.core.__file__ = 'x/mitogen/core.py' # For inspect.getsource()
mitogen.core.__loader__ = self.importer
sys.modules['mitogen'] = mitogen
sys.modules['mitogen.core'] = mitogen.core
del sys.modules['__main__']
def _handle_ns(packageName, path_item):
"""Ensure that named package includes a subpath of path_item (if needed)"""
importer = get_importer(path_item)
if importer is None:
return None
loader = importer.find_module(packageName)
if loader is None:
return None
module = sys.modules.get(packageName)
if module is None:
module = sys.modules[packageName] = imp.new_module(packageName)
module.__path__ = []
_set_parent_ns(packageName)
elif not hasattr(module,'__path__'):
raise TypeError("Not a package:", packageName)
handler = _find_adapter(_namespace_handlers, importer)
subpath = handler(importer,path_item,packageName,module)
if subpath is not None:
path = module.__path__
path.append(subpath)
loader.load_module(packageName)
module.__path__ = path
return subpath
def load_pupyimporter(self):
""" load pupyimporter in case it is not """
if "pupyimporter" not in self.conn.modules.sys.modules:
pupyimporter_code=""
with open(os.path.join(ROOT, "packages","all","pupyimporter.py"),'rb') as f:
pupyimporter_code=f.read()
self.conn.execute(textwrap.dedent(
"""
import imp
import sys
def pupyimporter_preimporter(code):
mod = imp.new_module("pupyimporter")
mod.__name__="pupyimporter"
mod.__file__="<memimport>\\\\pupyimporter"
mod.__package__="pupyimporter"
sys.modules["pupyimporter"]=mod
exec code+"\\n" in mod.__dict__
mod.install()
"""))
self.conn.namespace["pupyimporter_preimporter"](pupyimporter_code)
def __init__(self, expr):
# solution found in:
# http://stackoverflow.com/questions/10303248/true-dynamic-and-anonymous-functions-possible-in-python
self.code = compile(
"from numpy import *\n"+
"from math import *\n"+
"avg = None\n"+
"sd = None\n"+
"xmin = None\n"+
"xmax = None\n"+
"ymin = None\n"+
"ymax = None\n"+
"f = lambda x, y: "+expr,
'dyn-mark-string', 'exec')
self.mod = imp.new_module("dyn_marker_mod")
exec self.code in self.mod.__dict__
def get_env_dict_from_string(s):
"""What it sounds like.
http://stackoverflow.com/questions/5362771/load-module-from-string-in-python
"""
try:
del sys.modules['requester.env'] # this avoids a subtle bug, DON'T REMOVE
except KeyError:
pass
if not s:
return {}
env = imp.new_module('requester.env')
try:
exec(s, env.__dict__)
except Exception as e:
sublime.error_message('EnvBlock Error:\n{}'.format(e))
return {}
else:
return dict(env.__dict__)
def test_state_after_failure(self):
# A failed reload should leave the original module intact.
attributes = ('__file__', '__path__', '__package__')
value = '<test>'
name = '_temp'
with source_util.create_modules(name) as mapping:
orig_module = imp.new_module(name)
for attr in attributes:
setattr(orig_module, attr, value)
with open(mapping[name], 'w') as file:
file.write('+++ bad syntax +++')
loader = _bootstrap._SourceFileLoader('_temp', mapping['_temp'])
with self.assertRaises(SyntaxError):
loader.load_module(name)
for attr in attributes:
self.assertEqual(getattr(orig_module, attr), value)
# [syntax error]
def __init__(self, *names, module_code={}):
self.modules = {}
self.module_code = {}
for name in names:
if not name.endswith('.__init__'):
import_name = name
else:
import_name = name[:-len('.__init__')]
if '.' not in name:
package = None
elif import_name == name:
package = name.rsplit('.', 1)[0]
else:
package = import_name
module = imp.new_module(import_name)
module.__loader__ = self
module.__file__ = '<mock __file__>'
module.__package__ = package
module.attr = name
if import_name != name:
module.__path__ = ['<mock __path__>']
self.modules[import_name] = module
if import_name in module_code:
self.module_code[import_name] = module_code[import_name]
def _handle_ns(packageName, path_item):
"""Ensure that named package includes a subpath of path_item (if needed)"""
importer = get_importer(path_item)
if importer is None:
return None
loader = importer.find_module(packageName)
if loader is None:
return None
module = sys.modules.get(packageName)
if module is None:
module = sys.modules[packageName] = imp.new_module(packageName)
module.__path__ = []
_set_parent_ns(packageName)
elif not hasattr(module,'__path__'):
raise TypeError("Not a package:", packageName)
handler = _find_adapter(_namespace_handlers, importer)
subpath = handler(importer,path_item,packageName,module)
if subpath is not None:
path = module.__path__
path.append(subpath)
loader.load_module(packageName)
module.__path__ = path
return subpath
def make_plugin_module(version=None, name="test_plugin_name", extra_body=None):
"""Creates a plugin module with only the version field, optionally
The module is added to sys.modules, and both the name and module returned.
"""
# Create a plugin encoded with a version
plugin_module = imp.new_module(name)
if version is not None:
plugin_module_body = "version = %s" % version
else:
plugin_module_body = ""
if extra_body is not None:
plugin_module_body = "%s\n\n%s" % (plugin_module_body, extra_body)
exec plugin_module_body in plugin_module.__dict__
# Simulate imported
sys.modules[name] = plugin_module
return name, plugin_module
def _handle_ns(packageName, path_item):
"""Ensure that named package includes a subpath of path_item (if needed)"""
importer = get_importer(path_item)
if importer is None:
return None
loader = importer.find_module(packageName)
if loader is None:
return None
module = sys.modules.get(packageName)
if module is None:
module = sys.modules[packageName] = imp.new_module(packageName)
module.__path__ = []
_set_parent_ns(packageName)
elif not hasattr(module,'__path__'):
raise TypeError("Not a package:", packageName)
handler = _find_adapter(_namespace_handlers, importer)
subpath = handler(importer,path_item,packageName,module)
if subpath is not None:
path = module.__path__
path.append(subpath)
loader.load_module(packageName)
module.__path__ = path
return subpath
def __init__(self, *names):
self.modules = {}
for name in names:
if not name.endswith('.__init__'):
import_name = name
else:
import_name = name[:-len('.__init__')]
if '.' not in name:
package = None
elif import_name == name:
package = name.rsplit('.', 1)[0]
else:
package = import_name
module = imp.new_module(import_name)
module.__loader__ = self
module.__file__ = '<mock __file__>'
module.__package__ = package
module.attr = name
if import_name != name:
module.__path__ = ['<mock __path__>']
self.modules[import_name] = module
def load_module(self, name):
# If there is an existing module object named 'fullname' in
# sys.modules, the loader must use that existing module. (Otherwise,
# the reload() builtin will not work correctly.)
if name in sys.modules:
return sys.modules[name]
co, pyc = self.modules.pop(name)
# I wish I could just call imp.load_compiled here, but __file__ has to
# be set properly. In Python 3.2+, this all would be handled correctly
# by load_compiled.
mod = sys.modules[name] = imp.new_module(name)
try:
mod.__file__ = co.co_filename
# Normally, this attribute is 3.2+.
mod.__cached__ = pyc
mod.__loader__ = self
py.builtin.exec_(co, mod.__dict__)
except:
del sys.modules[name]
raise
return sys.modules[name]
def load_module(self, fullname):
"""This method is called by Python if CustomImporter.find_module
does not return None. fullname is the fully-qualified name
of the module/package that was requested."""
if fullname in sys.modules:
return sys.modules[fullname]
else:
logger.debug('Importing Mock Module: {}'.format(fullname))
# mod = imp.new_module(fullname)
# import pdb; pdb.set_trace()
mod = MockObject(fullname=fullname)
mod.__loader__ = self
mod.__file__ = fullname
mod.__path__ = [fullname]
mod.__name__ = fullname
sys.modules[fullname] = mod
return mod # This gives errors
def _handle_ns(packageName, path_item):
"""Ensure that named package includes a subpath of path_item (if needed)"""
importer = get_importer(path_item)
if importer is None:
return None
loader = importer.find_module(packageName)
if loader is None:
return None
module = sys.modules.get(packageName)
if module is None:
module = sys.modules[packageName] = imp.new_module(packageName)
module.__path__ = []
_set_parent_ns(packageName)
elif not hasattr(module,'__path__'):
raise TypeError("Not a package:", packageName)
handler = _find_adapter(_namespace_handlers, importer)
subpath = handler(importer,path_item,packageName,module)
if subpath is not None:
path = module.__path__
path.append(subpath)
loader.load_module(packageName)
module.__path__ = path
return subpath
def _handle_ns(packageName, path_item):
"""Ensure that named package includes a subpath of path_item (if needed)"""
importer = get_importer(path_item)
if importer is None:
return None
loader = importer.find_module(packageName)
if loader is None:
return None
module = sys.modules.get(packageName)
if module is None:
module = sys.modules[packageName] = imp.new_module(packageName)
module.__path__ = []
_set_parent_ns(packageName)
elif not hasattr(module,'__path__'):
raise TypeError("Not a package:", packageName)
handler = _find_adapter(_namespace_handlers, importer)
subpath = handler(importer,path_item,packageName,module)
if subpath is not None:
path = module.__path__
path.append(subpath)
loader.load_module(packageName)
module.__path__ = path
return subpath
def save_main_module(file, module_name):
# patch provided by: Scott Schlesier - when script is run, it does not
# use globals from pydevd:
# This will prevent the pydevd script from contaminating the namespace for the script to be debugged
# pretend pydevd is not the main module, and
# convince the file to be debugged that it was loaded as main
sys.modules[module_name] = sys.modules['__main__']
sys.modules[module_name].__name__ = module_name
from imp import new_module
m = new_module('__main__')
sys.modules['__main__'] = m
if hasattr(sys.modules[module_name], '__loader__'):
setattr(m, '__loader__', getattr(sys.modules[module_name], '__loader__'))
m.__file__ = file
return m
def test_state_after_failure(self):
# A failed reload should leave the original module intact.
attributes = ('__file__', '__path__', '__package__')
value = '<test>'
name = '_temp'
with source_util.create_modules(name) as mapping:
orig_module = imp.new_module(name)
for attr in attributes:
setattr(orig_module, attr, value)
with open(mapping[name], 'w') as file:
file.write('+++ bad syntax +++')
loader = machinery.SourceFileLoader('_temp', mapping['_temp'])
with self.assertRaises(SyntaxError):
loader.load_module(name)
for attr in attributes:
self.assertEqual(getattr(orig_module, attr), value)
# [syntax error]
def __init__(self, *names, module_code={}):
self.modules = {}
self.module_code = {}
for name in names:
if not name.endswith('.__init__'):
import_name = name
else:
import_name = name[:-len('.__init__')]
if '.' not in name:
package = None
elif import_name == name:
package = name.rsplit('.', 1)[0]
else:
package = import_name
module = imp.new_module(import_name)
module.__loader__ = self
module.__file__ = '<mock __file__>'
module.__package__ = package
module.attr = name
if import_name != name:
module.__path__ = ['<mock __path__>']
self.modules[import_name] = module
if import_name in module_code:
self.module_code[import_name] = module_code[import_name]
def db_migrate():
import imp
from config import config
from app.infrastructure.mappings import mapping
from app.infrastructure.mappings.mapping import metadata
db = SQLAlchemy()
config = config[os.getenv('FLASK_CONFIG') or 'default']
metadata.create_all(db.engine)
v = api.db_version(config.SQLALCHEMY_DATABASE_URI, config.SQLALCHEMY_MIGRATE_REPO)
migration = config.SQLALCHEMY_MIGRATE_REPO + ('/versions/%03d_migration.py' % (v+1))
tmp_module = imp.new_module('old_model')
old_model = api.create_model(config.SQLALCHEMY_DATABASE_URI, config.SQLALCHEMY_MIGRATE_REPO)
exec(old_model, tmp_module.__dict__)
script = api.make_update_script_for_model(config.SQLALCHEMY_DATABASE_URI, config.SQLALCHEMY_MIGRATE_REPO, tmp_module.meta, metadata)
open(migration, "wt").write(script)
api.upgrade(config.SQLALCHEMY_DATABASE_URI, config.SQLALCHEMY_MIGRATE_REPO)
v = api.db_version(config.SQLALCHEMY_DATABASE_URI, config.SQLALCHEMY_MIGRATE_REPO)
print('New migration saved as ' + migration)
print('Current database version: ' + str(v))
def _handle_ns(packageName, path_item):
"""Ensure that named package includes a subpath of path_item (if needed)"""
importer = get_importer(path_item)
if importer is None:
return None
loader = importer.find_module(packageName)
if loader is None:
return None
module = sys.modules.get(packageName)
if module is None:
module = sys.modules[packageName] = imp.new_module(packageName)
module.__path__ = []; _set_parent_ns(packageName)
elif not hasattr(module,'__path__'):
raise TypeError("Not a package:", packageName)
handler = _find_adapter(_namespace_handlers, importer)
subpath = handler(importer,path_item,packageName,module)
if subpath is not None:
path = module.__path__; path.append(subpath)
loader.load_module(packageName); module.__path__ = path
return subpath
def load_module(self, name):
# If there is an existing module object named 'fullname' in
# sys.modules, the loader must use that existing module. (Otherwise,
# the reload() builtin will not work correctly.)
if name in sys.modules:
return sys.modules[name]
co, pyc = self.modules.pop(name)
# I wish I could just call imp.load_compiled here, but __file__ has to
# be set properly. In Python 3.2+, this all would be handled correctly
# by load_compiled.
mod = sys.modules[name] = imp.new_module(name)
try:
mod.__file__ = co.co_filename
# Normally, this attribute is 3.2+.
mod.__cached__ = pyc
mod.__loader__ = self
py.builtin.exec_(co, mod.__dict__)
except:
del sys.modules[name]
raise
return sys.modules[name]