python类spawn()的实例源码

utils.py 文件源码 项目:masakari 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def spawn(func, *args, **kwargs):
    """Passthrough method for eventlet.spawn.

    This utility exists so that it can be stubbed for testing without
    interfering with the service spawns.

    It will also grab the context from the threadlocal store and add it to
    the store on the new thread.  This allows for continuity in logging the
    context when using this method to spawn a new thread.
    """
    _context = common_context.get_current()

    @functools.wraps(func)
    def context_wrapper(*args, **kwargs):
        # NOTE: If update_store is not called after spawn it won't be
        # available for the logger to pull from threadlocal storage.
        if _context is not None:
            _context.update_store()
        return func(*args, **kwargs)

    return eventlet.spawn(context_wrapper, *args, **kwargs)
utils.py 文件源码 项目:masakari 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def spawn_n(func, *args, **kwargs):
    """Passthrough method for eventlet.spawn_n.

    This utility exists so that it can be stubbed for testing without
    interfering with the service spawns.

    It will also grab the context from the threadlocal store and add it to
    the store on the new thread.  This allows for continuity in logging the
    context when using this method to spawn a new thread.
    """
    _context = common_context.get_current()

    @functools.wraps(func)
    def context_wrapper(*args, **kwargs):
        # NOTE: If update_store is not called after spawn_n it won't be
        # available for the logger to pull from threadlocal storage.
        if _context is not None:
            _context.update_store()
        func(*args, **kwargs)

    eventlet.spawn_n(context_wrapper, *args, **kwargs)
test_eventlet.py 文件源码 项目:deb-python-aioeventlet 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def coro_wrap_greenthread():
        result = []

        gt = eventlet.spawn(eventlet_slow_append, result, 1, 0.020)
        value = yield From(aioeventlet.wrap_greenthread(gt))
        result.append(value)

        gt = eventlet.spawn(eventlet_slow_append, result, 2, 0.010)
        value = yield From(aioeventlet.wrap_greenthread(gt))
        result.append(value)

        gt = eventlet.spawn(eventlet_slow_error)
        try:
            yield From(aioeventlet.wrap_greenthread(gt))
        except ValueError as exc:
            result.append(str(exc))

        result.append(4)
        raise Return(result)
test_eventlet.py 文件源码 项目:deb-python-aioeventlet 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_soon_spawn(self):
        result = []

        def func1():
            result.append("spawn")

        def func2():
            result.append("spawn_after")
            self.loop.stop()

        def schedule_greenthread():
            eventlet.spawn(func1)
            eventlet.spawn_after(0.010, func2)

        self.loop.call_soon(schedule_greenthread)
        self.loop.run_forever()
        self.assertEqual(result, ["spawn", "spawn_after"])
test_eventlet.py 文件源码 项目:deb-python-aioeventlet 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_yield_future_not_running(self):
        result = []

        def func(event, fut):
            event.send('link')
            value = aioeventlet.yield_future(fut)
            result.append(value)
            self.loop.stop()

        event = eventlet.event.Event()
        fut = asyncio.Future(loop=self.loop)
        eventlet.spawn(func, event, fut)
        event.wait()

        self.loop.call_soon(fut.set_result, 21)
        self.loop.run_forever()
        self.assertEqual(result, [21])
test_eventlet.py 文件源码 项目:deb-python-aioeventlet 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_yield_future_invalid_type(self):
        def func(obj):
            return aioeventlet.yield_future(obj)

        @asyncio.coroutine
        def coro_func():
            print("do something")

        def regular_func():
            return 3

        for obj in (coro_func, regular_func):
            gt = eventlet.spawn(func, coro_func)
            # ignore logged traceback
            with tests.mock.patch('traceback.print_exception') as m_print:
                self.assertRaises(TypeError, gt.wait)
debug_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_hub_exceptions(self):
        debug.hub_exceptions(True)
        server = eventlet.listen(('0.0.0.0', 0))
        client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
        client_2, addr = server.accept()

        def hurl(s):
            s.recv(1)
            {}[1]  # keyerror

        with capture_stderr() as fake:
            gt = eventlet.spawn(hurl, client_2)
            eventlet.sleep(0)
            client.send(b' ')
            eventlet.sleep(0)
            # allow the "hurl" greenlet to trigger the KeyError
            # not sure why the extra context switch is needed
            eventlet.sleep(0)
        self.assertRaises(KeyError, gt.wait)
        debug.hub_exceptions(False)
        # look for the KeyError exception in the traceback
        assert 'KeyError: 1' in fake.getvalue(), "Traceback not in:\n" + fake.getvalue()
