def call_rpc(self, rpc_message: RpcMessage):
stream = '{}:stream'.format(rpc_message.api_name)
logger.debug(
LBullets(
L("Enqueuing message {} in Redis stream {}", Bold(rpc_message), Bold(stream)),
items=rpc_message.to_dict()
)
)
pool = await self.get_redis_pool()
with await pool as redis:
start_time = time.time()
# TODO: MAXLEN
await redis.xadd(stream=stream, fields=rpc_message.to_dict())
logger.info(L(
"Enqueued message {} in Redis in {} stream {}",
Bold(rpc_message), human_time(time.time() - start_time), Bold(stream)
))
评论列表
文章目录