python类cycle()的实例源码

SlideShow_Pillow.py 文件源码 项目:Python-GUI-Programming-Cookbook-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, msShowTimeBetweenSlides=1500):
        # initialize tkinter super class
        Tk.__init__(self)

        # time each slide will be shown
        self.showTime = msShowTimeBetweenSlides

        # look for images in current working directory where this module lives
        chapter_folder = path.realpath(path.dirname(__file__))
        resources_folder = path.join(chapter_folder, 'Resources')
        listOfSlides = [slide for slide in listdir(resources_folder) if slide.endswith('gif') or slide.endswith('jpg')]

        # endlessly read in the slides so we can show them on the tkinter Label 
        chdir(resources_folder)
        self.iterableCycle = cycle((ImageTk.PhotoImage(file=slide), slide) for slide in listOfSlides)

        # create tkinter Label widget which can also display images
        self.slidesLabel = Label(self)

        # create the Frame widget
        self.slidesLabel.pack()
strap.py 文件源码 项目:zellij 作者: nedbat 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def pieces_under_over(path, segs_to_points, xings):
    """Produce all the pieces of the path, with a bool indicating if each leads to under or over."""
    pieces = list(path_pieces(path, segs_to_points))
    for i, piece in enumerate(pieces):
        xing = xings.get(piece[-1])
        if xing is None:
            continue
        if xing.under is not None:
            over = (xing.under != path)
        else:
            assert xing.over is not None
            over = (xing.over == path)
        ou = [over, not over]
        if i % 2:
            ou = ou[::-1]
        break
    else:
        ou = [True, False]

    yield from zip(pieces, itertools.cycle(ou))
test_particle.py 文件源码 项目:sappho 作者: lily-mayfield 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_jitter(self):
        # Jitter cycles between +1 & -1 times time delta
        jitter_iter = itertools.cycle((1, -1))

        def jitter(dt):
            return dt * next(jitter_iter)

        physics = particle.PhysicsJitter(y=2, jitter=jitter)
        p = particle.Particle(0, 0)
        self.assertEquals(p.y, 0)

        # Jitter one second; uses jitter value 1 * 1 second * scale 2, +2 to y
        physics(1, p)
        self.assertEquals(p.y, 2)

        # Jitter 2s; uses jitter value -1 * 2 second * scale 2, -4 to y
        physics(2, p)
        self.assertEquals(p.y, -2)

        # Jitter 5s; uses jitter value 1 * 5 second * scale 2, +10 to y
        physics(5, p)
        self.assertEquals(p.y, 8)
console.py 文件源码 项目:django-powerpages 作者: Open-E-WEB 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, console_or_stream=None, **kwargs):
        if isinstance(console_or_stream, Console):
            self.console = console_or_stream
        else:
            self.console = Console(console_or_stream)
        self.template = kwargs.get('template', self.TEMPLATE)
        self.progress_template = \
            kwargs.get('progress_template', self.PROGRESS_TEMPLATE)
        self.progress_brick = kwargs.get('progress_brick', self.PROGRESS_BRICK)
        self.progress_num_bricks = \
            kwargs.get('progress_num_bricks', self.PROGRESS_NUM_BRICKS)
        self.percent_template = \
            kwargs.get('percent_template', self.PERCENT_TEMPLATE)
        self.current_template = \
            kwargs.get('current_template', self.CURRENT_TEMPLATE)
        self.total_template = \
            kwargs.get('total_template', self.TOTAL_TEMPLATE)
        self.eta_template = kwargs.get('eta_template', self.ETA_TEMPLATE)
        self.screw_template = kwargs.get('screw_template', self.SCREW_TEMPLATE)
        self.refresh_every = kwargs.get('refresh_every', self.REFRESH_EVERY)
        self.line_template = self.build_line_template()
        self.start_datetime = None
        self.screw_cycle = itertools.cycle(('|', '/', '-', '\\'))
