python类currentframe()的实例源码

testing.py 文件源码 项目:radar 作者: amoose136 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def run_tests_if_main(show_coverage=False):
    """ Run tests in a given file if it is run as a script

    Coverage is reported for running this single test. Set show_coverage to
    launch the report in the web browser.
    """
    local_vars = inspect.currentframe().f_back.f_locals
    if not local_vars.get('__name__', '') == '__main__':
        return
    # we are in a "__main__"
    os.chdir(ROOT_DIR)
    fname = str(local_vars['__file__'])
    _clear_imageio()
    _enable_faulthandler()
    pytest.main('-v -x --color=yes --cov imageio '
                '--cov-config .coveragerc --cov-report html %s' % repr(fname))
    if show_coverage:
        import webbrowser
        fname = os.path.join(ROOT_DIR, 'htmlcov', 'index.html')
        webbrowser.open_new_tab(fname)
config.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __ImportFrom(module_xname, xname):
    """
    Import a specified name from a python module into the global namespace. This can be accomplished
    programatically and inside a method or class.

    @param module_xname : name of the module to load x.y.z
    @param xname :  symbol to import from loaded module
    @return method/class : imported item from loaded module
    """
    try:
        module = __import__(module_xname,globals(),locals(), [xname])
    except ImportError:
        return None

    try:
      module = getattr(module,xname)
      if xname:
        for x in range( len(inspect.stack()) ):
            inspect.currentframe(x).f_globals[xname]=module
    except AttributeError as e:
        module=None

    return module
pipeline.py 文件源码 项目:gransk 作者: pcbje 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def produce(self, topic, doc, payload):
    """
    Produce a new event.

    :param topic: The topic of the produced event.
    :param doc: The document to which the event belongs.
    :param payload: The file pointer beloning to the document.
    :type topic: ``str``
    :type doc: ``gransk.core.Document``
    :type payload: ``file``
    """
    caller = inspect.currentframe().f_back.f_locals['self'].__module__
    listeners = self.listeners.get(topic, [])
    filename = os.path.basename(doc.path)

    for listener, callback in listeners:
      LOGGER.debug('[%s] %s -> %s (%s)', topic, caller, listener, filename)
      callback(doc, payload)

    if len(listeners) == 0:
      LOGGER.debug('[%s] %s -> no listeners (%s)', topic, caller, filename)
ututil.py 文件源码 项目:pykit 作者: baishancloud 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _find_frame_by_self(clz):
    """
    Find the first frame on stack in which there is local variable 'self' of
    type clz.
    """

    frame = inspect.currentframe()

    while frame:
        self = frame.f_locals.get('self')
        if isinstance(self, clz):
            return frame

        frame = frame.f_back

    return None
control.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def step_listener(self, connection, event, fd):
        #print('%s(%s, %s)' %
        #   (inspect.currentframe().f_code.co_name,connection,event_str(event)))

        if event & ERRMASK:
            raise Exception('INTERNAL ERROR. LISTENER NEVER DIES')

        elif event & INMASK:
            new = self.accept(connection)
            if not new:
                # happens if peer hangs up during accept or the control is
                # rejecting new connections
                return
            self.pollable(new.fileno(), new, OUTMASK)
            self.accepting.append(new)
            # ignore all events for the same file descriptor in the current step
            # of the main loop. the OS may reuse descriptors aggressively and so
            # the events list may include POLLNVAL for the same descriptor. we
            # don't need to handle such a POLLNVAL event because that connection
            # is replaced (and GCed) by the call to self.pollable() above.
            self.unpend.append(new.fileno())
control.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def step_accepting(self, connection, event, fd):
        #print('%s(%s, %s)' %
        #   (inspect.currentframe().f_code.co_name,connection,event_str(event)))

        if event & ERRMASK:
            #print('%s %d close accepting %d %d %s' % (
            #   self.proc_name,os.getpid(),fd,connection.port,event_str(event)))
            self.accepting.remove(connection)
            connection.close()
            self.unpollable(fd)

        elif event & OUTMASK:
            self.accepting.remove(connection)
            salt = make_salt()
            try:
                connection.put(CHALLENGE + salt)
                self.authenticating[connection] = salt
                self.pollable(fd, connection, INMASK)
            except ConnectionClosed:
                #print('%s %d peer closed accepting %d %d OUT' % (
                #    self.proc_name, os.getpid(), fd, connection.port))
                connection.close()
                self.unpollable(fd)
