python类sleep()的实例源码

pools_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_exhaustion(self):
        waiter = Queue(0)

        def consumer():
            gotten = None
            try:
                gotten = self.pool.get()
            finally:
                waiter.put(gotten)

        eventlet.spawn(consumer)

        one, two, three, four = (
            self.pool.get(), self.pool.get(), self.pool.get(), self.pool.get())
        self.assertEqual(self.pool.free(), 0)

        # Let consumer run; nothing will be in the pool, so he will wait
        eventlet.sleep(0)

        # Wake consumer
        self.pool.put(one)

        # wait for the consumer
        self.assertEqual(waiter.get(), one)
pools_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_create_contention(self):
        creates = [0]

        def sleep_create():
            creates[0] += 1
            eventlet.sleep()
            return "slept"

        p = pools.Pool(max_size=4, create=sleep_create)

        def do_get():
            x = p.get()
            self.assertEqual(x, "slept")
            p.put(x)

        gp = eventlet.GreenPool()
        for i in six.moves.range(100):
            gp.spawn_n(do_get)
        gp.waitall()
        self.assertEqual(creates[0], 4)
thread_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_no_leaking(self):
        refs = weakref.WeakKeyDictionary()
        my_local = corolocal.local()

        class X(object):
            pass

        def do_something(i):
            o = X()
            refs[o] = True
            my_local.foo = o

        p = eventlet.GreenPool()
        for i in six.moves.range(100):
            p.spawn(do_something, i)
        p.waitall()
        del p
        gc.collect()
        eventlet.sleep(0)
        gc.collect()
        # at this point all our coros have terminated
        self.assertEqual(len(refs), 1)
socket_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_recv_into_type():
    # make sure `_recv_loop` returns the correct value when `recv_meth` is of
    # foo_into type (fills a buffer and returns number of bytes, not the data)
    # Using threads like `test_recv_type` above.
    threading = eventlet.patcher.original('threading')
    addr = []

    def server():
        sock = eventlet.listen(('127.0.0.1', 0))
        addr[:] = sock.getsockname()
        eventlet.sleep(0.2)

    server_thread = threading.Thread(target=server)
    server_thread.start()
    eventlet.sleep(0.1)
    sock = eventlet.connect(tuple(addr))
    buf = array.array('B', b' ')
    res = sock.recv_into(buf, 1)
    assert isinstance(res, int)
websocket_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def handle(ws):
    if ws.path == '/echo':
        while True:
            m = ws.wait()
            if m is None:
                break
            ws.send(m)
    elif ws.path == '/range':
        for i in range(10):
            ws.send("msg %d" % i)
            eventlet.sleep(0.01)
    elif ws.path == '/error':
        # some random socket error that we shouldn't normally get
        raise socket.error(errno.ENOTSOCK)
    else:
        ws.close()
websocket_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_sending_messages_to_websocket_75(self):
        connect = [
            "GET /echo HTTP/1.1",
            "Upgrade: WebSocket",
            "Connection: Upgrade",
            "Host: %s:%s" % self.server_addr,
            "Origin: http://%s:%s" % self.server_addr,
            "WebSocket-Protocol: ws",
        ]
        sock = eventlet.connect(self.server_addr)

        sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n'))
        sock.recv(1024)
        sock.sendall(b'\x00hello\xFF')
        result = sock.recv(1024)
        self.assertEqual(result, b'\x00hello\xff')
        sock.sendall(b'\x00start')
        eventlet.sleep(0.001)
        sock.sendall(b' end\xff')
        result = sock.recv(1024)
        self.assertEqual(result, b'\x00start end\xff')
        sock.shutdown(socket.SHUT_RDWR)
        sock.close()
        eventlet.sleep(0.01)
event_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _test_multiple_waiters(self, exception):
        evt = event.Event()
        results = []

        def wait_on_event(i_am_done):
            evt.wait()
            results.append(True)
            i_am_done.send()
            if exception:
                raise Exception()

        waiters = []
        count = 5
        for i in range(count):
            waiters.append(event.Event())
            eventlet.spawn_n(wait_on_event, waiters[-1])
        eventlet.sleep()  # allow spawns to start executing
        evt.send()

        for w in waiters:
            w.wait()

        self.assertEqual(len(results), count)
