def load_label_configuration(self, filename):
# load in a label specification, provided by the user
try:
self.configuration = imp.load_source('label_config',filename)
except IOError:
self.logger.critical('failed to open label configuration file %s' % filename)
raise
except:
self.logger.critical('error loading label configuration from %s' % filename)
raise
# perform some sanity checks on it
#
# make sure 'labels' is defined
try:
assert self.configuration.labels
except AssertionError:
logger.critical('loaded label configuration file %s, but it did not define "labels" !' % filename)
python类load_source()的实例源码
def test_bake_with_console_script_cli(cookies):
context = {'command_line_interface': 'click'}
result = cookies.bake(extra_context=context)
project_path, project_slug, project_dir = project_info(result)
module_path = os.path.join(project_dir, 'cli.py')
module_name = '.'.join([project_slug, 'cli'])
if sys.version_info >= (3, 5):
spec = importlib.util.spec_from_file_location(module_name,
module_path)
cli = importlib.util.module_from_spec(spec)
spec.loader.exec_module(cli)
elif sys.version_info >= (3, 3):
file_loader = importlib.machinery.SourceFileLoader
cli = file_loader(module_name, module_path).load_module()
else:
cli = imp.load_source(module_name, module_path)
runner = CliRunner()
noarg_result = runner.invoke(cli.main)
assert noarg_result.exit_code == 0
noarg_output = ' '.join(['Replace this message by putting your code into',
project_slug])
assert noarg_output in noarg_result.output
help_result = runner.invoke(cli.main, ['--help'])
assert help_result.exit_code == 0
assert 'Show this message' in help_result.output
def cdnImport(uri, name):
import imp
from resources.lib.modules import client
path = os.path.join(dataPath, 'py' + name)
path = path.decode('utf-8')
deleteDir(os.path.join(path, ''), force=True)
makeFile(dataPath) ; makeFile(path)
r = client.request(uri)
p = os.path.join(path, name + '.py')
f = openFile(p, 'w') ; f.write(r) ; f.close()
m = imp.load_source(name, p)
deleteDir(os.path.join(path, ''), force=True)
return m
def write(self, revision, sourcefile):
"""
Finalize the __version__ = major.minor.micro.{revision} tag.
Overwrite C{sourcefile} in place by substituting the {revision} macro.
@param revision: revision number to write to the source file.
@type revision: int
@param sourcefile: python source file with a __version__ tag, typically
"csb/__init__.py"
@type sourcefile: str
@return: sourcefile.__version__
"""
content = open(sourcefile).readlines()
with open(sourcefile, 'w') as src:
for line in content:
if line.startswith('__version__'):
src.write(line.format(revision=revision))
else:
src.write(line)
self._delcache(sourcefile)
return imp.load_source('____source', sourcefile).__version__
def check_config(use_config=None):
if use_config:
with lcd(codedir):
local('cp config.{}.py config.py'.format(use_config))
try:
config = imp.load_source('config', os.path.join(codedir, 'config.py'))
except IOError:
error('config.py not found. Did you create it by copying config.example.py?')
try:
config_example = imp.load_source('config_example', os.path.join(codedir, 'config.example.py'))
except IOError:
error('config.example.py not found. Did you remove it?')
if config.signing_key == config_example.signing_key:
error('You need to change the signing key to your own unique text.')
if config.s3_bucket == config_example.s3_bucket:
error('You need to change the s3 bucket name to a bucket you control.')
puts('Your config.py appears to be set up.')
return config
def caffe_to_tensorflow_session(caffe_def_path, caffemodel_path, inputs, graph_name='Graph',
conversion_out_dir_path=None, use_padding_same=False):
"""Create a TensorFlow Session from a Caffe model."""
try:
# noinspection PyUnresolvedReferences
from caffeflow import convert
except ImportError:
raise Exception("caffeflow package needs to be installed to freeze Caffe models. Check out the README file.")
with (dummy_context_mgr(conversion_out_dir_path) or util.TemporaryDirectory()) as dir_path:
params_values_output_path = os.path.join(dir_path, 'params_values.npy')
network_output_path = os.path.join(dir_path, 'network.py')
convert.convert(caffe_def_path, caffemodel_path, params_values_output_path, network_output_path, False,
use_padding_same=use_padding_same)
network_module = imp.load_source('module.name', network_output_path)
network_class = getattr(network_module, graph_name)
network = network_class(inputs)
sess = tf.Session()
network.load(params_values_output_path, sess)
return sess
def load_schemas(self):
schemas = {}
for root, dirnames, filenames in os.walk(self.dir):
for file in filenames:
if file.endswith('.py'):
path = os.path.join(root, file)
module = imp.load_source(root, path)
for each in dir(module):
attr = getattr(module, each)
if isinstance(attr, Schema):
name = path[:-3].replace(self.dir, '').split('/')[1:]
schema_name = '.'.join(name + [each])
schemas[schema_name] = attr
self.schemas = schemas
def _runscript(self, filename):
# The script has to run in __main__ namespace (clear it)
import __main__
import imp
__main__.__dict__.clear()
__main__.__dict__.update({"__name__": "__main__",
"__file__": filename,
"__builtins__": __builtins__,
"imp": imp, # need for run
})
# avoid stopping before we reach the main script
self._wait_for_mainpyfile = 1
self.mainpyfile = self.canonic(filename)
self._user_requested_quit = 0
statement = 'imp.load_source("__main__", "%s")' % filename
# notify and wait frontend to set initial params and breakpoints
self.pipe.send({'method': 'startup', 'args': (__version__, )})
while self.pull_actions() is not None:
pass
self.run(statement)
# General interaction function
def get_model(
model_file, model_name, loss_file, loss_name, class_weight, n_encdec,
n_classes, in_channel, n_mid, train_depth=None, result_dir=None):
model = imp.load_source(model_name, model_file)
model = getattr(model, model_name)
loss = imp.load_source(loss_name, loss_file)
loss = getattr(loss, loss_name)
# Initialize
model = model(n_encdec, n_classes, in_channel, n_mid)
if train_depth:
model = loss(model, class_weight, train_depth)
# Copy files
if result_dir is not None:
base_fn = os.path.basename(model_file)
dst = '{}/{}'.format(result_dir, base_fn)
if not os.path.exists(dst):
shutil.copy(model_file, dst)
base_fn = os.path.basename(loss_file)
dst = '{}/{}'.format(result_dir, base_fn)
if not os.path.exists(dst):
shutil.copy(loss_file, dst)
return model
def IncrementalInstall(device, apk_helper, installer_script):
"""Performs an incremental install.
Args:
device: Device to install on.
apk_helper: ApkHelper instance for the _incremental.apk.
installer_script: Path to the installer script for the incremental apk.
"""
try:
install_wrapper = imp.load_source('install_wrapper', installer_script)
except IOError:
raise Exception('Incremental install script not found: %s\n' %
installer_script)
params = install_wrapper.GetInstallParameters()
from incremental_install import installer
installer.Install(device, apk_helper, split_globs=params['splits'],
native_libs=params['native_libs'],
dex_files=params['dex_files'],
permissions=None) # Auto-grant permissions from manifest.
def _runscript(self, filename):
# The script has to run in __main__ namespace (clear it)
import __main__
import imp
__main__.__dict__.clear()
__main__.__dict__.update({"__name__": "__main__",
"__file__": filename,
"__builtins__": __builtins__,
"imp": imp, # need for run
})
# avoid stopping before we reach the main script
self._wait_for_mainpyfile = 1
self.mainpyfile = self.canonic(filename)
self._user_requested_quit = 0
statement = 'imp.load_source("__main__", "%s")' % filename
# notify and wait frontend to set initial params and breakpoints
self.pipe.send({'method': 'startup', 'args': (__version__, )})
while self.pull_actions() is not None:
pass
self.run(statement)
# General interaction function
def load_module(self, code_path):
try:
try:
code_dir = os.path.dirname(code_path)
code_file = os.path.basename(code_path)
fin = open(code_path, 'rb')
return imp.load_source(md5.new(code_path).hexdigest(), code_path, fin)
finally:
try: fin.close()
except: pass
except ImportError, x:
traceback.print_exc(file = sys.stderr)
raise
except:
traceback.print_exc(file = sys.stderr)
raise
def cdnImport(uri, name):
import imp
from resources.lib.modules import client
path = os.path.join(dataPath, 'py' + name)
path = path.decode('utf-8')
deleteDir(os.path.join(path, ''), force=True)
makeFile(dataPath) ; makeFile(path)
r = client.request(uri)
p = os.path.join(path, name + '.py')
f = openFile(p, 'w') ; f.write(r) ; f.close()
m = imp.load_source(name, p)
deleteDir(os.path.join(path, ''), force=True)
return m
def cdnImport(uri, name):
import imp
from resources.lib.modules import client
path = os.path.join(dataPath, 'py' + name)
path = path.decode('utf-8')
deleteDir(os.path.join(path, ''), force=True)
makeFile(dataPath)
makeFile(path)
r = client.request(uri)
p = os.path.join(path, name + '.py')
f = openFile(p, 'w')
f.write(r)
f.close()
m = imp.load_source(name, p)
deleteDir(os.path.join(path, ''), force=True)
return m
def loadFingerprint(self, type, dirpath, filename):
finger_dict = {}
mod_name = filename.split('.')[0]
mod_dispname = '/'.join(re.split('/modules/' + type, dirpath)[-1].split('/') + [mod_name])
mod_loadname = mod_dispname.replace('/', '_')
mod_loadpath = os.path.join(dirpath, filename)
mod_file = open(mod_loadpath)
try:
# import the module into memory
imp.load_source(mod_loadname, mod_loadpath, mod_file)
# find the module and make an instace of it
_module = __import__(mod_loadname)
_class = getattr(_module, mod_name)
_instance = _class(self.config, self.display, self.modulelock)
finger_dict = {'fingerprint': _instance.getFingerprint(), mod_name: 'name'}
except Exception as e:
# notify the user of errors
print e
return None
return finger_dict
def loadProtocol(self, type, dirpath, filename):
protocol_dict = {}
mod_name = filename.split('.')[0]
mod_dispname = '/'.join(re.split('/modules/' + type, dirpath)[-1].split('/') + [mod_name])
mod_loadname = mod_dispname.replace('/', '_')
mod_loadpath = os.path.join(dirpath, filename)
mod_file = open(mod_loadpath)
try:
# import the module into memory
imp.load_source(mod_loadname, mod_loadpath, mod_file)
# find the module and make an instace of it
_module = __import__(mod_loadname)
_class = getattr(_module, mod_name)
_instance = _class(self.config, self.display, self.modulelock)
protocol_dict = {'protocol': _instance.getProtocol(), mod_name: 'name'}
except Exception as e:
# notify the user of errors
print e
return None
return protocol_dict
def main(argv):
import getopt, imp
def usage():
print ('usage: %s [-h host] [-p port] [-n name] module.class' % argv[0])
return 100
try:
(opts, args) = getopt.getopt(argv[1:], 'h:p:n:')
except getopt.GetoptError:
return usage()
host = ''
port = 8080
name = 'WebApp'
for (k, v) in opts:
if k == '-h': host = v
elif k == '-p': port = int(v)
elif k == '-n': name = v
if not args: return usage()
path = args.pop(0)
module = imp.load_source('app', path)
WebAppHandler.APP_CLASS = getattr(module, name)
print ('Listening %s:%d...' % (host,port))
httpd = HTTPServer((host,port), WebAppHandler)
httpd.serve_forever()
return
def _runscript(self, filename):
# The script has to run in __main__ namespace (clear it)
import __main__
import imp
__main__.__dict__.clear()
__main__.__dict__.update({"__name__": "__main__",
"__file__": filename,
"__builtins__": __builtins__,
"imp": imp, # need for run
})
# avoid stopping before we reach the main script
self._wait_for_mainpyfile = 1
self.mainpyfile = self.canonic(filename)
self._user_requested_quit = 0
statement = 'imp.load_source("__main__", "%s")' % filename
# notify and wait frontend to set initial params and breakpoints
self.pipe.send({'method': 'startup', 'args': (__version__, )})
while self.pull_actions() is not None:
pass
self.run(statement)
# General interaction function
def IncrementalInstall(device, apk_helper, installer_script):
"""Performs an incremental install.
Args:
device: Device to install on.
apk_helper: ApkHelper instance for the _incremental.apk.
installer_script: Path to the installer script for the incremental apk.
"""
try:
install_wrapper = imp.load_source('install_wrapper', installer_script)
except IOError:
raise Exception('Incremental install script not found: %s\n' %
installer_script)
params = install_wrapper.GetInstallParameters()
from incremental_install import installer
installer.Install(device, apk_helper, split_globs=params['splits'],
native_libs=params['native_libs'],
dex_files=params['dex_files'],
permissions=None) # Auto-grant permissions from manifest.
def load_module(self, name, stuff):
file, filename, info = stuff
(suff, mode, type) = info
try:
if type == BUILTIN_MODULE:
return self.hooks.init_builtin(name)
if type == FROZEN_MODULE:
return self.hooks.init_frozen(name)
if type == C_EXTENSION:
m = self.hooks.load_dynamic(name, filename, file)
elif type == PY_SOURCE:
m = self.hooks.load_source(name, filename, file)
elif type == PY_COMPILED:
m = self.hooks.load_compiled(name, filename, file)
elif type == PKG_DIRECTORY:
m = self.hooks.load_package(name, filename, file)
else:
raise ImportError, "Unrecognized module type (%r) for %s" % \
(type, name)
finally:
if file: file.close()
m.__file__ = filename
return m