colorlabel.py 文件源码 项目:FCN_train 作者: 315386775 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _match_label_with_color(label, colors, bg_label, bg_color):
    """Return `unique_labels` and `color_cycle` for label array and color list.

    Colors are cycled for normal labels, but the background color should only
    be used for the background.
    """
    # Temporarily set background color; it will be removed later.
    if bg_color is None:
        bg_color = (0, 0, 0)
    bg_color = _rgb_vector([bg_color])

    unique_labels = list(set(label.flat))
    # Ensure that the background label is in front to match call to `chain`.
    if bg_label in unique_labels:
        unique_labels.remove(bg_label)
    unique_labels.insert(0, bg_label)

    # Modify labels and color cycle so background color is used only once.
    color_cycle = itertools.cycle(colors)
    color_cycle = itertools.chain(bg_color, color_cycle)

    return unique_labels, color_cycle
fms_pids.py 文件源码 项目:rpi-can-logger 作者: JonnoFTW 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, pid: int, name: str, parser, fields: [list, str]):
        """
        A class representing an FMSPID and its conversion
        :param pid: The PID
        :param name:  A friendly name of this PID
        :param parser: a function that turns a byte array of at most length 8 into one or more readable values
        :param fields: the friendly name of the outputs of this PID eg. 'RPM'
        """
        self.pid = pid
        self.name = name
        self.parser = parser
        if type(fields) is str:
            self.fields = [fields]
        else:
            self.fields = fields
        self.fieldnames = ['{} ({})'.format(name, unit) for pid_name, unit in
                           zip(cycle([self.name]), self.fields)]
test_concurrent.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def insert_and_validate_list_results(self, reverse, slowdown):
        """
        This utility method will execute submit various statements for execution using the ConcurrentExecutorListResults,
        then invoke a separate thread to execute the callback associated with the futures registered
        for those statements. The parameters will toggle various timing, and ordering changes.
        Finally it will validate that the results were returned in the order they were submitted
        :param reverse: Execute the callbacks in the opposite order that they were submitted
        :param slowdown: Cause intermittent queries to perform slowly
        """
        our_handler = MockResponseResponseFuture(reverse=reverse)
        mock_session = Mock()
        statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
                                    [(i, ) for i in range(100)])
        mock_session.execute_async.return_value = our_handler

        t = TimedCallableInvoker(our_handler, slowdown=slowdown)
        t.start()
        results = execute_concurrent(mock_session, statements_and_params)

        while(not our_handler.pending_callbacks.empty()):
            time.sleep(.01)
        t.stop()
        self.validate_result_ordering(results)
test_concurrent.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def insert_and_validate_list_generator(self, reverse, slowdown):
        """
        This utility method will execute submit various statements for execution using the ConcurrentExecutorGenResults,
        then invoke a separate thread to execute the callback associated with the futures registered
        for those statements. The parameters will toggle various timing, and ordering changes.
        Finally it will validate that the results were returned in the order they were submitted
        :param reverse: Execute the callbacks in the opposite order that they were submitted
        :param slowdown: Cause intermittent queries to perform slowly
        """
        our_handler = MockResponseResponseFuture(reverse=reverse)
        mock_session = Mock()
        statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
                                    [(i, ) for i in range(100)])
        mock_session.execute_async.return_value = our_handler

        t = TimedCallableInvoker(our_handler, slowdown=slowdown)
        t.start()
        try:
            results = execute_concurrent(mock_session, statements_and_params, results_generator=True)
            self.validate_result_ordering(results)
        finally:
            t.stop()
test_policies.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_wrap_round_robin(self):
        cluster = Mock(spec=Cluster)
        cluster.metadata = Mock(spec=Metadata)
        hosts = [Host(str(i), SimpleConvictionPolicy) for i in range(4)]
        for host in hosts:
            host.set_up()

        def get_replicas(keyspace, packed_key):
            index = struct.unpack('>i', packed_key)[0]
            return list(islice(cycle(hosts), index, index + 2))

        cluster.metadata.get_replicas.side_effect = get_replicas

        policy = TokenAwarePolicy(RoundRobinPolicy())
        policy.populate(cluster, hosts)

        for i in range(4):
            query = Statement(routing_key=struct.pack('>i', i), keyspace='keyspace_name')
            qplan = list(policy.make_query_plan(None, query))

            replicas = get_replicas(None, struct.pack('>i', i))
            other = set(h for h in hosts if h not in replicas)
            self.assertEqual(replicas, qplan[:2])
            self.assertEqual(other, set(qplan[2:]))

        # Should use the secondary policy
        for i in range(4):
            qplan = list(policy.make_query_plan())

            self.assertEqual(set(qplan), set(hosts))
