def load_module(self, name):
# If there is an existing module object named 'fullname' in
# sys.modules, the loader must use that existing module. (Otherwise,
# the reload() builtin will not work correctly.)
if name in sys.modules:
return sys.modules[name]
co, pyc = self.modules.pop(name)
# I wish I could just call imp.load_compiled here, but __file__ has to
# be set properly. In Python 3.2+, this all would be handled correctly
# by load_compiled.
mod = sys.modules[name] = imp.new_module(name)
try:
mod.__file__ = co.co_filename
# Normally, this attribute is 3.2+.
mod.__cached__ = pyc
mod.__loader__ = self
py.builtin.exec_(co, mod.__dict__)
except:
del sys.modules[name]
raise
return sys.modules[name]
python类new_module()的实例源码
def parse_eb(file_path):
# type: (object) -> object file_name) -> dict:
""" interpret easyconfig file with 'exec'. Interperting fails if
undefined constants are within the Easyconfig file.
Add undefined constants to <header>.
"""
header = 'SOURCE_TGZ = "%(name)s-%(version)s.tgz"\n'
header += 'SOURCE_TAR_GZ = "%(name)s-%(version)s.tar.gz"\n'
code = header
eb = imp.new_module("easyconfig")
with open(file_path, "r") as f:
code += f.read()
try:
exec (code, eb.__dict__)
except Exception, e:
print("interperting easyconfig error: %s" % e)
eb = {}
return eb
def parse_eb(self, file_name, primary):
""" interpret easyconfig file with 'exec'. Interperting fails if
constants that are not defined within the easyconfig file. Add
undefined constants to <header>.
"""
header = 'SOURCE_TGZ = "%(name)s-%(version)s.tgz"\n'
header += 'SOURCE_TAR_GZ = "%(name)s-%(version)s.tar.gz"\n'
header += self.prolog
code = header
eb = imp.new_module("easyconfig")
with open(file_name, "r") as f:
code += f.read()
try:
exec (code, eb.__dict__)
except Exception as err:
print("interperting easyconfig error: %s" % err)
sys.exit(1)
if primary: # save original text of source code
self.code = code
self.ptr_head = len(header)
return eb
def import_object_from_string_code(code, object):
"""Used to import an object from arbitrary passed code.
Passed in code is treated as a module and is imported and added
to `sys.modules` with its SHA256 hash as key.
Args:
code (string): Python code to import as module
object (string): Name of object to extract from imported module
"""
sha256 = hashlib.sha256(code.encode('UTF-8')).hexdigest()
module = imp.new_module(sha256)
try:
exec_(code, module.__dict__)
except Exception as e:
raise exceptions.UserError('User code exception', exception_message=str(e))
sys.modules[sha256] = module
try:
return getattr(module, object)
except AttributeError:
raise exceptions.UserError("{} not found in code".format(object))
def import_string_code_as_module(code):
"""Used to run arbitrary passed code as a module
Args:
code (string): Python code to import as module
Returns:
module: Python module
"""
sha256 = hashlib.sha256(code.encode('UTF-8')).hexdigest()
module = imp.new_module(sha256)
try:
exec_(code, module.__dict__)
except Exception as e:
raise exceptions.UserError('User code exception', exception_message=str(e))
sys.modules[sha256] = module
return module
def dbmigrate():
app = Flask(__name__)
conf = app.config.from_object('config.ProductionConfig')
sqlalchemy_migrate_repo = app.config['SQLALCHEMY_MIGRATE_REPO']
sqlalchemy_database_uri = app.config['SQLALCHEMY_DATABASE_URI']
v = api.db_version(sqlalchemy_database_uri, sqlalchemy_migrate_repo)
migration = sqlalchemy_migrate_repo + ('/versions/%03d_migration.py' % (v + 1))
tmp_module = imp.new_module('old_model')
old_model = api.create_model(sqlalchemy_database_uri, sqlalchemy_migrate_repo)
exec(old_model, tmp_module.__dict__)
script = api.make_update_script_for_model(sqlalchemy_database_uri, sqlalchemy_migrate_repo, tmp_module.meta,
db.metadata)
open(migration, "wt").write(script)
api.upgrade(sqlalchemy_database_uri, sqlalchemy_migrate_repo)
v = api.db_version(sqlalchemy_database_uri, sqlalchemy_migrate_repo)
print('New migration saved as ' + migration)
print('Current database version: ' + str(v))
def _handle_ns(packageName, path_item):
"""Ensure that named package includes a subpath of path_item (if needed)"""
importer = get_importer(path_item)
if importer is None:
return None
loader = importer.find_module(packageName)
if loader is None:
return None
module = sys.modules.get(packageName)
if module is None:
module = sys.modules[packageName] = imp.new_module(packageName)
module.__path__ = []; _set_parent_ns(packageName)
elif not hasattr(module,'__path__'):
raise TypeError("Not a package:", packageName)
handler = _find_adapter(_namespace_handlers, importer)
subpath = handler(importer,path_item,packageName,module)
if subpath is not None:
path = module.__path__; path.append(subpath)
loader.load_module(packageName); module.__path__ = path
return subpath
def __init__(self, mod_name = '__main__', launch_file = None):
import threading
ReplBackend.__init__(self)
if mod_name is not None:
if sys.platform == 'cli':
self.exec_mod = Scope()
self.exec_mod.__name__ = '__main__'
else:
sys.modules[mod_name] = self.exec_mod = imp.new_module(mod_name)
else:
self.exec_mod = sys.modules['__main__']
self.launch_file = launch_file
self.code_flags = 0
self.execute_item = None
self.execute_item_lock = threading.Lock()
self.execute_item_lock.acquire() # lock starts acquired (we use it like manual reset event)
def load_module(self, fullname):
"""
Loads the source for a module found at `fullname`.
For most of the module path here, we're going to be generating
placeholders that don't actually have code; they'd be the equivalent of
a bunch of empty dirs with __init__.py's, but ansible doesn't have the
init file in the path.
"""
if fullname in sys.modules:
mod = sys.modules[fullname]
else:
mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
mod.__file__ = os.path.join(self.path, fullname)
mod.__loader__ = self
mod.__path__ = [self.path]
mod.__package__ = '.'.join(fullname.split('.')[:-1])
# We don't want to actually load unless we get to the Python code.
# This is assuming that the guards in the __init__() function worked
if self.basename not in ANSIBLE_DIRS and self.basename not in PLUGINS:
mod = imp.load_source(self.basename, self.path + self.extension)
return mod
def __init__(self, mod_name):
self.mod_name = mod_name
self.module = imp.new_module(mod_name)
self._saved_module = []
def new_module(self, name): return imp.new_module(name)
def add_module(self, name):
d = self.modules_dict()
if name in d: return d[name]
d[name] = m = self.new_module(name)
return m
# sys interface
def _handle_ns(packageName, path_item):
"""Ensure that named package includes a subpath of path_item (if needed)"""
importer = get_importer(path_item)
if importer is None:
return None
loader = importer.find_module(packageName)
if loader is None:
return None
module = sys.modules.get(packageName)
if module is None:
module = sys.modules[packageName] = imp.new_module(packageName)
module.__path__ = []
_set_parent_ns(packageName)
elif not hasattr(module,'__path__'):
raise TypeError("Not a package:", packageName)
handler = _find_adapter(_namespace_handlers, importer)
subpath = handler(importer, path_item, packageName, module)
if subpath is not None:
path = module.__path__
path.append(subpath)
loader.load_module(packageName)
for path_item in path:
if path_item not in module.__path__:
module.__path__.append(path_item)
return subpath
def from_pyfile(self, filename, silent=False):
"""Updates the values in the config from a Python file. This function
behaves as if the file was imported as module with the
:meth:`from_object` function.
:param filename: the filename of the config. This can either be an
absolute filename or a filename relative to the
root path.
:param silent: set to `True` if you want silent failure for missing
files.
.. versionadded:: 0.7
`silent` parameter.
"""
filename = os.path.join(self.root_path, filename)
d = imp.new_module('config')
d.__file__ = filename
try:
with open(filename) as config_file:
exec(compile(config_file.read(), filename, 'exec'), d.__dict__)
except IOError as e:
if silent and e.errno in (errno.ENOENT, errno.EISDIR):
return False
e.strerror = 'Unable to load configuration file (%s)' % e.strerror
raise
self.from_object(d)
return True
def run(task, inputs, outputs, task_inputs, task_outputs, **kwargs):
custom = imp.new_module('__girder_worker__')
custom.__dict__['_job_manager'] = kwargs.get('_job_manager')
custom.__dict__['_tempdir'] = kwargs.get('_tempdir')
custom.__dict__['_celery_task'] = kwargs.get('_celery_task')
for name in inputs:
custom.__dict__[name] = inputs[name]['script_data']
if task.get('write_script', kwargs.get('write_script', False)):
debug_path = tempfile.mktemp()
with open(debug_path, 'wb') as fh:
fh.write(task['script'])
with open(debug_path, 'r') as fh:
exec fh in custom.__dict__
else:
try:
exec task['script'] in custom.__dict__
except Exception, e:
trace = sys.exc_info()[2]
lines = task['script'].split('\n')
lines = [(str(i+1) + ': ' + lines[i]) for i in xrange(len(lines))]
error = (
str(e) + '\nScript:\n' + '\n'.join(lines) +
'\nTask:\n' + json.dumps(task, indent=4)
)
raise Exception(error), None, trace
for name, task_output in task_outputs.iteritems():
outputs[name]['script_data'] = custom.__dict__[name]
def load_module(self, fullname):
# Only if a module is being reloaded and hasn't been scanned recently
# do we force a refresh of the contents of the .sublime-package. This
# allows proper code upgrades using Package Control.
if fullname in imp._RELOADING:
if self.refreshed < time.time() - 5:
self._scan_zip()
source, source_path, mod_file, is_pkg = self._read_source(fullname)
if source is None:
raise ImportError("No module named '%s'" % fullname)
is_new = False
if fullname in sys.modules:
mod = sys.modules[fullname]
old_mod_file = mod.__file__
else:
is_new = True
mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
mod.__name__ = fullname
mod.__path__ = [self.zippath]
mod.__loader__ = self
mod.__file__ = mod_file
if is_pkg:
mod.__package__ = mod.__name__
else:
mod.__package__ = fullname.rpartition('.')[0]
try:
exec(compile(source, source_path, 'exec'), mod.__dict__)
return mod
except:
if is_new:
del sys.modules[fullname]
else:
mod.__file__ = old_mod_file
raise
def load_module(path, encoding=None):
"""
Load a source file as a python module.
:param path: file path
:type path: string
:return: Loaded Python module
:rtype: module
"""
try:
return cache_modules[path]
except KeyError:
pass
module = imp.new_module(WSCRIPT_FILE)
try:
code = Utils.readf(path, m='rU', encoding=encoding)
except EnvironmentError:
raise Errors.WafError('Could not read the file %r' % path)
module_dir = os.path.dirname(path)
sys.path.insert(0, module_dir)
try : exec(compile(code, path, 'exec'), module.__dict__)
finally: sys.path.remove(module_dir)
cache_modules[path] = module
return module
def start(cwd, version, wafdir):
# simple example, the file main.c is hard-coded
try:
os.stat(cwd + os.sep + 'bbit')
except:
print('call from a folder containing a file named "bbit"')
sys.exit(1)
Logs.init_log()
Context.waf_dir = wafdir
Context.top_dir = Context.run_dir = cwd
Context.out_dir = os.path.join(cwd, 'build')
Context.g_module = imp.new_module('wscript')
Context.g_module.root_path = os.path.join(cwd, 'bbit')
Context.Context.recurse = \
lambda x, y: getattr(Context.g_module, x.cmd or x.fun, Utils.nada)(x)
Context.g_module.configure = lambda ctx: ctx.load('g++')
Context.g_module.build = lambda bld: bld.objects(source='main.c')
Options.OptionsContext().execute()
do_config = 'configure' in sys.argv
try:
os.stat(cwd + os.sep + 'build')
except:
do_config = True
if do_config:
Context.create_context('configure').execute()
if 'clean' in sys.argv:
Context.create_context('clean').execute()
if 'build' in sys.argv:
Context.create_context('build').execute()
def start(cwd, version, wafdir):
# this is the entry point of our small build system
# no script file here
Logs.init_log()
Context.waf_dir = wafdir
Context.out_dir = Context.top_dir = Context.run_dir = cwd
Context.g_module = imp.new_module('wscript')
Context.g_module.root_path = cwd
Context.Context.recurse = recurse_rep
Context.g_module.configure = configure
Context.g_module.build = build
Context.g_module.options = options
Context.g_module.top = Context.g_module.out = '.'
Options.OptionsContext().execute()
do_config = 'configure' in sys.argv
try:
os.stat(cwd + os.sep + 'c4che')
except:
do_config = True
if do_config:
Context.create_context('configure').execute()
if 'clean' in sys.argv:
Context.create_context('clean').execute()
if 'build' in sys.argv:
Context.create_context('build').execute()
def start(cwd, version, wafdir):
# simple example, the file main.c is hard-coded
try:
os.stat(cwd + os.sep + 'cbit')
except:
print('call from a folder containing a file named "cbit"')
sys.exit(1)
Logs.init_log()
Context.waf_dir = wafdir
Context.top_dir = Context.run_dir = cwd
Context.out_dir = os.path.join(cwd, 'build')
Context.g_module = imp.new_module('wscript')
Context.g_module.root_path = os.path.join(cwd, 'cbit')
Context.Context.recurse = recurse_rep
# this is a fake module, which looks like a standard wscript file
Context.g_module.options = options
Context.g_module.configure = configure
Context.g_module.build = build
Options.OptionsContext().execute()
do_config = 'configure' in sys.argv
try:
os.stat(cwd + os.sep + 'build')
except:
do_config = True
if do_config:
Context.create_context('configure').execute()
if 'clean' in sys.argv:
Context.create_context('clean').execute()
if 'build' in sys.argv:
Context.create_context('build').execute()