def timeit(self, number=default_number):
"""Time 'number' executions of the main statement.
To be precise, this executes the setup statement once, and
then returns the time it takes to execute the main statement
a number of times, as a float measured in seconds. The
argument is the number of times through the loop, defaulting
to one million. The main statement, the setup statement and
the timer function to be used are passed to the constructor.
"""
if itertools:
it = itertools.repeat(None, number)
else:
it = [None] * number
gcold = gc.isenabled()
gc.disable()
timing = self.inner(it, self.timer)
if gcold:
gc.enable()
return timing
python类enable()的实例源码
def test_cleanup(self):
gc.enable()
gc.collect()
assert not gc.garbage, "Object leak: %s" % str(gc.garbage)
container = pyngus.Container("abc")
c1 = container.create_connection("c1")
c2 = container.create_connection("c2")
assert c2
del c2
gc.collect()
c2 = container.get_connection("c2")
assert c2
c1 = container.get_connection("c1")
assert c1
c1.create_receiver("r1")
c1.create_sender("s1")
del c1
del c2
container.destroy()
del container
gc.collect()
assert not gc.garbage, "Object leak: %s" % str(gc.garbage)
def test_init():
# Delete existing DB to init from a clean state
if os.path.exists(MOCK_DB_FILE):
os.remove(MOCK_DB_FILE)
db = SqliteStore(MockConfig())
assert db
# Only the Global CIDR should appear here
assert len(db.query_all()) == 1
# Forcibly destroy DB object to close connection
gc.disable()
del db
gc.enable()
# Test the existing DB
db = SqliteStore(MockConfig())
# Again, only the Global CIDR should appear
assert len(db.query_all()) == 1
# Cleanup
os.remove(MOCK_DB_FILE)
def nogc(fun):
"""
Decorator: let a function disable the garbage collector during its execution.
It is used in the build context when storing/loading the build cache file (pickle)
:param fun: function to execute
:type fun: function
:return: the return value of the function executed
"""
def f(*k, **kw):
try:
gc.disable()
ret = fun(*k, **kw)
finally:
gc.enable()
return ret
f.__doc__ = fun.__doc__
return f
def nogc(fun):
"""
Decorator: let a function disable the garbage collector during its execution.
It is used in the build context when storing/loading the build cache file (pickle)
:param fun: function to execute
:type fun: function
:return: the return value of the function executed
"""
def f(*k, **kw):
try:
gc.disable()
ret = fun(*k, **kw)
finally:
gc.enable()
return ret
f.__doc__ = fun.__doc__
return f
def nogc(fun):
"""
Decorator: let a function disable the garbage collector during its execution.
It is used in the build context when storing/loading the build cache file (pickle)
:param fun: function to execute
:type fun: function
:return: the return value of the function executed
"""
def f(*k, **kw):
try:
gc.disable()
ret = fun(*k, **kw)
finally:
gc.enable()
return ret
f.__doc__ = fun.__doc__
return f
def timer(func, repetitions=100000):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
#------------------------------------------------------------------------------
# [ timer_X function decorators ]
# replicate the above decorator with different number of repetitions
#------------------------------------------------------------------------------
def timer_10(func, repetitions=10):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
def timer_100(func, repetitions=100):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
def timer_1k(func, repetitions=1000):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
def timer_10k(func, repetitions=10000):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
def timer(func, repetitions=100000):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
#------------------------------------------------------------------------------
# [ timer_X function decorators ]
# replicate the above decorator with different number of repetitions
#------------------------------------------------------------------------------
def timer_10(func, repetitions=10):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
def timer_100(func, repetitions=100):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
def timer_1k(func, repetitions=1000):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
def timer_10k(func, repetitions=10000):
@wraps(func)
def wrapper(*args, **kwargs):
sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
sys.stdout.flush()
print(" ")
# disable garbage collection
gc.collect()
gc.disable()
start = time.time()
for x in range(repetitions):
result = func(*args, **kwargs)
end = time.time()
gc.enable() # re-enable garbage collection
gc.collect()
print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
return result
return wrapper
def timeit(self, number=default_number):
"""Time 'number' executions of the main statement.
To be precise, this executes the setup statement once, and
then returns the time it takes to execute the main statement
a number of times, as a float measured in seconds. The
argument is the number of times through the loop, defaulting
to one million. The main statement, the setup statement and
the timer function to be used are passed to the constructor.
"""
if itertools:
it = itertools.repeat(None, number)
else:
it = [None] * number
gcold = gc.isenabled()
gc.disable()
try:
timing = self.inner(it, self.timer)
finally:
if gcold:
gc.enable()
return timing
def load_parser(self, path, original_changed_time):
try:
pickle_changed_time = self._index[path]
except KeyError:
return None
if original_changed_time is not None \
and pickle_changed_time < original_changed_time:
# the pickle file is outdated
return None
with open(self._get_hashed_path(path), 'rb') as f:
try:
gc.disable()
parser_cache_item = pickle.load(f)
finally:
gc.enable()
debug.dbg('pickle loaded: %s', path)
parser_cache[path] = parser_cache_item
return parser_cache_item.parser
def load_parser(self, path, original_changed_time):
try:
pickle_changed_time = self._index[path]
except KeyError:
return None
if original_changed_time is not None \
and pickle_changed_time < original_changed_time:
# the pickle file is outdated
return None
with open(self._get_hashed_path(path), 'rb') as f:
try:
gc.disable()
parser_cache_item = pickle.load(f)
finally:
gc.enable()
debug.dbg('pickle loaded: %s', path)
parser_cache[path] = parser_cache_item
return parser_cache_item.parser
def test_load_refcount():
# Check that objects returned by np.load are directly freed based on
# their refcount, rather than needing the gc to collect them.
f = BytesIO()
np.savez(f, [1, 2, 3])
f.seek(0)
assert_(gc.isenabled())
gc.disable()
try:
gc.collect()
np.load(f)
# gc.collect returns the number of unreachable objects in cycles that
# were found -- we are checking that no cycles were created by np.load
n_objects_in_cycles = gc.collect()
finally:
gc.enable()
assert_equal(n_objects_in_cycles, 0)
def bench(func, iterations, stat_memory):
gc.collect()
heap_diff = None
if heapy and stat_memory:
heap_before = heapy.heap()
total_sec = timeit.timeit(func, setup=gc.enable, number=iterations)
if heapy and stat_memory:
heap_diff = heapy.heap() - heap_before
sec_per_req = Decimal(str(total_sec)) / Decimal(str(iterations))
sys.stdout.write('.')
sys.stdout.flush()
return (sec_per_req, heap_diff)
def determine_iterations(func):
# NOTE(kgriffs): Algorithm adapted from IPython's magic timeit
# function to determine iterations so that 0.2 <= total time < 2.0
iterations = ITER_DETECTION_MULTIPLIER
for __ in range(1, ITER_DETECTION_MAX_ATTEMPTS):
gc.collect()
total_sec = timeit.timeit(
func,
setup=gc.enable,
number=int(iterations)
)
if total_sec >= ITER_DETECTION_DURATION_MIN:
assert total_sec < ITER_DETECTION_DURATION_MAX
break
iterations *= ITER_DETECTION_MULTIPLIER
return int(iterations)
def call_unrar(params):
"Calls rar/unrar command line executable, returns stdout pipe"
global rar_executable_cached
if rar_executable_cached is None:
for command in ('unrar', 'rar'):
try:
subprocess.Popen([command], stdout=subprocess.PIPE)
rar_executable_cached = command
break
except OSError:
pass
if rar_executable_cached is None:
raise UnpackerNotInstalled("No suitable RAR unpacker installed")
assert type(params) == list, "params must be list"
args = [rar_executable_cached] + params
try:
gc.disable() # See http://bugs.python.org/issue1336
return subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
finally:
gc.enable()
def _run(self, n):
self._make_args(n)
gcold = gc.isenabled()
gc.disable()
times = []
for i in range(self._cmd.args.runs):
t_start = time.time()
self._compute()
elapsed = time.time() - t_start
times.append(elapsed)
if gcold:
gc.enable()
return times
def timeit(self, number=timeit.default_number):
"""Time 'number' executions of the main statement.
To be precise, this executes the setup statement once, and
then returns the time it takes to execute the main statement
a number of times, as a float measured in seconds. The
argument is the number of times through the loop, defaulting
to one million. The main statement, the setup statement and
the timer function to be used are passed to the constructor.
"""
it = itertools.repeat(None, number)
gcold = gc.isenabled()
gc.disable()
try:
timing = self.inner(it, self.timer)
finally:
if gcold:
gc.enable()
return timing
def timeit(self, number=default_number):
"""Time 'number' executions of the main statement.
To be precise, this executes the setup statement once, and
then returns the time it takes to execute the main statement
a number of times, as a float measured in seconds. The
argument is the number of times through the loop, defaulting
to one million. The main statement, the setup statement and
the timer function to be used are passed to the constructor.
"""
if itertools:
it = itertools.repeat(None, number)
else:
it = [None] * number
gcold = gc.isenabled()
gc.disable()
try:
timing = self.inner(it, self.timer)
finally:
if gcold:
gc.enable()
return timing
def test_free_from_gc(self):
# Check that freeing of blocks by the garbage collector doesn't deadlock
# (issue #12352).
# Make sure the GC is enabled, and set lower collection thresholds to
# make collections more frequent (and increase the probability of
# deadlock).
if not gc.isenabled():
gc.enable()
self.addCleanup(gc.disable)
thresholds = gc.get_threshold()
self.addCleanup(gc.set_threshold, *thresholds)
gc.set_threshold(10)
# perform numerous block allocations, with cyclic references to make
# sure objects are collected asynchronously by the gc
for i in range(5000):
a = multiprocessing.heap.BufferWrapper(1)
b = multiprocessing.heap.BufferWrapper(1)
# circular references
a.buddy = b
b.buddy = a
#
#
#
def test_del_newclass(self):
# __del__ methods can trigger collection, make this to happen
thresholds = gc.get_threshold()
gc.enable()
gc.set_threshold(1)
class A(object):
def __del__(self):
dir(self)
a = A()
del a
gc.disable()
gc.set_threshold(*thresholds)
# The following two tests are fragile:
# They precisely count the number of allocations,
# which is highly implementation-dependent.
# For example, disposed tuples are not freed, but reused.
# To minimize variations, though, we first store the get_count() results
# and check them at the end.
def install_and_load(self):
# TODO automatically install if fails to find anything
FILE_NOT_FOUND_MSG = (
'Did not found TIMIT file "%s"'
', make sure you download and install the dataset')
self.subset = {}
path = os.path.join(os.path.dirname(__file__), 'TIMIT', '%s_set.pkl')
for subset in ['train', 'test']:
filepath = path % subset
if not os.path.exists(filepath):
raise IOError(
FILE_NOT_FOUND_MSG % filepath)
with open(filepath, 'rb') as f:
gc.disable()
all_data = [pickle.load(f)]
all_data.append(pickle.load(f))
all_data.append(pickle.load(f))
gc.enable()
self.subset[subset] = all_data
# use same subset for validation / test
# as TIMIT is small
self.subset['valid'] = self.subset['test']
def benchmark(self, block: RunProgramBlock, runs: int,
cpuset: CPUSet = None, set_id: int = 0) -> BenchmarkingResultBlock:
t = time.time()
block = block.copy()
try:
self._setup_block(block)
gc.collect()
gc.disable()
except IOError as err:
return BenchmarkingResultBlock(error=err)
try:
res = self._benchmark(block, runs, cpuset, set_id)
except BaseException as ex:
return BenchmarkingResultBlock(error=ex)
finally:
gc.enable()
try:
self._teardown_block(block)
except BaseException as err:
return BenchmarkingResultBlock(error=err)
t = time.time() - t
assert isinstance(res, BenchmarkingResultBlock)
res.data["__ov-time"] = [t / runs] * runs
# print(res.data)
return res