def load_text(self, context=None):
if self.text_file in bpy.data.texts:
if self.text_file == 'Text':
self.text_file = ''
return
text = text_remap(self.text_file)
try:
mod = importlib.import_module("svrx.nodes.script.{}".format(text))
importlib.reload(mod)
except Exception as err:
error.show(self, err, script=True)
else:
self.adjust_sockets()
self.color = READY_COLOR
self.use_custom_color = True
else:
pass # fail
python类reload()的实例源码
test_rosmsg_importlib_importmodule.py 文件源码
项目:rosimport
作者: pyros-dev
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_importlib_importmodule_relative_srv(self):
assert __package__
# Verify that files exists and are dynamically importable
test_srvs = importlib.import_module('.srv', package=__package__)
test_msgs = importlib.import_module('.msg', package=__package__)
self.assert_test_service_classes(test_srvs.TestSrv, test_srvs.TestSrvRequest, test_srvs.TestSrvResponse,
test_srvs.TestSrvDeps, test_srvs.TestSrvDepsRequest, test_srvs.TestSrvDepsResponse,
test_msgs.TestRosMsgDeps, test_msgs.TestRosMsg)
if hasattr(importlib, 'reload'): # recent version of importlib
# attempting to reload
importlib.reload(test_srvs)
else:
pass
assert test_srvs is not None
def test_importlib_importmodule_relative_msg(self):
assert __package__
# Verify that files exists and are dynamically importable
subtest_msgs = importlib.import_module('.msg', package=__package__)
test_msgs = importlib.import_module('test_rosimport.msg', package=__package__)
self.assert_test_message_classes(subtest_msgs.SubTestMsg, subtest_msgs.SubTestMsgDeps, test_msgs.TestRosMsgDeps, test_msgs.TestRosMsg)
if hasattr(importlib, 'reload'): # recent version of importlib
# attempting to reload
importlib.reload(test_msgs)
else:
pass
assert test_msgs is not None
def test_importlib_importmodule_relative_msg_from_absolute(self):
assert __package__
# Verify that files exists and are dynamically importable
subtest_msgs = importlib.import_module('test_rosimport.subtests.msg')
test_msgs = importlib.import_module('test_rosimport.msg')
self.assert_test_message_classes(subtest_msgs.SubTestMsg, subtest_msgs.SubTestMsgDeps, test_msgs.TestRosMsgDeps, test_msgs.TestRosMsg)
if hasattr(importlib, 'reload'): # recent version of importlib
# attempting to reload
importlib.reload(test_msgs)
else:
pass
assert test_msgs is not None
def test_importlib_importmodule_relative_srv(self):
assert __package__
# Verify that files exists and are dynamically importable
subtest_srvs = importlib.import_module('.srv', package=__package__)
test_msgs = importlib.import_module('test_rosimport.msg', package=__package__)
self.assert_test_service_classes(subtest_srvs.SubTestSrv, subtest_srvs.SubTestSrvRequest, subtest_srvs.SubTestSrvResponse,
subtest_srvs.SubTestSrvDeps, subtest_srvs.SubTestSrvDepsRequest, subtest_srvs.SubTestSrvDepsResponse,
test_msgs.TestRosMsgDeps, test_msgs.TestRosMsg)
if hasattr(importlib, 'reload'): # recent version of importlib
# attempting to reload
importlib.reload(subtest_srvs)
else:
pass
assert subtest_srvs is not None
def load_module(module_path):
"""(re) load an external module
Args:
module_path: string, the path to the module relative to the main script
Returns:
boolean, True if no Exception was raised on (re)load, otherwise False
"""
message = "search for plugin in sys.modules"
try:
if module_path in sys.modules:
message = "reload"
importlib.reload(sys.modules[module_path])
else:
message = "import"
importlib.import_module(module_path)
return True
except: # capture all Exceptions # pylint: disable=bare-except
logger.exception("load_module %s: %s", module_path, message)
return False
def reload_plugin(bot, module_path):
"""reload a plugin and keep track of multiple reloads
Note: the plugin may reset the sentinal on a successfull internal load
Args:
module_path: string, plugin path on disk relative to the main script
Returns:
boolean, False if the plugin may not be reloaded again, otherwise True
"""
repeat = SENTINALS.setdefault(module_path, 0)
if repeat >= 3:
logger.critical('too many reloads of %s, enter failstate', module_path)
return False
SENTINALS[module_path] += 1
await unload(bot, module_path)
await load(bot, module_path)
return True
def reload(ctx, *modules):
for mod in modules:
if mod == 'config':
with open('config.json', encoding='utf-8') as f:
ctx.bot.config = json.load(f)
else:
module_name = 'endrebot0.commands.{}'.format(mod)
module = sys.modules.get(module_name)
if module is not None:
if hasattr(module, 'listeners'):
del module.listeners
if hasattr(module, 'commands'):
del module.commands
importlib.reload(module)
ctx.bot.add_module(module)
if hasattr(module, 'listeners'):
for listener in module.listeners.get('on_ready', ()):
await listener(ctx.bot)
return 'Reloaded %d module(s)' % len(modules)
def load_configure_json(self, configure_file_path: str) -> None:
"""method for reading and applying json configuration.
:param configure_file_path: json configure file path
:return: None
"""
print(f"try load configure from json file ({configure_file_path})")
try:
with open(f"{configure_file_path}") as json_file:
json_data = json.load(json_file)
for configure_key, configure_value in json_data.items():
try:
configure_type, configure_value = self.__check_value_type(configure_key, configure_value)
self.__set_configure(configure_key, configure_type, configure_value)
except Exception as e:
# no configure value
print(f"this is not configure key({configure_key}): {e}")
except Exception as e:
exit(f"cannot open json file in ({configure_file_path}): {e}")
importlib.reload(loopchain.utils)
def reload(name: str):
""" Reload a plugin.
This must be called from an on_reload function or coroutine.
"""
if name in plugins:
# Remove all registered commands
if hasattr(plugins[name], "__commands"):
delattr(plugins[name], "__commands")
# Remove all registered events from the given plugin
for event_name, funcs in events.items():
for func in funcs:
if func.__module__.endswith(name):
events[event_name].remove(func)
plugins[name] = importlib.reload(plugins[name])
logging.debug("Reloaded plugin {}".format(name))
def page_payme(self):
global sitelocker_ver
global riddleme_mode
global hackable
hackable = False
sitelocker_ver = False
riddleme_mode = False
tkinter_ui.set_url('https://www.payme.net')
import data.sites.payme.payme_site as payme_site
importlib.reload(payme_site)
payme_site.web_site(tkinter_ui.web_page, tkinter_ui.go_from)
return
'''
hackable = [email_site.email_user, email_site.hack_in]
if not email_site.sitelocker == None:
sitelocker_ver = email_site.sitelocker
if not email_site.riddleme == None:
if not email_site.riddleme.lower() == 'false':
riddleme_mode = True'''
def page_easybits(self):
global sitelocker_ver
global riddleme_mode
global hackable
hackable = False
sitelocker_ver = False
riddleme_mode = False
tkinter_ui.set_url('https://www.easybits.com')
import data.sites.easybits.easybits_site as easybits_site
importlib.reload(easybits_site)
easybits_site.web_site(tkinter_ui.web_page, tkinter_ui.go_from)
return
'''
hackable = [email_site.email_user, email_site.hack_in]
if not email_site.sitelocker == None:
sitelocker_ver = email_site.sitelocker
if not email_site.riddleme == None:
if not email_site.riddleme.lower() == 'false':
riddleme_mode = True'''
def page_shmail(self):
global sitelocker_ver
global riddleme_mode
global hackable
hackable = False
sitelocker_ver = False
riddleme_mode = False
tkinter_ui.set_url('http://www.shhmail.net')
import data.sites.email.email_site as email_site
importlib.reload(email_site)
email_site.web_site(tkinter_ui.web_page, tkinter_ui.go_from)
hackable = [email_site.email_user, email_site.hack_in]
if email_site.sitelocker is not None:
sitelocker_ver = email_site.sitelocker
if email_site.riddleme is not None:
if not email_site.riddleme.lower() == 'false':
riddleme_mode = True
def _unload(self, modname, force = False):
bot = self._get(modname)
if not bot:
logger.warn('"%s" is not loaded', modname)
return False
if not bot.reload and not force:
logger.warn('"%s" cen not be reloaded', modname)
return False
try:
bot.finalize()
except Exception as err:
logger.error('Bot "%s" failed to finalize: %s', bot._name, err)
self.bots.remove(bot)
for t in bot.targets:
self.part_handler(t)
logger.info('Bot "%s" is unloaded', bot._name)
return True
def load_nodes_from(abs_dir):
print("loading project nodes and cells from {}".format(abs_dir))
dir_file_names = os.listdir(abs_dir)
py_file_names = [x for x in dir_file_names if x.endswith(".py")]
for fname in py_file_names:
mod_name = fname[:-3]
full_path = os.path.join(abs_dir, fname)
source = None
with open(full_path, "r") as f:
source = f.read()
if source:
bge_netlogic = _get_this_module()
locals = {
"bge_netlogic": _get_this_module(),
"__name__": mod_name,
"bpy": bpy}
globals = locals
print("loading... {}".format(mod_name))
exec(source, locals, globals)
#TODO: reload source to refresh intermediate compilation?
pass
def test_11_run_migration_bad_command():
"""Verify that run_migration detects a bad command"""
try:
os.unlink(TEST_DB_FILE)
except:
pass
rc = pydbvolve.run_migration(TEST_CONFIG_FILE, 'BAD_COMMAND_STRING', 'r1.0.0', True, False)
assert(rc != 0)
# test second path (which should not be hit, but you never know...)
pydbvolve.VALID_ACTIONS.add('BAD_COMMAND_STRING')
rc = pydbvolve.run_migration(TEST_CONFIG_FILE, 'BAD_COMMAND_STRING', 'r1.0.0', True, False)
assert(rc != 0)
importlib.reload(pydbvolve)
os.unlink(TEST_DB_FILE)
# End test_11_run_migration_bad_command
def test_12_run_migration_bad_config():
"""Verify that run_migration detects a bad config"""
def bad_init_1(*args, **kwargs):
return None
def bad_init_2(*args, **kwargs):
return {'a': 1}
rc = pydbvolve.run_migration(TEST_CONFIG_FILE + 'bad', 'BAD_COMMAND_STRING', 'r1.0.0', True, False)
assert(rc != 0)
pydbvolve.initialize = bad_init_1
rc = pydbvolve.run_migration(TEST_CONFIG_FILE, 'BAD_COMMAND_STRING', 'r1.0.0', True, False)
assert(rc != 0)
pydbvolve.initialize = bad_init_2
rc = pydbvolve.run_migration(TEST_CONFIG_FILE, 'BAD_COMMAND_STRING', 'r1.0.0', True, False)
assert(rc != 0)
importlib.reload(pydbvolve)
# End test_12_run_migration_bad_config
def test_15_pre_exec_trap_exception():
def raise_it(*args, **kwargs):
raise Exception("Something")
try:
os.unlink(TEST_DB_FILE)
except:
pass
pydbvolve.pre_execution = raise_it
rc = pydbvolve.run_migration(TEST_CONFIG_FILE, 'baseline', 'r1.0.0', True, False)
assert(rc != 0)
rc = pydbvolve.run_migration(TEST_CONFIG_FILE, 'baseline', 'r1.0.0', True, True)
assert(rc != 0)
importlib.reload(pydbvolve)
os.unlink(TEST_DB_FILE)
# End test_15_pre_exec_trap_exception
def patch_reload():
try:
import __builtin__ as builtins
except ImportError:
import builtins
if hasattr(builtins, "reload"):
sys.builtin_orig_reload = builtins.reload
builtins.reload = patched_reload(sys.builtin_orig_reload) # @UndefinedVariable
try:
import imp
sys.imp_orig_reload = imp.reload
imp.reload = patched_reload(sys.imp_orig_reload) # @UndefinedVariable
except:
pass
else:
try:
import importlib
sys.importlib_orig_reload = importlib.reload # @UndefinedVariable
importlib.reload = patched_reload(sys.importlib_orig_reload) # @UndefinedVariable
except:
pass
del builtins
def cancel_patches_in_sys_module():
sys.exc_info = sys.system_exc_info # @UndefinedVariable
try:
import __builtin__ as builtins
except ImportError:
import builtins
if hasattr(sys, "builtin_orig_reload"):
builtins.reload = sys.builtin_orig_reload
if hasattr(sys, "imp_orig_reload"):
import imp
imp.reload = sys.imp_orig_reload
if hasattr(sys, "importlib_orig_reload"):
import importlib
importlib.reload = sys.importlib_orig_reload
del builtins
def load_work_map(year):
"""Load all work from a given year file
It generates tuples with variable name and Work object
Doctest:
.. doctest::
>>> reload()
>>> sorted([(work.year, key) for key, work in load_work_map(2015)])
[(2014, 'murta2014a'), (2015, 'pimentel2015a')]
(2014, 'murta2014a') appears because it has an alias in 2015
"""
module = "y{}.py".format(year) if isinstance(year, int) else year
if module not in WORK_CACHE:
module = "y9999.py"
worklist = WORK_CACHE[module]
for key, work in worklist.__dict__.items():
if isinstance(work, worklist.Work):
yield key, work
def work_by_varname(varname, year=None):
"""Load work by varname
Doctest:
.. doctest::
>>> reload()
>>> work = work_by_varname('murta2014a')
>>> work.year
2014
"""
if year is None:
year = int(parse_varname(varname, 2) or -1)
module = "y{}.py".format(year) if isinstance(year, int) else year
if module not in WORK_CACHE:
return
worklist = WORK_CACHE[module]
return getattr(worklist, varname, None)
def load_work_map_all_years():
"""Load all work from all years
Doctest:
.. doctest::
>>> reload()
>>> sorted([(work.year, key) for key, work in load_work_map_all_years()])
[(2008, 'freire2008a'), (2014, 'murta2014a'), (2014, 'murta2014a'), (2015, 'pimentel2015a')]
(2014, 'murta2014a') appears twice because it has an alias in 2015
"""
years = reversed(sorted(WORK_CACHE.keys()))
for year in years:
yield from load_work_map(year)
def import_submodules(package, recursive=True):
"""Import all submodules of a module, recursively, including subpackages
Arguments:
* `package` -- package (name or actual module)
Keyword arguments:
* `recursive` -- import modules recursively
"""
if package is None:
return {}
if isinstance(package, str):
package = importlib.import_module(package)
importlib.reload(package)
results = {}
for loader, name, is_pkg in pkgutil.walk_packages(package.__path__):
full_name = package.__name__ + '.' + name
results[full_name] = importlib.import_module(full_name)
importlib.reload(results[full_name])
if recursive and is_pkg:
results.update(import_submodules(full_name))
return results
def process_unimported(self):
msg = 'Brak modu?u pip!\nNie mo?na zainstalowa?:\n%s'
msg_dec = 'Wymagane modu?y:\n{}\nChcesz zainstalowa? (T/n)? '
self.to_import = list(self.unimported)
modules_text = '\n'.join([' {} - {}'.format(ins, ver) for _, ins, ver in self.unimported])
if self.try_import('pip', add=False):
msg_dec = msg_dec.format(modules_text)
decision = Decision(msg_dec, {'T': {self.install_unimported: (self.unimported,)},
'n': sys.exit})
decision.run()
if all(self.import_all(reload=True)):
logging.info('...ponowne uruchamianie')
os.execv(sys.executable, ['python'] + sys.argv)
else:
logging.critical('Nie uda?o si? zainstalowa? modu?ów.')
raise SystemExit
else:
logging.critical(msg, modules_text)
raise SystemExit
def loadConfig(self, filename, restart=False):
""" Load configuration from file.
@param str filename: path of file to be loaded
"""
maindir = self.getMainDir()
configdir = os.path.join(maindir, 'config')
loadFile = os.path.join(configdir, 'load.cfg')
if filename.startswith(configdir):
filename = re.sub(
'^' + re.escape('/'),
'',
re.sub(
'^' + re.escape(configdir),
'',
filename)
)
loadData = {'configfile': filename}
config.save(loadFile, loadData)
logger.info('Set loaded configuration to {0}'.format(filename))
if restart:
logger.info('Restarting Qudi after configuration reload.')
self.restart()
def load_entry_points():
importlib.reload(pkg_resources)
for ep in pkg_resources.iter_entry_points(group=defaults.PACKAGES_ENTRY_POINT):
logging.logger.info('loading entry_point: '+str(ep))
try:
f = ep.load()
if asyncio.iscoroutinefunction(f):
await f()
elif isinstance(f, types.FunctionType):
f()
except (ModuleNotFoundError,SyntaxError):
logging.logger.error('Error loading package entry point.')
ex_info=traceback.format_exc().splitlines()
for line in ex_info:
logging.logger.error(line)
return False
else:
logging.logger.info('entry_point loaded successfully: '+str(ep))
return True
def load_attribute(attribute):
''' Loads or reloads the given attribute. For example:
1) load_attribute('module.submodule.class_name')
will return the type class_name (but not instantiate it)
from the module 'module.submodule'
2) load_attribute('module.submodule.func_name')
will return the function from that same module. '''
attribute_name = attribute.split('.')[-1]
pymodule = '.'.join(attribute.split('.')[:-1])
if pymodule in sys.modules:
pymodule_instance = importlib.reload(sys.modules[pymodule])
else:
pymodule_instance = importlib.import_module(pymodule)
# Get the attribute and return it
return getattr(pymodule_instance, attribute_name)
def set_backend(backend_name):
"""Sets the backend for TensorLy
The backend will be set as specified and operations will used that backend
Parameters
----------
backend_name : {'mxnet', 'numpy', 'pytorch'}, default is 'numpy'
"""
global _BACKEND
_BACKEND = backend_name
# reloads tensorly.backend
importlib.reload(backend)
# reload from .backend import * (e.g. tensorly.tensor)
globals().update(
{fun: getattr(backend, fun) for n in backend.__all__} if hasattr(backend, '__all__')
else
{k: v for (k, v) in backend.__dict__.items() if not k.startswith('_')
})
def test_python_importlib_reload():
"""Test the python module reload function.
"""
import statestream.visualization.visualization
reloaded = False
try:
importlib.reload(statestream.visualization.visualization)
reloaded = True
except:
try:
reload(statestream.visualization.visualization)
reloaded = True
except:
pass
assert(reloaded), "Error: Unable to reload modules."