def _tryconvertpyarg(self, x):
"""Convert a dotted module name to path.
"""
import pkgutil
try:
loader = pkgutil.find_loader(x)
except ImportError:
return x
if loader is None:
return x
# This method is sometimes invoked when AssertionRewritingHook, which
# does not define a get_filename method, is already in place:
try:
path = loader.get_filename(x)
except AttributeError:
# Retrieve path from AssertionRewritingHook:
path = loader.modules[x][0].co_filename
if loader.is_package(x):
path = os.path.dirname(path)
return path
python类find_loader()的实例源码
def check_antipackage():
from sys import version_info
sys_version = version_info[:2]
found = True
if sys_version < (3, 0):
# 'python 2'
from pkgutil import find_loader
found = find_loader('antipackage') is not None
elif sys_version <= (3, 3):
# 'python <= 3.3'
from importlib import find_loader
found = find_loader('antipackage') is not None
else:
# 'python >= 3.4'
from importlib import util
found = util.find_spec('antipackage') is not None
if not found:
print('Install missing package "antipackage"')
print('Example: pip install git+https://github.com/ellisonbg/antipackage.git#egg=antipackage')
from sys import exit
exit(1)
def check_antipackage():
from sys import version_info
sys_version = version_info[:2]
found = True
if sys_version < (3, 0):
# 'python 2'
from pkgutil import find_loader
found = find_loader('antipackage') is not None
elif sys_version <= (3, 3):
# 'python <= 3.3'
from importlib import find_loader
found = find_loader('antipackage') is not None
else:
# 'python >= 3.4'
from importlib import util
found = util.find_spec('antipackage') is not None
if not found:
print('Install missing package "antipackage"')
print('Example: pip install git+https://github.com/ellisonbg/antipackage.git#egg=antipackage')
from sys import exit
exit(1)
def is_package(module_name):
"""
Check if a Python module is really a module or is a package containing
other modules.
:param module_name: Module name to check.
:return: True if module is a package else otherwise.
"""
# This way determines if module is a package without importing the module.
try:
loader = pkgutil.find_loader(module_name)
except Exception:
# When it fails to find a module loader then it points probably to a class
# or function and module is not a package. Just return False.
return False
else:
if loader:
# A package must have a __path__ attribute.
return loader.is_package(module_name)
else:
# In case of None - modules is probably not a package.
return False
def _tryconvertpyarg(self, x):
"""Convert a dotted module name to path.
"""
import pkgutil
try:
loader = pkgutil.find_loader(x)
except ImportError:
return x
if loader is None:
return x
# This method is sometimes invoked when AssertionRewritingHook, which
# does not define a get_filename method, is already in place:
try:
path = loader.get_filename(x)
except AttributeError:
# Retrieve path from AssertionRewritingHook:
path = loader.modules[x][0].co_filename
if loader.is_package(x):
path = os.path.dirname(path)
return path
def check_antipackage():
from sys import version_info
sys_version = version_info[:2]
found = True
if sys_version < (3, 0):
# 'python 2'
from pkgutil import find_loader
found = find_loader('antipackage') is not None
elif sys_version <= (3, 3):
# 'python <= 3.3'
from importlib import find_loader
found = find_loader('antipackage') is not None
else:
# 'python >= 3.4'
from importlib import util
found = util.find_spec('antipackage') is not None
if not found:
print('Install missing package "antipackage"')
print('Example: pip install git+https://github.com/ellisonbg/antipackage.git#egg=antipackage')
from sys import exit
exit(1)
def _tryconvertpyarg(self, x):
"""Convert a dotted module name to path.
"""
import pkgutil
try:
loader = pkgutil.find_loader(x)
except ImportError:
return x
if loader is None:
return x
# This method is sometimes invoked when AssertionRewritingHook, which
# does not define a get_filename method, is already in place:
try:
path = loader.get_filename(x)
except AttributeError:
# Retrieve path from AssertionRewritingHook:
path = loader.modules[x][0].co_filename
if loader.is_package(x):
path = os.path.dirname(path)
return path
def _tryconvertpyarg(self, x):
"""Convert a dotted module name to path.
"""
import pkgutil
try:
loader = pkgutil.find_loader(x)
except ImportError:
return x
if loader is None:
return x
# This method is sometimes invoked when AssertionRewritingHook, which
# does not define a get_filename method, is already in place:
try:
path = loader.get_filename(x)
except AttributeError:
# Retrieve path from AssertionRewritingHook:
path = loader.modules[x][0].co_filename
if loader.is_package(x):
path = os.path.dirname(path)
return path
def get_setup_util(ta_name, splunk_uri, session_key, logger):
lib_dir = re.sub("[^\w]+", "_", ta_name.lower())
loader = find_loader(lib_dir + "_setup_util")
if not loader:
logger.debug('module="%s" doesn\'t exists, no global setting available',
lib_dir + "_setup_util")
return None
try:
setup_util_module = import_module(lib_dir + "_setup_util")
except ImportError:
logger.error('Did not import module: "%s", reason="%s"',
lib_dir + "_setup_util", traceback.format_exc())
return None
return setup_util_module.Setup_Util(splunk_uri, session_key,
logger)
def is_package(module_name):
"""
Check if a Python module is really a module or is a package containing
other modules.
:param module_name: Module name to check.
:return: True if module is a package else otherwise.
"""
# This way determines if module is a package without importing the module.
try:
loader = pkgutil.find_loader(module_name)
except Exception:
# When it fails to find a module loader then it points probably to a class
# or function and module is not a package. Just return False.
return False
else:
if loader:
# A package must have a __path__ attribute.
return loader.is_package(module_name)
else:
# In case of None - modules is probably not a package.
return False
def main():
if len(sys.argv) >= 3:
where_to_run = sys.argv[2]
if where_to_run != "standalone":
if pkgutil.find_loader('lockfile') is None:
print "lockfile module not found. This python module is needed to run injection experiments in parallel."
sys.exit(-1)
sorted_apps = [app for app, value in sorted(sp.apps.items(), key=lambda e: e[1][2])] # sort apps according to expected runtimes
for app in sorted_apps:
print app
if not os.path.isdir(sp.app_log_dir[app]): os.system("mkdir -p " + sp.app_log_dir[app]) # create directory to store summary
if len(sys.argv) == 4:
if sys.argv[3] == "clean":
clear_results_file(app) # clean log files only if asked for
run_multiple_injections(app, sys.argv[1], where_to_run)
else:
print_usage()
def _get_module_via_pkgutil(self, fullname):
"""Attempt to fetch source code via pkgutil. In an ideal world, this
would be the only required implementation of get_module()."""
loader = pkgutil.find_loader(fullname)
LOG.debug('pkgutil._get_module_via_pkgutil(%r) -> %r', fullname, loader)
if not loader:
return
try:
path = self._py_filename(loader.get_filename(fullname))
source = loader.get_source(fullname)
if path is not None and source is not None:
return path, source, loader.is_package(fullname)
except AttributeError:
return
def test_request_call(self, request):
"""Test request call kwargs for auth."""
patcher = None
if pkgutil.find_loader('eureq'):
patcher = patch('eureq.request', check_request_auth_kwargs)
patcher.start()
request.side_effect = check_request_auth_kwargs
pygenie.utils.call('http://localhost',
auth_handler=pygenie.auth.AuthHandler(conf=self.conf))
if patcher:
patcher.stop()
def has_module(name):
"""
Check if the Python module 'name' is available.
Parameters
----------
name : str
The name of the Python module, as used in an import instruction.
This function uses ideas from this Stack Overflow question:
http://stackoverflow.com/questions/14050281/
Returns
-------
b : bool
True if the module exists, or False otherwise.
"""
if sys.version_info > (3, 3):
import importlib.util
return importlib.util.find_spec(name) is not None
elif sys.version_info > (2, 7, 99):
import importlib
return importlib.find_loader(name) is not None
else:
import pkgutil
return pkgutil.find_loader(name) is not None
def is_importable_path(path, with_ext = None):
"""
Does ``path`` point to a module which can be imported?
:param path: a python import path
"""
try:
module_spec = _find_module(path)
except (ImportError, AttributeError):
module_spec = None
if module_spec is None:
return False
if with_ext is not None:
try:
# py3
filepath = module_spec.origin
except AttributeError:
try:
# py2
filepath = module_spec.get_filename()
except AttributeError:
# py2, but with polyloader
filepath = module_spec.path
if not filepath.endswith('.' + with_ext):
return False
return True
def _import_qgis():
if pkgutil.find_loader("qgis.core") is not None:
return importlib.import_module("qgis.core")
return None
def get_module_file_attribute(package):
"""
Get the absolute path of the module with the passed name.
Since modules *cannot* be directly imported during analysis, this function
spawns a subprocess importing this module and returning the value of this
module's `__file__` attribute.
Parameters
----------
package : str
Fully-qualified name of this module.
Returns
----------
str
Absolute path of this module.
"""
# First try to use 'pkgutil'. - fastest but doesn't work on
# certain modules in pywin32, which replace all module attributes
# with those of the .dll
try:
loader = pkgutil.find_loader(package)
attr = loader.get_filename(package)
# Second try to import module in a subprocess. Might raise ImportError.
except (AttributeError, ImportError):
# Statement to return __file__ attribute of a package.
__file__statement = """
import %s as p
print(p.__file__)
"""
attr = exec_statement(__file__statement % package)
if not attr.strip():
raise ImportError
return attr
def install_package(_package_name, _arguments=None):
"""
Install the packages listed if not present
:param _package_name: The package to install
:param _argument: An optional argument
:return:
"""
_installed = []
import pip
_exists = None
if minor < 3:
import pkgutil
_exists = pkgutil.find_loader(_package_name)
elif minor == 3:
import importlib
_exists = importlib.find_loader(_package_name)
else:
import importlib
_exists = importlib.util.find_spec(_package_name)
if _exists is None:
print(_package_name + " not installed, installing...")
if _arguments is None:
pip.main(['install', _package_name])
else:
pip.main(['install', _package_name] + _arguments)
print(_package_name + " installed...")
return True
else:
print(_package_name + " already installed, skipping...")
return False
def test_find_loader_missing_module(self):
name = 'totally bogus'
loader = pkgutil.find_loader(name)
self.assertIsNone(loader)
def test_find_loader_avoids_emulation(self):
with check_warnings() as w:
self.assertIsNotNone(pkgutil.find_loader("sys"))
self.assertIsNotNone(pkgutil.find_loader("os"))
self.assertIsNotNone(pkgutil.find_loader("test.support"))
self.assertEqual(len(w.warnings), 0)
def test_find_loader_missing_module(self):
name = 'totally bogus'
loader = pkgutil.find_loader(name)
self.assertIsNone(loader)
def test_find_loader_avoids_emulation(self):
with check_warnings() as w:
self.assertIsNotNone(pkgutil.find_loader("sys"))
self.assertIsNotNone(pkgutil.find_loader("os"))
self.assertIsNotNone(pkgutil.find_loader("test.support"))
self.assertEqual(len(w.warnings), 0)
def get_module_file_attribute(package):
"""
Get the absolute path of the module with the passed name.
Since modules *cannot* be directly imported during analysis, this function
spawns a subprocess importing this module and returning the value of this
module's `__file__` attribute.
Parameters
----------
package : str
Fully-qualified name of this module.
Returns
----------
str
Absolute path of this module.
"""
# First try to use 'pkgutil'. - fastest but doesn't work on
# certain modules in pywin32, which replace all module attributes
# with those of the .dll
try:
loader = pkgutil.find_loader(package)
attr = loader.get_filename(package)
# Second try to import module in a subprocess. Might raise ImportError.
except (AttributeError, ImportError):
# Statement to return __file__ attribute of a package.
__file__statement = """
import %s as p
print(p.__file__)
"""
attr = exec_statement(__file__statement % package)
if not attr.strip():
raise ImportError
return attr
def is_maasserver_available():
"""Ensure that 'maasserver' module is available."""
return pkgutil.find_loader("maasserver") is not None
def main():
if len(sys.argv) != 2:
print_usage()
inj_type = sys.argv[1] # inst_value or inst_address or rf
parse_results_apps(inj_type) # parse sassifi results into local data structures
# populate instruction fractions
if inj_type == "inst_value" or inj_type == "inst_address":
populate_inst_fraction()
if pkgutil.find_loader('xlsxwriter') is not None:
import xlsxwriter
workbook_name = sp.logs_base_dir + "results/results_" + inj_type + "_" + str(sp.NUM_INJECTIONS) + ".xlsx"
os.system("rm -f " + workbook_name)
global workbook
workbook = xlsxwriter.Workbook(workbook_name)
if inj_type == "inst_value" or inj_type == "inst_address":
print_inst_fractions_worksheet()
print_detailed_sassifi_results_worksheet(sys.argv[1])
print_detailed_sassifi_kernel_results_worksheet(sys.argv[1])
print_stats_worksheet(sys.argv[1])
workbook.close()
print "Results: %s" %workbook_name
else:
global fname_prefix
fname_prefix = sp.logs_base_dir + "results/results_" + inj_type + "_" + str(sp.NUM_INJECTIONS) + "_"
if inj_type == "inst_value" or inj_type == "inst_address":
print_inst_fractions_txt()
print_detailed_sassifi_results_txt(sys.argv[1])
print_stats_txt(sys.argv[1])
print "Results: %s" %(sp.logs_base_dir + "results/")
def record_result(inj_mode, igid, bfm, app, kname, kcount, iid, opid, bid, cat, pc, inst_type, tid, injBID, runtime, dmesg, value_str):
res_fname = sp.app_log_dir[app] + "/results-mode" + inj_mode + "-igid" + str(igid) + ".bfm" + str(bfm) + "." + str(sp.NUM_INJECTIONS) + ".txt"
result_str = kname + "-" + kcount + "-" + iid + "-" + opid
result_str += "-" + bid + ":" + str(pc) + ":" + str(inst_type) + ":" + str(tid)
result_str += ":" + str(injBID) + ":" + str(runtime) + ":" + str(cat) + ":" + dmesg
result_str += ":" + value_str + "\n"
if cp.verbose:
print result_str
has_filelock = False
if pkgutil.find_loader('lockfile') is not None:
from lockfile import FileLock
has_filelock = True
if has_filelock:
lock = FileLock(res_fname)
lock.acquire() #acquire lock
rf = open(res_fname, "a")
rf.write(result_str)
rf.close()
if has_filelock:
lock.release() # release lock
# Record the outputs if
if cat == cp.OUT_DIFF or cat == cp.STDOUT_ONLY_DIFF or cat == cp.APP_SPECIFIC_CHECK_FAIL:
if not os.path.isdir(sp.app_log_dir[app] + "/sdcs"): os.system("mkdir -p " + sp.app_log_dir[app] + "/sdcs") # create directory to store sdcs
full_sdc_dir = sp.app_log_dir[app] + "/sdcs/sdc-" + app + "-igid" + igid + "-bfm" + bfm + "-" + kname + "-" + kcount + "-" + iid + "-" + opid + "-" + bid
os.system("mkdir -p " + full_sdc_dir) # create directory to store sdc
map((lambda x: shutil.copy(x, full_sdc_dir)), [stdout_fname, stderr_fname, injection_seeds_file, new_directory + "/" + sp.output_diff_log]) # copy stdout, stderr injection seeds, output diff
shutil.make_archive(full_sdc_dir, 'gztar', full_sdc_dir) # archieve the outputs
shutil.rmtree(full_sdc_dir, True) # remove the directory
###############################################################################
# Create params file. The contents of this file will be read by the injection run.
###############################################################################
def get_module(backend_type):
"""
:param backend_type: String
:return: module lib.ibm_<backend_type>_client by given backend_type
:raise: IBMDriverNoClientModuleFound if module not found
"""
module_name = 'ibm_storage_flocker_driver.lib.ibm_{}_client'.format(
str(backend_type).lower())
pkgutil_import_module = pkgutil.find_loader(module_name)
if not pkgutil_import_module:
raise IBMDriverNoClientModuleFound(module_name, backend_type)
import_module_object = pkgutil_import_module.load_module(
module_name)
return import_module_object
def custom_import(*names):
""" Imports SeisFlows module and extracts class of same name. For example,
custom_import('workflow', 'inversion')
imports 'seisflows.workflow.inversion' and, from this module, extracts
class 'inversion'.
"""
# parse input arguments
if len(names) == 0:
raise Exception(ImportError1)
if names[0] not in SeisflowsObjects.names:
raise Exception(ImportError2)
if len(names) == 1:
names += (_val(names[0]),)
if not names[1]:
return Null
# generate package list
packages = ['seisflows']
if os.getenv('SEISFLOWS_PACKAGES'):
for package in os.getenv('SEISFLOWS_PACKAGES').split(','):
if package in packages:
continue
if find_loader(package):
packages += [package]
# does module exist?
_exists = False
for package in packages:
full_dotted_name = package+'.'+names[0]+'.'+names[1]
if find_loader(full_dotted_name):
_exists = True
break
if not _exists:
raise Exception(ImportError3 %
(names[0], names[1], names[0].upper()))
# import module
module = import_module(full_dotted_name)
# extract class
if hasattr(module, names[1]):
return getattr(module, names[1])
else:
raise Exception(ImportError4 %
(names[0], names[1], names[1]))
# utility functions