python类format_stack()的实例源码

config.py 文件源码 项目:cb-defense-splunk-app 作者: carbonblack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def log(msg, msgx='', level=logging.INFO, need_tb=False):
    """
    Logging in UCC Config Module.
    :param msg: message content
    :param msgx: detail info.
    :param level: logging level
    :param need_tb: if need logging traceback
    :return:
    """
    global LOGGING_STOPPED
    if LOGGING_STOPPED:
        return

    msgx = ' - ' + msgx if msgx else ''
    content = 'UCC Config Module: %s%s' % (msg, msgx)
    if need_tb:
        stack = ''.join(traceback.format_stack())
        content = '%s\r\n%s' % (content, stack)
    stulog.logger.log(level, content, exc_info=1)
app.py 文件源码 项目:data-store 作者: HumanCellAtlas 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def timeout_response() -> chalice.Response:
    """
    Produce a chalice Response object that indicates a timeout.  Stacktraces for all running threads, other than the
    current thread, are provided in the response object.
    """
    frames = sys._current_frames()
    current_threadid = threading.get_ident()
    trace_dump = {
        thread_id: traceback.format_stack(frame)
        for thread_id, frame in frames.items()
        if thread_id != current_threadid}

    problem = {
        'status': requests.codes.gateway_timeout,
        'code': "timed_out",
        'title': "Timed out processing request.",
        'traces': trace_dump,
    }
    return chalice.Response(
        status_code=problem['status'],
        headers={"Content-Type": "application/problem+json"},
        body=json.dumps(problem),
    )
mutex.py 文件源码 项目:qudi 作者: Ulm-IQO 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def tryLock(self, timeout=None, id=None):
        """ Try to lock  the mutex.

            @param timeout int: give up after this many milliseconds
            @param id: debug id

            @return bool: whether locking succeeded
        """
        if timeout is None:
            locked = QtCore.QMutex.tryLock(self)
        else:
            locked = QtCore.QMutex.tryLock(self, timeout)

        if self.debug and locked:
            self.l.lock()
            try:
                if id is None:
                    self.tb.append(''.join(traceback.format_stack()[:-1]))
                else:
                    self.tb.append("  " + str(id))
                #print 'trylock', self, len(self.tb)
            finally:
                self.l.unlock()
        return locked
mutex.py 文件源码 项目:qudi 作者: Ulm-IQO 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def lock(self, id=None):
        """ Lock mutex. Will try again every 5 seconds.

            @param id: debug id
        """
        c = 0
        waitTime = 5000  # in ms
        while True:
            if self.tryLock(waitTime, id):
                break
            c += 1
            if self.debug:
                self.l.lock()
                try:
                    logger.debug('Waiting for mutex lock ({:.1} sec). '
                    'Traceback follows:'.format(c*waitTime/1000.))
                    logger.debug(''.join(traceback.format_stack()))
                    if len(self.tb) > 0:
                        logger.debug('Mutex is currently locked from: {0}\n'
                                ''.format(self.tb[-1]))
                    else:
                        logger.debug('Mutex is currently locked from [???]')
                finally:
                    self.l.unlock()
        #print 'lock', self, len(self.tb)
pool.py 文件源码 项目:webapp 作者: superchilli 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
pool.py 文件源码 项目:QualquerMerdaAPI 作者: tiagovizoto 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
pool.py 文件源码 项目:gardenbot 作者: GoestaO 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
pool.py 文件源码 项目:flask-zhenai-mongo-echarts 作者: Fretice 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
pool.py 文件源码 项目:Data-visualization 作者: insta-code1 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
strategy.py 文件源码 项目:frontera-docs-zh_CN 作者: xsren 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run(self):
        def log_failure(failure):
            logger.exception(failure.value)
            if failure.frames:
                logger.critical(str("").join(format_tb(failure.getTracebackObject())))

        def errback_main(failure):
            log_failure(failure)
            self.task.start(interval=0).addErrback(errback_main)

        def errback_flush_states(failure):
            log_failure(failure)
            self._flush_states_task.start(interval=300).addErrback(errback_flush_states)

        def debug(sig, frame):
            logger.critical("Signal received: printing stack trace")
            logger.critical(str("").join(format_stack(frame)))

        self.task.start(interval=0).addErrback(errback_main)
        self._logging_task.start(interval=30)
        self._flush_states_task.start(interval=300).addErrback(errback_flush_states)
        signal(SIGUSR1, debug)
        reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
        reactor.run()
