def _send_code(self, code, wire_dir=None):
if not self.connected:
raise RuntimeError('HBI wire not connected')
# convert wire_dir as early as possible, will save conversions in case the code is of complex structure
if wire_dir is None:
wire_dir = b''
elif not isinstance(wire_dir, (bytes, bytearray)):
wire_dir = str(wire_dir).encode('utf-8')
# use a generator function to pull code from hierarchy
def pull_code(container):
for mc in container:
if inspect.isgenerator(mc):
yield from pull_code(mc)
else:
yield mc
if inspect.isgenerator(code):
for c in pull_code(code):
await self._send_text(c, wire_dir)
else:
await self._send_text(code, wire_dir)
评论列表
文章目录