test_concurrent.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_execute_concurrent(self):
        for num_statements in (0, 1, 2, 7, 10, 99, 100, 101, 199, 200, 201):
            # write
            statement = SimpleStatement(
                "INSERT INTO test3rf.test (k, v) VALUES (%s, %s)",
                consistency_level=ConsistencyLevel.QUORUM)
            statements = cycle((statement, ))
            parameters = [(i, i) for i in range(num_statements)]

            results = self.execute_concurrent_helper(self.session, list(zip(statements, parameters)))
            self.assertEqual(num_statements, len(results))
            for success, result in results:
                self.assertTrue(success)
                self.assertFalse(result)

            # read
            statement = SimpleStatement(
                "SELECT v FROM test3rf.test WHERE k=%s",
                consistency_level=ConsistencyLevel.QUORUM)
            statements = cycle((statement, ))
            parameters = [(i, ) for i in range(num_statements)]

            results = self.execute_concurrent_helper(self.session, list(zip(statements, parameters)))
            self.assertEqual(num_statements, len(results))
            self.assertEqual([(True, [(i,)]) for i in range(num_statements)], results)
test_concurrent.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_no_raise_on_first_failure(self):
        statement = SimpleStatement(
            "INSERT INTO test3rf.test (k, v) VALUES (%s, %s)",
            consistency_level=ConsistencyLevel.QUORUM)
        statements = cycle((statement, ))
        parameters = [(i, i) for i in range(100)]

        # we'll get an error back from the server
        parameters[57] = ('efefef', 'awefawefawef')

        results = execute_concurrent(self.session, list(zip(statements, parameters)), raise_on_first_error=False)
        for i, (success, result) in enumerate(results):
            if i == 57:
                self.assertFalse(success)
                self.assertIsInstance(result, InvalidRequest)
            else:
                self.assertTrue(success)
                self.assertFalse(result)
test_concurrent.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_no_raise_on_first_failure_client_side(self):
        statement = SimpleStatement(
            "INSERT INTO test3rf.test (k, v) VALUES (%s, %s)",
            consistency_level=ConsistencyLevel.QUORUM)
        statements = cycle((statement, ))
        parameters = [(i, i) for i in range(100)]

        # the driver will raise an error when binding the params
        parameters[57] = 1

        results = execute_concurrent(self.session, list(zip(statements, parameters)), raise_on_first_error=False)
        for i, (success, result) in enumerate(results):
            if i == 57:
                self.assertFalse(success)
                self.assertIsInstance(result, TypeError)
            else:
                self.assertTrue(success)
                self.assertFalse(result)
policies.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def make_query_plan(self, working_keyspace=None, query=None):
        # not thread-safe, but we don't care much about lost increments
        # for the purposes of load balancing
        pos = self._position
        self._position += 1

        local_live = self._dc_live_hosts.get(self.local_dc, ())
        pos = (pos % len(local_live)) if local_live else 0
        for host in islice(cycle(local_live), pos, pos + len(local_live)):
            yield host

        # the dict can change, so get candidate DCs iterating over keys of a copy
        other_dcs = [dc for dc in self._dc_live_hosts.copy().keys() if dc != self.local_dc]
        for dc in other_dcs:
            remote_live = self._dc_live_hosts.get(dc, ())
            for host in remote_live[:self.used_hosts_per_remote_dc]:
                yield host