defer.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _startRunCallbacks(self, result):
        if self.called:
            if self._suppressAlreadyCalled:
                self._suppressAlreadyCalled = False
                return
            if self.debug:
                if self._debugInfo is None:
                    self._debugInfo = DebugInfo()
                extra = "\n" + self._debugInfo._getDebugTracebacks()
                raise AlreadyCalledError(extra)
            raise AlreadyCalledError
        if self.debug:
            if self._debugInfo is None:
                self._debugInfo = DebugInfo()
            self._debugInfo.invoker = traceback.format_stack()[:-2]
        self.called = True
        self.result = result
        self._runCallbacks()
__init__.py 文件源码 项目:pillar 作者: armadillica 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_file(file_id, api=None):
    # TODO: remove this function and just use the Pillar SDK directly.
    if file_id is None:
        return None

    if api is None:
        api = system_util.pillar_api()

    try:
        return File.find(file_id, api=api)
    except ResourceNotFound:
        f = sys.exc_info()[2].tb_frame.f_back
        tb = traceback.format_stack(f=f, limit=2)
        log.warning('File %s not found, but requested from %s\n%s',
                    file_id, request.url, ''.join(tb))
        return None
core.py 文件源码 项目:deb-python-httpretty 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def debug(self, truesock_func, *a, **kw):
            if self.is_http:
                frame = inspect.stack()[0][0]
                lines = list(map(utf8, traceback.format_stack(frame)))

                message = [
                    "HTTPretty intercepted and unexpected socket method call.",
                    ("Please open an issue at "
                     "'https://github.com/gabrielfalcao/HTTPretty/issues'"),
                    "And paste the following traceback:\n",
                    "".join(decode_utf8(lines)),
                ]
                raise RuntimeError("\n".join(message))
            if not self.truesock:
                raise UnmockedError()
            return getattr(self.truesock, truesock_func)(*a, **kw)
pool.py 文件源码 项目:micro-blog 作者: nickChenyx 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
pool.py 文件源码 项目:python-flask-security 作者: weinbergdavid 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
query_tracker.py 文件源码 项目:django-snoopy 作者: Pradeek 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def execute_insert_sql(self, *args, **kwargs):
    stack_trace = traceback.format_stack()
    query_dict = {
        'query': [],
        'query_type': QUERY_TYPE_WRITE,
        'traceback': stack_trace,
        'model': "%s.%s" % (self.query.model.__module__, self.query.model.__name__),
        'start_time': datetime.datetime.now(),
    }
    for sql, params in self.as_sql():
        query_dict['query'].append(sql % params)

    try:
        return self._snoopy_execute_insert_sql(*args, **kwargs)
    finally:
        # This gets called just before the `return`
        query_dict['end_time'] = datetime.datetime.now()
        SnoopyRequest.record_query_data(query_dict)
pool.py 文件源码 项目:watcher 作者: nosmokingbandit 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
pool.py 文件源码 项目:Lixiang_zhaoxin 作者: hejaxian 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
runtest.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _captureThreadStacks(self):
        """Capture the stacks for all currently running threads.

        :return: A list of ``(thread-description, stack)`` tuples. See
            `traceback.format_stack` for the format of ``stack``.
        """
        threads = {t.ident: t for t in threading.enumerate()}

        def describe(ident):
            if ident in threads:
                return repr(threads[ident])
            else:
                return "<*Unknown* %d>" % ident

        return [
            (describe(ident), traceback.format_stack(frame))
            for ident, frame in sys._current_frames().items()
        ]
pool.py 文件源码 项目:flask 作者: bobohope 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn


问题


面经


文章

微信
公众号

扫码关注公众号