def _spawn_transform_service(self):
"""Launch transform service and get pid."""
status = 0
pid = os.fork()
if pid == 0:
try:
os.setsid()
# start transform service
launcher = oslo_service.service.launch(
self.conf,
transform_service.Transform(),
workers=1)
status = launcher.wait()
except SystemExit as exc:
traceback.print_exc()
status = exc.code
except BaseException:
try:
traceback.print_exc()
except BaseException:
print("Could not print traceback")
status = 2
os._exit(status or 0)
return pid
python类print_exc()的实例源码
def _config(props, category=None, disable_existing_loggers=1):
logging.shutdown()
# critical section
# patterned after from logging.config...
logging._acquireLock()
try:
logging._handlers.clear()
del logging._handlerList[:]
# Handlers add themselves to logging._handlers
handlers = _install_handlers(props)
_install_loggers(props, handlers, category, disable_existing_loggers)
except Exception as e:
traceback.print_exc()
raise e
finally:
logging._releaseLock()
def unregister():
for pcoll in preview_collections.values():
bpy.utils.previews.remove(pcoll)
preview_collections.clear()
try: bpy.utils.unregister_module(__name__)
except: traceback.print_exc()
print("Unregistered {}".format(bl_info["name"]))
bpy.context.window_manager.coa_running_modal = False
bpy.app.handlers.frame_change_post.remove(update_sprites)
bpy.app.handlers.scene_update_pre.remove(scene_update)
bpy.app.handlers.load_post.remove(coa_startup)
unregister_keymaps()
def ftp_upload(ftp, remotefile, localfile):
#f = open(localpath, "rb")
#filename = os.path.split(localpath)[-1]
try:
#bufsize = 1024
#localpath_file = os.listdir(localpath)
#for filename in localpath_file:
#fp = open(filename, 'rb')
fp = open(localfile,'rb')
ftp.storbinary('STOR ' + remotefile, fp) # ????
ftp.set_debuglevel(0)
fp.close() # ????
#ftp.quit()
print('????')
except Exception as e:
traceback.print_exc()
def _cleanup_domain():
try:
if globals().has_key('currentdomain'):
__terminate_process( globals()['currentdomain'].process)
x = globals().pop('currentdomain')
if x : del x
except:
traceback.print_exc()
pass
if globals().has_key('currentdevmgrs'):
for x in globals()['currentdevmgrs']:
try:
__terminate_process(x.process)
except:
traceback.print_exc()
pass
x = globals().pop('currentdevmgrs')
if x : del x
def action_label_counts(directory, data_loader, n_actions=18, n=None):
episode_paths = frame.episode_paths(directory)
label_counts = [0, 0]
action_label_counts = [[0, 0] for i in range(n_actions)]
if n is not None:
np.random.shuffle(episode_paths)
episode_paths = episode_paths[:n]
for episode_path in tqdm.tqdm(episode_paths):
try:
features, labels = data_loader.load_features_and_labels([episode_path])
except:
traceback.print_exc()
else:
for label in range(len(label_counts)):
label_counts[label] += np.count_nonzero(labels == label)
for action in range(n_actions):
actions = np.reshape(np.array(features["action"]), [-1])
action_label_counts[action][label] += np.count_nonzero(
np.logical_and(labels == label, actions == action))
return label_counts, action_label_counts
def _install_modules(command_table):
for cmd in command_table:
command_table[cmd].load_arguments()
try:
mods_ns_pkg = import_module('azure.cli.command_modules')
installed_command_modules = [modname for _, modname, _ in
pkgutil.iter_modules(mods_ns_pkg.__path__)
if modname not in BLACKLISTED_MODS]
except ImportError:
pass
for mod in installed_command_modules:
try:
mod = import_module('azure.cli.command_modules.' + mod)
mod.load_params(mod)
mod.load_commands()
except Exception: # pylint: disable=broad-except
print("Error loading: {}".format(mod), file=stderr)
traceback.print_exc(file=stderr)
_update_command_definitions(command_table)
def get_info(self):
'''
????????????url???url
Get informations of the comic
return:
comic title,description,cover url,chapters' urls
'''
headers={'use-agent':"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36",'Referer':'http://manhua.dmzj.com/tags/s.shtml'}
root='http://manhua.dmzj.com'
r_title=r'<span class="anim_title_text"><a href=".*?"><h1>(.*?)</h1></a></span>'
r_des=r'<meta name=\'description\' content=".*?(??.*?)"/>'#????
r_cover=r'src="(.*?)" id="cover_pic"/></a>'#??url??
r_cb=r'<div class="cartoon_online_border" >([\s\S]*?)<div class="clearfix"></div>'#??border
r_cs=r'<li><a title="(.*?)" href="(.*?)" .*?>.*?</a>'#??????
try:
text=requests.get(self.comic_url,headers=headers).text
except ConnectionError:
traceback.print_exc()
raise ConnectionError
title=re.findall(r_title,text)[0]
cb=re.findall(r_cb,text)[0]
chapter_urls=[(c[0],root+c[1]+'#@page=1') for c in re.findall(r_cs,cb)]
cover_url=re.findall(r_cover,text)[0]
des=re.findall(r_des,text)
return title,des,cover_url,chapter_urls
def print_chapters(self,show=False):
'''
??????
Display infos of chapters.
'''
headers={'use-agent':"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36",'Referer':'http://manhua.dmzj.com/tags/s.shtml'}
text='There are {n} chapters in comic {c}:\n{chs}'.format(n=self.chapter_num,c=self.comic_title,chs='\n'.join([info[0] for info in self.chapter_urls]))
print(text)
if show:
try:
res=requests.get(self.cover,headers=headers)
if b'403' in res.content:
raise ValueError('Got cover img failed')
out=BytesIO(res.content)
out.seek(0)
Image.open(out).show()
except (ConnectionError,ValueError):
traceback.print_exc()
return text
def _run_exitfuncs():
"""run any registered exit functions
_exithandlers is traversed in reverse order so functions are executed
last in, first out.
"""
exc_info = None
while _exithandlers:
func, targs, kargs = _exithandlers.pop()
try:
func(*targs, **kargs)
except SystemExit:
exc_info = sys.exc_info()
except:
import traceback
print >> sys.stderr, "Error in atexit._run_exitfuncs:"
traceback.print_exc()
exc_info = sys.exc_info()
if exc_info is not None:
raise exc_info[0], exc_info[1], exc_info[2]
def get_file_histogram(commit, path):
h = {}
try:
for old_commit, lines in repo.blame(commit, path):
cohort = commit2cohort.get(old_commit.hexsha, "MISSING")
h[cohort] = h.get(cohort, 0) + len(lines)
if old_commit.hexsha in commit2timestamp:
h[old_commit.hexsha] = h.get(old_commit.hexsha, 0) + len(lines)
_, ext = os.path.splitext(path)
h[ext] = h.get(ext, 0) + len(lines)
except KeyboardInterrupt:
raise
except:
traceback.print_exc()
return h
def test_add_exception(self):
stackimpact._agent = None
agent = stackimpact.start(
dashboard_address = 'http://localhost:5001',
agent_key = 'key1',
app_name = 'TestPythonApp',
debug = True
)
agent.error_reporter.start()
try:
raise ValueError('test_exc_1')
except:
traceback.print_exc()
time.sleep(1.1)
profile_handled_exc = agent.error_reporter.profile
#print(profile_handled_exc)
self.assertTrue('ValueError: test_exc_1' in str(profile_handled_exc))
self.assertTrue('test_add_exception' in str(profile_handled_exc))
agent.destroy()
def execute(self):
try:
self.system.run()
except (ReadTimeout, ConnectionError, JSONDecodeError):
pass
except exceptions.TradingSystemException as e:
curr = datetime.now()
print('{time} - {text}'.format(time=curr.strftime('%Y-%m-%d %H:%M:%S'), text=str(e)))
except Exception as e:
curr = datetime.now()
print('{time} - {text} - {args}'.format(time=curr.strftime('%Y-%m-%d %H:%M:%S'), text=str(e), args=e.args))
traceback.print_exc()
if self.interval:
threading.Timer(self.interval, self.execute).start()
def call_script(model_path, config, proto, eval_script,
num_process=10, normalize=True):
'''
open pipe and call the script
'''
try:
subprocess.call(
" python {} {} --config={} "
" --proto={} "
" -p {} {}".format(
eval_script, model_path, config,
proto, num_process,
' -n ' if normalize else ''),
shell=True)
except:
traceback.print_exc(file=sys.stdout)
print 'error in call_bleu_script()'
print model_path
def run_handlers(self, event):
assert event in self.observers
handlers = []
instance_handlers = {
'instance_canceled': self._on_cancel,
'instance_failed': self._on_failed,
'instance_finished': self._on_finish,
}
handlers += self.observers[event]
handlers += instance_handlers.get(event, [])
failures = 0
for handler in handlers:
try:
handler(self)
except: # pylint: disable=bare-except
failures += 1
idc.Message("BAP> {0} failed because {1}\n".
format(self.action, str(sys.exc_info()[1])))
traceback.print_exc()
if failures != 0:
idc.Warning("Some BAP handlers failed")
def get_info(self):
"""Get balance"""
try:
accounts = exchange_api.exchange_get_account()
except Exception as e:
traceback.print_exc()
exchange_api.init_broker()
return
if accounts:
self.cny_balance = 0
self.btc_balance = 0
self.cny_frozen = 0
self.btc_frozen = 0
for account in accounts:
self.btc_balance += account.available_btc
self.cny_balance += account.available_cny
self.btc_frozen += account.frozen_cny
self.cny_frozen += account.frozen_btc
def get_info(self):
"""Get balance"""
try:
response = self.market.accountInfo()
if response and "code" in response:
logging.warn(response)
return False
raise TradeException(response["message"])
if response:
self.btc_balance = float(response["available_btc_display"])
self.cny_balance = float(response["available_cny_display"])
self.btc_frozen = float(response["frozen_btc_display"])
self.cny_frozen = float(response["frozen_cny_display"])
except Exception as ex:
logging.warn("get_info failed :%s" % ex)
traceback.print_exc()
return False
def httpPost(url,resource,params):
headers = {
"Content-type" : "application/x-www-form-urlencoded",
}
try :
conn = httplib.HTTPSConnection(url, timeout=10)
temp_params = urllib.parse.urlencode(params)
conn.request("POST", resource, temp_params, headers)
response = conn.getresponse()
data = response.read().decode('utf-8')
params.clear()
conn.close()
return data
except:
# except Exception,e:
# print(Exception,":",e)
traceback.print_exc()
return False
def send_email(subject, msg):
import smtplib
message = "From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n%s\r\n" % (config.EMAIL_HOST_USER, ", ".join(config.EMAIL_RECEIVER), subject, msg)
try:
smtpserver = smtplib.SMTP(config.EMAIL_HOST)
smtpserver.set_debuglevel(0)
smtpserver.ehlo()
smtpserver.starttls()
smtpserver.ehlo
smtpserver.login(config.EMAIL_HOST_USER, config.EMAIL_HOST_PASSWORD)
smtpserver.sendmail(config.EMAIL_HOST_USER, config.EMAIL_RECEIVER, message)
smtpserver.quit()
smtpserver.close()
logging.info("send mail success")
except:
logging.error("send mail failed")
traceback.print_exc()
def run(self):
"""Runs the handler, flushes the streams, and ends the request."""
try:
protocolStatus, appStatus = self.server.handler(self)
except:
traceback.print_exc(file=self.stderr)
self.stderr.flush()
if not self.stdout.dataWritten:
self.server.error(self)
protocolStatus, appStatus = FCGI_REQUEST_COMPLETE, 0
if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' %
(protocolStatus, appStatus))
try:
self._flush()
self._end(appStatus, protocolStatus)
except socket.error, e:
if e[0] != errno.EPIPE:
raise
def close(self):
"""Close the pipe and calls the _obs_notify() method."""
if self.__filehandle:
try:
try:
file_dispatcher.close(self)
except OSError, oe:
if oe.errno not in self.__ignore_errno:
if self.__logger: self.__logger.exception("Unusual error closing pipe dispatcher")
else: print_exc(file=stderr)
try:
self.__filehandle.close()
except OSError, oe:
if oe.errno not in self.__ignore_errno:
if self.__logger: self.__logger.exception("Unusual error closing pipe filehandle")
else: print_exc(file=stderr)
finally:
self.__filehandle = None
self._obs_notify(event=self.PIPE_CLOSED)
def run ( self ): #overridden from threading library
try:
if self.proxy and self.server:
ins=[self.server,self.proxy]
ous=[];data={};adrs={}
new_socket=0
while not thread_it.done:
if not new_socket:
new_socket,address=self.server.accept()
else:
self.proxy.sendall(
recv_all(new_socket,timeout=self.timeout))
new_socket.sendall(
recv_all(self.proxy,timeout=self.timeout))
elif self.tunnel_client:
self.tunnel_client(self.ip,self.port)
thread_it.done=1
except Exception,error:
print traceback.print_exc(sys.exc_info()),error
thread_it.done=1
def update(self, callback):
# Find out if the file has been modified.
modified = os.path.getmtime(self.__path)
if modified != self.__modified:
# Remember the present time (we are getting an update).
self.__modified = modified
with open(self.__path, 'r') as file:
# Go to present location, read to EOF, and remember position.
file.seek(self.__position)
try:
text = file.read()
except UnicodeError:
print('Please report problem with:', repr(self.__path))
traceback.print_exc()
print('-' * 80)
self.__position = file.tell()
# Execute callback with file ID and new text update.
callback(self.__path, text)
################################################################################
def run(self):
self.success = False
try:
self.establish_session()
self.logger.info("Gathering running services")
self.running_services = self.create_process("c:\\windows\\system32\\net.exe start")
self.logger.info("Gathering running processes")
self.running_processes = self.get_processes()
# get the current user
users = set([proc['username'].split('\\')[-1]
for proc in self.running_processes if proc['path'].find('explorer.exe') != -1])
for user in users:
self.logger.info("Gathering Chrome browser history for %s" % user)
self.browser_history = \
self.get_file("c:\\users\\%s\\appdata\\local\\google\\chrome\\user data\\default\\history" % user)
self.logger.info("LR done")
except Exception as e:
import traceback
traceback.print_exc()
else:
self.success = True
def on_message(message):
"""
Event handler, fires when a message is received in the server.
:param message: discord.Message object containing the received message
"""
try:
if message.content.startswith(pm.botPreferences.commandPrefix) and client.user.id != message.author.id:
# Send the received message off to the Plugin Manager to handle the command
words = message.content.partition(' ')
await pm.handle_command(message, words[0][len(pm.botPreferences.commandPrefix):], words[1:])
elif message.server is not None:
await pm.handle_message(message)
except Exception as e:
await client.send_message(message.channel, "Error: " + str(e))
if pm.botPreferences.get_config_value("client", "debug") == "1":
traceback.print_exc()
def main():
shell.check_python()
if False:
db_transfer.DbTransfer.thread_db()
else:
if get_config().API_INTERFACE == 'mudbjson':
thread = MainThread(db_transfer.MuJsonTransfer)
elif get_config().API_INTERFACE == 'sspanelv2':
thread = MainThread(db_transfer.DbTransfer)
else:
thread = MainThread(db_transfer.Dbv3Transfer)
thread.start()
try:
while thread.is_alive():
thread.join(10.0)
except (KeyboardInterrupt, IOError, OSError) as e:
import traceback
traceback.print_exc()
thread.stop()
def on_query_context(view_id, key, operator, operand, match_all):
v = sublime.View(view_id)
for callback in all_callbacks['on_query_context']:
try:
val = callback.on_query_context(v, key, operator, operand, match_all)
if val:
return True
except:
traceback.print_exc()
for vel in event_listeners_for_view(v):
if 'on_query_context' in vel.__class__.__dict__:
try:
val = vel.on_query_context(key, operator, operand, match_all)
if val:
return True
except:
traceback.print_exc()
return False
def _saveConfig (self):
try:
# create options dictionary
dOpt = {}
for key, lWidget in self.dCheckboxWidgets.items():
w = getattr(self, key)
dOpt[w.Name] = w.State
for w in lWidget:
dOpt[w.Name] = w.State
# get extension path
xDefaultContext = self.ctx.ServiceManager.DefaultContext
xPackageInfoProvider = xDefaultContext.getValueByName("/singletons/com.sun.star.deployment.PackageInformationProvider")
sExtPath = xPackageInfoProvider.getPackageLocation("French.linguistic.resources.from.Dicollecte.by.OlivierR")
sExtPath = sExtPath[8:] + "/pythonpath/tf_options.py" # remove "file:///"
sExtPath = os.path.abspath(sExtPath)
# write file
if os.path.isfile(sExtPath):
hOpt = open(sExtPath, "w")
hOpt.write("dDefaultOpt = " + str(tf_options.dDefaultOpt) + "\n")
hOpt.write("dOpt = " + str(dOpt))
hOpt.close()
except:
traceback.print_exc()
def _mergeContiguousParagraphs (self, xDoc):
self._replaceText(xDoc, u"^[ ]+$", u"", True) # clear empty paragraphs
xCursor = xDoc.Text.createTextCursor()
xCursor.gotoStart(False)
n = 0
try:
while xCursor.gotoNextParagraph(False):
xCursor.gotoEndOfParagraph(True)
if xCursor.String != "":
xCursor.gotoStartOfParagraph(False)
xCursor.goLeft(1, True)
xCursor.setString(" ")
n += 1
except:
traceback.print_exc()
self._replaceText(xDoc, u" +", u" ", True)
return n
def _replaceBulletsByEmDash (self, xDoc):
xCursor = xDoc.Text.createTextCursor()
xCursor.gotoStart(False)
sParaStyleName = ""
if not self.delete2c.State:
sParaStyleName = u"Standard" if self.delete2a.State else u"Text body"
n = 0
try:
if xCursor.NumberingStyleName != "":
xCursor.NumberingStyleName = ""
if sParaStyleName:
xCursor.ParaStyleName = sParaStyleName
xDoc.Text.insertString(xCursor, u"— ", False)
n += 1
while xCursor.gotoNextParagraph(False):
if xCursor.NumberingStyleName != "":
xCursor.NumberingStyleName = ""
if sParaStyleName:
xCursor.ParaStyleName = sParaStyleName
xDoc.Text.insertString(xCursor, u"— ", False)
n += 1
except:
traceback.print_exc()
return n