python类exc_info()的实例源码

__init__.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def log(self, level, msg, *args, **kwargs):
        """
        Log 'msg % args' with the integer severity 'level'.

        To pass exception information, use the keyword argument exc_info with
        a true value, e.g.

        logger.log(level, "We have a %s", "mysterious problem", exc_info=1)
        """
        if not isinstance(level, int):
            if raiseExceptions:
                raise TypeError("level must be an integer")
            else:
                return
        if self.isEnabledFor(level):
            self._log(level, msg, args, **kwargs)
__init__.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _log(self, level, msg, args, exc_info=None, extra=None):
        """
        Low-level logging routine which creates a LogRecord and then calls
        all the handlers of this logger to handle the record.
        """
        if _srcfile:
            #IronPython doesn't track Python frames, so findCaller throws an
            #exception on some versions of IronPython. We trap it here so that
            #IronPython can use logging.
            try:
                fn, lno, func = self.findCaller()
            except ValueError:
                fn, lno, func = "(unknown file)", 0, "(unknown function)"
        else:
            fn, lno, func = "(unknown file)", 0, "(unknown function)"
        if exc_info:
            if not isinstance(exc_info, tuple):
                exc_info = sys.exc_info()
        record = self.makeRecord(self.name, level, fn, lno, msg, args, exc_info, func, extra)
        self.handle(record)
threading.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        assert group is None, "group argument must be None for now"
        _Verbose.__init__(self, verbose)
        if kwargs is None:
            kwargs = {}
        self.__target = target
        self.__name = str(name or _newname())
        self.__args = args
        self.__kwargs = kwargs
        self.__daemonic = self._set_daemon()
        self.__ident = None
        self.__started = Event()
        self.__stopped = False
        self.__block = Condition(Lock())
        self.__initialized = True
        # sys.stderr is not stored in the class like
        # sys.exc_info since it can be changed between instances
        self.__stderr = _sys.stderr
pydoc.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def importfile(path):
    """Import a Python source file or compiled file given its path."""
    magic = imp.get_magic()
    file = open(path, 'r')
    if file.read(len(magic)) == magic:
        kind = imp.PY_COMPILED
    else:
        kind = imp.PY_SOURCE
    file.close()
    filename = os.path.basename(path)
    name, ext = os.path.splitext(filename)
    file = open(path, 'r')
    try:
        module = imp.load_module(name, file, path, (ext, 'r', kind))
    except:
        raise ErrorDuringImport(path, sys.exc_info())
    file.close()
    return module
asyncore.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def compact_traceback():
    t, v, tb = sys.exc_info()
    tbinfo = []
    if not tb: # Must have a traceback
        raise AssertionError("traceback does not exist")
    while tb:
        tbinfo.append((
            tb.tb_frame.f_code.co_filename,
            tb.tb_frame.f_code.co_name,
            str(tb.tb_lineno)
            ))
        tb = tb.tb_next

    # just to be safe
    del tb

    file, function, line = tbinfo[-1]
    info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
    return (file, function, line), t, v, info
atexit.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _run_exitfuncs():
    """run any registered exit functions

    _exithandlers is traversed in reverse order so functions are executed
    last in, first out.
    """

    exc_info = None
    while _exithandlers:
        func, targs, kargs = _exithandlers.pop()
        try:
            func(*targs, **kargs)
        except SystemExit:
            exc_info = sys.exc_info()
        except:
            import traceback
            print >> sys.stderr, "Error in atexit._run_exitfuncs:"
            traceback.print_exc()
            exc_info = sys.exc_info()

    if exc_info is not None:
        raise exc_info[0], exc_info[1], exc_info[2]
__init__.py 文件源码 项目:py-secretcrypt 作者: Zemanta 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def decrypt(self):
        """Decrypt decrypts the secret and returns the plaintext.

        Calling decrypt() may incur side effects such as a call to a remote service for decryption.
        """
        if not self._crypter:
            return b''
        try:
            plaintext = self._crypter.decrypt(self._ciphertext, **self._decrypt_params)
            return plaintext
        except Exception as e:
            exc_info = sys.exc_info()
            six.reraise(
                ValueError('Invalid ciphertext "%s", error: %s' % (self._ciphertext, e)),
                None,
                exc_info[2]
            )
indexer.py 文件源码 项目:ipwb 作者: oduwsdl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def pushToIPFS(hstr, payload):
    ipfsRetryCount = 5  # WARC->IPFS attempts before giving up
    retryCount = 0
    while retryCount < ipfsRetryCount:
        try:
            httpHeaderIPFSHash = pushBytesToIPFS(bytes(hstr))
            payloadIPFSHash = pushBytesToIPFS(bytes(payload))
            if retryCount > 0:
                m = 'Retrying succeeded after {0} attempts'.format(retryCount)
                print(m)
            return [httpHeaderIPFSHash, payloadIPFSHash]
        except NewConnectionError as e:
            print('IPFS daemon is likely not running.')
            print('Run "ipfs daemon" in another terminal session.')
            sys.exit()
        except:
            attemptCount = '{0}/{1}'.format(retryCount + 1, ipfsRetryCount)
            logError('IPFS failed to add, ' +
                     'retrying attempt {0}'.format(attemptCount))
            # print(sys.exc_info())
            retryCount += 1

    return None  # Process of adding to IPFS failed