semaphore_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_semaphore_contention():
    g_mutex = eventlet.Semaphore()
    counts = [0, 0]

    def worker(no):
        while min(counts) < 200:
            with g_mutex:
                counts[no - 1] += 1
                eventlet.sleep(0.001)

    t1 = eventlet.spawn(worker, no=1)
    t2 = eventlet.spawn(worker, no=2)
    eventlet.sleep(0.5)
    t1.kill()
    t2.kill()

    assert abs(counts[0] - counts[1]) < int(min(counts) * 0.1), counts
wsgi_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def chunked_fail_app(environ, start_response):
    """http://rhodesmill.org/brandon/2013/chunked-wsgi/
    """
    headers = [('Content-Type', 'text/plain')]
    start_response('200 OK', headers)

    # We start streaming data just fine.
    yield b"The dwarves of yore made mighty spells,"
    yield b"While hammers fell like ringing bells"

    # Then the back-end fails!
    try:
        1 / 0
    except Exception:
        start_response('500 Error', headers, sys.exc_info())
        return

    # So rest of the response data is not available.
    yield b"In places deep, where dark things sleep,"
    yield b"In hollow halls beneath the fells."
wsgi_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_025_accept_errors(self):
        debug.hub_exceptions(True)
        listener = greensocket.socket()
        listener.bind(('localhost', 0))
        # NOT calling listen, to trigger the error
        with capture_stderr() as log:
            self.spawn_server(sock=listener)
            eventlet.sleep(0)  # need to enter server loop
            try:
                eventlet.connect(self.server_addr)
                self.fail("Didn't expect to connect")
            except socket.error as exc:
                self.assertEqual(support.get_errno(exc), errno.ECONNREFUSED)

        log_content = log.getvalue()
        assert 'Invalid argument' in log_content, log_content
        debug.hub_exceptions(False)
wsgi_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_ipv6(self):
        try:
            sock = eventlet.listen(('::1', 0), family=socket.AF_INET6)
        except (socket.gaierror, socket.error):  # probably no ipv6
            return
        log = six.StringIO()
        # first thing the server does is try to log the IP it's bound to

        def run_server():
            try:
                wsgi.server(sock=sock, log=log, site=Site())
            except ValueError:
                log.write(b'broken')

        self.spawn_thread(run_server)

        logval = log.getvalue()
        while not logval:
            eventlet.sleep(0.0)
            logval = log.getvalue()
        if 'broked' in logval:
            self.fail('WSGI server raised exception with ipv6 socket')
api_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_001_trampoline_timeout(self):
        server_sock = eventlet.listen(('127.0.0.1', 0))
        bound_port = server_sock.getsockname()[1]

        def server(sock):
            client, addr = sock.accept()
            eventlet.sleep(0.1)
        server_evt = eventlet.spawn(server, server_sock)
        eventlet.sleep(0)
        try:
            desc = eventlet.connect(('127.0.0.1', bound_port))
            hubs.trampoline(desc, read=True, write=False, timeout=0.001)
        except eventlet.Timeout:
            pass  # test passed
        else:
            assert False, "Didn't timeout"

        server_evt.wait()
        check_hub()
queue_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_max_size(self):
        q = eventlet.Queue(2)
        results = []

        def putter(q):
            q.put('a')
            results.append('a')
            q.put('b')
            results.append('b')
            q.put('c')
            results.append('c')

        gt = eventlet.spawn(putter, q)
        eventlet.sleep(0)
        self.assertEqual(results, ['a', 'b'])
        self.assertEqual(q.get(), 'a')
        eventlet.sleep(0)
        self.assertEqual(results, ['a', 'b', 'c'])
        self.assertEqual(q.get(), 'b')
        self.assertEqual(q.get(), 'c')
        gt.wait()
queue_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_zero_max_size(self):
        q = eventlet.Queue(0)

        def sender(evt, q):
            q.put('hi')
            evt.send('done')

        def receiver(q):
            x = q.get()
            return x

        evt = event.Event()
        gt = eventlet.spawn(sender, evt, q)
        eventlet.sleep(0)
        assert not evt.ready()
        gt2 = eventlet.spawn(receiver, q)
        self.assertEqual(gt2.wait(), 'hi')
        self.assertEqual(evt.wait(), 'done')
        gt.wait()