control.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def step_listener(self, connection, event, fd):
        #print('%s(%s, %s)' %
        #   (inspect.currentframe().f_code.co_name,connection,event_str(event)))

        if event & ERRMASK:
            raise Exception('INTERNAL ERROR. LISTENER NEVER DIES')

        elif event & INMASK:
            new = self.accept(connection)
            if not new:
                # happens if peer hangs up during accept or the control is
                # rejecting new connections
                return
            self.pollable(new.fileno(), new, OUTMASK)
            self.accepting.append(new)
            # ignore all events for the same file descriptor in the current step
            # of the main loop. the OS may reuse descriptors aggressively and so
            # the events list may include POLLNVAL for the same descriptor. we
            # don't need to handle such a POLLNVAL event because that connection
            # is replaced (and GCed) by the call to self.pollable() above.
            self.unpend.append(new.fileno())
control.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def step_accepting(self, connection, event, fd):
        #print('%s(%s, %s)' %
        #   (inspect.currentframe().f_code.co_name,connection,event_str(event)))

        if event & ERRMASK:
            #print('%s %d close accepting %d %d %s' % (
            #   self.proc_name,os.getpid(),fd,connection.port,event_str(event)))
            self.accepting.remove(connection)
            connection.close()
            self.unpollable(fd)

        elif event & OUTMASK:
            self.accepting.remove(connection)
            salt = make_salt()
            try:
                connection.put(CHALLENGE + salt)
                self.authenticating[connection] = salt
                self.pollable(fd, connection, INMASK)
            except ConnectionClosed:
                #print('%s %d peer closed accepting %d %d OUT' % (
                #    self.proc_name, os.getpid(), fd, connection.port))
                connection.close()
                self.unpollable(fd)
DartQCException.py 文件源码 项目:dartqc 作者: esteinig 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, *msg):
        message = ""
        for arg in msg:
            message += str(arg)

        try:
            ln = sys.exc_info()[-1].tb_lineno
            file = traceback.extract_tb(sys.exc_info()[-1])
            if isinstance(file, list):
                file = file[0].filename
        except AttributeError:
            ln = inspect.currentframe().f_back.f_lineno
            file = inspect.getframeinfo(inspect.currentframe().f_back).filename

        if file is not None:
            file = re.compile("[\\\/]+").split(file)
            file = file[-1]

        self.args = "Error ({3}:{1}): {2}".format(type(self), ln, message, file),
        sys.exit(self)

    # def __init__(self, *args, **kwargs):
    #     RuntimeError.__init__(self, args, kwargs)
handlers.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def guess_app_stacklevel(start=1):
    """
    try to guess stacklevel for application warning.
    looks for first frame not part of passlib.
    """
    frame = inspect.currentframe()
    count = -start
    try:
        while frame:
            name = frame.f_globals.get('__name__', "")
            if name.startswith("passlib.tests.") or not name.startswith("passlib."):
                return max(1, count)
            count += 1
            frame = frame.f_back
        return start
    finally:
        del frame
run_migration.py 文件源码 项目:Citation-Fetcher 作者: andrewhead 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def configure_parser(parser):

    parser.description = "Run a migration script. These can only be run forward, not in reverse."

    # Inspect the local directory for which migration modules the user can run.
    local_files = os.listdir(
        os.path.dirname(
            inspect.getfile(
                inspect.currentframe()
            )))
    migration_files = [f for f in local_files if re.match('^\d{4}.*\.py$', f)]
    migration_names = [m.rstrip('.py') for m in migration_files]

    parser.add_argument(
        'migration_name',
        choices=migration_names,
        help="The name of the migration script you want to run."
    )