util.py 文件源码 项目:ipwb 作者: oduwsdl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def isDaemonAlive(hostAndPort="{0}:{1}".format(IPFSAPI_IP, IPFSAPI_PORT)):
    """Ensure that the IPFS daemon is running via HTTP before proceeding"""
    client = ipfsapi.Client(IPFSAPI_IP, IPFSAPI_PORT)

    try:
        # OSError if ipfs not installed, redundant of below
        # subprocess.call(['ipfs', '--version'], stdout=open(devnull, 'wb'))

        # ConnectionError/AttributeError if IPFS daemon not running
        client.id()
        return True
    except (ConnectionError, exceptions.AttributeError):
        logError("Daemon is not running at http://" + hostAndPort)
        return False
    except OSError:
        logError("IPFS is likely not installed. "
                 "See https://ipfs.io/docs/install/")
        sys.exit()
    except:
        logError('Unknown error in retrieving daemon status')
        logError(sys.exc_info()[0])
run.py 文件源码 项目:bap-ida-python 作者: BinaryAnalysisPlatform 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def run_handlers(self, event):
        assert event in self.observers
        handlers = []
        instance_handlers = {
            'instance_canceled': self._on_cancel,
            'instance_failed':   self._on_failed,
            'instance_finished': self._on_finish,
        }

        handlers += self.observers[event]
        handlers += instance_handlers.get(event, [])

        failures = 0
        for handler in handlers:
            try:
                handler(self)
            except:  # pylint: disable=bare-except
                failures += 1
                idc.Message("BAP> {0} failed because {1}\n".
                            format(self.action, str(sys.exc_info()[1])))
                traceback.print_exc()
        if failures != 0:
            idc.Warning("Some BAP handlers failed")
powstream_utils.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def cleanup_dir(tmpdir, keep_data_files=False, ignore_errors=False):
    if keep_data_files: return
    #Remove our tmpdir, but don't fail the test if it doesn't remove
    try:
        shutil.rmtree(tmpdir, ignore_errors=ignore_errors)
    except OSError as oe:
        error = ""
        if oe.errno: error = "%s: " % oe.errno
        if oe.strerror: error += oe.strerror
        if oe.filename: error += " (filename: %s)" % oe.filename
        log.warning("Unable to remove powstream temporary directory %s due to error reported by OS: %s" % (tmpdir, error))
    except:
        log.warning("Unable to remove powstream temporary directory %s: %s" % (tmpdir, sys.exc_info()[0]))

##
# Called by signal handlers to clean-up then exit
powstream_utils.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def cleanup_file(tmpfile, keep_data_files=False):
    if keep_data_files: return
    #Remove our tmpfile, but don't fail the test if it doesn't remove
    try:
        os.remove(tmpfile)
    except OSError as oe:
        error = ""
        if oe.errno: error = "%s: " % oe.errno
        if oe.strerror: error += oe.strerror
        if oe.filename: error += " (filename: %s)" % oe.filename
        log.warning("Unable to remove powstream temporary file %s due to error reported by OS: %s" % (tmpfile, error))
    except:
        log.warning("Unable to remove powstream temporary file %s: %s" % (tmpfile, sys.exc_info()[0]))

