def run_tests_if_main(show_coverage=False):
""" Run tests in a given file if it is run as a script
Coverage is reported for running this single test. Set show_coverage to
launch the report in the web browser.
"""
local_vars = inspect.currentframe().f_back.f_locals
if not local_vars.get('__name__', '') == '__main__':
return
# we are in a "__main__"
os.chdir(ROOT_DIR)
fname = str(local_vars['__file__'])
_clear_imageio()
_enable_faulthandler()
pytest.main('-v -x --color=yes --cov imageio '
'--cov-config .coveragerc --cov-report html %s' % repr(fname))
if show_coverage:
import webbrowser
fname = os.path.join(ROOT_DIR, 'htmlcov', 'index.html')
webbrowser.open_new_tab(fname)
python类currentframe()的实例源码
def __ImportFrom(module_xname, xname):
"""
Import a specified name from a python module into the global namespace. This can be accomplished
programatically and inside a method or class.
@param module_xname : name of the module to load x.y.z
@param xname : symbol to import from loaded module
@return method/class : imported item from loaded module
"""
try:
module = __import__(module_xname,globals(),locals(), [xname])
except ImportError:
return None
try:
module = getattr(module,xname)
if xname:
for x in range( len(inspect.stack()) ):
inspect.currentframe(x).f_globals[xname]=module
except AttributeError as e:
module=None
return module
def produce(self, topic, doc, payload):
"""
Produce a new event.
:param topic: The topic of the produced event.
:param doc: The document to which the event belongs.
:param payload: The file pointer beloning to the document.
:type topic: ``str``
:type doc: ``gransk.core.Document``
:type payload: ``file``
"""
caller = inspect.currentframe().f_back.f_locals['self'].__module__
listeners = self.listeners.get(topic, [])
filename = os.path.basename(doc.path)
for listener, callback in listeners:
LOGGER.debug('[%s] %s -> %s (%s)', topic, caller, listener, filename)
callback(doc, payload)
if len(listeners) == 0:
LOGGER.debug('[%s] %s -> no listeners (%s)', topic, caller, filename)
def _find_frame_by_self(clz):
"""
Find the first frame on stack in which there is local variable 'self' of
type clz.
"""
frame = inspect.currentframe()
while frame:
self = frame.f_locals.get('self')
if isinstance(self, clz):
return frame
frame = frame.f_back
return None
def step_listener(self, connection, event, fd):
#print('%s(%s, %s)' %
# (inspect.currentframe().f_code.co_name,connection,event_str(event)))
if event & ERRMASK:
raise Exception('INTERNAL ERROR. LISTENER NEVER DIES')
elif event & INMASK:
new = self.accept(connection)
if not new:
# happens if peer hangs up during accept or the control is
# rejecting new connections
return
self.pollable(new.fileno(), new, OUTMASK)
self.accepting.append(new)
# ignore all events for the same file descriptor in the current step
# of the main loop. the OS may reuse descriptors aggressively and so
# the events list may include POLLNVAL for the same descriptor. we
# don't need to handle such a POLLNVAL event because that connection
# is replaced (and GCed) by the call to self.pollable() above.
self.unpend.append(new.fileno())
def step_accepting(self, connection, event, fd):
#print('%s(%s, %s)' %
# (inspect.currentframe().f_code.co_name,connection,event_str(event)))
if event & ERRMASK:
#print('%s %d close accepting %d %d %s' % (
# self.proc_name,os.getpid(),fd,connection.port,event_str(event)))
self.accepting.remove(connection)
connection.close()
self.unpollable(fd)
elif event & OUTMASK:
self.accepting.remove(connection)
salt = make_salt()
try:
connection.put(CHALLENGE + salt)
self.authenticating[connection] = salt
self.pollable(fd, connection, INMASK)
except ConnectionClosed:
#print('%s %d peer closed accepting %d %d OUT' % (
# self.proc_name, os.getpid(), fd, connection.port))
connection.close()
self.unpollable(fd)
def step_listener(self, connection, event, fd):
#print('%s(%s, %s)' %
# (inspect.currentframe().f_code.co_name,connection,event_str(event)))
if event & ERRMASK:
raise Exception('INTERNAL ERROR. LISTENER NEVER DIES')
elif event & INMASK:
new = self.accept(connection)
if not new:
# happens if peer hangs up during accept or the control is
# rejecting new connections
return
self.pollable(new.fileno(), new, OUTMASK)
self.accepting.append(new)
# ignore all events for the same file descriptor in the current step
# of the main loop. the OS may reuse descriptors aggressively and so
# the events list may include POLLNVAL for the same descriptor. we
# don't need to handle such a POLLNVAL event because that connection
# is replaced (and GCed) by the call to self.pollable() above.
self.unpend.append(new.fileno())
def step_accepting(self, connection, event, fd):
#print('%s(%s, %s)' %
# (inspect.currentframe().f_code.co_name,connection,event_str(event)))
if event & ERRMASK:
#print('%s %d close accepting %d %d %s' % (
# self.proc_name,os.getpid(),fd,connection.port,event_str(event)))
self.accepting.remove(connection)
connection.close()
self.unpollable(fd)
elif event & OUTMASK:
self.accepting.remove(connection)
salt = make_salt()
try:
connection.put(CHALLENGE + salt)
self.authenticating[connection] = salt
self.pollable(fd, connection, INMASK)
except ConnectionClosed:
#print('%s %d peer closed accepting %d %d OUT' % (
# self.proc_name, os.getpid(), fd, connection.port))
connection.close()
self.unpollable(fd)
def __init__(self, *msg):
message = ""
for arg in msg:
message += str(arg)
try:
ln = sys.exc_info()[-1].tb_lineno
file = traceback.extract_tb(sys.exc_info()[-1])
if isinstance(file, list):
file = file[0].filename
except AttributeError:
ln = inspect.currentframe().f_back.f_lineno
file = inspect.getframeinfo(inspect.currentframe().f_back).filename
if file is not None:
file = re.compile("[\\\/]+").split(file)
file = file[-1]
self.args = "Error ({3}:{1}): {2}".format(type(self), ln, message, file),
sys.exit(self)
# def __init__(self, *args, **kwargs):
# RuntimeError.__init__(self, args, kwargs)
def guess_app_stacklevel(start=1):
"""
try to guess stacklevel for application warning.
looks for first frame not part of passlib.
"""
frame = inspect.currentframe()
count = -start
try:
while frame:
name = frame.f_globals.get('__name__', "")
if name.startswith("passlib.tests.") or not name.startswith("passlib."):
return max(1, count)
count += 1
frame = frame.f_back
return start
finally:
del frame
def configure_parser(parser):
parser.description = "Run a migration script. These can only be run forward, not in reverse."
# Inspect the local directory for which migration modules the user can run.
local_files = os.listdir(
os.path.dirname(
inspect.getfile(
inspect.currentframe()
)))
migration_files = [f for f in local_files if re.match('^\d{4}.*\.py$', f)]
migration_names = [m.rstrip('.py') for m in migration_files]
parser.add_argument(
'migration_name',
choices=migration_names,
help="The name of the migration script you want to run."
)
def new_sslwrap(
sock, server_side=False, keyfile=None, certfile=None,
cert_reqs=__ssl__.CERT_NONE, ssl_version=__ssl__.PROTOCOL_SSLv23,
ca_certs=None, ciphers=None
):
context = __ssl__.SSLContext(ssl_version)
context.verify_mode = cert_reqs or __ssl__.CERT_NONE
if ca_certs:
context.load_verify_locations(ca_certs)
if certfile:
context.load_cert_chain(certfile, keyfile)
if ciphers:
context.set_ciphers(ciphers)
caller_self = inspect.currentframe().f_back.f_locals['self']
return context._wrap_socket(sock, server_side=server_side, ssl_sock=caller_self)
# Re-add sslwrap to Python 2.7.9+
def get_beard_config(config_file="../../config.yml"):
"""Attempts to load a yaml file in the beard directory.
NOTE: The file location should be relative from where this function is
called.
"""
# Sometimes external libraries change the logging level. This is not
# acceptable, so we assert after importing a beard that it has not changed.
logger_level_before = logger.getEffectiveLevel()
logging.debug("logging level: {}".format(logger.getEffectiveLevel()))
callers_frame = inspect.currentframe().f_back
logger.debug("This function was called from the file: " +
callers_frame.f_code.co_filename)
base_path = os.path.dirname(callers_frame.f_code.co_filename)
config = yaml.safe_load(open(os.path.join(base_path, config_file)))
logging.debug("logging level: {}".format(logger.getEffectiveLevel()))
logger_level_after = logger.getEffectiveLevel()
assert logger_level_before == logger_level_after, \
"Something has changed the logger level!"
return config
def get_current_request():
"""Return the current request by heuristically looking it up from stack
"""
frame = inspect.currentframe()
while frame is not None:
request = getattr(frame.f_locals.get('self'), 'request', None)
if request is not None:
return request
elif isinstance(frame.f_locals.get('self'), RequestHandler):
return frame.f_locals['request']
# if isinstance(frame.f_locals.get('self'), RequestHandler):
# return frame.f_locals.get('self').request
# elif IView.providedBy(frame.f_locals.get('self')):
# return frame.f_locals['request']
frame = frame.f_back
raise RequestNotFound(RequestNotFound.__doc__)
def __init__(self, **kwargs):
# TODO This is a good first cut for debugging info, but it would be nice to
# TODO be able to reliably walk the stack back to user code rather than just
# TODO back past this constructor
super(DebugInfo, self).__init__(**kwargs)
frame = None
try:
frame = inspect.currentframe()
while frame.f_locals.get('self', None) is self:
frame = frame.f_back
while frame:
filename, lineno, function, code_context, index = inspect.getframeinfo(
frame)
if -1 == filename.find('ngraph/op_graph'):
break
frame = frame.f_back
self.filename = filename
self.lineno = lineno
self.code_context = code_context
finally:
del frame
def loadatlas(r=5):
"""Load the atlas from the brainpipe module
"""
B3Dpath = dirname(
abspath(join(getfile(currentframe()), '..', '..', '..', 'atlas')))
# Load talairach atlas :
with open(B3Dpath + '/atlas/labels/talairach_atlas.pickle', "rb") as f:
TAL = pickle.load(f)
label = TAL['label']
strGM = ['No Gray Matter found within +/-'+str(r)+'mm']
label = concat([label, DataFrame({'hemisphere': [strGM], 'lobe':[
strGM], 'gyrus':[strGM], 'matter':[strGM], 'brodmann':[
0]})])
label = label.set_index([list(n.arange(label.shape[0]))])
return TAL['hdr'], TAL['mask'], TAL['gray'], label
def async_calling_function(callFrame):
def calling_functionn(__function):
"""
View Calling Function to debug
"""
def wrapper(*args, **kwargs):
cur_frame = inspect.currentframe()
cal_frame = inspect.getouterframes(cur_frame, callFrame)
stack = []
for t in cal_frame:
if 'Yu-gi' in t[1]:
stack.append(t[3])
logger.debug("Called by: " + str(stack))
return __function(*args, **kwargs)
return wrapper
return calling_functionn
def __advice_stack_frame_protection(self, frame):
"""
Overriding of this is only permitted if and only if your name is
Megumin and you have a pet/familiar named Chomusuke.
"""
if frame is None:
logger.debug(
'currentframe() returned None; frame protection disabled')
return
f_back = frame.f_back
while f_back:
if f_back.f_code is self.handle.__code__:
raise RuntimeError(
"indirect invocation of '%s' by 'handle' is forbidden" %
frame.f_code.co_name,
)
f_back = f_back.f_back
def getFunctionCall(idx=1):
"""Returns the name of function or method that was called idx frames outwards.
.. note:: ``idx=0`` will of course return ``getFunctionCall``.
Args:
idx (int): Steps outwards.
Returns:
str: Name of function or method.
"""
frame = inspect.currentframe()
try:
return inspect.getouterframes(frame)[idx][3]
except IndexError:
return ""
def shader_string(body, glsl_version='450 core'):
"""
Call this method from a function that defines a literal shader string as the "body" argument.
Dresses up a shader string in three ways:
1) Insert #version at the top
2) Insert #line number declaration
3) un-indents
The line number information can help debug glsl compile errors.
The version string needs to be the very first characters in the shader,
which can be distracting, requiring backslashes or other tricks.
The unindenting allows you to type the shader code at a pleasing indent level
in your python method, while still creating an unindented GLSL string at the end.
"""
line_count = len(body.split('\n'))
line_number = inspect.currentframe().f_back.f_lineno + 1 - line_count
return """\
#version %s
%s
""" % (glsl_version, shader_substring(body, stack_frame=2))
def get_current_request() -> IRequest:
"""
Return the current request by heuristically looking it up from stack
"""
try:
task_context = aiotask_context.get('request')
if task_context is not None:
return task_context
except (ValueError, AttributeError, RuntimeError):
pass
# fallback
frame = inspect.currentframe()
while frame is not None:
request = getattr(frame.f_locals.get('self'), 'request', None)
if request is not None:
return request
elif isinstance(frame.f_locals.get('request'), Request):
return frame.f_locals['request']
frame = frame.f_back
raise RequestNotFound(RequestNotFound.__doc__)
def command(fn, name=None):
"""Decorator for functions that should be exposed as commands."""
module = sys.modules[fn.__module__]
if name is None:
name = fn.__name__
if asyncio.iscoroutinefunction(fn if inspect.isfunction(fn) else fn.__func__ if inspect.ismethod(fn) else fn.__call__): # Get the actual function for coroutine check
@functools.wraps(fn)
async def wrapper(*args, **kwargs):
try:
frame = inspect.currentframe()
ctx = frame.f_back.f_locals['ctx']
return await fn(ctx, *args, **kwargs)
finally:
del frame
else:
@functools.wraps(fn)
def wrapper(*args, **kwargs):
try:
frame = inspect.currentframe()
ctx = frame.f_back.f_locals['ctx']
return fn(ctx, *args, **kwargs)
finally:
del frame
vars(module).setdefault('commands', {})[fn.__name__] = wrapper
return wrapper
def is_inside_recursive_test_call():
"""Test if a function is running from a call to du.test()."""
frame = inspect.currentframe()
count_test = 0
while frame:
# Test breaking condition.
if count_test >= 1:
return True
# Find if there is a breaking condition and update the value.
test_function = frame.f_locals.get('test')
if (hasattr(test_function, '_testMethodName')) and (
test_function._testMethodName ==
'test_module_level_test_calls'):
count_test += 1
# Go to next frame.
frame = frame.f_back
return False
def is_inside_unittest():
"""Test if a function is running from unittest.
This function will help freezing the __setattr__ freezing when running
inside a unittest environment.
"""
frame = inspect.currentframe()
# Calls from unittest discover have the following properties:
# 1) Its stack is longer
# 2) It contains arguments ('argv', 'pkg_name', 'unittest', etc) for
# instance: a key value pair in the format:
# ('argv', ['python3 -m unittest', 'discover', '-vvv', '.'])
key = 'argv' # this may be error prone...
value = 'unittest'
while frame:
frame_argv = frame.f_locals.get(key)
if frame_argv and value in ''.join(frame_argv):
return True
frame = frame.f_back
return False
def REPIC(obToPrint):
'''REPIC stands for Read, Evaluate, Print In Comment. Call this function with an object obToPrint and it will rewrite the current file with the output in the comment in a line after this was called.'''
cf = inspect.currentframe()
callingFile = inspect.getfile(cf.f_back)
callingLine = cf.f_back.f_lineno
# print 'Line I am calling REPIC from:', callingLine
for line in fileinput.input(callingFile, inplace=1):
if callingLine == fileinput.filelineno():
# Make results, but get rid of newlines in output since that will break the comment:
resultString = '#OUTPUT: ' + str(obToPrint).replace('\n','\\n') +'\n'
writeIndex = line.rfind('\n')
# Watch out for last line without newlines, there the end is just the line length.
if '\n' not in line:
writeIndex = len(line)
# Replace old output and/or any comments:
if '#' in line:
writeIndex = line.rfind('#')
output = line[0:writeIndex] + resultString
else:
output = line # If no REPIC, then don't change the line.
sys.stdout.write(output)
def execute_only_once():
"""
Each called in the code to this function is guranteed to return True the
first time and False afterwards.
Returns:
bool: whether this is the first time this function gets called from
this line of code.
Example:
.. code-block:: python
if execute_only_once():
# do something only once
"""
f = inspect.currentframe().f_back
ident = (f.f_code.co_filename, f.f_lineno)
if ident in _EXECUTE_HISTORY:
return False
_EXECUTE_HISTORY.add(ident)
return True
def get_linenumber(self):
'''?????????'''
cf = currentframe()
return "run_line: file %s line %s " % (self.file_path, cf.f_back.f_back.f_lineno)
def get_linenumber(self):
'''?????????'''
cf = currentframe()
return "run_line: file %s line %s " % (self.file_path, cf.f_back.f_back.f_lineno)
def run(self):
# Explicit request for old-style install? Just do it
if self.old_and_unmanageable or self.single_version_externally_managed:
return orig.install.run(self)
if not self._called_from_setup(inspect.currentframe()):
# Run in backward-compatibility mode to support bdist_* commands.
orig.install.run(self)
else:
self.do_egg_install()
def run(self):
# Explicit request for old-style install? Just do it
if self.old_and_unmanageable or self.single_version_externally_managed:
return orig.install.run(self)
if not self._called_from_setup(inspect.currentframe()):
# Run in backward-compatibility mode to support bdist_* commands.
orig.install.run(self)
else:
self.do_egg_install()