SslPatch.py 文件源码 项目:zeronet-debian 作者: bashrc 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def new_sslwrap(
        sock, server_side=False, keyfile=None, certfile=None,
        cert_reqs=__ssl__.CERT_NONE, ssl_version=__ssl__.PROTOCOL_SSLv23,
        ca_certs=None, ciphers=None
):
    context = __ssl__.SSLContext(ssl_version)
    context.verify_mode = cert_reqs or __ssl__.CERT_NONE
    if ca_certs:
        context.load_verify_locations(ca_certs)
    if certfile:
        context.load_cert_chain(certfile, keyfile)
    if ciphers:
        context.set_ciphers(ciphers)

    caller_self = inspect.currentframe().f_back.f_locals['self']
    return context._wrap_socket(sock, server_side=server_side, ssl_sock=caller_self)


# Re-add sslwrap to Python 2.7.9+
utils.py 文件源码 项目:skybeard-2 作者: LanceMaverick 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def get_beard_config(config_file="../../config.yml"):
    """Attempts to load a yaml file in the beard directory.

    NOTE: The file location should be relative from where this function is
    called.

    """

    # Sometimes external libraries change the logging level. This is not
    # acceptable, so we assert after importing a beard that it has not changed.
    logger_level_before = logger.getEffectiveLevel()
    logging.debug("logging level: {}".format(logger.getEffectiveLevel()))

    callers_frame = inspect.currentframe().f_back
    logger.debug("This function was called from the file: " +
                 callers_frame.f_code.co_filename)
    base_path = os.path.dirname(callers_frame.f_code.co_filename)
    config = yaml.safe_load(open(os.path.join(base_path, config_file)))

    logging.debug("logging level: {}".format(logger.getEffectiveLevel()))
    logger_level_after = logger.getEffectiveLevel()
    assert logger_level_before == logger_level_after, \
        "Something has changed the logger level!"

    return config
transactions.py 文件源码 项目:plone.server 作者: plone 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_current_request():
    """Return the current request by heuristically looking it up from stack
    """
    frame = inspect.currentframe()
    while frame is not None:
        request = getattr(frame.f_locals.get('self'), 'request', None)
        if request is not None:
            return request
        elif isinstance(frame.f_locals.get('self'), RequestHandler):
            return frame.f_locals['request']
        # if isinstance(frame.f_locals.get('self'), RequestHandler):
        #     return frame.f_locals.get('self').request
        # elif IView.providedBy(frame.f_locals.get('self')):
        #     return frame.f_locals['request']
        frame = frame.f_back
    raise RequestNotFound(RequestNotFound.__doc__)
op_graph.py 文件源码 项目:ngraph 作者: NervanaSystems 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, **kwargs):
        # TODO This is a good first cut for debugging info, but it would be nice to
        # TODO be able to reliably walk the stack back to user code rather than just
        # TODO back past this constructor
        super(DebugInfo, self).__init__(**kwargs)
        frame = None
        try:
            frame = inspect.currentframe()
            while frame.f_locals.get('self', None) is self:
                frame = frame.f_back
            while frame:
                filename, lineno, function, code_context, index = inspect.getframeinfo(
                    frame)
                if -1 == filename.find('ngraph/op_graph'):
                    break
                frame = frame.f_back

            self.filename = filename
            self.lineno = lineno
            self.code_context = code_context
        finally:
            del frame
_physio.py 文件源码 项目:brainpipe 作者: EtienneCmb 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def loadatlas(r=5):
    """Load the atlas from the brainpipe module
    """
    B3Dpath = dirname(
        abspath(join(getfile(currentframe()), '..', '..', '..', 'atlas')))

    # Load talairach atlas :
    with open(B3Dpath + '/atlas/labels/talairach_atlas.pickle', "rb") as f:
        TAL = pickle.load(f)
    label = TAL['label']
    strGM = ['No Gray Matter found within +/-'+str(r)+'mm']
    label = concat([label, DataFrame({'hemisphere': [strGM], 'lobe':[
                   strGM], 'gyrus':[strGM], 'matter':[strGM], 'brodmann':[
        0]})])
    label = label.set_index([list(n.arange(label.shape[0]))])
    return TAL['hdr'], TAL['mask'], TAL['gray'], label
helpers_decorators.py 文件源码 项目:Yugioh-bot 作者: will7200 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def async_calling_function(callFrame):
    def calling_functionn(__function):
        """
        View Calling Function to debug
        """

        def wrapper(*args, **kwargs):
            cur_frame = inspect.currentframe()
            cal_frame = inspect.getouterframes(cur_frame, callFrame)
            stack = []
            for t in cal_frame:
                if 'Yu-gi' in t[1]:
                    stack.append(t[3])
            logger.debug("Called by: " + str(stack))
            return __function(*args, **kwargs)

        return wrapper
    return calling_functionn
