def test_ThreadLineReader():
def sync_write(data):
reader.clear_processed()
os.write(wp, data)
reader.wait_processed()
rp, wp = os.pipe()
reader = ThreadLineReader(rp)
reader.start()
assert reader.readline() is None
sync_write('foo\n')
assert reader.readline() is None
reader.set_next_flag()
sync_write('bar\n')
assert reader.readline() == 'bar'
reader.terminate()
评论列表
文章目录