def test_IOBase_finalize(self):
# Issue #12149: segmentation fault on _PyIOBase_finalize when both a
# class which inherits IOBase and an object of this class are caught
# in a reference cycle and close() is already in the method cache.
class MyIO(self.IOBase):
def close(self):
pass
# create an instance to populate the method cache
MyIO()
obj = MyIO()
obj.obj = obj
wr = weakref.ref(obj)
del MyIO
del obj
support.gc_collect()
self.assertTrue(wr() is None, wr)
python类cycle()的实例源码
def next_phase(self):
if not hasattr(self, "_phaser"):
self._phaser = itertools.cycle(self.phases)
return next(self._phaser)
def __init__(self, message, file=None, spin_chars="-\\|/",
# Empirically, 8 updates/second looks nice
min_update_interval_seconds=0.125):
self._message = message
if file is None:
file = sys.stdout
self._file = file
self._rate_limiter = RateLimiter(min_update_interval_seconds)
self._finished = False
self._spin_cycle = itertools.cycle(spin_chars)
self._file.write(" " * get_indentation() + self._message + " ... ")
self._width = 0
def iterator(self, data_type="train"):
raw_data = self.get_data_from_type(data_type)
return itertools.cycle(([self.onehot(data), data] for data in raw_data if data != []))
def next_phase(self):
if not hasattr(self, "_phaser"):
self._phaser = itertools.cycle(self.phases)
return next(self._phaser)
def __init__(self, message, file=None, spin_chars="-\\|/",
# Empirically, 8 updates/second looks nice
min_update_interval_seconds=0.125):
self._message = message
if file is None:
file = sys.stdout
self._file = file
self._rate_limiter = RateLimiter(min_update_interval_seconds)
self._finished = False
self._spin_cycle = itertools.cycle(spin_chars)
self._file.write(" " * get_indentation() + self._message + " ... ")
self._width = 0
def __init__(self, distance=None):
self.nodes = itertools.cycle(get_nodes())
self.distance = distance
def test_segment_sort_along(p1, p2, tvals):
# Get rid of pathological cases.
assume(p1.distance(p2) > 0.001)
tvals = [t / 100 for t in tvals]
fuzz = [1e-10, -1e-10]
points = [along_the_way(p1, p2, t) for t in tvals]
points = [Point(x+f, y+f) for (x, y), f in zip(points, itertools.cycle(fuzz))]
# Calculate the smallest distance between any pair of points. If we get
# the wrong answer from sort_along, then the total distance will be off by
# at least twice this.
min_gap = min(q1.distance(q2) for q1, q2 in all_pairs(points + [p1, p2]))
seg = Segment(p1, p2)
spoints = seg.sort_along(points)
assert len(spoints) == len(points)
assert all(pt in points for pt in spoints)
original = Point(*p1).distance(Point(*p2))
total = (
Point(*p1).distance(Point(*spoints[0])) +
sum(Point(*p).distance(Point(*q)) for p, q in adjacent_pairs(spoints)) +
Point(*spoints[-1]).distance(Point(*p2))
)
# The total distance will be wrong by at least 2*min_gap if it is wrong.
assert total - original < 2 * min_gap
# Bounds
def test_bg_and_color_cycle():
image = np.zeros((1, 10)) # dummy image
label = np.arange(10).reshape(1, -1)
colors = [(1, 0, 0), (0, 0, 1)]
bg_color = (0, 0, 0)
rgb = label2rgb(label, image=image, bg_label=0, bg_color=bg_color,
colors=colors, alpha=1)
assert_close(rgb[0, 0], bg_color)
for pixel, color in zip(rgb[0, 1:], itertools.cycle(colors)):
assert_close(pixel, color)
def __call__(self, bs):
vals = self.dtype(bs)
if type(vals) not in (tuple, list):
vals = (vals,)
return {'{} ({})'.format(pid_name, unit): val for pid_name, val, unit in
zip(cycle([self.name]), vals, self.dtype.units)}
def __call__(self, msg):
parsed = self.parser(msg)
# if parsed is None:
# return None
if type(parsed) not in [tuple, list]:
parsed = (parsed,)
return {'{} ({})'.format(pid_name, unit): val for pid_name, val, unit in
zip(cycle([self.name]), parsed, self.fields)}
def __call__(self, msg):
parsed = self.parser(msg)
if type(parsed) not in [tuple, list]:
parsed = (parsed,)
return {'{} ({})'.format(pid_name, unit): val for pid_name, val, unit in
zip(cycle([self.name]), parsed, self.fields)}
def test_send():
btl = BluetoothLogger(fields=["speed", "rpm", "soc"], password=password)
btl.start()
# generate some data and send it
x = range(1000)
y = map(lambda v: sin(v * pi / 45) * 5000 + 5000, x)
speeds = cycle(y)
print("Sending dummy data")
while 1:
try:
row = map(str, [round(next(speeds), 2), 5000, 50])
btl.send(",".join(row))
time.sleep(1)
except KeyboardInterrupt:
print("Terminating")
break
btl.join()
def run(self):
for consumer in itertools.cycle(self.consumers):
consumer.run(iter_limit=1)
def test_first_failure(self):
statements = cycle(("INSERT INTO test3rf.test (k, v) VALUES (%s, %s)", ))
parameters = [(i, i) for i in range(100)]
# we'll get an error back from the server
parameters[57] = ('efefef', 'awefawefawef')
self.assertRaises(
InvalidRequest,
execute_concurrent, self.session, list(zip(statements, parameters)), raise_on_first_error=True)
test_query_paging.py 文件源码
项目:deb-python-cassandra-driver
作者: openstack
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_paging(self):
statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
[(i, ) for i in range(100)])
execute_concurrent(self.session, list(statements_and_params))
prepared = self.session.prepare("SELECT * FROM test3rf.test")
for fetch_size in (2, 3, 7, 10, 99, 100, 101, 10000):
self.session.default_fetch_size = fetch_size
self.assertEqual(100, len(list(self.session.execute("SELECT * FROM test3rf.test"))))
statement = SimpleStatement("SELECT * FROM test3rf.test")
self.assertEqual(100, len(list(self.session.execute(statement))))
self.assertEqual(100, len(list(self.session.execute(prepared))))
test_query_paging.py 文件源码
项目:deb-python-cassandra-driver
作者: openstack
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_paging_state(self):
"""
Test to validate paging state api
@since 3.7.0
@jira_ticket PYTHON-200
@expected_result paging state should returned should be accurate, and allow for queries to be resumed.
@test_category queries
"""
statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
[(i, ) for i in range(100)])
execute_concurrent(self.session, list(statements_and_params))
list_all_results = []
self.session.default_fetch_size = 3
result_set = self.session.execute("SELECT * FROM test3rf.test")
while(result_set.has_more_pages):
for row in result_set.current_rows:
self.assertNotIn(row, list_all_results)
list_all_results.extend(result_set.current_rows)
page_state = result_set.paging_state
result_set = self.session.execute("SELECT * FROM test3rf.test", paging_state=page_state)
if(len(result_set.current_rows) > 0):
list_all_results.append(result_set.current_rows)
self.assertEqual(len(list_all_results), 100)
test_query_paging.py 文件源码
项目:deb-python-cassandra-driver
作者: openstack
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_paging_verify_writes(self):
statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
[(i, ) for i in range(100)])
execute_concurrent(self.session, statements_and_params)
prepared = self.session.prepare("SELECT * FROM test3rf.test")
for fetch_size in (2, 3, 7, 10, 99, 100, 101, 10000):
self.session.default_fetch_size = fetch_size
results = self.session.execute("SELECT * FROM test3rf.test")
result_array = set()
result_set = set()
for result in results:
result_array.add(result.k)
result_set.add(result.v)
self.assertEqual(set(range(100)), result_array)
self.assertEqual(set([0]), result_set)
statement = SimpleStatement("SELECT * FROM test3rf.test")
results = self.session.execute(statement)
result_array = set()
result_set = set()
for result in results:
result_array.add(result.k)
result_set.add(result.v)
self.assertEqual(set(range(100)), result_array)
self.assertEqual(set([0]), result_set)
results = self.session.execute(prepared)
result_array = set()
result_set = set()
for result in results:
result_array.add(result.k)
result_set.add(result.v)
self.assertEqual(set(range(100)), result_array)
self.assertEqual(set([0]), result_set)
test_query_paging.py 文件源码
项目:deb-python-cassandra-driver
作者: openstack
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_async_paging(self):
statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
[(i, ) for i in range(100)])
execute_concurrent(self.session, list(statements_and_params))
prepared = self.session.prepare("SELECT * FROM test3rf.test")
for fetch_size in (2, 3, 7, 10, 99, 100, 101, 10000):
self.session.default_fetch_size = fetch_size
self.assertEqual(100, len(list(self.session.execute_async("SELECT * FROM test3rf.test").result())))
statement = SimpleStatement("SELECT * FROM test3rf.test")
self.assertEqual(100, len(list(self.session.execute_async(statement).result())))
self.assertEqual(100, len(list(self.session.execute_async(prepared).result())))
test_query_paging.py 文件源码
项目:deb-python-cassandra-driver
作者: openstack
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_concurrent_with_paging(self):
statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
[(i, ) for i in range(100)])
execute_concurrent(self.session, list(statements_and_params))
prepared = self.session.prepare("SELECT * FROM test3rf.test")
for fetch_size in (2, 3, 7, 10, 99, 100, 101, 10000):
self.session.default_fetch_size = fetch_size
results = execute_concurrent_with_args(self.session, prepared, [None] * 10)
self.assertEqual(10, len(results))
for (success, result) in results:
self.assertTrue(success)
self.assertEqual(100, len(list(result)))