def send_mail(subject, message, from_email, recipient_list,
fail_silently=False, auth_user=None, auth_password=None,
connection=None, html_message=None, headers=None,
cc=None, bcc=None):
"""Override django send_mail function to allow use of custom email headers.
"""
connection = connection or get_connection(username=auth_user,
password=auth_password,
fail_silently=fail_silently)
mail = EmailMultiAlternatives(subject, message,
from_email, recipient_list,
connection=connection, headers=headers,
cc=cc, bcc=bcc)
if html_message:
mail.attach_alternative(html_message, 'text/html')
return mail.send()
python类get_connection()的实例源码
def check_email_server_is_alive(app_configs=None, **kwargs):
from django.conf import settings
errors = []
if settings.POOTLE_SIGNUP_ENABLED or settings.POOTLE_CONTACT_ENABLED:
from django.core.mail import get_connection
connection = get_connection()
try:
connection.open()
except Exception:
errors.append(checks.Warning(
_("Email server is not available."),
hint=_("Review your email settings and make sure your email "
"server is working."),
id="pootle.W004",
))
else:
connection.close()
return errors
def send_mass_html_mail(datatuple, fail_silently=False, user=None, password=None,
connection=None):
"""
Given a datatuple of (subject, text_content, html_content, from_email,
recipient_list), sends each message to each recipient list. Returns the
number of emails sent.
If from_email is None, the DEFAULT_FROM_EMAIL setting is used.
If auth_user and auth_password are set, they're used to log in.
If auth_user is None, the EMAIL_HOST_USER setting is used.
If auth_password is None, the EMAIL_HOST_PASSWORD setting is used.
"""
connection = connection or get_connection(
username=user, password=password, fail_silently=fail_silently)
messages = []
for subject, text, html, from_email, recipient in datatuple:
message = EmailMultiAlternatives(subject, text, from_email, recipient)
message.attach_alternative(html, 'text/html')
messages.append(message)
return connection.send_messages(messages)
def get_mail(**kwargs):
def check():
from django.core.mail import get_connection
try:
conn = get_connection(fail_silently=False)
conn.open()
ret = "OK"
conn.close()
except Exception as e:
ret = str(e)
return ret
p = OrderedDict()
p["backend"] = settings.EMAIL_BACKEND
p["host"] = "{0}:{1}".format(settings.EMAIL_HOST, settings.EMAIL_PORT)
p["tls"] = getattr(settings, "USE_TLS", False)
p["ssl"] = getattr(settings, "USE_SSL", False)
p["status"] = check()
return p
def mail_send_task(to: str, subject: str, body: str, html: str, sender: str,
event: int=None, cc: list=None, bcc: list=None, headers: dict=None):
headers = headers or dict()
if event:
event = Event.objects.get(id=event)
sender = sender or event.settings.get('mail_from')
headers['reply-to'] = headers.get('reply-to', event.settings.get('mail_from'))
backend = event.get_mail_backend()
else:
backend = get_connection(fail_silently=False)
email = EmailMultiAlternatives(subject, body, sender, to=to, cc=cc, bcc=bcc, headers=headers)
if html is not None:
email.attach_alternative(inline_css(html), 'text/html')
try:
backend.send_messages([email])
except Exception:
logger.exception('Error sending email')
raise SendMailException('Failed to send an email to {}.'.format(to))
def send_license_deactivated_email(payment, last_4):
connection = get_connection(
username=settings.EMAIL_HOST_USER_BILLING,
password=settings.EMAIL_HOST_PASSWORD_BILLING
)
context = {
'payment': payment,
'last_4': last_4
}
recipients = payment.account.billing_admins.values_list('email', flat=True)
to_emails = [c for c in recipients]
send_mail(
subject_template_name='emails/billing/license_deactivated_subject.txt',
email_template_name='emails/billing/license_deactivated.txt',
to_email=to_emails,
bcc=[settings.BILLING_BCC, ],
context=context,
from_email=settings.BILLING_EMAIL,
html_email_template_name='emails/billing/license_deactivated.html',
connection=connection
)
def send_mail_with_bcc(subject, html_message, recipient_list, fail_silently=False):
def divide_group(lst, k):
return [lst[i:i+k] for i in range(0, len(lst), k)]
for grp in divide_group(recipient_list, 100):
try:
connection = get_connection(
username=None,
password=None,
fail_silently=fail_silently,
)
mail = EmailMultiAlternatives(subject, bcc=grp, connection=connection)
mail.attach_alternative(html_message, 'text/html')
mail.send()
except:
traceback.print_exc()
time.sleep(30)
def send_mail_with_backend(
subject, body, from_email, recipient_list, html_message=None,
fail_silently=False, auth_user=None, auth_password=None,
attachments=None, alternatives=None, connection=None, headers=None,
do_not_encrypt_this_message=False):
connection = connection or mail.get_connection(
username=auth_user, password=auth_password,
fail_silently=fail_silently,
)
message = mail.EmailMultiAlternatives(
subject, body, from_email, recipient_list, attachments=attachments,
connection=connection, headers=headers)
if html_message:
message.attach_alternative(html_message, 'text/html')
for alternative, mimetype in alternatives or []:
message.attach_alternative(alternative, mimetype)
if do_not_encrypt_this_message:
message.do_not_encrypt_this_message = True
return message.send()
def mail_send_task(to: str, subject: str, body: str, sender: str,
cc: list=None, bcc: list=None, headers: dict=None):
email = EmailMultiAlternatives(subject, body, sender, to=to, cc=cc, bcc=bcc, headers=headers)
backend = get_connection(fail_silently=False)
try:
backend.send_messages([email])
except Exception:
logger.exception('Error sending email')
raise SendMailException('Failed to send an email to {}.'.format(to))
def test_silent_exception(self):
with mail.get_connection(fail_silently=True) as connection:
send_with_connection(connection)
def test_loud_exception(self):
with mail.get_connection() as connection:
with pytest.raises(ValueError):
send_with_connection(connection)
def test_close_closed_connection():
with mail.get_connection() as connection:
connection.close()
def send(self, connection=None, fail_silently=False):
if connection is None:
from django.core.mail import get_connection
connection = get_connection(
backend=settings.EMAIL_QUEUE_EMAIL_BACKEND,
fail_silently=fail_silently
)
to = self.to.split(EMAIL_ADDRESS_SEPARATOR) if self.to and self.to.strip() else None
cc = self.cc.split(EMAIL_ADDRESS_SEPARATOR) if self.cc and self.cc.strip() else None
bcc = self.bcc.split(EMAIL_ADDRESS_SEPARATOR) if self.bcc and self.bcc.strip() else None
mail = EmailMultiAlternatives(
subject=self.subject,
body=self.body_text,
from_email=self.from_email,
to=to,
cc=cc,
bcc=bcc,
connection=connection
)
if self.body_html:
mail.attach_alternative(self.body_html, HTML_MIME_TYPE)
mail.send(fail_silently=fail_silently)
self.posted = timezone.now()
self.status = QueuedEmailMessageStatus.posted
self.save()
logging.debug("Message posted: %s", self)
return mail
def get_connection(self, fail_silently=False):
from django.core.mail import get_connection
if not self.connection:
self.connection = get_connection(fail_silently=fail_silently)
return self.connection
def send(self, fail_silently=False):
"""Sends the email message."""
if not self.recipients():
# Don't bother creating the network connection if there's nobody to
# send to.
return 0
return self.get_connection(fail_silently).send_messages([self])
def connection(self):
return get_connection(backend=self.email_backend, fail_silently=True)
def get_connection(self, fail_silently=False):
from django.core.mail import get_connection
if not self.connection:
self.connection = get_connection(fail_silently=fail_silently)
return self.connection
def send(self, fail_silently=False):
"""Sends the email message."""
if not self.recipients():
# Don't bother creating the network connection if there's nobody to
# send to.
return 0
return self.get_connection(fail_silently).send_messages([self])
def mail_connection():
mail.outbox = []
with mail.get_connection() as connection:
yield connection
def get_connection(self, fail_silently=False):
from django.core.mail import get_connection
if not self.connection:
self.connection = get_connection(fail_silently=fail_silently)
return self.connection
def send(self, fail_silently=False):
"""Sends the email message."""
if not self.recipients():
# Don't bother creating the network connection if there's nobody to
# send to.
return 0
return self.get_connection(fail_silently).send_messages([self])
def send(self, event, users, instance, paths):
from_address = settings.ASYNC_EMAIL_FROM
reply_to = {}
if hasattr(settings, 'ASYNC_EMAIL_REPLY_TO'):
reply_to = {'Reply-To': settings.ASYNC_EMAIL_REPLY_TO, 'Return-Path': settings.ASYNC_EMAIL_REPLY_TO}
messages = []
for user, templates in users.items():
if not self.enabled(event, user, paths) or not user.email:
continue
template = self._get_template(templates)
if template is None:
continue
params = self.prepare(template, instance)
e = mail.EmailMultiAlternatives(
subject=params['subject'],
body=params['description'],
from_email=from_address,
to=[user.email, ],
headers=reply_to
)
e.attach_alternative(markdown2.markdown(params['description'], extras=["link-patterns"],
link_patterns=link_registry.link_patterns(request), safe_mode=True),
'text/html')
messages.append(e)
if len(messages):
connection = mail.get_connection()
connection.send_messages(messages)
def send(self, request, queryset):
msgs = []
sent = 0
errors = 0
for reimb in queryset:
try:
reimb.send(request.user)
msgs.append(emails.create_reimbursement_email(reimb, request))
sent += 1
except ValidationError as e:
errors += 1
logging.error(e.message)
if msgs:
connection = mail.get_connection()
connection.send_messages(msgs)
if sent > 0 and errors > 0:
self.message_user(request, (
"%s reimbursements sent, %s reimbursements not sent. Did you "
"check that they were invited before and with money assigned?"
% (sent, errors)), level=messages.WARNING)
elif sent > 0:
self.message_user(request, '%s reimbursement sent' % sent,
level=messages.SUCCESS)
else:
self.message_user(request,
'Reimbursement couldn\'t be sent! Did you check '
'that app was invited before and with money '
'assigned?',
level=messages.ERROR)
def send_batch_emails(emails):
connection = mail.get_connection()
connection.send_messages(emails)
def handle(self, *args, **options):
fourdaysago = datetime.today() - timedelta(days=4)
self.stdout.write('Checking reminders...')
reminders = models.Application.objects.filter(
status_update_date__lte=fourdaysago, status=models.APP_INVITED)
self.stdout.write('Checking reminders...%s found' % reminders.count())
self.stdout.write('Sending reminders...')
msgs = []
for app in reminders:
app.last_reminder()
msgs.append(emails.create_lastreminder_email(app))
connection = mail.get_connection()
connection.send_messages(msgs)
self.stdout.write(self.style.SUCCESS(
'Sending reminders... Successfully sent %s reminders' % len(msgs)))
onedayago = datetime.today() - timedelta(days=1)
self.stdout.write('Checking expired...')
expired = models.Application.objects.filter(
status_update_date__lte=onedayago, status=models.APP_LAST_REMIDER)
self.stdout.write('Checking expired...%s found' % expired.count())
self.stdout.write('Setting expired...')
count = len([app.expire() for app in expired])
self.stdout.write(self.style.SUCCESS(
'Setting expired... Successfully expired %s applications' % count))
def get_connection(self, fail_silently=False):
from django.core.mail import get_connection
if not self.connection:
self.connection = get_connection(fail_silently=fail_silently)
return self.connection
def send(self, fail_silently=False):
"""Sends the email message."""
if not self.recipients():
# Don't bother creating the network connection if there's nobody to
# send to.
return 0
return self.get_connection(fail_silently).send_messages([self])
def send_notification(self, sender, dests, reply_to=None, message_id=None, reference=None, footer=None, subject=None):
messages = []
for dest, dest_name, dest_email in dests:
dest_type = ContentType.objects.get_for_model(dest)
dest, _ = MessageAuthor.objects.get_or_create(author_type=dest_type, author_id=dest.pk)
token = self.token + dest.token + hexdigest_sha256(settings.SECRET_KEY, self.token, dest.token)[:16]
if reply_to:
reply_to_name, reply_to_email = reply_to
reply_to_list = ['%s <%s>' % (reply_to_name, reply_to_email.format(token=token))]
else:
reply_to_list = []
headers = dict()
if message_id:
headers.update({
'Message-ID': message_id.format(id=self.token),
})
if message_id and reference:
headers.update({
'References': message_id.format(id=reference),
})
body = self.content
if footer is not None:
body += footer
messages.append(EmailMessage(
subject=subject or self.subject,
body=body,
from_email='%s <%s>' % sender,
to=['%s <%s>' % (dest_name, dest_email)],
reply_to=reply_to_list,
headers=headers,
))
connection = get_connection()
connection.send_messages(messages)
def get_mail_backend(self, force_custom: bool=False) -> BaseEmailBackend:
from pretalx.common.mail import CustomSMTPBackend
if self.settings.smtp_use_custom or force_custom:
return CustomSMTPBackend(host=self.settings.smtp_host,
port=self.settings.smtp_port,
username=self.settings.smtp_username,
password=self.settings.smtp_password,
use_tls=self.settings.smtp_use_tls,
use_ssl=self.settings.smtp_use_ssl,
fail_silently=False)
else:
return get_connection(fail_silently=False)
def get_connection(self, fail_silently=False):
from django.core.mail import get_connection
if not self.connection:
self.connection = get_connection(fail_silently=fail_silently)
return self.connection