queue_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_multiple_waiters(self):
        # tests that multiple waiters get their results back
        q = eventlet.Queue()

        sendings = ['1', '2', '3', '4']
        gts = [eventlet.spawn(q.get) for x in sendings]

        eventlet.sleep(0.01)  # get 'em all waiting

        q.put(sendings[0])
        q.put(sendings[1])
        q.put(sendings[2])
        q.put(sendings[3])
        results = set()
        for i, gt in enumerate(gts):
            results.add(gt.wait())
        self.assertEqual(results, set(sendings))
queue_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_channel_waiters(self):
        c = eventlet.Queue(0)
        w1 = eventlet.spawn(c.get)
        w2 = eventlet.spawn(c.get)
        w3 = eventlet.spawn(c.get)
        eventlet.sleep(0)
        self.assertEqual(c.getting(), 3)
        s1 = eventlet.spawn(c.put, 1)
        s2 = eventlet.spawn(c.put, 2)
        s3 = eventlet.spawn(c.put, 3)

        s1.wait()
        s2.wait()
        s3.wait()
        self.assertEqual(c.getting(), 0)
        # NOTE: we don't guarantee that waiters are served in order
        results = sorted([w1.wait(), w2.wait(), w3.wait()])
        self.assertEqual(results, [1, 2, 3])
queue_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_get_nowait_unlock(self):
        hub = hubs.get_hub()
        result = []
        q = eventlet.Queue(0)
        p = eventlet.spawn(q.put, 5)
        assert q.empty(), q
        assert q.full(), q
        eventlet.sleep(0)
        assert q.empty(), q
        assert q.full(), q
        hub.schedule_call_global(0, store_result, result, q.get_nowait)
        eventlet.sleep(0)
        assert q.empty(), q
        assert q.full(), q
        assert result == [5], result
        # TODO add ready to greenthread
        # assert p.ready(), p
        assert p.dead, p
        assert q.empty(), q

    # put_nowait must work from the mainloop
queue_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_put_nowait_unlock(self):
        hub = hubs.get_hub()
        result = []
        q = eventlet.Queue(0)
        eventlet.spawn(q.get)
        assert q.empty(), q
        assert q.full(), q
        eventlet.sleep(0)
        assert q.empty(), q
        assert q.full(), q
        hub.schedule_call_global(0, store_result, result, q.put_nowait, 10)
        # TODO ready method on greenthread
        # assert not p.ready(), p
        eventlet.sleep(0)
        assert result == [None], result
        # TODO ready method
        # assert p.ready(), p
        assert q.full(), q
        assert q.empty(), q
websocket_new_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle(ws):
    if ws.path == '/echo':
        while True:
            m = ws.wait()
            if m is None:
                break
            ws.send(m)
    elif ws.path == '/range':
        for i in range(10):
            ws.send("msg %d" % i)
            eventlet.sleep(0.01)
    elif ws.path == '/error':
        # some random socket error that we shouldn't normally get
        raise socket.error(errno.ENOTSOCK)
    else:
        ws.close()
websocket_new_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_send_recv_13(self):
        connect = [
            "GET /echo HTTP/1.1",
            "Upgrade: websocket",
            "Connection: Upgrade",
            "Host: %s:%s" % self.server_addr,
            "Origin: http://%s:%s" % self.server_addr,
            "Sec-WebSocket-Version: 13",
            "Sec-WebSocket-Key: d9MXuOzlVQ0h+qRllvSCIg==",
        ]
        sock = eventlet.connect(self.server_addr)
        sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n'))
        sock.recv(1024)
        ws = websocket.RFC6455WebSocket(sock, {}, client=True)
        ws.send(b'hello')
        assert ws.wait() == b'hello'
        ws.send(b'hello world!\x01')
        ws.send(u'hello world again!')
        assert ws.wait() == b'hello world!\x01'
        assert ws.wait() == u'hello world again!'
        ws.close()
        eventlet.sleep(0.01)


问题


面经


文章

微信
公众号

扫码关注公众号