python类format_stack()的实例源码

instance.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def signal_manager(self):
        """
        Deprecated!

        .. deprecated:: 0.5.0
            Use :func:`self.context.signals` instead in your apps.

        :return: Signal manager (global).
        :rtype: pyplanet.core.events.manager._SignalManager
        """
        logger.warning(DeprecationWarning(
            'DEPRECATED: The usage of \'self.instance.signal_manager\' in apps is deprecated, '
            'use \'self.context.signals\' instead!'
        ))
        logger.warning('\n'.join(traceback.format_stack(limit=7)))

        return self.signals
mutex.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def tryLock(self, timeout=None, id=None):
        if timeout is None:
            locked = QtCore.QMutex.tryLock(self)
        else:
            locked = QtCore.QMutex.tryLock(self, timeout)

        if self.debug and locked:
            self.l.lock()
            try:
                if id is None:
                    self.tb.append(''.join(traceback.format_stack()[:-1]))
                else:
                    self.tb.append("  " + str(id))
                #print 'trylock', self, len(self.tb)
            finally:
                self.l.unlock()
        return locked
mutex.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def tryLock(self, timeout=None, id=None):
        if timeout is None:
            locked = QtCore.QMutex.tryLock(self)
        else:
            locked = QtCore.QMutex.tryLock(self, timeout)

        if self.debug and locked:
            self.l.lock()
            try:
                if id is None:
                    self.tb.append(''.join(traceback.format_stack()[:-1]))
                else:
                    self.tb.append("  " + str(id))
                #print 'trylock', self, len(self.tb)
            finally:
                self.l.unlock()
        return locked
pool.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
recipe-576781.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, *args):
                # build the message from the user
                user_msg = args[0] if args else "An error has occurred"
                # get the traceback from the last error if it exists
                try: tb = traceback.format_exc()
                # otherwise, get the tb prior to this frame and pretend its us
                except: 
                        tb = "Traceback (most recent call last):\n"
                        tb += ''.join(traceback.format_stack()[:-1])
                        tb += self.__class__.__name__ + ": " + user_msg
                        tb += '\n'
                # build the complete log message
                log_msg = user_msg + "\n" + tb
                # and log it
                self.log.error(log_msg)
                # store the args
                self.args = args
control.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 86 收藏 0 点赞 0 评论 0
def handle_SIGUSR1(self, signum, frame):
        # propagate the signal to children, but only if they are AVE processes
        for pid in self.get_children():
            name = get_proc_name(pid)
            if name.startswith('ave-'):
                os.kill(pid, signal.SIGUSR1)
        # make the dump directory if it doesn't exist
        hickup_dir = os.path.join(self.home, '.ave', 'hickup')
        try:
            os.makedirs(hickup_dir)
        except OSError, e:
            if e.errno != errno.EEXIST:
                self.log('ERROR: could not create %s: %s' % (hickup_dir, e))
                return
        # create the trace file
        date = time.strftime('%Y%m%d-%H%M%S')
        name = '%s-%s-%d' % (date, self.proc_name, os.getpid())
        path = os.path.join(hickup_dir, name)
        with open(path, 'w') as f:
            f.write('stack:\n%s' % ''.join(traceback.format_stack(frame)))
            f.write('locals:\n%s\n' % frame.f_locals)
            f.write('globals:\n%s' % frame.f_globals)
control.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def handle_SIGUSR1(self, signum, frame):
        # propagate the signal to children, but only if they are AVE processes
        for pid in self.get_children():
            name = get_proc_name(pid)
            if name.startswith('ave-'):
                os.kill(pid, signal.SIGUSR1)
        # make the dump directory if it doesn't exist
        hickup_dir = os.path.join(self.home, '.ave', 'hickup')
        try:
            os.makedirs(hickup_dir)
        except OSError, e:
            if e.errno != errno.EEXIST:
                self.log('ERROR: could not create %s: %s' % (hickup_dir, e))
                return
        # create the trace file
        date = time.strftime('%Y%m%d-%H%M%S')
        name = '%s-%s-%d' % (date, self.proc_name, os.getpid())
        path = os.path.join(hickup_dir, name)
        with open(path, 'w') as f:
            f.write('stack:\n%s' % ''.join(traceback.format_stack(frame)))
            f.write('locals:\n%s\n' % frame.f_locals)
            f.write('globals:\n%s' % frame.f_globals)
base.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, time, func, args, kw, cancel, reset, seconds=None):
        """
        @param time: Seconds from the epoch at which to call C{func}.
        @param func: The callable to call.
        @param args: The positional arguments to pass to the callable.
        @param kw: The keyword arguments to pass to the callable.
        @param cancel: A callable which will be called with this
            DelayedCall before cancellation.
        @param reset: A callable which will be called with this
            DelayedCall after changing this DelayedCall's scheduled
            execution time. The callable should adjust any necessary
            scheduling details to ensure this DelayedCall is invoked
            at the new appropriate time.
        @param seconds: If provided, a no-argument callable which will be
            used to determine the current time any time that information is
            needed.
        """
        self.time, self.func, self.args, self.kw = time, func, args, kw
        self.resetter = reset
        self.canceller = cancel
        self.seconds = seconds
        self.cancelled = self.called = 0
        self.delayed_time = 0
        if self.debug:
            self.creator = traceback.format_stack()[:-2]
