def _iter_stream(self):
"""Stream parser.
:returns: Next item in the stream (may or may not be 'delimited').
:raises: TwitterConnectionError, StopIteration
"""
while True:
item = None
buf = bytearray()
stall_timer = None
try:
while True:
# read bytes until item boundary reached
buf += self.stream.read(1)
if not buf:
# check for stall (i.e. no data for 90 seconds)
if not stall_timer:
stall_timer = time.time()
elif time.time() - stall_timer > STREAMING_TIMEOUT:
raise TwitterConnectionError('Twitter stream stalled')
elif stall_timer:
stall_timer = None
if buf[-2:] == b'\r\n':
item = buf[0:-2]
if item.isdigit():
# use byte size to read next item
nbytes = int(item)
item = None
item = self.stream.read(nbytes)
break
yield item
except (ConnectionError, ProtocolError, ReadTimeout, ReadTimeoutError,
SSLError, ssl.SSLError, socket.error) as e:
raise TwitterConnectionError(e)
except AttributeError:
# inform iterator to exit when client closes connection
raise StopIteration
评论列表
文章目录