def __call__(self, *args, **kw):
"""Profile a singe call to the function."""
self.ncalls += 1
if self.skip > 0:
self.skip -= 1
self.skipped += 1
return self.fn(*args, **kw)
if FuncProfile.in_profiler:
# handle recursive calls
return self.fn(*args, **kw)
# You cannot reuse the same profiler for many calls and accumulate
# stats that way. :-/
profiler = cProfile.Profile()
try:
FuncProfile.in_profiler = True
return profiler.runcall(self.fn, *args, **kw)
finally:
FuncProfile.in_profiler = False
self.stats.add(profiler)
if self.immediate:
self.print_stats()
self.reset_stats()
python类Profile()的实例源码
def __call__(self, *args, **kw):
"""Profile a singe call to the function."""
self.ncalls += 1
if self.skip > 0:
self.skip -= 1
self.skipped += 1
return self.fn(*args, **kw)
if FuncProfile.in_profiler:
# handle recursive calls
return self.fn(*args, **kw)
# You cannot reuse the same profiler for many calls and accumulate
# stats that way. :-/
profiler = cProfile.Profile()
try:
FuncProfile.in_profiler = True
return profiler.runcall(self.fn, *args, **kw)
finally:
FuncProfile.in_profiler = False
self.stats.add(profiler)
if self.immediate:
self.print_stats()
self.reset_stats()
def profiled(f, outputFile):
def _(*args, **kwargs):
if sys.version_info[0:2] != (2, 4):
import profile
prof = profile.Profile()
try:
result = prof.runcall(f, *args, **kwargs)
prof.dump_stats(outputFile)
except SystemExit:
pass
prof.print_stats()
return result
else: # use hotshot, profile is broken in 2.4
import hotshot.stats
prof = hotshot.Profile(outputFile)
try:
return prof.runcall(f, *args, **kwargs)
finally:
stats = hotshot.stats.load(outputFile)
stats.strip_dirs()
stats.sort_stats('cum') # 'time'
stats.print_stats(100)
return _
def server_main(cooker, func, *args):
cooker.pre_serve()
if cooker.configuration.profile:
try:
import cProfile as profile
except:
import profile
prof = profile.Profile()
ret = profile.Profile.runcall(prof, func, *args)
prof.dump_stats("profile.log")
bb.utils.process_profilelog("profile.log")
print("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
else:
ret = func(*args)
cooker.post_serve()
return ret
def run(self):
if not self.profile:
self.realrun()
return
try:
import cProfile as profile
except:
import profile
prof = profile.Profile()
try:
profile.Profile.runcall(prof, self.realrun)
finally:
logfile = "profile-parse-%s.log" % multiprocessing.current_process().name
prof.dump_stats(logfile)
def __init__(self):
self.log_info("Initialing WcdmaRrcAnalyzer..")
ProtocolAnalyzer.__init__(self)
#init packet filters
self.add_source_callback(self.__rrc_filter)
#init internal states
self.__status=WcdmaRrcStatus() # current cell status
self.__history={} # cell history: timestamp -> WcdmaRrcStatus()
self.__config={} # cell_id -> WcdmaRrcConfig()
self.state_machine = self.create_state_machine()
# print type(self.state_machine)
#FIXME: change the timestamp
self.__history[0]=self.__config
#Temporary structure for holding the config
self.__config_tmp=WcdmaRrcConfig()
# self.__profile = Profile(WcdmaRrcProfileHierarchy())
def __init__(self):
"""
Initialization.
Each protocol should provide three initialization methods
- Initialize ProfileHierarchy
- Initialize StateMachnie
- Initialize Failure flags
"""
Analyzer.__init__(self)
self.profile = Profile(self.create_profile_hierarchy())
self.state_machine = StateMachine(
self.create_state_machine(),
self.init_protocol_state)
# update state dynamics
self.add_source_callback(self.__update_state)
def profiled(f, outputFile):
def _(*args, **kwargs):
if sys.version_info[0:2] != (2, 4):
import profile
prof = profile.Profile()
try:
result = prof.runcall(f, *args, **kwargs)
prof.dump_stats(outputFile)
except SystemExit:
pass
prof.print_stats()
return result
else: # use hotshot, profile is broken in 2.4
import hotshot.stats
prof = hotshot.Profile(outputFile)
try:
return prof.runcall(f, *args, **kwargs)
finally:
stats = hotshot.stats.load(outputFile)
stats.strip_dirs()
stats.sort_stats('cum') # 'time'
stats.print_stats(100)
return _
def _exec_main(parser, values):
sconsflags = os.environ.get('SCONSFLAGS', '')
all_args = sconsflags.split() + sys.argv[1:]
options, args = parser.parse_args(all_args, values)
if isinstance(options.debug, list) and "pdb" in options.debug:
import pdb
pdb.Pdb().runcall(_main, parser)
elif options.profile_file:
# compat layer imports "cProfile" for us if it's available.
from profile import Profile
prof = Profile()
try:
prof.runcall(_main, parser)
finally:
prof.dump_stats(options.profile_file)
else:
_main(parser)
def _exec_main(parser, values):
sconsflags = os.environ.get('SCONSFLAGS', '')
all_args = sconsflags.split() + sys.argv[1:]
options, args = parser.parse_args(all_args, values)
if isinstance(options.debug, list) and "pdb" in options.debug:
import pdb
pdb.Pdb().runcall(_main, parser)
elif options.profile_file:
# compat layer imports "cProfile" for us if it's available.
from profile import Profile
prof = Profile()
try:
prof.runcall(_main, parser)
finally:
prof.dump_stats(options.profile_file)
else:
_main(parser)
def run(self, reactor):
"""
Run reactor under the standard profiler.
"""
try:
import profile
except ImportError as e:
self._reportImportError("profile", e)
p = profile.Profile()
p.runcall(reactor.run)
if self.saveStats:
p.dump_stats(self.profileOutput)
else:
tmp, sys.stdout = sys.stdout, open(self.profileOutput, 'a')
try:
p.print_stats()
finally:
sys.stdout, tmp = tmp, sys.stdout
tmp.close()
def run(self, reactor):
"""
Run reactor under the cProfile profiler.
"""
try:
import cProfile
import pstats
except ImportError as e:
self._reportImportError("cProfile", e)
p = cProfile.Profile()
p.runcall(reactor.run)
if self.saveStats:
p.dump_stats(self.profileOutput)
else:
with open(self.profileOutput, 'w') as stream:
s = pstats.Stats(p, stream=stream)
s.strip_dirs()
s.sort_stats(-1)
s.print_stats()
def test_profilePrintStatsError(self):
"""
When an error happens during the print of the stats, C{sys.stdout}
should be restored to its initial value.
"""
class ErroneousProfile(profile.Profile):
def print_stats(self):
raise RuntimeError("Boom")
self.patch(profile, "Profile", ErroneousProfile)
config = twistd.ServerOptions()
config["profile"] = self.mktemp()
config["profiler"] = "profile"
profiler = app.AppProfiler(config)
reactor = DummyReactor()
oldStdout = sys.stdout
self.assertRaises(RuntimeError, profiler.run, reactor)
self.assertIs(sys.stdout, oldStdout)
def _profileWithoutGarbageLeak(cmd, filename):
# The profile module isn't necessarily installed on every Python
# installation, so we import it here, instead of in the module
# scope.
import profile
# this is necessary because the profile module creates a memory leak
Profile = profile.Profile
statement = cmd
sort = -1
retVal = None
#### COPIED FROM profile.run ####
prof = Profile()
try:
prof = prof.run(statement)
except SystemExit:
pass
if filename is not None:
prof.dump_stats(filename)
else:
#return prof.print_stats(sort) #DCR
retVal = prof.print_stats(sort) #DCR
#################################
# eliminate the garbage leak
del prof.dispatcher
return retVal
functionprofiler.py 文件源码
项目:BigBrotherBot-For-UrT43
作者: ptitbigorneau
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def runprofile(mainfunction, output, timeout = 60):
if noprofiler == True:
print('ERROR: profiler and/or pstats library missing ! Please install it (probably package named python-profile) before running a profiling !')
return False
def profileb3():
profile.run(mainfunction, output)
# This is the main function for profiling
print('=> SAVING MODE\n\n')
print('Calibrating the profiler...')
cval = calibrateprofile()
#print('Found value : %s' % cval)
print('Initializing the profiler...')
b3main = KThread(target=profileb3) # we open b3 main function with the profiler, in a special killable thread (see below why)
print('Will now run the profiling and terminate it in %s seconds. Results will be saved in %s' % (str(timeout), str(output)))
print('\nCountdown:')
for i in range(0,5):
print(str(5-i))
time.sleep(1)
print('0\nStarting to profile...')
b3main.start() # starting the thread
time.sleep(float(timeout)) # after this amount of seconds, the b3 main function gets killed and the profiler will end its job
print('\n\nFinishing the profile and saving to the file %s' % str(output))
b3main.kill() # we must end the main function in order for the profiler to output its results (if we didn't launch a thread and just closed the process, it would have done no result)
print('=> Profile done ! Exiting...')
return True
def _exec_main(parser, values):
sconsflags = os.environ.get('SCONSFLAGS', '')
all_args = sconsflags.split() + sys.argv[1:]
options, args = parser.parse_args(all_args, values)
if isinstance(options.debug, list) and "pdb" in options.debug:
import pdb
pdb.Pdb().runcall(_main, parser)
elif options.profile_file:
# compat layer imports "cProfile" for us if it's available.
from profile import Profile
prof = Profile()
try:
prof.runcall(_main, parser)
finally:
prof.dump_stats(options.profile_file)
else:
_main(parser)
def main(self):
if len(sys.argv) > 1 and sys.argv[1].lower().startswith('prof'):
self._shouldProfile = True
if self._shouldRunTestSuite:
from TestDataTable import main
main()
start = time.time()
if self._shouldProfile:
prof = Profile()
prof.runcall(self._main)
filename = '%s.pstats' % self.__class__.__name__
prof.dump_stats(filename)
print 'Wrote', filename
else:
self._main()
duration = time.time() - start
print '%.1f secs' % duration
def main(self):
if len(sys.argv) > 1 and sys.argv[1].lower().startswith('prof'):
self._shouldProfile = True
if self._shouldRunTestSuite:
from TestCSVParser import main
main()
start = time.time()
if self._shouldProfile:
prof = Profile()
prof.runcall(self._main)
filename = '%s.pstats' % self.__class__.__name__
prof.dump_stats(filename)
print 'Wrote', filename
else:
self._main()
duration = time.time() - start
print '%.1f secs' % duration
def run(frameworks, number, do_profile):
print("Benchmarking frameworks:", ', '.join(frameworks))
sys.path[0] = '.'
path = os.getcwd()
print(" ms rps tcalls funcs")
for framework in frameworks:
os.chdir(os.path.join(path, framework))
try:
main = __import__('app', None, None, ['main']).main
f = lambda: list(main(environ.copy(), start_response))
time = timeit(f, number=number)
st = Stats(profile.Profile().runctx(
'f()', globals(), locals()))
print("%-11s %6.0f %7.0f %7d %6d" % (framework, 1000 * time,
number / time, st.total_calls, len(st.stats)))
if do_profile:
st = Stats(profile.Profile().runctx(
'timeit(f, number=number)', globals(), locals()))
st.strip_dirs().sort_stats('time').print_stats(10)
del sys.modules['app']
except ImportError:
print("%-15s not installed" % framework)
def __call__(self, environ, start_response):
response_body = []
def catching_start_response(status, headers, exc_info=None):
start_response(status, headers, exc_info)
return response_body.append
def runapp():
appiter = self._app(environ, catching_start_response)
response_body.extend(appiter)
if hasattr(appiter, 'close'):
appiter.close()
p = Profile()
start = time.time()
p.runcall(runapp)
body = b''.join(response_body)
elapsed = time.time() - start
if self._profile_dir is not None:
prof_filename = os.path.join(self._profile_dir,
'%s.%s.%06dms.%d.prof' % (
environ['REQUEST_METHOD'],
environ.get('PATH_INFO').strip(
'/').replace('/', '.') or 'root',
elapsed * 1000.0,
time.time()
))
p.dump_stats(prof_filename)
else:
stats = Stats(p, stream=self._stream)
stats.sort_stats(*self._sort_by)
self._stream.write('-' * 80)
self._stream.write('\nPATH: %r\n' % environ.get('PATH_INFO'))
stats.print_stats(*self._restrictions)
self._stream.write('-' * 80 + '\n\n')
return [body]
def __call__(self, environ, start_response):
response_body = []
def catching_start_response(status, headers, exc_info=None):
start_response(status, headers, exc_info)
return response_body.append
def runapp():
appiter = self._app(environ, catching_start_response)
response_body.extend(appiter)
if hasattr(appiter, 'close'):
appiter.close()
p = Profile()
start = time.time()
p.runcall(runapp)
body = b''.join(response_body)
elapsed = time.time() - start
if self._profile_dir is not None:
prof_filename = os.path.join(self._profile_dir,
'%s.%s.%06dms.%d.prof' % (
environ['REQUEST_METHOD'],
environ.get('PATH_INFO').strip(
'/').replace('/', '.') or 'root',
elapsed * 1000.0,
time.time()
))
p.dump_stats(prof_filename)
else:
stats = Stats(p, stream=self._stream)
stats.sort_stats(*self._sort_by)
self._stream.write('-' * 80)
self._stream.write('\nPATH: %r\n' % environ.get('PATH_INFO'))
stats.print_stats(*self._restrictions)
self._stream.write('-' * 80 + '\n\n')
return [body]
def reset_stats(self):
"""Reset accumulated profiler statistics."""
# Note: not using self.Profile, since pstats.Stats() fails then
self.stats = pstats.Stats(profile.Profile())
self.ncalls = 0
self.skipped = 0
def reset_stats(self):
"""Reset accumulated profiler statistics."""
# Note: not using self.Profile, since pstats.Stats() fails then
self.stats = pstats.Stats(profile.Profile())
self.ncalls = 0
self.skipped = 0
def __call__(self, environ, start_response):
response_body = []
def catching_start_response(status, headers, exc_info=None):
start_response(status, headers, exc_info)
return response_body.append
def runapp():
appiter = self._app(environ, catching_start_response)
response_body.extend(appiter)
if hasattr(appiter, 'close'):
appiter.close()
p = Profile()
start = time.time()
p.runcall(runapp)
body = b''.join(response_body)
elapsed = time.time() - start
if self._profile_dir is not None:
prof_filename = os.path.join(self._profile_dir,
'%s.%s.%06dms.%d.prof' % (
environ['REQUEST_METHOD'],
environ.get('PATH_INFO').strip(
'/').replace('/', '.') or 'root',
elapsed * 1000.0,
time.time()
))
p.dump_stats(prof_filename)
else:
stats = Stats(p, stream=self._stream)
stats.sort_stats(*self._sort_by)
self._stream.write('-' * 80)
self._stream.write('\nPATH: %r\n' % environ.get('PATH_INFO'))
stats.print_stats(*self._restrictions)
self._stream.write('-' * 80 + '\n\n')
return [body]
def runWithProfiler(reactor, config):
"""Run reactor under standard profiler."""
try:
import profile
except ImportError, e:
s = "Failed to import module profile: %s" % e
s += """
This is most likely caused by your operating system not including
profile.py due to it being non-free. Either do not use the option
--profile, or install profile.py; your operating system vendor
may provide it in a separate package.
"""
traceback.print_exc(file=log.logfile)
log.msg(s)
log.deferr()
sys.exit('\n' + s + '\n')
p = profile.Profile()
p.runcall(reactor.run)
if config['savestats']:
p.dump_stats(config['profile'])
else:
# XXX - omfg python sucks
tmp, sys.stdout = sys.stdout, open(config['profile'], 'a')
p.print_stats()
sys.stdout, tmp = tmp, sys.stdout
tmp.close()
def runWithHotshot(reactor, config):
"""Run reactor under hotshot profiler."""
try:
import hotshot.stats
except ImportError, e:
s = "Failed to import module hotshot: %s" % e
s += """
This is most likely caused by your operating system not including
profile.py due to it being non-free. Either do not use the option
--profile, or install profile.py; your operating system vendor
may provide it in a separate package.
"""
traceback.print_exc(file=log.logfile)
log.msg(s)
log.deferr()
sys.exit('\n' + s + '\n')
# this writes stats straight out
p = hotshot.Profile(config["profile"])
p.runcall(reactor.run)
if config["savestats"]:
# stats are automatically written to file, nothing to do
return
else:
s = hotshot.stats.load(config["profile"])
s.strip_dirs()
s.sort_stats(-1)
tmp, sys.stdout = sys.stdout, open(config['profile'], 'w')
s.print_stats()
sys.stdout, tmp = tmp, sys.stdout
tmp.close()
def load(self):
# The timer selected by the profiler should never be used, so make
# sure it doesn't work:
p = Profile()
p.get_time = _brokentimer
log = hotshot.log.LogReader(self._logfn)
taccum = 0
for event in log:
what, (filename, lineno, funcname), tdelta = event
if tdelta > 0:
taccum += tdelta
# We multiply taccum to convert from the microseconds we
# have to the seconds that the profile/pstats module work
# with; this allows the numbers to have some basis in
# reality (ignoring calibration issues for now).
if what == ENTER:
frame = self.new_frame(filename, lineno, funcname)
p.trace_dispatch_call(frame, taccum * .000001)
taccum = 0
elif what == EXIT:
frame = self.pop_frame()
p.trace_dispatch_return(frame, taccum * .000001)
taccum = 0
# no further work for line events
assert not self._stack
return pstats.Stats(p)
def __call__(self, environ, start_response):
response_body = []
def catching_start_response(status, headers, exc_info=None):
start_response(status, headers, exc_info)
return response_body.append
def runapp():
appiter = self._app(environ, catching_start_response)
response_body.extend(appiter)
if hasattr(appiter, 'close'):
appiter.close()
p = Profile()
start = time.time()
p.runcall(runapp)
body = b''.join(response_body)
elapsed = time.time() - start
if self._profile_dir is not None:
prof_filename = os.path.join(self._profile_dir,
'%s.%s.%06dms.%d.prof' % (
environ['REQUEST_METHOD'],
environ.get('PATH_INFO').strip(
'/').replace('/', '.') or 'root',
elapsed * 1000.0,
time.time()
))
p.dump_stats(prof_filename)
else:
stats = Stats(p, stream=self._stream)
stats.sort_stats(*self._sort_by)
self._stream.write('-' * 80)
self._stream.write('\nPATH: %r\n' % environ.get('PATH_INFO'))
stats.print_stats(*self._restrictions)
self._stream.write('-' * 80 + '\n\n')
return [body]
def __call__(self, environ, start_response):
response_body = []
def catching_start_response(status, headers, exc_info=None):
start_response(status, headers, exc_info)
return response_body.append
def runapp():
appiter = self._app(environ, catching_start_response)
response_body.extend(appiter)
if hasattr(appiter, 'close'):
appiter.close()
p = Profile()
start = time.time()
p.runcall(runapp)
body = b''.join(response_body)
elapsed = time.time() - start
if self._profile_dir is not None:
prof_filename = os.path.join(self._profile_dir,
'%s.%s.%06dms.%d.prof' % (
environ['REQUEST_METHOD'],
environ.get('PATH_INFO').strip(
'/').replace('/', '.') or 'root',
elapsed * 1000.0,
time.time()
))
p.dump_stats(prof_filename)
else:
stats = Stats(p, stream=self._stream)
stats.sort_stats(*self._sort_by)
self._stream.write('-' * 80)
self._stream.write('\nPATH: %r\n' % environ.get('PATH_INFO'))
stats.print_stats(*self._restrictions)
self._stream.write('-' * 80 + '\n\n')
return [body]
def __call__(self, environ, start_response):
response_body = []
def catching_start_response(status, headers, exc_info=None):
start_response(status, headers, exc_info)
return response_body.append
def runapp():
appiter = self._app(environ, catching_start_response)
response_body.extend(appiter)
if hasattr(appiter, 'close'):
appiter.close()
p = Profile()
start = time.time()
p.runcall(runapp)
body = b''.join(response_body)
elapsed = time.time() - start
if self._profile_dir is not None:
prof_filename = os.path.join(self._profile_dir,
'%s.%s.%06dms.%d.prof' % (
environ['REQUEST_METHOD'],
environ.get('PATH_INFO').strip(
'/').replace('/', '.') or 'root',
elapsed * 1000.0,
time.time()
))
p.dump_stats(prof_filename)
else:
stats = Stats(p, stream=self._stream)
stats.sort_stats(*self._sort_by)
self._stream.write('-' * 80)
self._stream.write('\nPATH: %r\n' % environ.get('PATH_INFO'))
stats.print_stats(*self._restrictions)
self._stream.write('-' * 80 + '\n\n')
return [body]