test_gevent.py 文件源码

python
阅读 34 收藏 0 点赞 0 评论 0

项目:core-python 作者: yidao620c 项目源码 文件源码
def test_queue(self):
        """???????????Queue"""
        task_queue = Queue()

        def worker(name):
            while not task_queue.empty():
                task = task_queue.get()
                _log.info('Worker %s got task %s' % (name, task))
                gevent.sleep(0)

            _log.info('Quitting time!')

        def boss():
            for i in xrange(1,25):
                task_queue.put_nowait(i)

        gevent.spawn(boss).join()

        gevent.joinall([
            gevent.spawn(worker, 'steve'),
            gevent.spawn(worker, 'john'),
            gevent.spawn(worker, 'nancy'),
        ])
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号