plot.py 文件源码 项目:privcount 作者: privcount 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def add_plot_args(parser):
    parser.add_argument('-d', '--data',
        help="""Append a PATH to a privcount tallies.json file,
                and the LABEL we should use for the graph legend for this
                set of experimental results""",
        metavar=("PATH", "LABEL"),
        nargs=2,
        required="True",
        action=PlotDataAction, dest="experiments")

    parser.add_argument('-p', '--prefix',
        help="a STRING filename prefix for graphs we generate",
        metavar="STRING",
        action="store", dest="prefix",
        default=None)

    parser.add_argument('-f', '--format',
        help="""A comma-separated LIST of color/line format strings to cycle to
                matplotlib's plot command (see matplotlib.pyplot.plot)""",
        metavar="LIST",
        action="store", dest="lineformats",
        default=LINEFORMATS)
test_copy.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _copy_from(self, curs, nrecs, srec, copykw):
        f = StringIO()
        for i, c in zip(range(nrecs), cycle(string.ascii_letters)):
            l = c * srec
            f.write("%s\t%s\n" % (i, l))

        f.seek(0)
        curs.copy_from(MinimalRead(f), "tcopy", **copykw)

        curs.execute("select count(*) from tcopy")
        self.assertEqual(nrecs, curs.fetchone()[0])

        curs.execute("select data from tcopy where id < %s order by id",
                (len(string.ascii_letters),))
        for i, (l,) in enumerate(curs):
            self.assertEqual(l, string.ascii_letters[i] * srec)
test_copy.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _copy_from(self, curs, nrecs, srec, copykw):
        f = StringIO()
        for i, c in izip(xrange(nrecs), cycle(string.ascii_letters)):
            l = c * srec
            f.write("%s\t%s\n" % (i, l))

        f.seek(0)
        curs.copy_from(MinimalRead(f), "tcopy", **copykw)

        curs.execute("select count(*) from tcopy")
        self.assertEqual(nrecs, curs.fetchone()[0])

        curs.execute("select data from tcopy where id < %s order by id",
                (len(string.ascii_letters),))
        for i, (l,) in enumerate(curs):
            self.assertEqual(l, string.ascii_letters[i] * srec)
test_pop3.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testLineBuffering(self):
        """
        Test creating a LineBuffer and feeding it some lines.  The lines should
        build up in its internal buffer for a while and then get spat out to
        the writer.
        """
        output = []
        input = iter(itertools.cycle(['012', '345', '6', '7', '8', '9']))
        c = pop3._IteratorBuffer(output.extend, input, 6)
        i = iter(c)
        self.assertEquals(output, []) # nothing is buffer
        i.next()
        self.assertEquals(output, []) # '012' is buffered
        i.next()
        self.assertEquals(output, []) # '012345' is buffered
        i.next()
        self.assertEquals(output, ['012', '345', '6']) # nothing is buffered
        for n in range(5):
            i.next()
        self.assertEquals(output, ['012', '345', '6', '7', '8', '9', '012', '345'])
SlideShow_Pillow.py 文件源码 项目:Python-GUI-Programming-Cookbook-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def slidesCallback(self):
        # get next slide from iterable cycle
        currentInstance, nameOfSlide = next(self.iterableCycle)

        # assign next slide to Label widget
        self.slidesLabel.config(image=currentInstance)

        # update Window title with current slide
        self.title(nameOfSlide)

        # recursively repeat the Show
        self.after(self.showTime, self.slidesCallback)


#=================================
# Start GUI
#=================================
SlideShow.py 文件源码 项目:Python-GUI-Programming-Cookbook-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, msShowTimeBetweenSlides=1500):
        # initialize tkinter super class
        Tk.__init__(self)

        # time each slide will be shown
        self.showTime = msShowTimeBetweenSlides

        # look for images in current working directory where this module lives
        chapter_folder = path.realpath(path.dirname(__file__))
        resources_folder = path.join(chapter_folder, 'Resources')
        listOfSlides = [slide for slide in listdir(resources_folder) if slide.endswith('gif')]

        # endlessly read in the slides so we can show them on the tkinter Label 
        chdir(resources_folder)
        self.iterableCycle = cycle((PhotoImage(file=slide), slide) for slide in listOfSlides)

        # create tkinter Label widget which can also display images
        self.slidesLabel = Label(self)

        # create the Frame widget
        self.slidesLabel.pack()