defer.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _startRunCallbacks(self, result):
        if self.called:
            if self.debug:
                if self._debugInfo is None:
                    self._debugInfo = DebugInfo()
                extra = "\n" + self._debugInfo._getDebugTracebacks()
                raise AlreadyCalledError(extra)
            raise AlreadyCalledError
        if self.debug:
            if self._debugInfo is None:
                self._debugInfo = DebugInfo()
            self._debugInfo.invoker = traceback.format_stack()[:-2]
        self.called = True
        self.result = result
        if self.timeoutCall:
            try:
                self.timeoutCall.cancel()
            except:
                pass

            del self.timeoutCall
        self._runCallbacks()
pool.py 文件源码 项目:QXSConsolas 作者: qxsch 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
pool.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
__init__.py 文件源码 项目:universe 作者: openai 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def format_error(e):
    # errback automatically wraps everything in a Twisted Failure
    if isinstance(e, failure.Failure):
        e = e.value

    if isinstance(e, str):
        err_string = e
    elif six.PY2:
        err_string = traceback.format_exc(e).rstrip()
    else:
        err_string = ''.join(traceback.format_exception(type(e), e, e.__traceback__)).rstrip()

    if err_string == 'None':
        # Reasonable heuristic for exceptions that were created by hand
        last = traceback.format_stack()[-2]
        err_string = '{}\n  {}'.format(e, last)
    # Quick and dirty hack for now.
    err_string = err_string.replace('Connection to the other side was lost in a non-clean fashion', 'Connection to the other side was lost in a non-clean fashion (HINT: this generally actually means we got a connection refused error. Check that the remote is actually running.)')
    return error.Error(err_string)
pool.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
__init__.py 文件源码 项目:mdToRst 作者: kata198 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def debugmsg(msg, msgTupleLambda=None):
        '''
            debugmsg - If #IS_DEVELOPER_DEBUG is changed to True (in the code, changing it after import has no affect ) this will print a debug message

                        to stderr prefixed with "DEBUG: " and suffixed with a newline.

                       Don't create your message like:

                           debugmsg( "Order(id=%s) has %d pepperoni pizzas" %( myOrder.id, len( [ topping for pizza in getPizzas() for topping in pizza.toppings() if 'pepperoni' in topping ] ) ) )

                       Which would evaluate that comprehension whether or not debug mode is on,

                         Instead, use a lambda to return that same tuple, so that code is only evaluated in the #debugmsg function calls your lambda:

                           debugmsg( "Order(id=%s) has %d pepperoni pizzas", lambda : ( myOrder.id, len( [ topping for pizza in getPizzas() for topping in pizza.toppings() if 'pepperoni' in topping ] ) ) )
        '''

        if issubclass(msgTupleLambda.__class__, (tuple, list)):
            sys.stderr.write('''WARNING: debugmsg called with a tuple/list for msgTupleLambda.\n\tShould be a lambda that returns the tuple, so when IS_DEVELOPER_DEBUG=False it is not evaluated. \n\n''' + ''.join(traceback.format_stack()[:-1]) + "\n\n" )

            msgTuple = msgTupleLambda
        else:
            msgTuple = msgTupleLambda()

        sys.stderr.write('DEBUG: %s\n' % msgTuple )
worker.py 文件源码 项目:tasker 作者: wavenator 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def requeue(
        self,
        exception=None,
    ):
        task = self.current_task
        exception_traceback = ''.join(traceback.format_stack())

        if not exception:
            exception = WorkerRequeue()

        self._on_requeue(
            task=task,
            exception=exception,
            exception_traceback=exception_traceback,
            args=task['args'],
            kwargs=task['kwargs'],
        )
Global.py 文件源码 项目:ANNarchy 作者: vitay 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _warning(*var_text):
    """
    Prints a warning message to standard out.
    """
    text = 'WARNING: '
    for var in var_text:
        text += str(var) + ' '
    if not config['suppress_warnings']:
        print(text)
    #     # Print the trace
    #     tb = traceback.format_stack()
    #     for line in tb:
    #         if not '/ANNarchy/core/' in line and \
    #            not '/ANNarchy/parser/' in line and \
    #            not '/ANNarchy/generator/' in line :
    #             print(line)
