python类cycle()的实例源码

test_io.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_IOBase_finalize(self):
        # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
        # class which inherits IOBase and an object of this class are caught
        # in a reference cycle and close() is already in the method cache.
        class MyIO(self.IOBase):
            def close(self):
                pass

        # create an instance to populate the method cache
        MyIO()
        obj = MyIO()
        obj.obj = obj
        wr = weakref.ref(obj)
        del MyIO
        del obj
        support.gc_collect()
        self.assertTrue(wr() is None, wr)
ui.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def next_phase(self):
        if not hasattr(self, "_phaser"):
            self._phaser = itertools.cycle(self.phases)
        return next(self._phaser)
ui.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, message, file=None, spin_chars="-\\|/",
                 # Empirically, 8 updates/second looks nice
                 min_update_interval_seconds=0.125):
        self._message = message
        if file is None:
            file = sys.stdout
        self._file = file
        self._rate_limiter = RateLimiter(min_update_interval_seconds)
        self._finished = False

        self._spin_cycle = itertools.cycle(spin_chars)

        self._file.write(" " * get_indentation() + self._message + " ... ")
        self._width = 0
reader.py 文件源码 项目:variational-text-tensorflow 作者: carpedm20 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def iterator(self, data_type="train"):
    raw_data = self.get_data_from_type(data_type)
    return itertools.cycle(([self.onehot(data), data] for data in raw_data if data != []))
ui.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def next_phase(self):
        if not hasattr(self, "_phaser"):
            self._phaser = itertools.cycle(self.phases)
        return next(self._phaser)
ui.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, message, file=None, spin_chars="-\\|/",
                 # Empirically, 8 updates/second looks nice
                 min_update_interval_seconds=0.125):
        self._message = message
        if file is None:
            file = sys.stdout
        self._file = file
        self._rate_limiter = RateLimiter(min_update_interval_seconds)
        self._finished = False

        self._spin_cycle = itertools.cycle(spin_chars)

        self._file.write(" " * get_indentation() + self._message + " ... ")
        self._width = 0
numa.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, distance=None):
        self.nodes = itertools.cycle(get_nodes())
        self.distance = distance
test_euclid.py 文件源码 项目:zellij 作者: nedbat 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_segment_sort_along(p1, p2, tvals):
    # Get rid of pathological cases.
    assume(p1.distance(p2) > 0.001)

    tvals = [t / 100 for t in tvals]
    fuzz = [1e-10, -1e-10]
    points = [along_the_way(p1, p2, t) for t in tvals]
    points = [Point(x+f, y+f) for (x, y), f in zip(points, itertools.cycle(fuzz))]

    # Calculate the smallest distance between any pair of points.  If we get
    # the wrong answer from sort_along, then the total distance will be off by
    # at least twice this.
    min_gap = min(q1.distance(q2) for q1, q2 in all_pairs(points + [p1, p2]))

    seg = Segment(p1, p2)
    spoints = seg.sort_along(points)

    assert len(spoints) == len(points)
    assert all(pt in points for pt in spoints)

    original = Point(*p1).distance(Point(*p2))
    total = (
        Point(*p1).distance(Point(*spoints[0])) +
        sum(Point(*p).distance(Point(*q)) for p, q in adjacent_pairs(spoints)) +
        Point(*spoints[-1]).distance(Point(*p2))
    )
    # The total distance will be wrong by at least 2*min_gap if it is wrong.
    assert total - original < 2 * min_gap


# Bounds
test_colorlabel.py 文件源码 项目:FCN_train 作者: 315386775 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_bg_and_color_cycle():
    image = np.zeros((1, 10))  # dummy image
    label = np.arange(10).reshape(1, -1)
    colors = [(1, 0, 0), (0, 0, 1)]
    bg_color = (0, 0, 0)
    rgb = label2rgb(label, image=image, bg_label=0, bg_color=bg_color,
                    colors=colors, alpha=1)
    assert_close(rgb[0, 0], bg_color)
    for pixel, color in zip(rgb[0, 1:], itertools.cycle(colors)):
        assert_close(pixel, color)
obd_pids.py 文件源码 项目:rpi-can-logger 作者: JonnoFTW 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __call__(self, bs):
        vals = self.dtype(bs)
        if type(vals) not in (tuple, list):
            vals = (vals,)
        return {'{} ({})'.format(pid_name, unit): val for pid_name, val, unit in
                zip(cycle([self.name]), vals, self.dtype.units)}
fms_pids.py 文件源码 项目:rpi-can-logger 作者: JonnoFTW 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __call__(self, msg):
        parsed = self.parser(msg)
        # if parsed is None:
        #     return None
        if type(parsed) not in [tuple, list]:
            parsed = (parsed,)
        return {'{} ({})'.format(pid_name, unit): val for pid_name, val, unit in
                zip(cycle([self.name]), parsed, self.fields)}
tesla_pids.py 文件源码 项目:rpi-can-logger 作者: JonnoFTW 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __call__(self, msg):
        parsed = self.parser(msg)
        if type(parsed) not in [tuple, list]:
            parsed = (parsed,)
        return {'{} ({})'.format(pid_name, unit): val for pid_name, val, unit in
                zip(cycle([self.name]), parsed, self.fields)}