##
# Handles reporting errors in pscheduler format
univ.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def prettyIn(self, value):
        if not isinstance(value, str):
            try:
                return int(value)
            except:
                raise error.PyAsn1Error(
                    'Can\'t coerce %r into integer: %s' % (value, sys.exc_info()[1])
                    )
        r = self.__namedValues.getValue(value)
        if r is not None:
            return r
        try:
            return int(value)
        except:
            raise error.PyAsn1Error(
                'Can\'t coerce %r into integer: %s' % (value, sys.exc_info()[1])
                )
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _async_recv(self, bufsize, *args):
        """Internal use only; use 'recv' with 'yield' instead.

        Asynchronous version of socket recv method.
        """
        def _recv():
            try:
                buf = self._rsock.recv(bufsize, *args)
            except:
                self._read_fn = None
                self._notifier.clear(self, _AsyncPoller._Read)
                self._read_task.throw(*sys.exc_info())
            else:
                self._read_fn = None
                self._notifier.clear(self, _AsyncPoller._Read)
                self._read_task._proceed_(buf)

        if not self._scheduler:
            self._scheduler = Pycos.scheduler()
            self._notifier = self._scheduler._notifier
            self._register()
        self._read_task = Pycos.cur_task(self._scheduler)
        self._read_task._await_()
        self._read_fn = _recv
        self._notifier.add(self, _AsyncPoller._Read)
        if self._certfile and self._rsock.pending():
            try:
                buf = self._rsock.recv(bufsize, *args)
            except:
                self._read_fn = None
                self._notifier.clear(self, _AsyncPoller._Read)
                self._read_task.throw(*sys.exc_info())
            else:
                if buf:
                    self._read_fn = None
                    self._notifier.clear(self, _AsyncPoller._Read)
                    self._read_task._proceed_(buf)
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _tasklet(self):
        while 1:
            item = self._task_queue.get(block=True)
            if item is None:
                self._task_queue.task_done()
                break
            task, target, args, kwargs = item
            try:
                val = target(*args, **kwargs)
                task._proceed_(val)
            except:
                task.throw(*sys.exc_info())
            finally:
                self._task_queue.task_done()
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _async_recv(self, bufsize, *args):
        """Internal use only; use 'recv' with 'yield' instead.

        Asynchronous version of socket recv method.
        """
        def _recv():
            try:
                buf = self._rsock.recv(bufsize, *args)
            except:
                self._read_fn = None
                self._notifier.clear(self, _AsyncPoller._Read)
                self._read_task.throw(*sys.exc_info())
            else:
                self._read_fn = None
                self._notifier.clear(self, _AsyncPoller._Read)
                self._read_task._proceed_(buf)

        if not self._scheduler:
            self._scheduler = Pycos.scheduler()
            self._notifier = self._scheduler._notifier
            self._register()
        self._read_task = Pycos.cur_task(self._scheduler)
        self._read_task._await_()
        self._read_fn = _recv
        self._notifier.add(self, _AsyncPoller._Read)
        if self._certfile and self._rsock.pending():
            try:
                buf = self._rsock.recv(bufsize, *args)
            except:
                self._read_fn = None
                self._notifier.clear(self, _AsyncPoller._Read)
                self._read_task.throw(*sys.exc_info())
            else:
                if buf:
                    self._read_fn = None
                    self._notifier.clear(self, _AsyncPoller._Read)
                    self._read_task._proceed_(buf)
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 61 收藏 0 点赞 0 评论 0
def _tasklet(self):
        while 1:
            item = self._task_queue.get(block=True)
            if item is None:
                self._task_queue.task_done()
                break
            task, target, args, kwargs = item
            try:
                val = target(*args, **kwargs)
                task._proceed_(val)
            except:
                task.throw(*sys.exc_info())
            finally:
                self._task_queue.task_done()
pdfid.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def Scan(directory, options, plugins):
    try:
        if os.path.isdir(directory):
            for entry in os.listdir(directory):
                Scan(os.path.join(directory, entry), options, plugins)
        else:
            ProcessFile(directory, options, plugins)
    except Exception as e:
#        print directory
        print(e)
#        print(sys.exc_info()[2])
#        print traceback.format_exc()

#function derived from: http://blog.9bplus.com/pdfidpy-output-to-json
render_script.py 文件源码 项目:xr-telemetry-m2m-web 作者: cisco 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def render_POST(self, request):
        """
        Handle a request from the client.
        """
        script_env = {
            method: api_method(request, method)
            for method in request.sdata.api.fns
        }

        # Make get do auto-formatting for convenience, even though this
        # breaks if you try to use literal '{}' named arguments
        # @@@ reconsider whether this is at all a good idea
        def get_with_formatting(path, *args):
            return api_method(request, 'get')(path.format(*args))
        script_env['get'] = get_with_formatting

        script_env['re'] = re
        script_env['dumps'] = dumps
        script_env['defaultdict'] = defaultdict
        script_env['OrderedDict'] = OrderedDict

        buf = []
        def dummy_print(*args):
            if len(args) == 1 and (isinstance(args[0], list) or isinstance(args[0], dict)):
                buf.append(dumps(args[0], indent=4))
            else:
                buf.append(' '.join(map(str, args)))
        script_env['print'] = dummy_print

        def run_script(script):
            try:
                exec script in script_env
            except:
                exception_info = sys.exc_info()
                buf.extend(traceback.format_exception(*exception_info))
            request.sdata.log('got reply {}'.format(buf))
            request.sdata.add_to_push_queue('script', text=dumps(buf))

        script = request.args['script'][0]
        reactor.callInThread(run_script, script)
piksi_driver.py 文件源码 项目:piksi_ros 作者: uscresl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def spin(self):
        reconnect_delay = 1.0
        while not rospy.is_shutdown():
            try:
                rospy.loginfo("Connecting to SwiftNav Piksi on port %s" % self.piksi_port)
                self.connect_piksi()

                while not rospy.is_shutdown():
                    rospy.sleep(0.05)
                    if not self.piksi.is_alive():
                        raise IOError
                    self.diag_updater.update()
                    self.check_timeouts()

                break # should only happen if rospy is trying to shut down
            except IOError as e:
                rospy.logerr("IOError")
                self.disconnect_piksi()
            except SystemExit as e:
                rospy.logerr("Unable to connect to Piksi on port %s" % self.piksi_port)
                self.disconnect_piksi()
            except: # catch *all* exceptions
                e = sys.exc_info()[0]
                rospy.logerr("Uncaught error: %s" % repr(e))
                self.disconnect_piksi()
            rospy.loginfo("Attempting to reconnect in %fs" % reconnect_delay)
            rospy.sleep(reconnect_delay)


问题


面经


文章

微信
公众号

扫码关注公众号