pool.py 文件源码 项目:chihu 作者: yelongyu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
pool.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
BaseConsumer.py 文件源码 项目:kafkatos3 作者: ConnectedHomes 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def exit_gracefully(self, signum, frame):
        '''exit the consumer gracefully'''
        if not self.message_processing:
            self.logger.info("Fast shutdown available ... exiting (signum %d)" \
                             % (signum))
            self.logger.info("Print stack trace. Don't panic!")
            self.logger.info("-----------------------------------------------")
            for chunk in traceback.format_stack(frame):
                for line in chunk.split("\n"):
                    self.logger.info(line)
            self.logger.info("-----------------------------------------------")
            for part in self.partitions:
                self.partitions[part].writer.close()
            self.consumer.commit()
            sys.exit(0)

        self.logger.info("Graceful shutdown of consumer " +
                         str(self.consumer_id) + " started....")
        self.shutting_down = True
pool.py 文件源码 项目:pyetje 作者: rorlika 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
pool.py 文件源码 项目:Price-Comparator 作者: Thejas-1 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
base.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self, time, func, args, kw, cancel, reset, seconds=None):
        """
        @param time: Seconds from the epoch at which to call C{func}.
        @param func: The callable to call.
        @param args: The positional arguments to pass to the callable.
        @param kw: The keyword arguments to pass to the callable.
        @param cancel: A callable which will be called with this
            DelayedCall before cancellation.
        @param reset: A callable which will be called with this
            DelayedCall after changing this DelayedCall's scheduled
            execution time. The callable should adjust any necessary
            scheduling details to ensure this DelayedCall is invoked
            at the new appropriate time.
        @param seconds: If provided, a no-argument callable which will be
            used to determine the current time any time that information is
            needed.
        """
        self.time, self.func, self.args, self.kw = time, func, args, kw
        self.resetter = reset
        self.canceller = cancel
        self.seconds = seconds
        self.cancelled = self.called = 0
        self.delayed_time = 0
        if self.debug:
            self.creator = traceback.format_stack()[:-2]
defer.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _startRunCallbacks(self, result):
        if self.called:
            if self.debug:
                if self._debugInfo is None:
                    self._debugInfo = DebugInfo()
                extra = "\n" + self._debugInfo._getDebugTracebacks()
                raise AlreadyCalledError(extra)
            raise AlreadyCalledError
        if self.debug:
            if self._debugInfo is None:
                self._debugInfo = DebugInfo()
            self._debugInfo.invoker = traceback.format_stack()[:-2]
        self.called = True
        self.result = result
        if self.timeoutCall:
            try:
                self.timeoutCall.cancel()
            except:
                pass

            del self.timeoutCall
        self._runCallbacks()
pool.py 文件源码 项目:Flask-NvRay-Blog 作者: rui7157 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
pool.py 文件源码 项目:Flask-NvRay-Blog 作者: rui7157 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
pool.py 文件源码 项目:Callandtext 作者: iaora 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
util.py 文件源码 项目:dcos 作者: dcos 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def setup_thread_debugger():
    """Setup a thread debbuger for pytest session

    This function, based on http://stackoverflow.com/a/133384, is meant to
    add debugging facility to pytest that will allow to debug deadlock that
    may sometimes occur.
    """
    def debug(signal, frame):
        """Interrupt running process and provide a python prompt for
        interactive debugging."""
        d = {'_frame': frame}  # Allow access to frame object.
        d.update(frame.f_globals)  # Unless shadowed by global
        d.update(frame.f_locals)

        i = code.InteractiveConsole(d)
        message = "Signal received : entering python shell.\nTraceback:\n"
        message += ''.join(traceback.format_stack(frame))
        i.interact(message)

    signal.signal(signal.SIGUSR1, debug)  # Register handler
config.py 文件源码 项目:splunk_ta_ps4_f1_2016 作者: jonathanvarley 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def log(msg, msgx='', level=logging.INFO, need_tb=False):
    """
    Logging in UCC Config Module.
    :param msg: message content
    :param msgx: detail info.
    :param level: logging level
    :param need_tb: if need logging traceback
    :return:
    """
    global LOGGING_STOPPED
    if LOGGING_STOPPED:
        return

    msgx = ' - ' + msgx if msgx else ''
    content = 'UCC Config Module: %s%s' % (msg, msgx)
    if need_tb:
        stack = ''.join(traceback.format_stack())
        content = '%s\r\n%s' % (content, stack)
    stulog.logger.log(level, content, exc_info=1)
pool.py 文件源码 项目:python_ddd_flask 作者: igorvinnicius 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
config.py 文件源码 项目:TA-SyncKVStore 作者: georgestarcher 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def log(msg, msgx='', level=logging.INFO, need_tb=False):
    """
    Logging in UCC Config Module.
    :param msg: message content
    :param msgx: detail info.
    :param level: logging level
    :param need_tb: if need logging traceback
    :return:
    """
    global LOGGING_STOPPED
    if LOGGING_STOPPED:
        return

    msgx = ' - ' + msgx if msgx else ''
    content = 'UCC Config Module: %s%s' % (msg, msgx)
    if need_tb:
        stack = ''.join(traceback.format_stack())
        content = '%s\r\n%s' % (content, stack)
    stulog.logger.log(level, content, exc_info=1)


问题


面经


文章

微信
公众号

扫码关注公众号