hub_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_exceptionleaks(self):
        # tests expected behaviour with all versions of greenlet
        def test_gt(sem):
            try:
                raise KeyError()
            except KeyError:
                sem.release()
                hubs.get_hub().switch()

        # semaphores for controlling execution order
        sem = eventlet.Semaphore()
        sem.acquire()
        g = eventlet.spawn(test_gt, sem)
        try:
            sem.acquire()
            assert sys.exc_info()[0] is None
        finally:
            g.kill()
hub_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_kill(self):
        """ Checks that killing a process after the hub runloop dies does
        not immediately return to hub greenlet's parent and schedule a
        redundant timer. """
        hub = hubs.get_hub()

        def dummyproc():
            hub.switch()

        g = eventlet.spawn(dummyproc)
        eventlet.sleep(0)  # let dummyproc run
        assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
        self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
                          KeyboardInterrupt())

        # kill dummyproc, this schedules a timer to return execution to
        # this greenlet before throwing an exception in dummyproc.
        # it is from this timer that execution should be returned to this
        # greenlet, and not by propogating of the terminating greenlet.
        g.kill()
        with eventlet.Timeout(0.5, self.CustomException()):
            # we now switch to the hub, there should be no existing timers
            # that switch back to this greenlet and so this hub.switch()
            # call should block indefinitely.
            self.assertRaises(self.CustomException, hub.switch)
hub_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_parent(self):
        """ Checks that a terminating greenthread whose parent
        was a previous, now-defunct hub greenlet returns execution to
        the hub runloop and not the hub greenlet's parent. """
        hub = hubs.get_hub()

        def dummyproc():
            pass

        g = eventlet.spawn(dummyproc)
        assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
        self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
                          KeyboardInterrupt())

        assert not g.dead  # check dummyproc hasn't completed
        with eventlet.Timeout(0.5, self.CustomException()):
            # we now switch to the hub which will allow
            # completion of dummyproc.
            # this should return execution back to the runloop and not
            # this greenlet so that hub.switch() would block indefinitely.
            self.assertRaises(self.CustomException, hub.switch)
        assert g.dead  # sanity check that dummyproc has completed
greenio_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_raised_multiple_readers(self):
        debug.hub_prevent_multiple_readers(True)

        def handle(sock, addr):
            sock.recv(1)
            sock.sendall(b"a")
            raise eventlet.StopServe()

        listener = eventlet.listen(('127.0.0.1', 0))
        eventlet.spawn(eventlet.serve, listener, handle)

        def reader(s):
            s.recv(1)

        s = eventlet.connect(('127.0.0.1', listener.getsockname()[1]))
        a = eventlet.spawn(reader, s)
        eventlet.sleep(0)
        self.assertRaises(RuntimeError, s.recv, 1)
        s.sendall(b'b')
        a.wait()
greenio_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_zero_timeout_and_back(self):
        listen = eventlet.listen(('', 0))
        # Keep reference to server side of socket
        server = eventlet.spawn(listen.accept)
        client = eventlet.connect(listen.getsockname())

        client.settimeout(0.05)
        # Now must raise socket.timeout
        self.assertRaises(socket.timeout, client.recv, 1)

        client.settimeout(0)
        # Now must raise socket.error with EAGAIN
        try:
            client.recv(1)
            assert False
        except socket.error as e:
            assert get_errno(e) == errno.EAGAIN

        client.settimeout(0.05)
        # Now socket.timeout again
        self.assertRaises(socket.timeout, client.recv, 1)
        server.wait()
greenio_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_pipe_writes_large_messages(self):
        r, w = os.pipe()

        r = greenio.GreenPipe(r, 'rb')
        w = greenio.GreenPipe(w, 'wb')

        large_message = b"".join([1024 * six.int2byte(i) for i in range(65)])

        def writer():
            w.write(large_message)
            w.close()

        gt = eventlet.spawn(writer)

        for i in range(65):
            buf = r.read(1024)
            expected = 1024 * six.int2byte(i)
            self.assertEqual(
                buf, expected,
                "expected=%r..%r, found=%r..%r iter=%d"
                % (expected[:4], expected[-4:], buf[:4], buf[-4:], i))
        gt.wait()
