def log(self, level, msg, *args, **kwargs):
"""
Log 'msg % args' with the integer severity 'level'.
To pass exception information, use the keyword argument exc_info with
a true value, e.g.
logger.log(level, "We have a %s", "mysterious problem", exc_info=1)
"""
if not isinstance(level, int):
if raiseExceptions:
raise TypeError("level must be an integer")
else:
return
if self.isEnabledFor(level):
self._log(level, msg, args, **kwargs)
python类exc_info()的实例源码
def _log(self, level, msg, args, exc_info=None, extra=None):
"""
Low-level logging routine which creates a LogRecord and then calls
all the handlers of this logger to handle the record.
"""
if _srcfile:
#IronPython doesn't track Python frames, so findCaller throws an
#exception on some versions of IronPython. We trap it here so that
#IronPython can use logging.
try:
fn, lno, func = self.findCaller()
except ValueError:
fn, lno, func = "(unknown file)", 0, "(unknown function)"
else:
fn, lno, func = "(unknown file)", 0, "(unknown function)"
if exc_info:
if not isinstance(exc_info, tuple):
exc_info = sys.exc_info()
record = self.makeRecord(self.name, level, fn, lno, msg, args, exc_info, func, extra)
self.handle(record)
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None):
assert group is None, "group argument must be None for now"
_Verbose.__init__(self, verbose)
if kwargs is None:
kwargs = {}
self.__target = target
self.__name = str(name or _newname())
self.__args = args
self.__kwargs = kwargs
self.__daemonic = self._set_daemon()
self.__ident = None
self.__started = Event()
self.__stopped = False
self.__block = Condition(Lock())
self.__initialized = True
# sys.stderr is not stored in the class like
# sys.exc_info since it can be changed between instances
self.__stderr = _sys.stderr
def importfile(path):
"""Import a Python source file or compiled file given its path."""
magic = imp.get_magic()
file = open(path, 'r')
if file.read(len(magic)) == magic:
kind = imp.PY_COMPILED
else:
kind = imp.PY_SOURCE
file.close()
filename = os.path.basename(path)
name, ext = os.path.splitext(filename)
file = open(path, 'r')
try:
module = imp.load_module(name, file, path, (ext, 'r', kind))
except:
raise ErrorDuringImport(path, sys.exc_info())
file.close()
return module
def compact_traceback():
t, v, tb = sys.exc_info()
tbinfo = []
if not tb: # Must have a traceback
raise AssertionError("traceback does not exist")
while tb:
tbinfo.append((
tb.tb_frame.f_code.co_filename,
tb.tb_frame.f_code.co_name,
str(tb.tb_lineno)
))
tb = tb.tb_next
# just to be safe
del tb
file, function, line = tbinfo[-1]
info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
return (file, function, line), t, v, info
def _run_exitfuncs():
"""run any registered exit functions
_exithandlers is traversed in reverse order so functions are executed
last in, first out.
"""
exc_info = None
while _exithandlers:
func, targs, kargs = _exithandlers.pop()
try:
func(*targs, **kargs)
except SystemExit:
exc_info = sys.exc_info()
except:
import traceback
print >> sys.stderr, "Error in atexit._run_exitfuncs:"
traceback.print_exc()
exc_info = sys.exc_info()
if exc_info is not None:
raise exc_info[0], exc_info[1], exc_info[2]
def decrypt(self):
"""Decrypt decrypts the secret and returns the plaintext.
Calling decrypt() may incur side effects such as a call to a remote service for decryption.
"""
if not self._crypter:
return b''
try:
plaintext = self._crypter.decrypt(self._ciphertext, **self._decrypt_params)
return plaintext
except Exception as e:
exc_info = sys.exc_info()
six.reraise(
ValueError('Invalid ciphertext "%s", error: %s' % (self._ciphertext, e)),
None,
exc_info[2]
)
def pushToIPFS(hstr, payload):
ipfsRetryCount = 5 # WARC->IPFS attempts before giving up
retryCount = 0
while retryCount < ipfsRetryCount:
try:
httpHeaderIPFSHash = pushBytesToIPFS(bytes(hstr))
payloadIPFSHash = pushBytesToIPFS(bytes(payload))
if retryCount > 0:
m = 'Retrying succeeded after {0} attempts'.format(retryCount)
print(m)
return [httpHeaderIPFSHash, payloadIPFSHash]
except NewConnectionError as e:
print('IPFS daemon is likely not running.')
print('Run "ipfs daemon" in another terminal session.')
sys.exit()
except:
attemptCount = '{0}/{1}'.format(retryCount + 1, ipfsRetryCount)
logError('IPFS failed to add, ' +
'retrying attempt {0}'.format(attemptCount))
# print(sys.exc_info())
retryCount += 1
return None # Process of adding to IPFS failed
def isDaemonAlive(hostAndPort="{0}:{1}".format(IPFSAPI_IP, IPFSAPI_PORT)):
"""Ensure that the IPFS daemon is running via HTTP before proceeding"""
client = ipfsapi.Client(IPFSAPI_IP, IPFSAPI_PORT)
try:
# OSError if ipfs not installed, redundant of below
# subprocess.call(['ipfs', '--version'], stdout=open(devnull, 'wb'))
# ConnectionError/AttributeError if IPFS daemon not running
client.id()
return True
except (ConnectionError, exceptions.AttributeError):
logError("Daemon is not running at http://" + hostAndPort)
return False
except OSError:
logError("IPFS is likely not installed. "
"See https://ipfs.io/docs/install/")
sys.exit()
except:
logError('Unknown error in retrieving daemon status')
logError(sys.exc_info()[0])
def run_handlers(self, event):
assert event in self.observers
handlers = []
instance_handlers = {
'instance_canceled': self._on_cancel,
'instance_failed': self._on_failed,
'instance_finished': self._on_finish,
}
handlers += self.observers[event]
handlers += instance_handlers.get(event, [])
failures = 0
for handler in handlers:
try:
handler(self)
except: # pylint: disable=bare-except
failures += 1
idc.Message("BAP> {0} failed because {1}\n".
format(self.action, str(sys.exc_info()[1])))
traceback.print_exc()
if failures != 0:
idc.Warning("Some BAP handlers failed")
def cleanup_dir(tmpdir, keep_data_files=False, ignore_errors=False):
if keep_data_files: return
#Remove our tmpdir, but don't fail the test if it doesn't remove
try:
shutil.rmtree(tmpdir, ignore_errors=ignore_errors)
except OSError as oe:
error = ""
if oe.errno: error = "%s: " % oe.errno
if oe.strerror: error += oe.strerror
if oe.filename: error += " (filename: %s)" % oe.filename
log.warning("Unable to remove powstream temporary directory %s due to error reported by OS: %s" % (tmpdir, error))
except:
log.warning("Unable to remove powstream temporary directory %s: %s" % (tmpdir, sys.exc_info()[0]))
##
# Called by signal handlers to clean-up then exit
def cleanup_file(tmpfile, keep_data_files=False):
if keep_data_files: return
#Remove our tmpfile, but don't fail the test if it doesn't remove
try:
os.remove(tmpfile)
except OSError as oe:
error = ""
if oe.errno: error = "%s: " % oe.errno
if oe.strerror: error += oe.strerror
if oe.filename: error += " (filename: %s)" % oe.filename
log.warning("Unable to remove powstream temporary file %s due to error reported by OS: %s" % (tmpfile, error))
except:
log.warning("Unable to remove powstream temporary file %s: %s" % (tmpfile, sys.exc_info()[0]))
##
# Handles reporting errors in pscheduler format
def prettyIn(self, value):
if not isinstance(value, str):
try:
return int(value)
except:
raise error.PyAsn1Error(
'Can\'t coerce %r into integer: %s' % (value, sys.exc_info()[1])
)
r = self.__namedValues.getValue(value)
if r is not None:
return r
try:
return int(value)
except:
raise error.PyAsn1Error(
'Can\'t coerce %r into integer: %s' % (value, sys.exc_info()[1])
)
def _async_recv(self, bufsize, *args):
"""Internal use only; use 'recv' with 'yield' instead.
Asynchronous version of socket recv method.
"""
def _recv():
try:
buf = self._rsock.recv(bufsize, *args)
except:
self._read_fn = None
self._notifier.clear(self, _AsyncPoller._Read)
self._read_task.throw(*sys.exc_info())
else:
self._read_fn = None
self._notifier.clear(self, _AsyncPoller._Read)
self._read_task._proceed_(buf)
if not self._scheduler:
self._scheduler = Pycos.scheduler()
self._notifier = self._scheduler._notifier
self._register()
self._read_task = Pycos.cur_task(self._scheduler)
self._read_task._await_()
self._read_fn = _recv
self._notifier.add(self, _AsyncPoller._Read)
if self._certfile and self._rsock.pending():
try:
buf = self._rsock.recv(bufsize, *args)
except:
self._read_fn = None
self._notifier.clear(self, _AsyncPoller._Read)
self._read_task.throw(*sys.exc_info())
else:
if buf:
self._read_fn = None
self._notifier.clear(self, _AsyncPoller._Read)
self._read_task._proceed_(buf)
def _tasklet(self):
while 1:
item = self._task_queue.get(block=True)
if item is None:
self._task_queue.task_done()
break
task, target, args, kwargs = item
try:
val = target(*args, **kwargs)
task._proceed_(val)
except:
task.throw(*sys.exc_info())
finally:
self._task_queue.task_done()
def _async_recv(self, bufsize, *args):
"""Internal use only; use 'recv' with 'yield' instead.
Asynchronous version of socket recv method.
"""
def _recv():
try:
buf = self._rsock.recv(bufsize, *args)
except:
self._read_fn = None
self._notifier.clear(self, _AsyncPoller._Read)
self._read_task.throw(*sys.exc_info())
else:
self._read_fn = None
self._notifier.clear(self, _AsyncPoller._Read)
self._read_task._proceed_(buf)
if not self._scheduler:
self._scheduler = Pycos.scheduler()
self._notifier = self._scheduler._notifier
self._register()
self._read_task = Pycos.cur_task(self._scheduler)
self._read_task._await_()
self._read_fn = _recv
self._notifier.add(self, _AsyncPoller._Read)
if self._certfile and self._rsock.pending():
try:
buf = self._rsock.recv(bufsize, *args)
except:
self._read_fn = None
self._notifier.clear(self, _AsyncPoller._Read)
self._read_task.throw(*sys.exc_info())
else:
if buf:
self._read_fn = None
self._notifier.clear(self, _AsyncPoller._Read)
self._read_task._proceed_(buf)
def _tasklet(self):
while 1:
item = self._task_queue.get(block=True)
if item is None:
self._task_queue.task_done()
break
task, target, args, kwargs = item
try:
val = target(*args, **kwargs)
task._proceed_(val)
except:
task.throw(*sys.exc_info())
finally:
self._task_queue.task_done()
def Scan(directory, options, plugins):
try:
if os.path.isdir(directory):
for entry in os.listdir(directory):
Scan(os.path.join(directory, entry), options, plugins)
else:
ProcessFile(directory, options, plugins)
except Exception as e:
# print directory
print(e)
# print(sys.exc_info()[2])
# print traceback.format_exc()
#function derived from: http://blog.9bplus.com/pdfidpy-output-to-json
def render_POST(self, request):
"""
Handle a request from the client.
"""
script_env = {
method: api_method(request, method)
for method in request.sdata.api.fns
}
# Make get do auto-formatting for convenience, even though this
# breaks if you try to use literal '{}' named arguments
# @@@ reconsider whether this is at all a good idea
def get_with_formatting(path, *args):
return api_method(request, 'get')(path.format(*args))
script_env['get'] = get_with_formatting
script_env['re'] = re
script_env['dumps'] = dumps
script_env['defaultdict'] = defaultdict
script_env['OrderedDict'] = OrderedDict
buf = []
def dummy_print(*args):
if len(args) == 1 and (isinstance(args[0], list) or isinstance(args[0], dict)):
buf.append(dumps(args[0], indent=4))
else:
buf.append(' '.join(map(str, args)))
script_env['print'] = dummy_print
def run_script(script):
try:
exec script in script_env
except:
exception_info = sys.exc_info()
buf.extend(traceback.format_exception(*exception_info))
request.sdata.log('got reply {}'.format(buf))
request.sdata.add_to_push_queue('script', text=dumps(buf))
script = request.args['script'][0]
reactor.callInThread(run_script, script)
def spin(self):
reconnect_delay = 1.0
while not rospy.is_shutdown():
try:
rospy.loginfo("Connecting to SwiftNav Piksi on port %s" % self.piksi_port)
self.connect_piksi()
while not rospy.is_shutdown():
rospy.sleep(0.05)
if not self.piksi.is_alive():
raise IOError
self.diag_updater.update()
self.check_timeouts()
break # should only happen if rospy is trying to shut down
except IOError as e:
rospy.logerr("IOError")
self.disconnect_piksi()
except SystemExit as e:
rospy.logerr("Unable to connect to Piksi on port %s" % self.piksi_port)
self.disconnect_piksi()
except: # catch *all* exceptions
e = sys.exc_info()[0]
rospy.logerr("Uncaught error: %s" % repr(e))
self.disconnect_piksi()
rospy.loginfo("Attempting to reconnect in %fs" % reconnect_delay)
rospy.sleep(reconnect_delay)