SlideShow.py 文件源码 项目:Python-GUI-Programming-Cookbook-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def slidesCallback(self):
        # get next slide from iterable cycle
        currentInstance, nameOfSlide = next(self.iterableCycle)

        # assign next slide to Label widget
        self.slidesLabel.config(image=currentInstance)

        # update Window title with current slide
        self.title(nameOfSlide)

        # recursively repeat the Show
        self.after(self.showTime, self.slidesCallback)


#=================================
# Start GUI
#=================================
SlideShow_try_jpg.py 文件源码 项目:Python-GUI-Programming-Cookbook-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, msShowTimeBetweenSlides=1500):
        # initialize tkinter super class
        Tk.__init__(self)

        # time each slide will be shown
        self.showTime = msShowTimeBetweenSlides

        # look for images in current working directory where this module lives
        # try: .jpeg
        chapter_folder = path.realpath(path.dirname(__file__))
        resources_folder = path.join(chapter_folder, 'Resources')
        listOfSlides = [slide for slide in listdir(resources_folder) if slide.endswith('gif') or slide.endswith('jpg')]

        # endlessly read in the slides so we can show them on the tkinter Label 
        chdir(resources_folder)
        self.iterableCycle = cycle((PhotoImage(file=slide), slide) for slide in listOfSlides)

        # create tkinter Label widget which can also display images
        self.slidesLabel = Label(self)

        # create the Frame widget
        self.slidesLabel.pack()
glance.py 文件源码 项目:novajoin 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_api_servers():
    """Return iterator of glance api_servers.

    Return iterator of glance api_servers to cycle through the
    list, looping around to the beginning if necessary.
    """

    api_servers = []

    ks = keystone_client.get_client()

    catalog = keystone_client.get_service_catalog(ks)

    image_service = catalog.url_for(service_type='image')
    if image_service:
        api_servers.append(image_service)

    if CONF.glance_api_servers:
        for api_server in CONF.glance_api_servers:
            api_servers.append(api_server)

    random.shuffle(api_servers)
    return itertools.cycle(api_servers)
example.py 文件源码 项目:DNGR-Keras 作者: MdAsifKhan 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def cluster(data,true_labels,n_clusters=3):

    km = KMeans(init='k-means++', n_clusters=n_clusters, n_init=10)
    km.fit(data)

    km_means_labels = km.labels_
    km_means_cluster_centers = km.cluster_centers_
    km_means_labels_unique = np.unique(km_means_labels)

    colors_ = cycle(colors.cnames.keys())

    initial_dim = np.shape(data)[1]
    data_2 = tsne(data,2,initial_dim,30)

    plt.figure(figsize=(12, 6))
    plt.scatter(data_2[:,0],data_2[:,1], c=true_labels)
    plt.title('True Labels')

    return km_means_labels
nugridse.py 文件源码 项目:NuGridPy 作者: NuGrid 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def plot_prof_2(self, mod, species, xlim1, xlim2):

        """
        Plot one species for cycle between xlim1 and xlim2

        Parameters
        ----------
        mod : string or integer
            Model to plot, same as cycle number.
        species : list
            Which species to plot.
        xlim1, xlim2 : float
            Mass coordinate range.

        """

        mass=self.se.get(mod,'mass')
        Xspecies=self.se.get(mod,'yps',species)
        pyl.plot(mass,Xspecies,'-',label=str(mod)+', '+species)
        pyl.xlim(xlim1,xlim2)
        pyl.legend()