greenio_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_socket_file_read_non_int():
    listen_socket = eventlet.listen(('localhost', 0))

    def server():
        conn, _ = listen_socket.accept()
        conn.recv(1)
        conn.sendall(b'response')
        conn.close()

    eventlet.spawn(server)
    sock = eventlet.connect(listen_socket.getsockname())

    fd = sock.makefile('rwb')
    fd.write(b'?')
    fd.flush()
    with eventlet.Timeout(1):
        try:
            fd.read("This shouldn't work")
            assert False
        except TypeError:
            pass
pools_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_putting_to_queue(self):
        timer = eventlet.Timeout(0.1)
        try:
            size = 2
            self.pool = IntPool(min_size=0, max_size=size)
            queue = Queue()
            results = []

            def just_put(pool_item, index):
                self.pool.put(pool_item)
                queue.put(index)
            for index in six.moves.range(size + 1):
                pool_item = self.pool.get()
                eventlet.spawn(just_put, pool_item, index)

            for _ in six.moves.range(size + 1):
                x = queue.get()
                results.append(x)
            self.assertEqual(sorted(results), list(six.moves.range(size + 1)))
        finally:
            timer.cancel()
thread_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_calls_init(self):
        init_args = []

        class Init(corolocal.local):
            def __init__(self, *args):
                init_args.append((args, eventlet.getcurrent()))

        my_local = Init(1, 2, 3)
        self.assertEqual(init_args[0][0], (1, 2, 3))
        self.assertEqual(init_args[0][1], eventlet.getcurrent())

        def do_something():
            my_local.foo = 'bar'
            self.assertEqual(len(init_args), 2, init_args)
            self.assertEqual(init_args[1][0], (1, 2, 3))
            self.assertEqual(init_args[1][1], eventlet.getcurrent())

        eventlet.spawn(do_something).wait()
thread_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_calling_methods(self):
        class Caller(corolocal.local):
            def callme(self):
                return self.foo

        my_local = Caller()
        my_local.foo = "foo1"
        self.assertEqual("foo1", my_local.callme())

        def do_something():
            my_local.foo = "foo2"
            self.assertEqual("foo2", my_local.callme())

        eventlet.spawn(do_something).wait()

        my_local.foo = "foo3"
        self.assertEqual("foo3", my_local.callme())
green_select_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_select_mark_file_as_reopened():
    # https://github.com/eventlet/eventlet/pull/294
    # Fix API inconsistency in select and Hub.
    # mark_as_closed takes one argument, but called without arguments.
    # on_error takes file descriptor, but called with an exception object.
    s = original_socket.socket()
    s.setblocking(0)
    s.bind(('127.0.0.1', 0))
    s.listen(5)

    gt = eventlet.spawn(select.select, [s], [s], [s])
    eventlet.sleep(0.01)

    with eventlet.Timeout(0.5) as t:
        with tests.assert_raises(hubs.IOClosed):
            hubs.get_hub().mark_as_reopened(s.fileno())
            gt.wait()
        t.cancel()
ssl_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_ssl_close(self):
        def serve(listener):
            sock, addr = listener.accept()
            sock.recv(8192)
            try:
                self.assertEqual(b'', sock.recv(8192))
            except greenio.SSL.ZeroReturnError:
                pass

        sock = listen_ssl_socket()

        server_coro = eventlet.spawn(serve, sock)

        raw_client = eventlet.connect(sock.getsockname())
        client = ssl.wrap_socket(raw_client)
        client.sendall(b'X')
        greenio.shutdown_safe(client)
        client.close()
        server_coro.wait()
ssl_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_ssl_unwrap(self):
        def serve():
            sock, addr = listener.accept()
            self.assertEqual(sock.recv(6), b'before')
            sock_ssl = ssl.wrap_socket(sock, tests.private_key_file, tests.certificate_file,
                                       server_side=True)
            sock_ssl.do_handshake()
            self.assertEqual(sock_ssl.recv(6), b'during')
            sock2 = sock_ssl.unwrap()
            self.assertEqual(sock2.recv(5), b'after')
            sock2.close()

        listener = eventlet.listen(('127.0.0.1', 0))
        server_coro = eventlet.spawn(serve)
        client = eventlet.connect(listener.getsockname())
        client.sendall(b'before')
        client_ssl = ssl.wrap_socket(client)
        client_ssl.do_handshake()
        client_ssl.sendall(b'during')
        client2 = client_ssl.unwrap()
        client2.sendall(b'after')
        server_coro.wait()


问题


面经


文章

微信
公众号

扫码关注公众号