def get_processor(aid):
"""
Returns the processor module for a given achievement.
Args:
aid: the achievement id
Returns:
The processor module
"""
try:
path = get_achievement(aid=aid, show_disabled=True)["processor"]
base_path = api.config.get_settings()["achievements"]["processor_base_path"]
return imp.load_source(path[:-3], join(base_path, path))
except FileNotFoundError:
raise InternalException("Achievement processor is offline.")
python类load_source()的实例源码
def cdnImport(uri, name):
import imp
from resources.lib.modules import client
path = os.path.join(dataPath, 'py' + name)
path = path.decode('utf-8')
deleteDir(os.path.join(path, ''), force=True)
makeFile(dataPath) ; makeFile(path)
r = client.request(uri)
p = os.path.join(path, name + '.py')
f = openFile(p, 'w') ; f.write(r) ; f.close()
m = imp.load_source(name, p)
deleteDir(os.path.join(path, ''), force=True)
return m
def get_factory_from_template(maintype):
path = os.path.join(BASE_DIR, 'templates', maintype, FACTORY_FILENAME)
if (python_version_gte(3, 5)):
# Python 3.5 code in this block
import importlib.util
spec = importlib.util.spec_from_file_location(
"{}.factory".format(maintype), path)
foo = importlib.util.module_from_spec(spec)
spec.loader.exec_module(foo)
return foo
elif (python_version_gte(3, 0)):
from importlib.machinery import SourceFileLoader
foo = SourceFileLoader(
"{}.factory".format(maintype), path).load_module()
return foo
else:
# Python 2 code in this block
import imp
foo = imp.load_source("{}.factory".format(maintype), path)
return foo
def load_plugin(self, file_name):
# XAPI plugins run in a py24 environment and may be not compatible with
# py34's syntax. In order to prevent unit test scanning the source file
# under py34 environment, the plugins will be imported with this
# function at run time.
plugin_path = self._get_plugin_path()
# add plugin path into search path.
if plugin_path not in sys.path:
sys.path.append(plugin_path)
# be sure not to create c files next to the plugins
sys.dont_write_bytecode = True
name = file_name.split('.')[0]
path = os.path.join(plugin_path, file_name)
return imp.load_source(name, path)
def __checkUpload(self,msg):
if msg.text()=="Yes":
name = QtGui.QFileDialog.getOpenFileName(filter=".py (*.py)")
file_name = name[0]
if len(file_name) > 0:
if hasattr(self,"i10"):
self.algorithmArgTree.removeTopLevelItem(self.i10)
delattr(self,"i10")
delattr(self,"myFilter")
self.myFilter = imp.load_source("filter_design",file_name)
self.i10 = pg.TreeWidgetItem([""])
myFilterWgtt = QtGui.QPushButton()
myFilterWgtt.setText("myFilter")
myFilterWgtt.setFixedWidth(60)
myFilterWgtt.setStyleSheet("background-color: rgb(190,190,190);border-radius: 6px;")
# rpWgtt.clicked.connect(self.__setrp)
self.i10.setWidget(1,myFilterWgtt)
self.algorithmArgTree.addTopLevelItem(self.i10)
elif msg.text()=="Cancel":
pass
# This class is used to draw multiple lines for the AnalogFilter class in a memory-efficient way
def __checkUpload(self,msg):
if msg.text()=="Yes":
name = QtGui.QFileDialog.getOpenFileName(filter=".py (*.py)")
file_name = name[0]
if len(file_name) > 0:
if hasattr(self,"i10"):
self.algorithmArgTree.removeTopLevelItem(self.i10)
delattr(self,"i10")
delattr(self,"myFilter")
self.myFilter = imp.load_source("filter_design",file_name)
self.i10 = pg.TreeWidgetItem([""])
myFilterWgtt = QtGui.QPushButton()
myFilterWgtt.setText("myFilter")
myFilterWgtt.setFixedWidth(60)
myFilterWgtt.setStyleSheet("background-color: rgb(190,190,190);border-radius: 6px;")
# rpWgtt.clicked.connect(self.__setrp)
self.i10.setWidget(1,myFilterWgtt)
self.algorithmArgTree.addTopLevelItem(self.i10)
elif msg.text()=="Cancel":
pass
# This class is used to draw multiple lines for the AnalogFilter class in a memory-efficient way
def test_exit_cleanly(self):
# Make sure handler fun is called on clean interpreter exit.
ret = pyrun(textwrap.dedent(
"""
import os, imp
mod = imp.load_source("mod", r"{}")
def foo():
with open(r"{}", "ab") as f:
f.write(b"1")
mod.register_exit_fun(foo)
""".format(os.path.abspath(__file__), TESTFN)
))
self.assertEqual(ret, 0)
with open(TESTFN, "rb") as f:
self.assertEqual(f.read(), b"1")
def test_exception(self):
# Make sure handler fun is called on exception.
ret = pyrun(textwrap.dedent(
"""
import os, imp, sys
mod = imp.load_source("mod", r"{}")
def foo():
with open(r"{}", "ab") as f:
f.write(b"1")
mod.register_exit_fun(foo)
sys.stderr = os.devnull
1 / 0
""".format(os.path.abspath(__file__), TESTFN)
))
self.assertEqual(ret, 1)
with open(TESTFN, "rb") as f:
self.assertEqual(f.read(), b"1")
def test_kinterrupt(self):
# Simulates CTRL + C and make sure the exit function is called.
ret = pyrun(textwrap.dedent(
"""
import os, imp, sys
mod = imp.load_source("mod", r"{}")
def foo():
with open(r"{}", "ab") as f:
f.write(b"1")
mod.register_exit_fun(foo)
sys.stderr = os.devnull
raise KeyboardInterrupt
""".format(os.path.abspath(__file__), TESTFN)
))
self.assertEqual(ret, 1)
with open(TESTFN, "rb") as f:
self.assertEqual(f.read(), b"1")
def test_called_once(self):
# Make sure the registered fun is called once.
ret = pyrun(textwrap.dedent(
"""
import os, imp
mod = imp.load_source("mod", r"{}")
def foo():
with open(r"{}", "ab") as f:
f.write(b"1")
mod.register_exit_fun(foo)
""".format(os.path.abspath(__file__), TESTFN)
))
self.assertEqual(ret, 0)
with open(TESTFN, "rb") as f:
self.assertEqual(f.read(), b"1")
def test_cascade(self):
# Register 2 functions and make sure the last registered
# function is executed first.
ret = pyrun(textwrap.dedent(
"""
import functools, os, imp
mod = imp.load_source("mod", r"{}")
def foo(s):
with open(r"{}", "ab") as f:
f.write(s)
mod.register_exit_fun(functools.partial(foo, b'1'))
mod.register_exit_fun(functools.partial(foo, b'2'))
""".format(os.path.abspath(__file__), TESTFN)
))
self.assertEqual(ret, 0)
with open(TESTFN, "rb") as f:
self.assertEqual(f.read(), b"21")
def test_all_exit_sigs_with_sig(self):
# Same as above but the process is terminated by a signal
# instead of exiting cleanly.
for sig in TEST_SIGNALS:
ret = pyrun(textwrap.dedent(
"""
import functools, os, signal, imp
mod = imp.load_source("mod", r"{modname}")
def foo(s):
with open(r"{testfn}", "ab") as f:
f.write(s)
signal.signal({sig}, functools.partial(foo, b'0'))
mod.register_exit_fun(functools.partial(foo, b'1'))
mod.register_exit_fun(functools.partial(foo, b'2'))
os.kill(os.getpid(), {sig})
""".format(modname=os.path.abspath(__file__),
testfn=TESTFN, sig=sig)
))
self.assertEqual(ret, sig)
with open(TESTFN, "rb") as f:
self.assertEqual(f.read(), b"210")
safe_remove(TESTFN)
def test_as_deco(self):
ret = pyrun(textwrap.dedent(
"""
import imp
mod = imp.load_source("mod", r"{}")
@mod.register_exit_fun
def foo():
with open(r"{}", "ab") as f:
f.write(b"1")
""".format(os.path.abspath(__file__), TESTFN)
))
self.assertEqual(ret, 0)
with open(TESTFN, "rb") as f:
self.assertEqual(f.read(), b"1")
def load_node(node, file_path):
path_var = "node.%s"%node
main_module = imp.load_source(path_var, file_path)
for c in main_module.__dict__.values():
try:
if issubclass(c, Node) and c.__module__ is main_module.__name__:
nodes[node] = c
if c.char and isinstance(c.char, str):
c.char = bytearray([ord(c.char[-1].encode()) | (0x80 * (len(c.char) == 2))])
elif isinstance(c.char, bytes):
c.char = bytearray(c.char)
c.ignore_dot = True
elif c.char == "":
c.char = b""
return c
except TypeError:
pass
def importModule(modulePath, name=""):
if isDottedPath(modulePath) or not os.path.exists(modulePath):
try:
return __import__(modulePath, fromlist="dummy")
except ImportError:
logger.debug("failed to load module->%s" % modulePath, exc_info=True)
try:
if os.path.exists(modulePath):
if not name:
name = os.path.splitext(os.path.basename(modulePath))[0]
if name in sys.modules:
return sys.modules[name]
if os.path.isdir(modulePath):
modulePath = os.path.join(modulePath, "__init__.py")
if not os.path.exists(modulePath):
raise ValueError("Cannot find modulepath: {}".format(modulePath))
return imp.load_source(name, os.path.realpath(modulePath))
except ImportError:
logger.error("Failed to load module {}".format(modulePath))
raise
def get_processor(aid):
"""
Returns the processor module for a given achievement.
Args:
aid: the achievement id
Returns:
The processor module
"""
try:
path = get_achievement(aid=aid, show_disabled=True)["processor"]
base_path = api.config.get_settings()["achievements"]["processor_base_path"]
return imp.load_source(path[:-3], join(base_path, path))
except FileNotFoundError:
raise InternalException("Achievement processor is offline.")
def load_module(self, name, stuff):
file, filename, info = stuff
(suff, mode, type) = info
try:
if type == BUILTIN_MODULE:
return self.hooks.init_builtin(name)
if type == FROZEN_MODULE:
return self.hooks.init_frozen(name)
if type == C_EXTENSION:
m = self.hooks.load_dynamic(name, filename, file)
elif type == PY_SOURCE:
m = self.hooks.load_source(name, filename, file)
elif type == PY_COMPILED:
m = self.hooks.load_compiled(name, filename, file)
elif type == PKG_DIRECTORY:
m = self.hooks.load_package(name, filename, file)
else:
raise ImportError, "Unrecognized module type (%r) for %s" % \
(type, name)
finally:
if file: file.close()
m.__file__ = filename
return m
def get_generator(pid):
"""
Gets a handle on a problem generator module.
Args:
pid: the problem pid
Returns:
The loaded module
"""
generator_path = get_generator_path(pid)
if not path.isfile(generator_path):
raise InternalException("Could not find {}.".format(generator_path))
return imp.load_source(generator_path[:-3], generator_path)
def get_logger(logger_name):
app_conf = imp.load_source('app_conf', os.getenv('EAGLE_HOME', '..') + '/eagle_cfg.py')
_logger = logging.getLogger(logger_name)
file_formatter = Formatter(
'%(levelname)s | %(asctime)s | %(name)s | %(message)s | %(pathname)s:%(lineno)d'
)
time_rotating_handler = TimedRotatingFileHandler(\
'{0}/{1}.log'.format(app_conf.LOG_PATH, logger_name), when="midnight", encoding='utf-8')
time_rotating_handler.suffix = "%Y-%m-%d"
time_rotating_handler.setFormatter(file_formatter)
stream_handler = StreamHandler(stream=sys.stdout)
echo_formatter = Formatter('[%(levelname)s][%(name)s][in %(filename)s:%(lineno)d] %(message)s')
stream_handler.setFormatter(echo_formatter)
_logger.addHandler(time_rotating_handler)
_logger.addHandler(stream_handler)
_logger.setLevel(logging.DEBUG)
return _logger
def __new__(cls, *args, **kwargs):
if cls._defined is None:
script_files = glob.glob('{}{}*.py'.format(SCRIPT_PATH, os.path.sep))
for i in script_files:
try:
s = imp.load_source('script', os.path.join(SCRIPT_PATH, i))
script_class = getattr(s, 'Script')()
if cls.check(script_class):
cls.scripts.append(script_class)
print_success('Load script {} successfully.'.format(i))
except:
print_warning('Load script {} failed, ignored'.format(i))
if os.getenv('DEBUG_SCRIPT'):
traceback.print_exc()
# self.scripts = filter(self._check_followed, self.scripts)
# self.scripts = filter(self._check_bangumi, self.scripts)
cls._defined = super(ScriptRunner, cls).__new__(cls, *args, **kwargs)
return cls._defined
def get_version_info():
# Adding the git rev number needs to be done inside
# write_version_py(), otherwise the import of pyfunt.version messes
# up the build under Python 3.
FULLVERSION = VERSION
if os.path.exists('.git'):
GIT_REVISION = git_version()
elif os.path.exists('pyfunt/version.py'):
# must be a source distribution, use existing version file
# load it as a separate module to not load pyfunt/__init__.py
import imp
version = imp.load_source('pyfunt.version', 'pyfunt/version.py')
GIT_REVISION = version.git_revision
else:
GIT_REVISION = "Unknown"
if not ISRELEASED:
FULLVERSION += '.dev0+' + GIT_REVISION[:7]
return FULLVERSION, GIT_REVISION
def _runscript(self, filename):
# The script has to run in __main__ namespace (clear it)
import __main__
import imp
filename = os.path.abspath(filename)
__main__.__dict__.clear()
__main__.__dict__.update({"__name__" : "__main__",
"__file__" : filename,
"__builtins__": __builtins__,
"imp" : imp, # need for run
})
# avoid stopping before we reach the main script
self._wait_for_mainpyfile = 1
self.mainpyfile = self.canonic(filename)
self._user_requested_quit = 0
if sys.version_info>(3,0):
statement = 'imp.load_source("__main__", "%s")' % filename
else:
statement = 'execfile(%r)' % filename
self.startup()
self.run(statement)
def installXl(self, args = None):
fd = open(self.config['template'])
lvmDev = imp.load_source('xlConfig', '', fd).disk[0].split(',')[0][4:]
fd.close()
templateArgs = args.split(' ') if args is not None else []
xl = subprocess.Popen(
['/usr/share/xen/templates/xen-%s' % (self.template), '--hostname=%s' % (self.name), '--dev=%s' % (lvmDev)] + templateArgs,
stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE
)
stdOut, stdErr = xl.communicate()
if xl.returncode == 0:
if not os.path.exists('/var/lib/xen/%s' % (self.name)):
os.mkdir('/var/lib/xen/%s' % (self.name), 0750)
copyfile(self.config['template'], self.config['file'])
self.state = self.getXlState()
return True
else:
return False
def load():
"""Load the active experiment."""
if os.getcwd() not in sys.path:
sys.path.append(os.getcwd())
try:
exp = imp.load_source('dallinger_experiment', "dallinger_experiment.py")
classes = inspect.getmembers(exp, inspect.isclass)
exps = [c for c in classes
if (c[1].__bases__[0].__name__ in "Experiment")]
this_experiment = exps[0][0]
mod = __import__('dallinger_experiment', fromlist=[this_experiment])
return getattr(mod, this_experiment)
except ImportError:
logger.error('Could not import experiment.')
raise
def register_extra_parameters(self):
extra_parameters = None
cwd = os.getcwd()
sys.path.append(cwd)
path_index = len(sys.path) - 1
try:
from dallinger_experiment import extra_parameters
except ImportError:
try:
exp = imp.load_source('dallinger_experiment', "dallinger_experiment.py")
extra_parameters = getattr(exp, 'extra_parameters', None)
except IOError:
pass
if extra_parameters is None:
try:
# We may be in the original source directory, try experiment.py
exp = imp.load_source('dallinger_experiment', "experiment.py")
extra_parameters = getattr(exp, 'extra_parameters', None)
except IOError:
pass
if extra_parameters is not None and getattr(extra_parameters, 'loaded', None) is None:
extra_parameters()
extra_parameters.loaded = True
# Remove path element we added
sys.path.pop(path_index)
def _load_templates(self):
"""
Loads provider template from TEMPLATE_DIR
:rtype: dict:
:return: All templates available in TEMPLATE_DIR
"""
template_base = os.path.dirname(os.path.realpath(__file__)) + TEMPLATE_DIR
modules = glob.glob(os.path.join(template_base, '*.py'))
template_files = [os.path.basename(f)[:-3] for f in modules if not f.endswith('__init__.py')]
templates = {}
for template in template_files:
infos = imp.load_source(template, os.path.join(template_base, template + '.py'))
templates[infos.TEMPLATE.pop('email')] = infos.TEMPLATE
return templates
def load_url_map(path, package, log=None):
url_map = []
our_dir = path[0]
for dirpath, dirnames, filenames in os.walk(our_dir):
for fname in filenames:
root, ext = os.path.splitext(fname)
if ext != '.py' or root == '__init__':
continue
class_path = os.path.join(dirpath, fname)
handle_class = imp.load_source(fname, class_path)
_url_map = getattr(handle_class, 'url_map', {})
if _url_map:
for _url, _handler in _url_map.items():
url_map.append((_url, getattr(handle_class, _handler)))
log.info('url map:\n'+'\n'.join([ '%20s\t%s' % (_url_map[0], _url_map[1])\
for _url_map in url_map]))
return url_map
def get_model_and_optimizer(result_dir, modelfn, opt, opt_kwargs, net_kwargs, gpu):
model_fn = os.path.basename(modelfn)
model_name = model_fn.split('.')[0]
module = imp.load_source(model_name, modelfn)
net = getattr(module, model_name)
# Copy model definition and this train script to the result dir
dst = '%s/%s' % (result_dir, model_fn)
if not os.path.exists(dst):
shutil.copy(modelfn, dst)
dst = '%s/%s' % (result_dir, os.path.basename(__file__))
if not os.path.exists(dst):
shutil.copy(__file__, dst)
# Create model
model = net(**net_kwargs)
if gpu >= 0:
model.to_gpu(gpu)
# Create optimizer
optimizer = optimizers.__dict__[opt](**opt_kwargs)
optimizer.setup(model)
return model, optimizer
def get_model_and_optimizer(result_dir, modelfn, opt, opt_kwargs, net_kwargs, gpu):
model_fn = os.path.basename(modelfn)
model_name = model_fn.split('.')[0]
module = imp.load_source(model_name, modelfn)
Net = getattr(module, model_name)
dst = '%s/%s' % (result_dir, model_fn)
if not os.path.exists(dst):
shutil.copy(modelfn, dst)
dst = '%s/%s' % (result_dir, os.path.basename(__file__))
if not os.path.exists(dst):
shutil.copy(__file__, dst)
# prepare model
model = Net(**net_kwargs)
if gpu >= 0:
model.to_gpu()
optimizer = optimizers.__dict__[opt](**opt_kwargs)
optimizer.setup(model)
return model, optimizer
def _load_module(self, dirpath, filename):
mod_name = filename.split('.')[0]
mod_dispname = '/'.join(re.split('/modules/', dirpath)[-1].split('/') + [mod_name])
mod_loadname = mod_dispname.replace('/', '_')
mod_loadpath = os.path.join(dirpath, filename)
mod_file = open(mod_loadpath)
try:
# import the module into memory
mod = imp.load_source(mod_loadname, mod_loadpath, mod_file)
__import__(mod_loadname)
# add the module to the framework's loaded modules
self._loaded_modules[mod_dispname] = sys.modules[mod_loadname].Module(mod_dispname)
return True
except ImportError as e:
# notify the user of missing dependencies
self.error('Module \'%s\' disabled. Dependency required: \'%s\'' % (mod_dispname, e.message[16:]))
except:
# notify the user of errors
self.print_exception()
self.error('Module \'%s\' disabled.' % (mod_dispname))
# remove the module from the framework's loaded modules
self._loaded_modules.pop(mod_dispname, None)
return False