def test_flow():
from io import BytesIO
# info + flush
inbound = b'\x01\x02\x1a\x00\x01\x02\x12\x00'
data = BytesIO(inbound)
req_type,_ = read_message(data, types.Request)
assert 'info' == req_type.WhichOneof("value")
req_type2, _ = read_message(data, types.Request)
assert 'flush' == req_type2.WhichOneof("value")
assert data.read() == b''
data.close()
data2 = BytesIO(b'')
req_type, fail = read_message(data2, types.Request)
assert fail == 0
assert req_type == None
data3 = BytesIO(b'\x01')
req_type, fail = read_message(data3, types.Request)
assert fail == 0
评论列表
文章目录