wordembed.py 文件源码 项目:pytorch-skipthoughts 作者: kaniblu 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def chunks(it, n, k):
    buffer = [[] for _ in range(n)]
    buf_it = iter(itertools.cycle(buffer))

    for item in it:
        buf_item = next(buf_it)

        if len(buf_item) == k:
            yield buffer
            buffer = [[] for _ in range(n)]
            buf_it = iter(itertools.cycle(buffer))
            buf_item = next(buf_it)

        buf_item.append(item)

    if all(buffer):
        yield buffer
renderer.py 文件源码 项目:mauzr 作者: eqrx 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _on_graph(self):
        for cfg in self._cfg:
            size = [str(s) for s in cfg["size"]]
            limits = [str(s) for s in cfg["limits"]]
            g = [cfg["path"], "--imgformat", "PNG",
                 "-w", size[0], "-h", size[1],
                 "--vertical-label", cfg["vlabel"], "-t", cfg["title"],
                 "-s", str(round(time.time()-cfg["length"])),
                 "-l", limits[0], "-u", limits[1]]
            color_iterator = itertools.cycle(("ff0000", "00ff00", "0000ff",
                                              "ff00ff", "ffff00", "00ffff"))

            for entry in cfg["lines"]:
                color = next(color_iterator)
                g.append(f"DEF:{entry['name']}={entry['path']}:value:AVERAGE")
                g.append(f"LINE2:{entry['name']}#{color}:{entry['name']}")

            rrdtool.graph(g)
proxylib.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def __init__(self, key):
        self.__key_gen = itertools.cycle([ord(x) for x in key]).next
        self.__key_xor = lambda s: ''.join(chr(ord(x) ^ self.__key_gen()) for x in s)
        if len(key) == 1:
            try:
                from Crypto.Util.strxor import strxor_c
                c = ord(key)
                self.__key_xor = lambda s: strxor_c(s, c)
            except ImportError:
                #logging.debug('Load Crypto.Util.strxor Failed, Use Pure Python Instead.\n')
                pass
swap.py 文件源码 项目:ropf 作者: kevinkoo001 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_swap_subset(self, subset, other):
        # get a copy of subset!
        subset = subset.copy()
        # print "will check", self.name, other.name, "in", subset

        for lifetime in itertools.cycle((self, other)):
            old_size = subset.size
            for sg in lifetime.subsets:
                if sg.intersects(subset):
                    # print "\t", sg, "intersects", subset
                    if sg.no_swap:
                        # print "\tsg is no swap .. bail out"
                        return None
                    # print "\tmerging them!"
                    subset.merge(sg)
            if subset.size == old_size:
                # print "\tsize did not change, done!"
                break

        return subset
test_indexing.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_lengths():
    s = fields.Schema(f1=fields.KEYWORD(stored=True, scorable=True),
                      f2=fields.KEYWORD(stored=True, scorable=True))
    with TempIndex(s, "testlengths") as ix:
        w = ix.writer()
        items = u("ABCDEFG")
        from itertools import cycle, islice
        lengths = [10, 20, 2, 102, 45, 3, 420, 2]
        for length in lengths:
            w.add_document(f2=u(" ").join(islice(cycle(items), length)))
        w.commit()

        with ix.reader() as dr:
            ls1 = [dr.doc_field_length(i, "f1")
                   for i in xrange(0, len(lengths))]
            assert ls1 == [0] * len(lengths)
            ls2 = [dr.doc_field_length(i, "f2")
                   for i in xrange(0, len(lengths))]
            assert ls2 == [byte_to_length(length_to_byte(l)) for l in lengths]
threadring.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def main(n = int(sys.argv[1]), n_threads=503, cycle=itertools.cycle):

    def worker(worker_id):

        n = 1
        while True:
            print n
            if n > 0:
                n = (yield (n - 1))
            else:
                print worker_id
                raise StopIteration


    threadRing = [worker(w) for w in xrange(1, n_threads + 1)]
    for t in threadRing: foo = t.next()           # start exec. gen. funcs
    sendFuncRing = [t.send for t in threadRing]   # speed...
    for send in cycle(sendFuncRing):
        try:
            n = send(n)
        except StopIteration:
            break


问题


面经


文章

微信
公众号

扫码关注公众号