toolchain.py 文件源码 项目:calmjs 作者: calmjs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __advice_stack_frame_protection(self, frame):
        """
        Overriding of this is only permitted if and only if your name is
        Megumin and you have a pet/familiar named Chomusuke.
        """

        if frame is None:
            logger.debug(
                'currentframe() returned None; frame protection disabled')
            return

        f_back = frame.f_back
        while f_back:
            if f_back.f_code is self.handle.__code__:
                raise RuntimeError(
                    "indirect invocation of '%s' by 'handle' is forbidden" %
                    frame.f_code.co_name,
                )
            f_back = f_back.f_back
pyfrp_term_module.py 文件源码 项目:PyFRAP 作者: alexblaessle 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def getFunctionCall(idx=1):

    """Returns the name of function or method that was called idx frames outwards.

    .. note:: ``idx=0`` will of course return ``getFunctionCall``.

    Args:
        idx (int): Steps outwards.

    Returns:
        str: Name of function or method.
    """

    frame = inspect.currentframe()
    try:
        return inspect.getouterframes(frame)[idx][3]
    except IndexError:
        return ""
__init__.py 文件源码 项目:pyopenvr 作者: cmbruns 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def shader_string(body, glsl_version='450 core'):
    """
    Call this method from a function that defines a literal shader string as the "body" argument.
    Dresses up a shader string in three ways:
        1) Insert #version at the top
        2) Insert #line number declaration
        3) un-indents
    The line number information can help debug glsl compile errors.
    The version string needs to be the very first characters in the shader,
    which can be distracting, requiring backslashes or other tricks.
    The unindenting allows you to type the shader code at a pleasing indent level
    in your python method, while still creating an unindented GLSL string at the end.
    """
    line_count = len(body.split('\n'))
    line_number = inspect.currentframe().f_back.f_lineno + 1 - line_count
    return """\
#version %s
%s
""" % (glsl_version, shader_substring(body, stack_frame=2))
utils.py 文件源码 项目:guillotina 作者: plone 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_current_request() -> IRequest:
    """
    Return the current request by heuristically looking it up from stack
    """
    try:
        task_context = aiotask_context.get('request')
        if task_context is not None:
            return task_context
    except (ValueError, AttributeError, RuntimeError):
        pass

    # fallback
    frame = inspect.currentframe()
    while frame is not None:
        request = getattr(frame.f_locals.get('self'), 'request', None)
        if request is not None:
            return request
        elif isinstance(frame.f_locals.get('request'), Request):
            return frame.f_locals['request']
        frame = frame.f_back
    raise RequestNotFound(RequestNotFound.__doc__)
command.py 文件源码 项目:endrebot0 作者: endreman0 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def command(fn, name=None):
    """Decorator for functions that should be exposed as commands."""
    module = sys.modules[fn.__module__]
    if name is None:
        name = fn.__name__
    if asyncio.iscoroutinefunction(fn if inspect.isfunction(fn) else fn.__func__ if inspect.ismethod(fn) else fn.__call__): # Get the actual function for coroutine check
        @functools.wraps(fn)
        async def wrapper(*args, **kwargs):
            try:
                frame = inspect.currentframe()
                ctx = frame.f_back.f_locals['ctx']
                return await fn(ctx, *args, **kwargs)
            finally:
                del frame
    else:
        @functools.wraps(fn)
        def wrapper(*args, **kwargs):
            try:
                frame = inspect.currentframe()
                ctx = frame.f_back.f_locals['ctx']
                return fn(ctx, *args, **kwargs)
            finally:
                del frame
    vars(module).setdefault('commands', {})[fn.__name__] = wrapper
    return wrapper
test_support.py 文件源码 项目:data_utilities 作者: fmv1992 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def is_inside_recursive_test_call():
    """Test if a function is running from a call to du.test()."""
    frame = inspect.currentframe()
    count_test = 0
    while frame:
        # Test breaking condition.
        if count_test >= 1:
            return True
        # Find if there is a breaking condition and update the value.
        test_function = frame.f_locals.get('test')
        if (hasattr(test_function, '_testMethodName')) and (
                test_function._testMethodName ==
                'test_module_level_test_calls'):
            count_test += 1
        # Go to next frame.
        frame = frame.f_back
    return False
