def test_BufferedStream(self, benchmark, server):
# this is like BufferedSocket, but with the overhead that _readchunk
# is written in Python instead of Cython; this simulates what happens
# if an user of capnpy wants to wrap its own stream reader
class MyStream(BufferedStream):
def __init__(self, host, port):
super(MyStream, self).__init__()
self.sock = socket.create_connection((host, port))
def _readchunk(self):
return self.sock.recv(8192)
def open_connection():
host, port = server.host, server.port
return MyStream(host, port)
self.do_benchmark(benchmark, open_connection)
评论列表
文章目录