def _create(cls, email_message, commit=True):
"""A factory method creates model from a base django EmailMultiAlternatives instance."""
assert email_message.recipients()
instance = cls()
instance.encoding = email_message.encoding or settings.DEFAULT_CHARSET
instance.from_email = email_message.from_email
instance.to = EMAIL_ADDRESS_SEPARATOR.join(email_message.to)
instance.cc = EMAIL_ADDRESS_SEPARATOR.join(email_message.cc)
instance.bcc = EMAIL_ADDRESS_SEPARATOR.join(email_message.bcc)
instance.reply_to = EMAIL_ADDRESS_SEPARATOR.join(email_message.reply_to)
instance.subject = email_message.subject
instance.body_text = email_message.body
for content, mime_type in email_message.alternatives:
if mime_type != HTML_MIME_TYPE:
msg = "Only '{}' mime type is supported, can not send a message with {} alternative"
msg.format(HTML_MIME_TYPE, mime_type)
raise NotImplementedError(msg)
instance.body_html = content
if commit:
instance.save()
return instance
python类EmailMultiAlternatives()的实例源码
def _send(self, **kwargs):
"""
Actually send email in background
:return:
"""
context = self._build_context(**kwargs)
plain, html = self._render(context)
msg = EmailMultiAlternatives(
from_email=self.from_email,
to=self.to,
body=plain,
subject=self.format_subject()
)
msg.attach_alternative(html, 'text/html')
msg.send()
def send(self, connection=None, fail_silently=False):
if connection is None:
from django.core.mail import get_connection
connection = get_connection(
backend=settings.EMAIL_QUEUE_EMAIL_BACKEND,
fail_silently=fail_silently
)
to = self.to.split(EMAIL_ADDRESS_SEPARATOR) if self.to and self.to.strip() else None
cc = self.cc.split(EMAIL_ADDRESS_SEPARATOR) if self.cc and self.cc.strip() else None
bcc = self.bcc.split(EMAIL_ADDRESS_SEPARATOR) if self.bcc and self.bcc.strip() else None
mail = EmailMultiAlternatives(
subject=self.subject,
body=self.body_text,
from_email=self.from_email,
to=to,
cc=cc,
bcc=bcc,
connection=connection
)
if self.body_html:
mail.attach_alternative(self.body_html, HTML_MIME_TYPE)
mail.send(fail_silently=fail_silently)
self.posted = timezone.now()
self.status = QueuedEmailMessageStatus.posted
self.save()
logging.debug("Message posted: %s", self)
return mail
def bulk_email_view(self, request, object_id):
season = get_object_or_404(Season, pk=object_id)
if not request.user.has_perm('tournament.bulk_email', season.league):
raise PermissionDenied
if request.method == 'POST':
form = forms.BulkEmailForm(season.seasonplayer_set.count(), request.POST)
if form.is_valid() and form.cleaned_data['confirm_send']:
season_players = season.seasonplayer_set.all()
email_addresses = {sp.player.email for sp in season_players if sp.player.email != ''}
email_messages = []
for addr in email_addresses:
message = EmailMultiAlternatives(
form.cleaned_data['subject'],
form.cleaned_data['text_content'],
settings.DEFAULT_FROM_EMAIL,
[addr]
)
message.attach_alternative(form.cleaned_data['html_content'], 'text/html')
email_messages.append(message)
conn = mail.get_connection()
conn.open()
conn.send_messages(email_messages)
conn.close()
self.message_user(request, 'Emails sent to %d players.' % len(season_players), messages.INFO)
return redirect('admin:tournament_season_changelist')
else:
form = forms.BulkEmailForm(season.seasonplayer_set.count())
context = {
'has_permission': True,
'opts': self.model._meta,
'site_url': '/',
'original': season,
'title': 'Bulk email',
'form': form
}
return render(request, 'tournament/admin/bulk_email.html', context)
def dispatch(self, object, *args, **kwargs):
self.object = object
self.kwargs = kwargs
receivers = self.get_receivers()
context = self.get_context()
context.update(kwargs)
attachments = self.get_attachments()
template = self.template_name
mails = []
for receiver in receivers:
context['receiver'] = receiver
(subject, text, html) = self.render(template, context)
context.pop('receiver')
if hasattr(receiver, 'email'):
to_address = receiver.email
else:
to_address = receiver
subject_clean = re.sub(r'[\r\n]', '', subject).strip()
mail = EmailMultiAlternatives(
subject=subject_clean,
body=text,
from_email=settings.DEFAULT_FROM_EMAIL,
to=[to_address],
reply_to=self.get_reply_to(),
)
if len(attachments) > 0:
mail.mixed_subtype = 'related'
for attachment in attachments:
mail.attach(attachment)
mail.attach_alternative(html, 'text/html')
mail.send()
mails.append(mail)
return mails
def send(self, object_or_list):
"""
Given an object_or_list creates a EmailMultiAlternatives and
send it to the respective destination.
If Attachments exist, also adds them to the messsage.
"""
html_as_string = self.render()
text_part = strip_tags(html_as_string)
to = self.get_mail_to(object_or_list)
if to:
msg = EmailMultiAlternatives(
self.get_subject(),
text_part,
self.get_mail_from(),
to=to,
cc=self.get_mail_cc(),
bcc=self.get_mail_bcc(),
reply_to=self.get_mail_reply_to())
# Attach the html version of email
msg.attach_alternative(html_as_string, "text/html")
# If there is attachments attach them to the email
for attachment in self.get_attachments():
msg.attach(*attachment.get_triple())
return get_connection().send_messages([msg])
return 0
def render_mail(subject, template_prefix, to_emails, context, bcc=None, cc=None, **kwargs):
from_email = DEFAULT_FROM_EMAIL
if not re.match(r'^\[\s*Tunga', subject):
subject = '{} {}'.format(EMAIL_SUBJECT_PREFIX, subject)
bodies = {}
for ext in ['html', 'txt']:
try:
template_name = '{0}.{1}'.format(template_prefix, ext)
bodies[ext] = render_to_string(template_name,
context).strip()
except TemplateDoesNotExist:
if ext == 'txt':
if 'html' in bodies:
# Compose text body from html
bodies[ext] = convert_to_text(bodies['html'])
else:
# We need at least one body
raise
if bodies:
msg = EmailMultiAlternatives(subject, bodies['txt'], from_email, to_emails, bcc=bcc, cc=cc)
if 'html' in bodies:
try:
html_body = render_to_string(
'tunga/email/base.html', dict(email_content=bodies['html'])
).strip()
except TemplateDoesNotExist:
html_body = bodies['html']
msg.attach_alternative(premailer.transform(html_body), 'text/html')
else:
raise TemplateDoesNotExist
return msg
def copy_message(msg):
return EmailMultiAlternatives(
to=msg.to,
cc=msg.cc,
bcc=msg.bcc,
reply_to=msg.reply_to,
from_email=msg.from_email,
subject=msg.subject,
body=msg.body,
alternatives=getattr(msg, 'alternatives', []),
attachments=msg.attachments,
headers=msg.extra_headers,
connection=msg.connection)
def bulk_email_view(self, request, object_ids):
season_players = SeasonPlayer.objects.filter(id__in=[int(i) for i in object_ids.split(',')]).select_related('season', 'player').nocache()
seasons = {sp.season for sp in season_players}
for season in seasons:
if not request.user.has_perm('tournament.bulk_email', season.league):
raise PermissionDenied
if request.method == 'POST':
form = forms.BulkEmailForm(len(season_players), request.POST)
if form.is_valid() and form.cleaned_data['confirm_send']:
email_addresses = {sp.player.email for sp in season_players if sp.player.email != ''}
email_messages = []
for addr in email_addresses:
message = EmailMultiAlternatives(
form.cleaned_data['subject'],
form.cleaned_data['text_content'],
settings.DEFAULT_FROM_EMAIL,
[addr]
)
message.attach_alternative(form.cleaned_data['html_content'], 'text/html')
email_messages.append(message)
conn = mail.get_connection()
conn.open()
conn.send_messages(email_messages)
conn.close()
self.message_user(request, 'Emails sent to %d players.' % len(season_players), messages.INFO)
return redirect('admin:tournament_seasonplayer_changelist')
else:
form = forms.BulkEmailForm(len(season_players))
context = {
'has_permission': True,
'opts': self.model._meta,
'site_url': '/',
'original': 'Bulk email',
'title': 'Bulk email',
'form': form
}
return render(request, 'tournament/admin/bulk_email.html', context)
#-------------------------------------------------------------------------------