bluetooth_test.py 文件源码 项目:rpi-can-logger 作者: JonnoFTW 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_send():
    btl = BluetoothLogger(fields=["speed", "rpm", "soc"], password=password)
    btl.start()

    # generate some data and send it
    x = range(1000)
    y = map(lambda v: sin(v * pi / 45) * 5000 + 5000, x)
    speeds = cycle(y)
    print("Sending dummy data")
    while 1:
        try:
            row = map(str, [round(next(speeds), 2), 5000, 50])
            btl.send(",".join(row))
            time.sleep(1)
        except KeyboardInterrupt:
            print("Terminating")
            break
    btl.join()
consumer.py 文件源码 项目:django-logpipe 作者: thelabnyc 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(self):
        for consumer in itertools.cycle(self.consumers):
            consumer.run(iter_limit=1)
test_concurrent.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_first_failure(self):
        statements = cycle(("INSERT INTO test3rf.test (k, v) VALUES (%s, %s)", ))
        parameters = [(i, i) for i in range(100)]

        # we'll get an error back from the server
        parameters[57] = ('efefef', 'awefawefawef')

        self.assertRaises(
            InvalidRequest,
            execute_concurrent, self.session, list(zip(statements, parameters)), raise_on_first_error=True)
test_query_paging.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_paging(self):
        statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
                                    [(i, ) for i in range(100)])
        execute_concurrent(self.session, list(statements_and_params))

        prepared = self.session.prepare("SELECT * FROM test3rf.test")

        for fetch_size in (2, 3, 7, 10, 99, 100, 101, 10000):
            self.session.default_fetch_size = fetch_size
            self.assertEqual(100, len(list(self.session.execute("SELECT * FROM test3rf.test"))))

            statement = SimpleStatement("SELECT * FROM test3rf.test")
            self.assertEqual(100, len(list(self.session.execute(statement))))

            self.assertEqual(100, len(list(self.session.execute(prepared))))
test_query_paging.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_paging_state(self):
        """
        Test to validate paging state api
        @since 3.7.0
        @jira_ticket PYTHON-200
        @expected_result paging state should returned should be accurate, and allow for queries to be resumed.

        @test_category queries
        """
        statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
                                    [(i, ) for i in range(100)])
        execute_concurrent(self.session, list(statements_and_params))

        list_all_results = []
        self.session.default_fetch_size = 3

        result_set = self.session.execute("SELECT * FROM test3rf.test")
        while(result_set.has_more_pages):
            for row in result_set.current_rows:
                self.assertNotIn(row, list_all_results)
            list_all_results.extend(result_set.current_rows)
            page_state = result_set.paging_state
            result_set = self.session.execute("SELECT * FROM test3rf.test", paging_state=page_state)

        if(len(result_set.current_rows) > 0):
            list_all_results.append(result_set.current_rows)
        self.assertEqual(len(list_all_results), 100)
test_query_paging.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_paging_verify_writes(self):
        statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
                                    [(i, ) for i in range(100)])
        execute_concurrent(self.session, statements_and_params)

        prepared = self.session.prepare("SELECT * FROM test3rf.test")

        for fetch_size in (2, 3, 7, 10, 99, 100, 101, 10000):
            self.session.default_fetch_size = fetch_size
            results = self.session.execute("SELECT * FROM test3rf.test")
            result_array = set()
            result_set = set()
            for result in results:
                result_array.add(result.k)
                result_set.add(result.v)

            self.assertEqual(set(range(100)), result_array)
            self.assertEqual(set([0]), result_set)

            statement = SimpleStatement("SELECT * FROM test3rf.test")
            results = self.session.execute(statement)
            result_array = set()
            result_set = set()
            for result in results:
                result_array.add(result.k)
                result_set.add(result.v)

            self.assertEqual(set(range(100)), result_array)
            self.assertEqual(set([0]), result_set)

            results = self.session.execute(prepared)
            result_array = set()
            result_set = set()
            for result in results:
                result_array.add(result.k)
                result_set.add(result.v)

            self.assertEqual(set(range(100)), result_array)
            self.assertEqual(set([0]), result_set)
test_query_paging.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_async_paging(self):
        statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
                                    [(i, ) for i in range(100)])
        execute_concurrent(self.session, list(statements_and_params))

        prepared = self.session.prepare("SELECT * FROM test3rf.test")

        for fetch_size in (2, 3, 7, 10, 99, 100, 101, 10000):
            self.session.default_fetch_size = fetch_size
            self.assertEqual(100, len(list(self.session.execute_async("SELECT * FROM test3rf.test").result())))

            statement = SimpleStatement("SELECT * FROM test3rf.test")
            self.assertEqual(100, len(list(self.session.execute_async(statement).result())))

            self.assertEqual(100, len(list(self.session.execute_async(prepared).result())))
test_query_paging.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_concurrent_with_paging(self):
        statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
                                    [(i, ) for i in range(100)])
        execute_concurrent(self.session, list(statements_and_params))

        prepared = self.session.prepare("SELECT * FROM test3rf.test")

        for fetch_size in (2, 3, 7, 10, 99, 100, 101, 10000):
            self.session.default_fetch_size = fetch_size
            results = execute_concurrent_with_args(self.session, prepared, [None] * 10)
            self.assertEqual(10, len(results))
            for (success, result) in results:
                self.assertTrue(success)
                self.assertEqual(100, len(list(result)))


问题


面经


文章

微信
公众号

扫码关注公众号