test_base.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:rteeg 作者: kaczmarj 项目源码 文件源码
def test_BaseStream_connect():
    event = threading.Event()
    def dummy_func():
        while not event.is_set():
            time.sleep(1.)
    base = BaseStream()
    n_threads_0 = threading.active_count()
    base.connect(dummy_func, "TEST")
    n_threads_1 = threading.active_count()
    # Check that a thread was started.
    assert n_threads_1 - n_threads_0 == 1, "Thread not started."

    # Check that the thread was created and named properly.
    name = [t.getName() for t in threading.enumerate() if t.getName() == "TEST"]
    assert name[0] == "TEST", "Thread not named properly."

    # Check that connect method only allows one connection.
    with pytest.raises(RuntimeError):
        base.connect(dummy_func, "SECOND_TEST")

    # Clean up.
    event.set()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号