def receive_data(self, channel, oc):
# push data from a socket into an OutputConnector (oc)
self.last_timestamp = datetime.datetime.now()
self.fetch_count += 1
# wire format is just: [size, buffer...]
sock = self._chan_to_rsocket[channel]
# TODO receive 4 or 8 bytes depending on sizeof(size_t)
msg = sock.recv(8)
# reinterpret as int (size_t)
msg_size = struct.unpack('n', msg)[0]
buf = sock.recv(msg_size, socket.MSG_WAITALL)
if len(buf) != msg_size:
logger.error("Channel %s socket msg shorter than expected" % channel.channel)
logger.error("Expected %s bytes, received %s bytes" % (msg_size, len(buf)))
# assume that we cannot recover, so stop listening.
loop = asyncio.get_event_loop()
loop.remove_reader(sock)
return
data = np.frombuffer(buf, dtype=np.float32)
asyncio.ensure_future(oc.push(data))
评论列表
文章目录