def mqtt_connect(mqtt_client, core_info):
connected = False
# try connecting to all connectivity info objects in the list
for connectivity_info in core_info.connectivityInfoList:
core_host = connectivity_info.host
core_port = connectivity_info.port
logging.info("Connecting to Core at {0}:{1}".format(
core_host, core_port))
mqtt_client.configureEndpoint(core_host, core_port)
try:
mqtt_client.connect()
connected = True
break
except socket.error as se:
print("SE:{0}".format(se))
except operationTimeoutException as te:
print("operationTimeoutException:{0}".format(te.message))
traceback.print_tb(te, limit=25)
except Exception as e:
print("Exception caught:{0}".format(e.message))
return connected
python类print_tb()的实例源码
def mqtt_connect(mqtt_client, core_info):
connected = False
# try connecting to all connectivity info objects in the list
for connectivity_info in core_info.connectivityInfoList:
core_host = connectivity_info.host
core_port = connectivity_info.port
logging.info("Connecting to Core at {0}:{1}".format(
core_host, core_port))
mqtt_client.configureEndpoint(core_host, core_port)
try:
mqtt_client.connect()
connected = True
break
except socket.error as se:
print("SE:{0}".format(se))
except operationTimeoutException as te:
print("operationTimeoutException:{0}".format(te.message))
traceback.print_tb(te, limit=25)
except Exception as e:
print("Exception caught:{0}".format(e.message))
return connected
def handle_error(self, wrapper, exception, traceback_):
print >> sys.stderr
print >> sys.stderr, "---- location:"
traceback.print_stack()
print >> sys.stderr, "---- error:"
traceback.print_tb(traceback_)
try:
stack = wrapper.stack_where_defined
except AttributeError:
print >> sys.stderr, "??:??: %s / %r" % (wrapper, exception)
else:
stack = list(stack)
stack.reverse()
for (filename, line_number, function_name, text) in stack:
file_dir = os.path.dirname(os.path.abspath(filename))
if file_dir.startswith(this_script_dir):
print >> sys.stderr, "%s:%i: %r" % (os.path.join("..", "bindings", "python", os.path.basename(filename)),
line_number, exception)
break
return True
def on_command_error(error, ctx):
if isinstance(error, commands.NoPrivateMessage):
await bot.send_typing(ctx.message.author)
await asyncio.sleep(1)
await bot.send_message(ctx.message.author, "Um... this command can't be used in private messages.")
elif isinstance(error, commands.DisabledCommand):
await bot.send_typing(ctx.message.author)
await asyncio.sleep(1)
await bot.send_message(ctx.message.author, "I'm Sorry. This command is disabled and it can't be used.")
elif isinstance(error, commands.CommandInvokeError):
print('In {0.command.qualified_name}:'.format(ctx), file=sys.stderr)
traceback.print_tb(error.original.__traceback__)
print('{0.__class__.__name__}: {0}'.format(error.original), file=sys.stderr)
elif isinstance(error, commands.CommandNotFound):
log.info("'{0.message.author}' in {0.message.server} used a command thats not in Inkxbot, the content is resent here: '{0.message.content}'".format(ctx))
elif isinstance(error, commands.MissingRequiredArgument):
log.info("'{0.message.author}' in {0.message.server} was missing some arguments in a command, message is resent here: '{0.message.content}'".format(ctx))
await bot.send_typing(ctx.message.channel)
await asyncio.sleep(1)
await bot.send_message(ctx.message.channel, "It seems you are missing required argument(s). Try again if you have all the arguments needed.")
def get_dispatch(distro, arg):
"""
Dispatch to the appropriate scraping function for the given distro.
"""
defin = DISTROS[distro]
try:
if isinstance(defin, type(lambda:None)):
return defin(arg)
elif isinstance(defin, type("")):
return get_dispatch(defin, arg)
else:
return get_scrape(distro, defin, arg)
except:
ex_ty, ex_ob, ex_tb = sys.exc_info()
trace('get_dispatch(%r, %r) failed:' % (distro, arg))
if VERBOSE:
import traceback
traceback.print_tb(ex_tb)
trace(' %s: %s' % (ex_ty.__name__, ex_ob))
return parse_semver('0.0.0')
def boto_except_hook(debugger_flag, debug_flag):
def excepthook(typ, value, tb):
if typ is bdb.BdbQuit:
sys.exit(1)
sys.excepthook = sys.__excepthook__
if debugger_flag and sys.stdout.isatty() and sys.stdin.isatty():
if debugger.__name__ == 'epdb':
debugger.post_mortem(tb, typ, value)
else:
debugger.post_mortem(tb)
elif debug_flag:
print(traceback.print_tb(tb))
sys.exit(1)
else:
print(value)
sys.exit(1)
return excepthook
def on_command_error(ctx, error):
if isinstance(error, commands.NoPrivateMessage):
await ctx.send(content='This command cannot be used in private messages.')
elif isinstance(error, commands.DisabledCommand):
await ctx.send(content='This command is disabled and cannot be used.')
elif isinstance(error, commands.MissingRequiredArgument):
await bot.formatter.format_help_for(ctx, ctx.command, "You are missing required arguments.")
elif isinstance(error, commands.CommandNotFound):
# await ctx.send('Command not found. (I\'m working on fixing cmd_not_found forf help module')
await bot.formatter.format_help_for(ctx, [c for c in bot.commands if 'help' == c.name][0],
"Command not found.")
elif isinstance(error, commands.CommandInvokeError):
print('In {0.command.qualified_name}:'.format(ctx), file=sys.stderr)
print('{0.__class__.__name__}: {0}'.format(error.original), file=sys.stderr)
traceback.print_tb(error.__traceback__, file=sys.stderr)
log.error('In {0.command.qualified_name}:'.format(ctx))
log.error('{0.__class__.__name__}: {0}'.format(error.original))
else:
traceback.print_tb(error.__traceback__, file=sys.stderr)
def mock_input(testfunc):
"""Decorator for tests of the main interact loop.
Write the test as a generator, yield-ing the input strings, which IPython
will see as if they were typed in at the prompt.
"""
def test_method(self):
testgen = testfunc(self)
with mock_input_helper(testgen) as mih:
mih.ip.interact()
if mih.exception is not None:
# Re-raise captured exception
etype, value, tb = mih.exception
import traceback
traceback.print_tb(tb, file=sys.stdout)
del tb # Avoid reference loop
raise value
return test_method
# Test classes -----------------------------------------------------------------
def load_wsgi(self):
try:
self.wsgi = self.app.wsgi()
except SyntaxError as e:
if self.cfg.reload == 'off':
raise
self.log.exception(e)
# fix from PR #1228
# storing the traceback into exc_tb will create a circular reference.
# per https://docs.python.org/2/library/sys.html#sys.exc_info warning,
# delete the traceback after use.
try:
exc_type, exc_val, exc_tb = sys.exc_info()
self.reloader.add_extra_file(exc_val.filename)
tb_string = six.StringIO()
traceback.print_tb(exc_tb, file=tb_string)
self.wsgi = util.make_fail_app(tb_string.getvalue())
finally:
del exc_tb
def _run_batches_from_queue(self):
skip_batch = False
while not self._stop_flag:
future = self._prefetch_queue.get(block=True)
if future is None:
self._prefetch_queue.task_done()
self._batch_queue.put(None)
break
else:
try:
batch = future.result()
except SkipBatchException:
skip_batch = True
except Exception: # pylint: disable=broad-except
exc = future.exception()
print("Exception in a thread:", exc)
traceback.print_tb(exc.__traceback__)
finally:
if not skip_batch:
self._batch_queue.put(batch, block=True)
skip_batch = False
self._prefetch_queue.task_done()
return None
def assemble(self, all_res, *args, **kwargs):
""" Assemble the batch after a parallel action """
_ = args, kwargs
if any_action_failed(all_res):
all_errors = self.get_errors(all_res)
print(all_errors)
traceback.print_tb(all_errors[0].__traceback__)
raise RuntimeError("Could not assemble the batch")
components = kwargs.get('components', 'images')
if isinstance(components, (list, tuple)):
all_res = list(zip(*all_res))
else:
components = [components]
all_res = [all_res]
for component, res in zip(components, all_res):
self.assemble_component(res, component)
return self
def assemble(self, all_res, *args, **kwargs):
""" Assemble the batch after a parallel action """
_ = args, kwargs
if any_action_failed(all_res):
all_errors = self.get_errors(all_res)
print(all_errors)
traceback.print_tb(all_errors[0].__traceback__)
raise RuntimeError("Could not assemble the batch")
components = kwargs.get('components', 'images')
if isinstance(components, (list, tuple)):
all_res = list(zip(*all_res))
else:
components = [components]
all_res = [all_res]
for component, res in zip(components, all_res):
new_data = np.array(res, dtype='object')
setattr(self, component, new_data)
return self
def pyerr_write_unraisable(exc, obj):
f = sys.stderr
if obj:
f.write("Exception ignored in: ")
f.write(repr(obj))
f.write("\n")
t, v, tb = sys.exc_info()
traceback.print_tb(tb, file=f)
assert isinstance(v, Exception)
f.write(t.__module__ + "." + t.__name__)
f.write(": ")
f.write(str(v))
f.write("\n")
def step_impl(context):
"""
:type context behave.runner.Context
"""
_json_data = json_load_file(os.path.join(script_location, "../data/bad_json.json"))
try:
context.schema_tools.apply(_json_data)
except Exception as e:
if str(e)[:32] == "'canRead' is a required property":
ok_(True)
else:
if hasattr(sys, "last_traceback"):
_traceback = traceback.print_tb(sys.last_traceback)
else:
_traceback = "Not available"
ok_(False, "Error: " + str(e) + "\nTraceback:" + _traceback)
def _in_thread_run_script(self):
""" Run the script. """
try:
self.run_script()
except Exception as e:
# handle any exception thrown
exc_type,exc_value,exc_traceback = sys.exc_info()
print "Exception Error: ", exc_value
print e
# print traceback, excluding this file
traceback.print_tb(exc_traceback)
# exc_strings = traceback.format_list(traceback.extract_tb(exc_traceback))
# exc_strings = [s for s in exc_strings if s.find("parlay_script.py")< 0 ]
# for s in exc_strings:
# print s
def boto_except_hook(debugger_flag, debug_flag):
def excepthook(typ, value, tb):
if typ is bdb.BdbQuit:
sys.exit(1)
sys.excepthook = sys.__excepthook__
if debugger_flag and sys.stdout.isatty() and sys.stdin.isatty():
if debugger.__name__ == 'epdb':
debugger.post_mortem(tb, typ, value)
else:
debugger.post_mortem(tb)
elif debug_flag:
print(traceback.print_tb(tb))
sys.exit(1)
else:
print(value)
sys.exit(1)
return excepthook
def assertion_msg(message: str, *args):
"""This will:
* print the custom assertion message
* print the traceback (stack trace)
* raise the original AssertionError exception
:param message: a message that will be printed & logged when assertion fails
:param args: values that will replace % conversion specifications in message
like: %s, %d
"""
try:
yield
except AssertionError as e:
if args:
message = message % args
logging.error(message)
e.args += (message,)
_, _, tb = sys.exc_info()
traceback.print_tb(tb)
raise
def assertion_msg(message: str, *args):
"""This will:
* print the custom assertion message
* print the traceback (stack trace)
* raise the original AssertionError exception
:param message: message that will be printed & logged when assertion fails
:param args: values that will replace % conversion specifications in
message like: %s, %d
"""
try:
yield
except AssertionError as e:
if args:
message = message % args
logging.error(message)
e.args += (message,)
_, _, tb = sys.exc_info()
traceback.print_tb(tb)
raise
def run(self):
try:
open_luggage = luggage.Luggage(luggage_path=self.path, password=self.password,
pyluggage_config=self.pyluggage_config)
self.open_luggage = open_luggage
except Exception as ex:
if be_verbose:
print ">>>> Error opening Luggage"
import sys
import traceback
exc_type, exc_value, exc_traceback = sys.exc_info()
print "*** print_tb:"
traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
print "*** print_exception:"
traceback.print_exception(exc_type, exc_value, exc_traceback,
limit=2, file=sys.stdout)
self.error = ex
finally:
self.finished.emit()
def run(self):
try:
self.new_luggage = luggage.Luggage.create_new_luggage(
luggage_path=self.path,
password=self.password,
pyluggage_config=self.pyluggage_config)
except Exception as ex:
if be_verbose:
print ">>>> Error creating Luggage"
import sys
import traceback
exc_type, exc_value, exc_traceback = sys.exc_info()
print "*** print_tb:"
traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
print "*** print_exception:"
traceback.print_exception(exc_type, exc_value, exc_traceback, file=sys.stdout)
self.error = ex
finally:
self.finished.emit()
def play_music(self, ctx, *, song : str):
state = self.get_voice_state(ctx.message.server)
opts = {
"default_search": "auto",
"quiet": True
}
if state.voice is None:
success = await ctx.invoke(self.summon)
if not success:
return
try:
player = await state.voice.create_ytdl_player(song, ytdl_options=opts, after=state.toggle_next)
except Exception as e:
traceback.print_tb(e.__traceback__)
await self.bot.send_message(ctx.message.channel, "Could not play song: [{}]: {}".format(type(e).__name__, e))
else:
player.volume = 0.5
player_state = VoicePlayer(ctx.message, player, state.voice)
await self.bot.say("Queued **{}**".format(player.title))
await state.songs.put(player_state)
state.song_list.append(player_state)
def on_command_error(self, error, ctx):
if isinstance(error, commands.NoPrivateMessage):
await self.send_message(ctx.message.author, "\N{WARNING SIGN} Sorry, you can't use this command in a private message!")
elif isinstance(error, commands.DisabledCommand):
await self.send_message(ctx.message.author, "\N{WARNING SIGN} Sorry, this command is disabled!")
elif isinstance(error, commands.CommandOnCooldown):
await self.send_message(ctx.message.channel, f"{ctx.message.author.mention} slow down! Try again in {error.retry_after:.1f} seconds.")
elif isinstance(error, commands.MissingRequiredArgument) or isinstance(error, commands.BadArgument):
await self.send_message(ctx.message.channel, f"\N{WARNING SIGN} {error}")
elif isinstance(error, commands.CommandInvokeError):
original_name = error.original.__class__.__name__
print(f"In {paint(ctx.command.qualified_name, 'b_red')}:")
traceback.print_tb(error.original.__traceback__)
print(f"{paint(original_name, 'red')}: {error.original}")
else:
print(f"{paint(type(error).__name__, 'b_red')}: {error}")
def handle_indexing(app, mysql):
with app.app_context():
while(True):
try:
g.conn = mysql.connect()
g.cursor = g.conn.cursor()
g.conn.begin()
# run indexing servie every 300 seconds
time.sleep(300)
sqlpie.Indexer().index_documents()
g.conn.commit()
except Exception as e:
if sqlpie.Util.is_debug():
traceback.print_tb(sys.exc_info()[2])
try:
g.conn.rollback()
except:
pass
finally:
# if the MySQL Server is not running, this will fail.
try:
g.cursor.close()
g.conn.close()
except:
pass
def get(data, pattern, typ, windex, st=0):
x = None
for line in data.lines:
if pattern in line:
try:
x = line.split()[windex]
if st < 0:
x = x[0:st]
else:
x = x[st:]
x = typ(x)
except:
print line.split()
print x, st
print sys.exc_info()[1]
traceback.print_tb(sys.exc_info()[2])
quit()
return x
return x
def test_push_dirty_git(runner, monkeypatch):
with runner.isolated_filesystem():
os.mkdir('wandb')
# If the test was run from a directory containing .wandb, then __stage_dir__
# was '.wandb' when imported by api.py, reload to fix. UGH!
reload(wandb)
repo = git_repo()
open("foo.txt", "wb").close()
repo.repo.index.add(["foo.txt"])
monkeypatch.setattr(cli, 'api', wandb_api.Api({'project': 'test'}))
cli.api._settings['git_tag'] = True
result = runner.invoke(
cli.push, ["test", "foo.txt", "-p", "test", "-m", "Dirty"])
print(result.output)
print(result.exception)
print(traceback.print_tb(result.exc_info[2]))
assert result.exit_code == 1
assert "You have un-committed changes." in result.output
def test_push_dirty_force_git(runner, request_mocker, query_project, upload_url, upsert_run, monkeypatch):
query_project(request_mocker)
upload_url(request_mocker)
update_mock = upsert_run(request_mocker)
with runner.isolated_filesystem():
# So GitRepo is in this cwd
monkeypatch.setattr(cli, 'api', wandb_api.Api({'project': 'test'}))
repo = git_repo()
with open('weights.h5', 'wb') as f:
f.write(os.urandom(100))
repo.repo.index.add(["weights.h5"])
result = runner.invoke(
cli.push, ["test", "weights.h5", "-f", "-p", "test", "-m", "Dirty"])
print(result.output)
print(result.exception)
print(traceback.print_tb(result.exc_info[2]))
assert result.exit_code == 0
def test_restore(runner, request_mocker, query_run, monkeypatch):
mock = query_run(request_mocker)
with runner.isolated_filesystem():
os.mkdir("wandb")
repo = git_repo()
with open("patch.txt", "w") as f:
f.write("test")
repo.repo.index.add(["patch.txt"])
repo.repo.commit()
monkeypatch.setattr(cli, 'api', wandb_api.Api({'project': 'test'}))
result = runner.invoke(cli.restore, ["test/abcdef"])
print(result.output)
print(traceback.print_tb(result.exc_info[2]))
assert result.exit_code == 0
assert "Created branch wandb/abcdef" in result.output
assert "Applied patch" in result.output
assert "Restored config variables" in result.output
def test_init_new_login(runner, empty_netrc, local_netrc, request_mocker, query_projects, query_viewer):
query_viewer(request_mocker)
query_projects(request_mocker)
with runner.isolated_filesystem():
# If the test was run from a directory containing .wandb, then __stage_dir__
# was '.wandb' when imported by api.py, reload to fix. UGH!
reload(wandb)
result = runner.invoke(cli.init, input="%s\nvanpelt" % DUMMY_API_KEY)
print('Output: ', result.output)
print('Exception: ', result.exception)
print('Traceback: ', traceback.print_tb(result.exc_info[2]))
assert result.exit_code == 0
with open("netrc", "r") as f:
generatedNetrc = f.read()
with open("wandb/settings", "r") as f:
generatedWandb = f.read()
assert DUMMY_API_KEY in generatedNetrc
assert "test_model" in generatedWandb
def test_init_add_login(runner, empty_netrc, local_netrc, request_mocker, query_projects, query_viewer):
query_viewer(request_mocker)
query_projects(request_mocker)
with runner.isolated_filesystem():
with open("netrc", "w") as f:
f.write("previous config")
result = runner.invoke(cli.init, input="%s\nvanpelt\n" % DUMMY_API_KEY)
print(result.output)
print(result.exception)
print(traceback.print_tb(result.exc_info[2]))
assert result.exit_code == 0
with open("netrc", "r") as f:
generatedNetrc = f.read()
with open("wandb/settings", "r") as f:
generatedWandb = f.read()
assert DUMMY_API_KEY in generatedNetrc
assert "previous config" in generatedNetrc
def mock_input(testfunc):
"""Decorator for tests of the main interact loop.
Write the test as a generator, yield-ing the input strings, which IPython
will see as if they were typed in at the prompt.
"""
def test_method(self):
testgen = testfunc(self)
with mock_input_helper(testgen) as mih:
mih.ip.interact()
if mih.exception is not None:
# Re-raise captured exception
etype, value, tb = mih.exception
import traceback
traceback.print_tb(tb, file=sys.stdout)
del tb # Avoid reference loop
raise value
return test_method
# Test classes -----------------------------------------------------------------