def __init__(self, msShowTimeBetweenSlides=1500):
# initialize tkinter super class
Tk.__init__(self)
# time each slide will be shown
self.showTime = msShowTimeBetweenSlides
# look for images in current working directory where this module lives
chapter_folder = path.realpath(path.dirname(__file__))
resources_folder = path.join(chapter_folder, 'Resources')
listOfSlides = [slide for slide in listdir(resources_folder) if slide.endswith('gif') or slide.endswith('jpg')]
# endlessly read in the slides so we can show them on the tkinter Label
chdir(resources_folder)
self.iterableCycle = cycle((ImageTk.PhotoImage(file=slide), slide) for slide in listOfSlides)
# create tkinter Label widget which can also display images
self.slidesLabel = Label(self)
# create the Frame widget
self.slidesLabel.pack()
python类cycle()的实例源码
SlideShow_Pillow.py 文件源码
项目:Python-GUI-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def pieces_under_over(path, segs_to_points, xings):
"""Produce all the pieces of the path, with a bool indicating if each leads to under or over."""
pieces = list(path_pieces(path, segs_to_points))
for i, piece in enumerate(pieces):
xing = xings.get(piece[-1])
if xing is None:
continue
if xing.under is not None:
over = (xing.under != path)
else:
assert xing.over is not None
over = (xing.over == path)
ou = [over, not over]
if i % 2:
ou = ou[::-1]
break
else:
ou = [True, False]
yield from zip(pieces, itertools.cycle(ou))
def test_jitter(self):
# Jitter cycles between +1 & -1 times time delta
jitter_iter = itertools.cycle((1, -1))
def jitter(dt):
return dt * next(jitter_iter)
physics = particle.PhysicsJitter(y=2, jitter=jitter)
p = particle.Particle(0, 0)
self.assertEquals(p.y, 0)
# Jitter one second; uses jitter value 1 * 1 second * scale 2, +2 to y
physics(1, p)
self.assertEquals(p.y, 2)
# Jitter 2s; uses jitter value -1 * 2 second * scale 2, -4 to y
physics(2, p)
self.assertEquals(p.y, -2)
# Jitter 5s; uses jitter value 1 * 5 second * scale 2, +10 to y
physics(5, p)
self.assertEquals(p.y, 8)
def __init__(self, console_or_stream=None, **kwargs):
if isinstance(console_or_stream, Console):
self.console = console_or_stream
else:
self.console = Console(console_or_stream)
self.template = kwargs.get('template', self.TEMPLATE)
self.progress_template = \
kwargs.get('progress_template', self.PROGRESS_TEMPLATE)
self.progress_brick = kwargs.get('progress_brick', self.PROGRESS_BRICK)
self.progress_num_bricks = \
kwargs.get('progress_num_bricks', self.PROGRESS_NUM_BRICKS)
self.percent_template = \
kwargs.get('percent_template', self.PERCENT_TEMPLATE)
self.current_template = \
kwargs.get('current_template', self.CURRENT_TEMPLATE)
self.total_template = \
kwargs.get('total_template', self.TOTAL_TEMPLATE)
self.eta_template = kwargs.get('eta_template', self.ETA_TEMPLATE)
self.screw_template = kwargs.get('screw_template', self.SCREW_TEMPLATE)
self.refresh_every = kwargs.get('refresh_every', self.REFRESH_EVERY)
self.line_template = self.build_line_template()
self.start_datetime = None
self.screw_cycle = itertools.cycle(('|', '/', '-', '\\'))
def _match_label_with_color(label, colors, bg_label, bg_color):
"""Return `unique_labels` and `color_cycle` for label array and color list.
Colors are cycled for normal labels, but the background color should only
be used for the background.
"""
# Temporarily set background color; it will be removed later.
if bg_color is None:
bg_color = (0, 0, 0)
bg_color = _rgb_vector([bg_color])
unique_labels = list(set(label.flat))
# Ensure that the background label is in front to match call to `chain`.
if bg_label in unique_labels:
unique_labels.remove(bg_label)
unique_labels.insert(0, bg_label)
# Modify labels and color cycle so background color is used only once.
color_cycle = itertools.cycle(colors)
color_cycle = itertools.chain(bg_color, color_cycle)
return unique_labels, color_cycle
def __init__(self, pid: int, name: str, parser, fields: [list, str]):
"""
A class representing an FMSPID and its conversion
:param pid: The PID
:param name: A friendly name of this PID
:param parser: a function that turns a byte array of at most length 8 into one or more readable values
:param fields: the friendly name of the outputs of this PID eg. 'RPM'
"""
self.pid = pid
self.name = name
self.parser = parser
if type(fields) is str:
self.fields = [fields]
else:
self.fields = fields
self.fieldnames = ['{} ({})'.format(name, unit) for pid_name, unit in
zip(cycle([self.name]), self.fields)]
def insert_and_validate_list_results(self, reverse, slowdown):
"""
This utility method will execute submit various statements for execution using the ConcurrentExecutorListResults,
then invoke a separate thread to execute the callback associated with the futures registered
for those statements. The parameters will toggle various timing, and ordering changes.
Finally it will validate that the results were returned in the order they were submitted
:param reverse: Execute the callbacks in the opposite order that they were submitted
:param slowdown: Cause intermittent queries to perform slowly
"""
our_handler = MockResponseResponseFuture(reverse=reverse)
mock_session = Mock()
statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
[(i, ) for i in range(100)])
mock_session.execute_async.return_value = our_handler
t = TimedCallableInvoker(our_handler, slowdown=slowdown)
t.start()
results = execute_concurrent(mock_session, statements_and_params)
while(not our_handler.pending_callbacks.empty()):
time.sleep(.01)
t.stop()
self.validate_result_ordering(results)
def insert_and_validate_list_generator(self, reverse, slowdown):
"""
This utility method will execute submit various statements for execution using the ConcurrentExecutorGenResults,
then invoke a separate thread to execute the callback associated with the futures registered
for those statements. The parameters will toggle various timing, and ordering changes.
Finally it will validate that the results were returned in the order they were submitted
:param reverse: Execute the callbacks in the opposite order that they were submitted
:param slowdown: Cause intermittent queries to perform slowly
"""
our_handler = MockResponseResponseFuture(reverse=reverse)
mock_session = Mock()
statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
[(i, ) for i in range(100)])
mock_session.execute_async.return_value = our_handler
t = TimedCallableInvoker(our_handler, slowdown=slowdown)
t.start()
try:
results = execute_concurrent(mock_session, statements_and_params, results_generator=True)
self.validate_result_ordering(results)
finally:
t.stop()
def test_wrap_round_robin(self):
cluster = Mock(spec=Cluster)
cluster.metadata = Mock(spec=Metadata)
hosts = [Host(str(i), SimpleConvictionPolicy) for i in range(4)]
for host in hosts:
host.set_up()
def get_replicas(keyspace, packed_key):
index = struct.unpack('>i', packed_key)[0]
return list(islice(cycle(hosts), index, index + 2))
cluster.metadata.get_replicas.side_effect = get_replicas
policy = TokenAwarePolicy(RoundRobinPolicy())
policy.populate(cluster, hosts)
for i in range(4):
query = Statement(routing_key=struct.pack('>i', i), keyspace='keyspace_name')
qplan = list(policy.make_query_plan(None, query))
replicas = get_replicas(None, struct.pack('>i', i))
other = set(h for h in hosts if h not in replicas)
self.assertEqual(replicas, qplan[:2])
self.assertEqual(other, set(qplan[2:]))
# Should use the secondary policy
for i in range(4):
qplan = list(policy.make_query_plan())
self.assertEqual(set(qplan), set(hosts))
def test_execute_concurrent(self):
for num_statements in (0, 1, 2, 7, 10, 99, 100, 101, 199, 200, 201):
# write
statement = SimpleStatement(
"INSERT INTO test3rf.test (k, v) VALUES (%s, %s)",
consistency_level=ConsistencyLevel.QUORUM)
statements = cycle((statement, ))
parameters = [(i, i) for i in range(num_statements)]
results = self.execute_concurrent_helper(self.session, list(zip(statements, parameters)))
self.assertEqual(num_statements, len(results))
for success, result in results:
self.assertTrue(success)
self.assertFalse(result)
# read
statement = SimpleStatement(
"SELECT v FROM test3rf.test WHERE k=%s",
consistency_level=ConsistencyLevel.QUORUM)
statements = cycle((statement, ))
parameters = [(i, ) for i in range(num_statements)]
results = self.execute_concurrent_helper(self.session, list(zip(statements, parameters)))
self.assertEqual(num_statements, len(results))
self.assertEqual([(True, [(i,)]) for i in range(num_statements)], results)
def test_no_raise_on_first_failure(self):
statement = SimpleStatement(
"INSERT INTO test3rf.test (k, v) VALUES (%s, %s)",
consistency_level=ConsistencyLevel.QUORUM)
statements = cycle((statement, ))
parameters = [(i, i) for i in range(100)]
# we'll get an error back from the server
parameters[57] = ('efefef', 'awefawefawef')
results = execute_concurrent(self.session, list(zip(statements, parameters)), raise_on_first_error=False)
for i, (success, result) in enumerate(results):
if i == 57:
self.assertFalse(success)
self.assertIsInstance(result, InvalidRequest)
else:
self.assertTrue(success)
self.assertFalse(result)
def test_no_raise_on_first_failure_client_side(self):
statement = SimpleStatement(
"INSERT INTO test3rf.test (k, v) VALUES (%s, %s)",
consistency_level=ConsistencyLevel.QUORUM)
statements = cycle((statement, ))
parameters = [(i, i) for i in range(100)]
# the driver will raise an error when binding the params
parameters[57] = 1
results = execute_concurrent(self.session, list(zip(statements, parameters)), raise_on_first_error=False)
for i, (success, result) in enumerate(results):
if i == 57:
self.assertFalse(success)
self.assertIsInstance(result, TypeError)
else:
self.assertTrue(success)
self.assertFalse(result)
def make_query_plan(self, working_keyspace=None, query=None):
# not thread-safe, but we don't care much about lost increments
# for the purposes of load balancing
pos = self._position
self._position += 1
local_live = self._dc_live_hosts.get(self.local_dc, ())
pos = (pos % len(local_live)) if local_live else 0
for host in islice(cycle(local_live), pos, pos + len(local_live)):
yield host
# the dict can change, so get candidate DCs iterating over keys of a copy
other_dcs = [dc for dc in self._dc_live_hosts.copy().keys() if dc != self.local_dc]
for dc in other_dcs:
remote_live = self._dc_live_hosts.get(dc, ())
for host in remote_live[:self.used_hosts_per_remote_dc]:
yield host
def add_plot_args(parser):
parser.add_argument('-d', '--data',
help="""Append a PATH to a privcount tallies.json file,
and the LABEL we should use for the graph legend for this
set of experimental results""",
metavar=("PATH", "LABEL"),
nargs=2,
required="True",
action=PlotDataAction, dest="experiments")
parser.add_argument('-p', '--prefix',
help="a STRING filename prefix for graphs we generate",
metavar="STRING",
action="store", dest="prefix",
default=None)
parser.add_argument('-f', '--format',
help="""A comma-separated LIST of color/line format strings to cycle to
matplotlib's plot command (see matplotlib.pyplot.plot)""",
metavar="LIST",
action="store", dest="lineformats",
default=LINEFORMATS)
def _copy_from(self, curs, nrecs, srec, copykw):
f = StringIO()
for i, c in zip(range(nrecs), cycle(string.ascii_letters)):
l = c * srec
f.write("%s\t%s\n" % (i, l))
f.seek(0)
curs.copy_from(MinimalRead(f), "tcopy", **copykw)
curs.execute("select count(*) from tcopy")
self.assertEqual(nrecs, curs.fetchone()[0])
curs.execute("select data from tcopy where id < %s order by id",
(len(string.ascii_letters),))
for i, (l,) in enumerate(curs):
self.assertEqual(l, string.ascii_letters[i] * srec)
def _copy_from(self, curs, nrecs, srec, copykw):
f = StringIO()
for i, c in izip(xrange(nrecs), cycle(string.ascii_letters)):
l = c * srec
f.write("%s\t%s\n" % (i, l))
f.seek(0)
curs.copy_from(MinimalRead(f), "tcopy", **copykw)
curs.execute("select count(*) from tcopy")
self.assertEqual(nrecs, curs.fetchone()[0])
curs.execute("select data from tcopy where id < %s order by id",
(len(string.ascii_letters),))
for i, (l,) in enumerate(curs):
self.assertEqual(l, string.ascii_letters[i] * srec)
def testLineBuffering(self):
"""
Test creating a LineBuffer and feeding it some lines. The lines should
build up in its internal buffer for a while and then get spat out to
the writer.
"""
output = []
input = iter(itertools.cycle(['012', '345', '6', '7', '8', '9']))
c = pop3._IteratorBuffer(output.extend, input, 6)
i = iter(c)
self.assertEquals(output, []) # nothing is buffer
i.next()
self.assertEquals(output, []) # '012' is buffered
i.next()
self.assertEquals(output, []) # '012345' is buffered
i.next()
self.assertEquals(output, ['012', '345', '6']) # nothing is buffered
for n in range(5):
i.next()
self.assertEquals(output, ['012', '345', '6', '7', '8', '9', '012', '345'])
SlideShow_Pillow.py 文件源码
项目:Python-GUI-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def slidesCallback(self):
# get next slide from iterable cycle
currentInstance, nameOfSlide = next(self.iterableCycle)
# assign next slide to Label widget
self.slidesLabel.config(image=currentInstance)
# update Window title with current slide
self.title(nameOfSlide)
# recursively repeat the Show
self.after(self.showTime, self.slidesCallback)
#=================================
# Start GUI
#=================================
SlideShow.py 文件源码
项目:Python-GUI-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def __init__(self, msShowTimeBetweenSlides=1500):
# initialize tkinter super class
Tk.__init__(self)
# time each slide will be shown
self.showTime = msShowTimeBetweenSlides
# look for images in current working directory where this module lives
chapter_folder = path.realpath(path.dirname(__file__))
resources_folder = path.join(chapter_folder, 'Resources')
listOfSlides = [slide for slide in listdir(resources_folder) if slide.endswith('gif')]
# endlessly read in the slides so we can show them on the tkinter Label
chdir(resources_folder)
self.iterableCycle = cycle((PhotoImage(file=slide), slide) for slide in listOfSlides)
# create tkinter Label widget which can also display images
self.slidesLabel = Label(self)
# create the Frame widget
self.slidesLabel.pack()
SlideShow.py 文件源码
项目:Python-GUI-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def slidesCallback(self):
# get next slide from iterable cycle
currentInstance, nameOfSlide = next(self.iterableCycle)
# assign next slide to Label widget
self.slidesLabel.config(image=currentInstance)
# update Window title with current slide
self.title(nameOfSlide)
# recursively repeat the Show
self.after(self.showTime, self.slidesCallback)
#=================================
# Start GUI
#=================================
SlideShow_try_jpg.py 文件源码
项目:Python-GUI-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def __init__(self, msShowTimeBetweenSlides=1500):
# initialize tkinter super class
Tk.__init__(self)
# time each slide will be shown
self.showTime = msShowTimeBetweenSlides
# look for images in current working directory where this module lives
# try: .jpeg
chapter_folder = path.realpath(path.dirname(__file__))
resources_folder = path.join(chapter_folder, 'Resources')
listOfSlides = [slide for slide in listdir(resources_folder) if slide.endswith('gif') or slide.endswith('jpg')]
# endlessly read in the slides so we can show them on the tkinter Label
chdir(resources_folder)
self.iterableCycle = cycle((PhotoImage(file=slide), slide) for slide in listOfSlides)
# create tkinter Label widget which can also display images
self.slidesLabel = Label(self)
# create the Frame widget
self.slidesLabel.pack()
def get_api_servers():
"""Return iterator of glance api_servers.
Return iterator of glance api_servers to cycle through the
list, looping around to the beginning if necessary.
"""
api_servers = []
ks = keystone_client.get_client()
catalog = keystone_client.get_service_catalog(ks)
image_service = catalog.url_for(service_type='image')
if image_service:
api_servers.append(image_service)
if CONF.glance_api_servers:
for api_server in CONF.glance_api_servers:
api_servers.append(api_server)
random.shuffle(api_servers)
return itertools.cycle(api_servers)
def cluster(data,true_labels,n_clusters=3):
km = KMeans(init='k-means++', n_clusters=n_clusters, n_init=10)
km.fit(data)
km_means_labels = km.labels_
km_means_cluster_centers = km.cluster_centers_
km_means_labels_unique = np.unique(km_means_labels)
colors_ = cycle(colors.cnames.keys())
initial_dim = np.shape(data)[1]
data_2 = tsne(data,2,initial_dim,30)
plt.figure(figsize=(12, 6))
plt.scatter(data_2[:,0],data_2[:,1], c=true_labels)
plt.title('True Labels')
return km_means_labels
def plot_prof_2(self, mod, species, xlim1, xlim2):
"""
Plot one species for cycle between xlim1 and xlim2
Parameters
----------
mod : string or integer
Model to plot, same as cycle number.
species : list
Which species to plot.
xlim1, xlim2 : float
Mass coordinate range.
"""
mass=self.se.get(mod,'mass')
Xspecies=self.se.get(mod,'yps',species)
pyl.plot(mass,Xspecies,'-',label=str(mod)+', '+species)
pyl.xlim(xlim1,xlim2)
pyl.legend()
def chunks(it, n, k):
buffer = [[] for _ in range(n)]
buf_it = iter(itertools.cycle(buffer))
for item in it:
buf_item = next(buf_it)
if len(buf_item) == k:
yield buffer
buffer = [[] for _ in range(n)]
buf_it = iter(itertools.cycle(buffer))
buf_item = next(buf_it)
buf_item.append(item)
if all(buffer):
yield buffer
def _on_graph(self):
for cfg in self._cfg:
size = [str(s) for s in cfg["size"]]
limits = [str(s) for s in cfg["limits"]]
g = [cfg["path"], "--imgformat", "PNG",
"-w", size[0], "-h", size[1],
"--vertical-label", cfg["vlabel"], "-t", cfg["title"],
"-s", str(round(time.time()-cfg["length"])),
"-l", limits[0], "-u", limits[1]]
color_iterator = itertools.cycle(("ff0000", "00ff00", "0000ff",
"ff00ff", "ffff00", "00ffff"))
for entry in cfg["lines"]:
color = next(color_iterator)
g.append(f"DEF:{entry['name']}={entry['path']}:value:AVERAGE")
g.append(f"LINE2:{entry['name']}#{color}:{entry['name']}")
rrdtool.graph(g)
def __init__(self, key):
self.__key_gen = itertools.cycle([ord(x) for x in key]).next
self.__key_xor = lambda s: ''.join(chr(ord(x) ^ self.__key_gen()) for x in s)
if len(key) == 1:
try:
from Crypto.Util.strxor import strxor_c
c = ord(key)
self.__key_xor = lambda s: strxor_c(s, c)
except ImportError:
#logging.debug('Load Crypto.Util.strxor Failed, Use Pure Python Instead.\n')
pass
def get_swap_subset(self, subset, other):
# get a copy of subset!
subset = subset.copy()
# print "will check", self.name, other.name, "in", subset
for lifetime in itertools.cycle((self, other)):
old_size = subset.size
for sg in lifetime.subsets:
if sg.intersects(subset):
# print "\t", sg, "intersects", subset
if sg.no_swap:
# print "\tsg is no swap .. bail out"
return None
# print "\tmerging them!"
subset.merge(sg)
if subset.size == old_size:
# print "\tsize did not change, done!"
break
return subset
def test_lengths():
s = fields.Schema(f1=fields.KEYWORD(stored=True, scorable=True),
f2=fields.KEYWORD(stored=True, scorable=True))
with TempIndex(s, "testlengths") as ix:
w = ix.writer()
items = u("ABCDEFG")
from itertools import cycle, islice
lengths = [10, 20, 2, 102, 45, 3, 420, 2]
for length in lengths:
w.add_document(f2=u(" ").join(islice(cycle(items), length)))
w.commit()
with ix.reader() as dr:
ls1 = [dr.doc_field_length(i, "f1")
for i in xrange(0, len(lengths))]
assert ls1 == [0] * len(lengths)
ls2 = [dr.doc_field_length(i, "f2")
for i in xrange(0, len(lengths))]
assert ls2 == [byte_to_length(length_to_byte(l)) for l in lengths]
def main(n = int(sys.argv[1]), n_threads=503, cycle=itertools.cycle):
def worker(worker_id):
n = 1
while True:
print n
if n > 0:
n = (yield (n - 1))
else:
print worker_id
raise StopIteration
threadRing = [worker(w) for w in xrange(1, n_threads + 1)]
for t in threadRing: foo = t.next() # start exec. gen. funcs
sendFuncRing = [t.send for t in threadRing] # speed...
for send in cycle(sendFuncRing):
try:
n = send(n)
except StopIteration:
break