def call(self, request):
m = await self.request_data(EmailSendModel)
async with await self.sender.get_redis_conn() as redis:
group_key = f'group:{m.uid}'
v = await redis.incr(group_key)
if v > 1:
raise HTTPConflict(text=f'Send group with id "{m.uid}" already exists\n')
recipients_key = f'recipients:{m.uid}'
data = m.values(exclude={'recipients', 'from_address'})
data.update(
from_email=m.from_address.email,
from_name=m.from_address.name,
)
pipe = redis.pipeline()
pipe.lpush(recipients_key, *[msgpack.packb(r.values(), use_bin_type=True) for r in m.recipients])
pipe.expire(group_key, 86400)
pipe.expire(recipients_key, 86400)
await pipe.execute()
await self.sender.send_emails(recipients_key, **data)
logger.info('%s sending %d emails', m.company_code, len(m.recipients))
return Response(text='201 job enqueued\n', status=201)
评论列表
文章目录