test_support.py 文件源码 项目:data_utilities 作者: fmv1992 项目源码 文件源码 阅读 54 收藏 0 点赞 0 评论 0
def is_inside_unittest():
    """Test if a function is running from unittest.

    This function will help freezing the __setattr__ freezing when running
    inside a unittest environment.

    """
    frame = inspect.currentframe()

    # Calls from unittest discover have the following properties:
    # 1) Its stack is longer
    # 2) It contains arguments ('argv', 'pkg_name', 'unittest', etc) for
    # instance: a key value pair in the format:
    # ('argv', ['python3 -m unittest', 'discover', '-vvv', '.'])
    key = 'argv'  # this may be error prone...
    value = 'unittest'
    while frame:
        frame_argv = frame.f_locals.get(key)
        if frame_argv and value in ''.join(frame_argv):
            return True
        frame = frame.f_back
    return False
REPIC.py 文件源码 项目:REPIC 作者: dpinney 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def REPIC(obToPrint):
    '''REPIC stands for Read, Evaluate, Print In Comment. Call this function with an object obToPrint and it will rewrite the current file with the output in the comment in a line after this was called.'''
    cf = inspect.currentframe()
    callingFile = inspect.getfile(cf.f_back)
    callingLine = cf.f_back.f_lineno
    # print 'Line I am calling REPIC from:', callingLine
    for line in fileinput.input(callingFile, inplace=1):
        if callingLine == fileinput.filelineno():
            # Make results, but get rid of newlines in output since that will break the comment:
            resultString = '#OUTPUT: ' + str(obToPrint).replace('\n','\\n') +'\n'
            writeIndex = line.rfind('\n')
            # Watch out for last line without newlines, there the end is just the line length.
            if '\n' not in line:
                writeIndex = len(line)
            # Replace old output and/or any comments:
            if '#' in line:
                writeIndex = line.rfind('#')
            output = line[0:writeIndex] + resultString
        else:
            output = line # If no REPIC, then don't change the line.
        sys.stdout.write(output)
dataset.py 文件源码 项目:keras_experiments 作者: avolkov1 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def execute_only_once():
    """
    Each called in the code to this function is guranteed to return True the
    first time and False afterwards.

    Returns:
        bool: whether this is the first time this function gets called from
            this line of code.

    Example:
        .. code-block:: python

            if execute_only_once():
                # do something only once
    """
    f = inspect.currentframe().f_back
    ident = (f.f_code.co_filename, f.f_lineno)
    if ident in _EXECUTE_HISTORY:
        return False
    _EXECUTE_HISTORY.add(ident)
    return True
nbNetBase.py 文件源码 项目:Starfish 作者: BillWang139967 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def get_linenumber(self):
        '''?????????'''
        cf = currentframe()
        return "run_line: file %s line %s " % (self.file_path, cf.f_back.f_back.f_lineno)
nbNetBase.py 文件源码 项目:Starfish 作者: BillWang139967 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_linenumber(self):
        '''?????????'''
        cf = currentframe()
        return "run_line: file %s line %s " % (self.file_path, cf.f_back.f_back.f_lineno)
install.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def run(self):
        # Explicit request for old-style install?  Just do it
        if self.old_and_unmanageable or self.single_version_externally_managed:
            return orig.install.run(self)

        if not self._called_from_setup(inspect.currentframe()):
            # Run in backward-compatibility mode to support bdist_* commands.
            orig.install.run(self)
        else:
            self.do_egg_install()
install.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 61 收藏 0 点赞 0 评论 0
def run(self):
        # Explicit request for old-style install?  Just do it
        if self.old_and_unmanageable or self.single_version_externally_managed:
            return orig.install.run(self)

        if not self._called_from_setup(inspect.currentframe()):
            # Run in backward-compatibility mode to support bdist_* commands.
            orig.install.run(self)
        else:
            self.do_egg_install()


问题


面经


文章

微信
公众号

扫码关注公众号