def __call__(self, *args, **kw):
"""Profile a singe call to the function."""
self.ncalls += 1
if self.skip > 0:
self.skip -= 1
self.skipped += 1
return self.fn(*args, **kw)
if FuncProfile.in_profiler:
# handle recursive calls
return self.fn(*args, **kw)
# You cannot reuse the same profiler for many calls and accumulate
# stats that way. :-/
profiler = cProfile.Profile()
try:
FuncProfile.in_profiler = True
return profiler.runcall(self.fn, *args, **kw)
finally:
FuncProfile.in_profiler = False
self.stats.add(profiler)
if self.immediate:
self.print_stats()
self.reset_stats()
python类Profile()的实例源码
def __call__(self, *args, **kw):
"""Profile a singe call to the function."""
self.ncalls += 1
if self.skip > 0:
self.skip -= 1
self.skipped += 1
return self.fn(*args, **kw)
if FuncProfile.in_profiler:
# handle recursive calls
return self.fn(*args, **kw)
# You cannot reuse the same profiler for many calls and accumulate
# stats that way. :-/
profiler = cProfile.Profile()
try:
FuncProfile.in_profiler = True
return profiler.runcall(self.fn, *args, **kw)
finally:
FuncProfile.in_profiler = False
self.stats.add(profiler)
if self.immediate:
self.print_stats()
self.reset_stats()
def profileit(name):
"""cProfile decorator to profile said function, must pass in a filename to write the information out to
use RunSnakeRun to run the output
:param name: The output file path
:type name: str
:return: Function
"""
def inner(func):
@wraps(func)
def wrapper(*args, **kwargs):
prof = cProfile.Profile()
retval = prof.runcall(func, *args, **kwargs)
# Note use of name from outer scope
prof.dump_stats(name)
return retval
return wrapper
return inner
def handle(self, *args, **kwargs): # pylint: disable=unused-argument
"""
Recreates the index
"""
log = logging.getLogger(indexing_api_name)
console = logging.StreamHandler(self.stderr)
console.setLevel(logging.DEBUG)
log.addHandler(console)
log.level = logging.INFO
if kwargs['profile']:
import cProfile
import uuid
profile = cProfile.Profile()
profile.enable()
recreate_index()
profile.disable()
filename = 'recreate_index_{}.profile'.format(uuid.uuid4())
profile.dump_stats(filename)
self.stdout.write('Output profiling data to: {}'.format(filename))
else:
recreate_index()
def load_with_profiler(
context,
filepath,
*,
global_matrix=None
):
import cProfile
import pstats
pro = cProfile.Profile()
pro.runctx("load_web3d(context.scene, filepath, PREF_FLAT=True, "
"PREF_CIRCLE_DIV=16, global_matrix=global_matrix)",
globals(), locals())
st = pstats.Stats(pro)
st.sort_stats("time")
st.print_stats(0.1)
# st.print_callers(0.1)
def enable_profiling():
global _profile_hook
import cProfile, pstats
def _profile_hook(name, func, *args):
profiler = cProfile.Profile()
profiler.enable()
try:
return func(*args)
finally:
profiler.create_stats()
fp = open('/tmp/mitogen.stats.%d.%s.log' % (os.getpid(), name), 'w')
try:
stats = pstats.Stats(profiler, stream=fp)
stats.sort_stats('cumulative')
stats.print_stats()
finally:
fp.close()
def process_fp(fp, p, sparse):
if _PROFILE:
pr = cProfile.Profile()
pr.enable()
beg = time.time()
try:
p.ParseFile(fp)
except xml.parsers.expat.ExpatError as err:
app.logger.error("Bad XML: %r" % err)
sparse.exceptions += 1
# Bulk upload the remainder
sparse._bulk_upload()
end = time.time()
if _PROFILE:
pr.disable()
s = StringIO.StringIO()
sortby = 'cumulative'
ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
ps.print_stats()
app.logger.info(s.getvalue())
return beg, end
def run(self):
self.taskStarted.emit()
self.exiting = False
self.quitted = False
result = None
try:
if options.cfg.profile:
import cProfile
profiler = cProfile.Profile()
try:
result = profiler.runcall(self.FUN, *self.args, **self.kwargs)
finally:
profiler.dump_stats(os.path.join(
general.get_home_dir(),
"thread{}.profile".format(hex(id(self)))))
else:
result = self.FUN(*self.args, **self.kwargs)
except Exception as e:
if self.parent:
self.parent().exc_info = sys.exc_info()
self.parent().exception = e
self.taskException.emit(e)
print("CoqThread.run():", e)
self.taskFinished.emit()
return result
def do_cprofile(func):
"""Decorator for profiling a function
"""
def profiled_func(*args, **kwargs):
"""Wrapper
"""
profile = cProfile.Profile()
try:
profile.enable()
result = func(*args, **kwargs)
profile.disable()
return result
finally:
stats = pstats.Stats(profile)
stats.sort_stats("time").print_stats(20)
return profiled_func
def __init__(self):
idaapi.processor_t.__init__(self)
# TODO: logging not working.
# self.work_folder = ""
# self.log_fn = self.work_folder + 'work.log'
# logging.basicConfig(filename=self.log_fn, level=logging.DEBUG, filemode='w')
# self.logger = open(self.log_fn, 'w')
self.relocatable_file = re.search(r'\.o$', GetInputFile()) != None
self.init_instructions()
self.prev_addr_analyzed = -1
self.current_hex_packet = None
self.hd = HexagonDisassembler()
# TODO: this should be instatiated on demand, because I think the init is called every time IDA starts
self.disasm_cache = {}
# TODO: use orderdict to remove old entries
self.profiler = cProfile.Profile()
hexagondisasm.profiler = self.profiler
# TODO: I don't know how to access this class from the IDA Python
# console to get the profiler, I do it through the module
def get_parser(self):
parser = argparse.ArgumentParser(description=self.description)
parser.add_argument('-c', '--configuration',
default='config.yaml', help='Configuration file')
parser.add_argument('--debug', dest='debug', action='store_true',
help='Log verbose')
parser.add_argument('-m', '--monitor', action='store_true',
dest='monitor', help='Monitor', default=False)
parser.add_argument('--profile', action='store_true',
dest='profile', help='Profile execution',
default=False)
parser.add_argument('--profile-output',
help='Where to store the output of the profile data',
default=None)
parser.add_argument('--line-profiler', action='store_true',
dest='line_profiler', help='Line profiler execution',
default=False)
parser.add_argument('--line-profiler-matcher',
help='Line profiler execution', default=None)
parser.add_argument('--line-profiler-output',
help='Where to store the output of the line profiler data',
default=None)
parser.set_defaults(debug=False)
return parser
def server_main(cooker, func, *args):
cooker.pre_serve()
if cooker.configuration.profile:
try:
import cProfile as profile
except:
import profile
prof = profile.Profile()
ret = profile.Profile.runcall(prof, func, *args)
prof.dump_stats("profile.log")
bb.utils.process_profilelog("profile.log")
print("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
else:
ret = func(*args)
cooker.post_serve()
return ret
def run(self):
if not self.profile:
self.realrun()
return
try:
import cProfile as profile
except:
import profile
prof = profile.Profile()
try:
profile.Profile.runcall(prof, self.realrun)
finally:
logfile = "profile-parse-%s.log" % multiprocessing.current_process().name
prof.dump_stats(logfile)
def run_profile(func, sort_order="cumtime", count=1, strip_dir=True, name_filter=""):
"""sort_order : keywords 'ncalls', 'tottime', 'cumtime', 'filename' """
@wraps(func)
def wrapper(*args, **kwargs):
def cmd():
for i in range(count):
func(*args, **kwargs)
prof = cProfile.Profile()
_profile = prof.runctx("cmd()", globals(), locals())
stream = StringIO.StringIO()
stats = pstats.Stats(_profile, stream=stream)
if strip_dir:
stats.strip_dirs()
stats.sort_stats(sort_order)
stats.print_stats(name_filter)
return stream.getvalue()
return wrapper
def tween_request_profiling(handler, registry):
from cProfile import Profile
req = [0]
num_profile = registry["xom"].config.args.profile_requests
# we need to use a list, so we can create a new Profile instance without
# getting variable scope issues
profile = [Profile()]
def request_profiling_handler(request):
profile[0].enable()
try:
return handler(request)
finally:
profile[0].disable()
req[0] += 1
if req[0] >= num_profile:
profile[0].print_stats("cumulative")
req[0] = 0
profile[:] = [Profile()]
return request_profiling_handler
def c_profile(sort_by='tottime'):
def decorator(func):
@wraps(func)
def wrapped(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
stream = StringIO()
stats = pstats.Stats(profiler, stream=stream).sort_stats(sort_by)
stats.print_stats(25)
print(stream.getvalue())
return result
return wrapped
return decorator
# https://zapier.com/engineering/profiling-python-boss/
def load_with_profiler(
context,
filepath,
*,
global_matrix=None
):
import cProfile
import pstats
pro = cProfile.Profile()
pro.runctx("load_web3d(context.scene, filepath, PREF_FLAT=True, "
"PREF_CIRCLE_DIV=16, global_matrix=global_matrix)",
globals(), locals())
st = pstats.Stats(pro)
st.sort_stats("time")
st.print_stats(0.1)
# st.print_callers(0.1)
def profiler(request):
class CustomProfiler(object):
def __init__(self, request):
if request.config.getoption("--profile"):
import cProfile
self.profile = cProfile.Profile()
else:
self.profile = None
def __enter__(self):
if self.profile is None:
return False
self.profile.enable()
def __exit__(self, *args, **kwargs):
if self.profile is None:
return False
self.profile.disable()
self.profile.dump_stats(request.function.__name__)
return False
return CustomProfiler(request)
def profileit(func):
"""
Decorator straight up stolen from stackoverflow
"""
def wrapper(*args, **kwargs):
datafn = func.__name__ + ".profile" # Name the data file sensibly
prof = cProfile.Profile()
prof.enable()
retval = prof.runcall(func, *args, **kwargs)
prof.disable()
stats = pstats.Stats(prof)
stats.sort_stats('tottime').print_stats(20)
print()
print()
stats.sort_stats('cumtime').print_stats(20)
return retval
return wrapper
def do_cprofile(func):
"""
?????cProfile???????????
???
@do_cprofile
def expensive_function():
for x in range(10000):
_ = x ** x
return 'OK!'
result = expensive_function()
"""
def profiled_func(*args, **kwargs):
profile = cProfile.Profile()
try:
profile.enable()
result = func(*args, **kwargs)
profile.disable()
return result
finally:
profile.print_stats()
return profiled_func
def profiled_thread(func):
"""decorator to profile a thread or function. Profiling output will be written to
'agent_profile_<process_id>.<thread_id_>.<thread_name>.log'"""
def wrapper(*args, **kwargs):
profile = Profile()
profile.enable()
try:
func(*args, **kwargs)
finally:
profile.disable()
try:
thread = current_thread()
profile.dump_stats('profile_%s.%s.%s.log' % (getpid(), thread.name, thread.ident))
except:
logger.exception('Failed to dump stats')
return wrapper
def list_logs():
prof = cProfile.Profile()
prof.enable()
headers = {"Content-Type": "application/json"}
conn = sqlite3.connect(app.config['DBFILE'])
cur = conn.cursor()
cur.execute("SELECT * FROM logs")
logs = [{"id": log[0], "datetime": log[1], "level": log[2], "message": log[3]} for log in cur.fetchall()]
conn.commit()
conn.close()
filter_query = request.args.get('filter', None)
if filter_query:
pattern = re.compile(filter_query)
filtered_logs = []
for log in logs:
if re.match(pattern, log['message']):
filtered_logs.append(log)
logs = filtered_logs
response = json.dumps(logs), 200, headers
prof.disable()
prof.print_stats()
return response
def __init__(self, file_path, sort_by='time', builtins=False):
self._profiler = cProfile.Profile(builtins=builtins)
self.file_path = file_path
self.sort_by = sort_by
def __init__(self, thread_num, session, query, values, num_queries, protocol_version, profile):
Thread.__init__(self)
self.thread_num = thread_num
self.session = session
self.query = query
self.values = values
self.num_queries = num_queries
self.protocol_version = protocol_version
self.profiler = Profile() if profile else None
def __call__(self, environ, start_response):
response_body = []
def catching_start_response(status, headers, exc_info=None):
start_response(status, headers, exc_info)
return response_body.append
def runapp():
appiter = self._app(environ, catching_start_response)
response_body.extend(appiter)
if hasattr(appiter, 'close'):
appiter.close()
p = Profile()
start = time.time()
p.runcall(runapp)
body = b''.join(response_body)
elapsed = time.time() - start
if self._profile_dir is not None:
prof_filename = os.path.join(self._profile_dir,
'%s.%s.%06dms.%d.prof' % (
environ['REQUEST_METHOD'],
environ.get('PATH_INFO').strip(
'/').replace('/', '.') or 'root',
elapsed * 1000.0,
time.time()
))
p.dump_stats(prof_filename)
else:
stats = Stats(p, stream=self._stream)
stats.sort_stats(*self._sort_by)
self._stream.write('-' * 80)
self._stream.write('\nPATH: %r\n' % environ.get('PATH_INFO'))
stats.print_stats(*self._restrictions)
self._stream.write('-' * 80 + '\n\n')
return [body]
def setUp(self):
random.seed(20)
if PROFILE:
self.profile = cProfile.Profile()
self.profile.enable()
def setUp(self):
random.seed(20)
self.level_stats = [LevelStats()] * (CellId.MAX_LEVEL + 1)
if PROFILE:
self.profile = cProfile.Profile()
self.profile.enable()
def __call__(self, environ, start_response):
response_body = []
def catching_start_response(status, headers, exc_info=None):
start_response(status, headers, exc_info)
return response_body.append
def runapp():
appiter = self._app(environ, catching_start_response)
response_body.extend(appiter)
if hasattr(appiter, 'close'):
appiter.close()
p = Profile()
start = time.time()
p.runcall(runapp)
body = b''.join(response_body)
elapsed = time.time() - start
if self._profile_dir is not None:
prof_filename = os.path.join(self._profile_dir,
'%s.%s.%06dms.%d.prof' % (
environ['REQUEST_METHOD'],
environ.get('PATH_INFO').strip(
'/').replace('/', '.') or 'root',
elapsed * 1000.0,
time.time()
))
p.dump_stats(prof_filename)
else:
stats = Stats(p, stream=self._stream)
stats.sort_stats(*self._sort_by)
self._stream.write('-' * 80)
self._stream.write('\nPATH: %r\n' % environ.get('PATH_INFO'))
stats.print_stats(*self._restrictions)
self._stream.write('-' * 80 + '\n\n')
return [body]
def reset_stats(self):
"""Reset accumulated profiler statistics."""
# Note: not using self.Profile, since pstats.Stats() fails then
self.stats = pstats.Stats(profile.Profile())
self.ncalls = 0
self.skipped = 0
def reset_stats(self):
"""Reset accumulated profiler statistics."""
# Note: not using self.Profile, since pstats.Stats() fails then
self.stats = pstats.Stats(profile.Profile())
self.ncalls = 0
self.skipped = 0