def run(self):
try:
loop = asyncio.get_event_loop()
ipfactory = get_ip.ipFactory(self.q, self.iprange)
testip = Test_Ip(loop, ipfactory)
loop.create_task(testip.Server())
profile.runctx(
"loop.run_until_complete(testip.SuccessStop())",
globals(),
locals())
# loop.run_until_complete(testip.SuccessStop())
except (KeyboardInterrupt, SystemExit) as e:
loop.create_task(testip.stop())
loop.run_until_complete(testip.SuccessStop())
finally:
loop.close()
print("Task exit")
python类runctx()的实例源码
def main():
"Main hook for standalone usage"
start = time.time()
runner_args = parseArguments()
setupLogging(sys.stdout, runner_args.log_level)
logging.root.setLevel(runner_args.log_level)
# logging.getLogger('hdlcc.source_file').setLevel(logging.WARNING)
logging.getLogger('hdlcc.config_parser').setLevel(logging.WARNING)
# logging.getLogger('hdlcc.builders').setLevel(logging.INFO)
logging.getLogger('vunit.project').setLevel(logging.ERROR)
# Running hdlcc with threads has two major drawbacks:
# 1) Makes interrupting it impossible currently because each source
# file is parsed on is own thread. Since there can be lots of
# sources, interrupting a single thread is not enough. This is
# discussed at https://github.com/suoto/hdlcc/issues/19
# 2) When profiling, the result expected is of the inner hdlcc calls
# and with threads we have no info. This is discussed at
# https://github.com/suoto/hdlcc/issues/16
# poor results (see suoto/hdlcc/issues/16).
# To circumvent this we disable using threads at all when running
# via standalone (it's ugly, I know)
# pylint: disable=protected-access
StandaloneProjectBuilder._USE_THREADS = False
# pylint: enable=protected-access
if runner_args.debug_profiling:
profile.runctx(
'runner(runner_args)',
globals=globals(),
locals={'runner_args' : runner_args},
filename=runner_args.debug_profiling, sort=-1)
else:
runner(runner_args)
end = time.time()
_logger.info("Process took %.2fs", (end - start))
def profile(name, env, filename=None, verbose=False):
if filename:
filename = name + '-' + filename
print('Profiling %s ==> %s' % (name, filename))
else:
filename = None
title = name + ' profile'
print()
print('=' * len(title))
print(title)
print('=' * len(title))
func = create_bench(name, env)
gc.collect()
code = 'for x in range(10000): func()'
if verbose:
if pprofile is None:
print('pprofile not found. Please install pprofile and try again.')
return
pprofile.runctx(code, locals(), globals(), filename=filename)
else:
cProfile.runctx(code, locals(), globals(),
sort='tottime', filename=filename)
def profile(name, env, filename=None, verbose=False):
if filename:
filename = name + '-' + filename
print('Profiling %s ==> %s' % (name, filename))
else:
filename = None
title = name + ' profile'
print()
print('=' * len(title))
print(title)
print('=' * len(title))
func = create_bench(name, env)
gc.collect()
code = 'for x in range(10000): func()'
if verbose:
if pprofile is None:
print('pprofile not found. Please install pprofile and try again.')
return
pprofile.runctx(code, locals(), globals(), filename=filename)
else:
cProfile.runctx(code, locals(), globals(),
sort='tottime', filename=filename)
def quickProfile(name="unnamed"):
import pstats
def profileDecorator(f):
if(not config.GetBool("use-profiler",0)):
return f
def _profiled(*args, **kArgs):
# must do this in here because we don't have base/simbase
# at the time that PythonUtil is loaded
if(not config.GetBool("profile-debug",0)):
#dumb timings
st=globalClock.getRealTime()
f(*args,**kArgs)
s=globalClock.getRealTime()-st
print "Function %s.%s took %s seconds"%(f.__module__, f.__name__,s)
else:
import profile as prof, pstats
#detailed profile, stored in base.stats under (
if(not hasattr(base,"stats")):
base.stats={}
if(not base.stats.get(name)):
base.stats[name]=[]
prof.runctx('f(*args, **kArgs)', {'f':f,'args':args,'kArgs':kArgs},None,"t.prof")
s=pstats.Stats("t.prof")
#p=hotshot.Profile("t.prof")
#p.runctx('f(*args, **kArgs)', {'f':f,'args':args,'kArgs':kArgs},None)
#s = hotshot.stats.load("t.prof")
s.strip_dirs()
s.sort_stats("cumulative")
base.stats[name].append(s)
_profiled.__doc__ = f.__doc__
return _profiled
return profileDecorator
def main():
global iprange, ipHasFind
print("Process Sum:", ProcessSum)
if ProcessSum > 1:
q = Queue()
q.put(ipHasFind)
for i in range(ProcessSum):
print("Start Process:", i)
p = CheckProcess(q, iprange)
p.start()
try:
with open("ip.txt", "w") as f:
sum = 0
while True:
ip = ipList.get()
s = ip + "|"
f.write(s)
sum += 1
print("All Sucess Ip:%4d" % sum)
except (KeyboardInterrupt, SystemExit) as e:
print("main exited")
finally:
file_name = "find_log" +"0" + ".txt"
with open(file_name) as f:
s = f.readlines()
d = dict([[int(i.split(":")[0]), int(i.split(":")[-1])]
for i in list(map(lambda x:x[:-1], s))])
while GoodIpRange.qsize() > 0:
nowdict = GoodIpRange.get()
d.update(nowdict)
with open(file_name, "w") as f:
for i in d.items():
print(i[0], ":", i[1])
f.write(str(i[0]) + ":" + str(i[1]) + "\n")
else:
try:
q = Queue()
q.put(ipHasFind)
loop = asyncio.get_event_loop()
ipfactory = get_ip.ipFactory(q, iprange)
testip = Test_Ip(loop, ipfactory)
loop.create_task(testip.Server())
profile.runctx(
"loop.run_until_complete(testip.SuccessStop())",
globals(),
locals())
# loop.run_until_complete(testip.SuccessStop())
except (KeyboardInterrupt, SystemExit) as e:
loop.create_task(testip.stop())
loop.run_until_complete(testip.SuccessStop())
finally:
loop.close()
print("Task exit")
def execute(self, argv):
options, args = self.parseOptions(argv)
self.setUp(options)
if options.interactive:
while True:
try:
input = raw_input(">>> ")
except (EOFError, KeyboardInterrupt):
self.stdout.write("\nBye.\n")
break
inStream = antlr3.ANTLRStringStream(input)
self.parseStream(options, inStream)
else:
if options.input is not None:
inStream = antlr3.ANTLRStringStream(options.input)
elif len(args) == 1 and args[0] != '-':
inStream = antlr3.ANTLRFileStream(
args[0], encoding=options.encoding
)
else:
inStream = antlr3.ANTLRInputStream(
self.stdin, encoding=options.encoding
)
if options.profile:
try:
import cProfile as profile
except ImportError:
import profile
profile.runctx(
'self.parseStream(options, inStream)',
globals(),
locals(),
'profile.dat'
)
import pstats
stats = pstats.Stats('profile.dat')
stats.strip_dirs()
stats.sort_stats('time')
stats.print_stats(100)
elif options.hotshot:
import hotshot
profiler = hotshot.Profile('hotshot.dat')
profiler.runctx(
'self.parseStream(options, inStream)',
globals(),
locals()
)
else:
self.parseStream(options, inStream)