def evaluate_block_users(self):
employees = get_list_or_404(Employee)
for employee in employees:
if employee.yesterday_given > config.MAX_STARS_GIVEN_DAY:
employee.is_blocked = True
if employee.yesterday_received > config.MAX_STARS_RECEIVED_DAY:
employee.is_blocked = True
if employee.current_month_given > config.MAX_STARS_GIVEN_MONTHLY:
employee.is_blocked = True
if employee.current_month_score > config.MAX_STARS_RECEIVED_MONTHLY:
employee.is_blocked = True
employee.save()
try:
if employee.is_blocked:
subject = config.USER_BLOCKED_NOTIFICATION_SUBJECT
message = config.USER_BLOCKED_NOTIFICATION_MESSAGE % (employee.username)
send_email = EmailMessage(subject, message, to=[employee.email])
send_email.send()
except Exception as e:
print(e)
python类EmailMessage()的实例源码
def send_order_email_confirmation(sender, instance, **kwargs):
"""
Send email to customer with order details.
"""
order = instance
message = get_template("emails/order_conf.html").render(Context({
'order': order.get_serialized_data()
}))
mail = EmailMessage(
subject="Order confirmation",
body=message,
from_email=EMAIL_ADMIN,
to=[order.email],
reply_to=[EMAIL_ADMIN],
)
mail.content_subtype = "html"
return mail.send()
def send_html_mail(tolist, subject, html_content, fromer=None, cclist=None, bcclist=None):
'''
??html??
'''
if fromer:
_fromer = '%s<%s>' % (fromer, settings.EMAIL_HOST_USER)
else:
_fromer = settings.EMAIL_HOST_USER
msg = EmailMessage(subject, html_content, _fromer, tolist)
msg.content_subtype = "html"
if cclist: msg.cc = cclist
if bcclist: msg.bcc = bcclist
ret = msg.send(fail_silently=True)
if ret == 1:
ret = True
else:
ret = False
return ret
def email_to_string(e):
# type: (EmailMessage) -> str
def n(x):
x or 'Not specified'
return """
From: {}
To: {}
Subject: {}
Reply-To: {}
CC: {}
BCC: {}
Body: {}
Attachments: {}
""".format(
n(e.from_email),
n(e.to),
n(e.subject),
n(e.reply_to), n(e.cc), n(e.bcc), n(e.body), n(str(e.attachments)))
def send_access(self):
if self.sent_email_data:
can_send = timezone.now() - self.sent_email_data > datetime.timedelta(minutes=settings.SEND_ACCESS_INTERVAL)
else:
can_send = True
if can_send:
self.sent_email_data = timezone.now()
self.save()
if settings.SEND_EMAILS:
message = render_to_string('application/email/draft.txt', {'uuid': self.uuid, 'email': self.email,})
email = EmailMessage(
subject='OpenCon 2016 Draft Application',
body=message,
from_email=settings.DEFAULT_FROM_EMAIL,
to=[self.email],
)
email.content_subtype = "html"
email.send(fail_silently=True)
return True
return False
def create_mail(subject, message, from_email, recipient, message_html=None,
attachments=None, rfc2822_headers=None):
headers = {'Message-ID': make_msgid()}
if rfc2822_headers:
headers.update(rfc2822_headers)
if message is None: # make text version out of html if text version is missing
message = html2text(message_html)
if message_html:
msg = EmailMultiAlternatives(subject, message, from_email, [recipient],
headers=headers)
msg.attach_alternative(message_html, "text/html")
else:
msg = EmailMessage(subject, message, from_email, [recipient],
headers=headers)
if attachments:
for filename, content, mimetype in attachments:
msg.attach(filename, content, mimetype)
return msg
def apply(self, item, policies_list):
subscription_url = get_app_url(organization=item.get_owner())
if 'BounceCheck' in policies_list:
return_path = 'subscription-bounce+{uuid}@{fqdn}'.format(
uuid=item.uuid, fqdn=settings.RETURNPATH_DOMAIN)
else:
return_path = settings.SERVICE_MSG_FROM_EMAIL
confirmation_link = urljoin(
subscription_url,
reverse('confirmation', kwargs={'uuid': item.uuid}))
message = EmailMessage(
subject='Votre inscription',
body=render_to_string(
'contacts/double_opt_in_confirmation.txt',
{'contact': item, 'confirmation_link': confirmation_link}),
to=(item.address, ),
from_email='{} <{}>'.format(
settings.SERVICE_MSG_FROM_NAME,
settings.SERVICE_MSG_FROM_EMAIL),
headers={
'Auto-Submitted': 'auto-generated',
'Return-Path': return_path})
message.send()
def apply(self, item, policies_list):
# if double opt-in is set up, we use the validation mail to check
# bounce
if 'DoubleOptIn' not in policies_list:
return_path = 'subscription-bounce+{uuid}@{fqdn}'.format(
uuid=item.uuid, fqdn=settings.RETURNPATH_DOMAIN)
message = EmailMessage(
subject='Votre inscription',
body=render_to_string(
'contacts/bounce_check_confirmation.txt',
{'contact': item}),
to=(item.address, ),
from_email='{} <{}>'.format(
settings.SERVICE_MSG_FROM_NAME,
settings.SERVICE_MSG_FROM_EMAIL),
headers={
'Auto-Submitted': 'auto-generated',
'Return-Path': return_path})
message.send()
def send_html_mail(tolist, subject, html_content, fromer=None, cclist=None, bcclist=None):
'''
??html??
'''
if fromer:
_fromer = '%s<%s>' % (fromer, settings.EMAIL_HOST_USER)
else:
_fromer = settings.EMAIL_HOST_USER
msg = EmailMessage(subject, html_content, _fromer, tolist)
msg.content_subtype = "html"
if cclist: msg.cc = cclist
if bcclist: msg.bcc = bcclist
ret = msg.send(fail_silently=True)
if ret == 1:
ret = True
else:
ret = False
return ret
def send_error_report(url, exception, trace):
"""Send email when an error occurs."""
if settings.DEBUG is False:
email_context = {
'site': url,
'exception': exception,
'traceback': trace
}
message = get_template('scheduler_core/error_report_parsing.html').render(email_context)
error_email = EmailMessage('[MovieScheduler] Parsing Error Report',
message,
settings.SERVER_EMAIL,
settings.ADMINS)
error_email.content_subtype = 'html'
error_email.send(fail_silently=False)
else:
print("Exception: " + str(exception))
print("Traceback: ")
print(trace)
def send_mail(self):
# need to ensure address is minimally valid email
sent_emails = ['', None]
for key in self._recipients:
reg_id = key
address = self._recipients[key]['email']
salutation = self._recipients[key]['salutation']
email_body = self._message
if address not in sent_emails:
if salutation not in ('', None):
email_body = 'Dear ' + salutation + ':<br/><br/>' + \
self._message
if self._msg_type == 'docs':
email_body = self._insert_passwords(email_body, key)
email = EmailMessage(
subject = self._subject,
body = email_body,
to = [address]
)
email.content_subtype = 'html'
email.send()
sent_emails.append(address)
time.sleep(0.5)
def send_email_task(email, name):
email_value = email.split('@')[0].encode('utf-8') # ??? ?? url? ???? @ ???? base64 ???? ???
encoded_email = base64.b64encode(email_value) # ???
from_email = settings.EMAIL_HOST_USER # ?? ?? ?? - settings ??? ??(?? ??)
subject = '{} ? ???? ??'.format(name) # ?? ??
refined_email = str(encoded_email)[1:].strip("'") # ??? ??? ? b'c2F6MDU0OQ==' -> c2F6MDU0OQ==? ??.
html_content = """<h1>{0}? ??? ?????.</h1>
<p>?? ??? ??? ?? ??? ??????</p>
<a href='http://127.0.0.1:8000/account/{1}/'>http://127.0.0.1:8000/account/{2}/</a>
""".format(name, refined_email, refined_email)
msg = EmailMessage( subject, html_content, from_email, [email])
msg.content_subtype = "html"
msg.send()
def EnviarEmail(request, remetente, destinatario, oferta):
msg = EmailMessage(
'Bolsa: ' + oferta.titulo,
remetente.nome + ' Candidatou-se a ' + oferta.titulo + '<br>' +
'Dados: <br>' +
'Nome: ' + remetente.nome + '<br>' +
'Email: ' + request.user.email + '<br>' +
'Telefone: ' + remetente.telefone + '<br>' +
'Nascimento: ' + remetente.nascimento + '<br>' +
'Estado Civil: ' + remetente.estadoCivil + '<br>' +
'Curso: ' + remetente.curso + '<br>' +
'Periodo: ' + remetente.periodo + '<br>' +
'CRA: ' + remetente.CRA + '<br>' +
'Objetivo: ' + remetente.objetivo + '<br>' +
'Formação: ' + remetente.formacao + '<br>' +
'Experiência: ' + remetente.experiencia + '<br>' +
'Habilidade: ' + remetente.habilidade + '<br>',
request.user.email,
[destinatario.email]
)
msg.content_subtype = "html"
msg.send()
def handle(self, form):
"""
Handle method
Sends an email to the specified recipients
:param form: Valid form instance
:type form: django.forms.Form
"""
message = EmailMessage(
self.subject,
self._render_template(form.cleaned_data),
settings.DEFAULT_FROM_EMAIL,
self.recipients.split(',')
)
for file_object in self.get_files(form):
message.attach(file_object.name, file_object.read(), file_object.content_type)
message.send()
def signup(request):
if request.user.is_authenticated():
return redirect('view_profile')
else:
if request.method == 'POST':
form = SignupForm(request.POST)
if form.is_valid():
user = form.save(commit=False)
user.is_active = False
user.save()
current_site = get_current_site(request)
subject = 'Activate your account on Janani Care.'
message = render_to_string('accounts/activation_email.html', {
'user':user, 'domain':current_site.domain,
'uid': urlsafe_base64_encode(force_bytes(user.pk)),
'token': account_activation_token.make_token(user),
})
toemail = form.cleaned_data.get('email')
email = EmailMessage(subject, message, to=[toemail])
email.send()
return render(request, 'accounts/activation_pending.html')
else:
form = SignupForm()
return render(request, 'accounts/signup.html', {'form': form})
def organization_signup(request):
if request.user.is_authenticated():
return redirect('view_profile')
else:
if request.method == 'POST':
form = OrganizationSignupForm(request.POST)
if form.is_valid():
user = form.save(commit=False)
user.is_active = False
user.save()
current_site = get_current_site(request)
subject = 'Activate your organization account on Janani Care.'
message = render_to_string('accounts/organization_activation_email.html', {
'user':user, 'domain':current_site.domain,
'uid': urlsafe_base64_encode(force_bytes(user.pk)),
'token': account_activation_token.make_token(user),
})
toemail = form.cleaned_data.get('email')
email = EmailMessage(subject, message, to=[toemail])
email.send()
return render(request, 'accounts/activation_pending.html')
else:
form = OrganizationSignupForm()
return render(request, 'accounts/organization_signup.html', {'form': form})
def send_general_email(customer_id, to_address, subject, body):
customer_obj = Customer.objects.get(pk=customer_id)
from_address = config('EMAIL_HOST_USER')
email = EmailMessage(subject, body, from_address, [to_address])
# Attempt to send email
try:
response = email.send()
except Exception:
response = 0
# Log email in DB
email_log_entry = {
'to_address': to_address,
'subject': subject,
'body': body,
'successful': response,
'customer_id': customer_obj.id
}
log_email(**email_log_entry)
return response
def send_html_mail(tolist, subject, html_content, fromer=None, cclist=None, bcclist=None):
'''
??html??
'''
if fromer:
_fromer = '%s<%s>' % (fromer, settings.EMAIL_HOST_USER)
else:
_fromer = settings.EMAIL_HOST_USER
msg = EmailMessage(subject, html_content, _fromer, tolist)
msg.content_subtype = "html"
if cclist: msg.cc = cclist
if bcclist: msg.bcc = bcclist
ret = msg.send(fail_silently=True)
if ret == 1:
ret = True
else:
ret = False
return ret
def forgot_password(request):
request = json.loads(request.body.decode('utf-8'))
eMail = request["email"]
empId = request["loginid"]
emp = Employee.objects.filter(instructor_id=empId)
if eMail == "" or emp[0].email != eMail:
res = {"error":'true'}
return JsonResponse(res, safe=False)
pwd = "Password for Faculty Data Acquisition System : " + emp[0].password
email = EmailMessage('Password For FDA System',pwd,to=[eMail])
try:
email.send()
except Exception as e:
res = {
"error":'true'
}
res = {
'success': 'true'
}
return JsonResponse(res, safe=False)
def notify_registration(profile):
"""send message by email"""
notification_email = getattr(settings, 'BALAFON_NOTIFICATION_EMAIL', '')
if notification_email:
from_email = getattr(settings, 'DEFAULT_FROM_EMAIL')
data = {
'contact': profile.contact,
'site': getattr(settings, 'COOP_CMS_SITE_PREFIX', None),
}
t = get_template('Profile/registration_notification_email.txt')
content = t.render(Context(data))
email = EmailMessage(
_(u"New registration"), content, from_email,
[notification_email], headers={'Reply-To': profile.contact.email})
try:
email.send()
except Exception:
logger.exception("notify_registration")
def notify_due_actions(user, actions):
"""send message by email"""
notification_email = user.email
from_email = settings.DEFAULT_FROM_EMAIL
data = {
'user': user,
'actions': actions,
'site': settings.COOP_CMS_SITE_PREFIX,
}
template = get_template('Users/due_actions_notification_email.txt')
content = template.render(Context(data))
email = EmailMessage(
_(u"Balafon: You have due actions"),
content,
from_email,
[notification_email]
)
try:
email.send()
except Exception: #pylint: disable=broad-except
logger.exception("notify_due_actions")
def send_html_mail(tolist, subject, html_content, fromer=None, cclist=None, bcclist=None):
'''
??html??
'''
if fromer:
_fromer = '%s<%s>' % (fromer, settings.EMAIL_HOST_USER)
else:
_fromer = settings.EMAIL_HOST_USER
msg = EmailMessage(subject, html_content, _fromer, tolist)
msg.content_subtype = "html"
if cclist: msg.cc = cclist
if bcclist: msg.bcc = bcclist
ret = msg.send(fail_silently=True)
if ret == 1:
ret = True
else:
ret = False
return ret
def employee_reset_password(request, employee_email):
"""
This endpoint send an email to employee, with confirmation reset password url.
---
responseMessages:
- code: 404
message: Not found
- code: 406
message: Request not acceptable
"""
if request.method == 'GET':
employee = get_object_or_404(Employee, email=employee_email)
# Generate random uuid and save in employee
employee.generate_reset_password_code()
# Send email with reset password confirmation url
subject = config.EMPLOYEE_RESET_PASSWORD_CONFIRMATION_SUBJECT
current_site = Site.objects.get_current()
employee_reset_password_api = reverse('employees:employee_reset_password', args=[employee.email])
url = current_site.domain + employee_reset_password_api + employee.reset_password_code
message = config.EMPLOYEE_RESET_PASSWORD_CONFIRMATION_MESSAGE % (url)
try:
send_email = EmailMessage(subject, message, to=[employee.email])
send_email.send()
except Exception as e:
print(e)
content = {'detail': config.EMAIL_SERVICE_ERROR}
return Response(content, status=status.HTTP_406_NOT_ACCEPTABLE)
content = {'detail': 'Confirmation email sent.',
'email': employee.email,
'reset_password_code': employee.reset_password_code}
return Response(content, status=status.HTTP_200_OK)
def employee_reset_password_confirmation(request, employee_email, employee_uuid):
"""
This endpoint reset employee with random password and send an email to employee with it.
---
responseMessages:
- code: 404
message: Not found
- code: 406
message: Request not acceptable
"""
if request.method == 'GET':
employee = get_object_or_404(Employee, email=employee_email, reset_password_code=employee_uuid)
random_password = Employee.objects.make_random_password(length=4, allowed_chars='beatrx23456789')
employee.set_password(random_password)
employee.is_password_reset_required = True
employee.save()
# Send confirmation email
subject = config.EMPLOYEE_RESET_PASSWORD_SUCCESSFULLY_SUBJECT
message = config.EMPLOYEE_RESET_PASSWORD_SUCCESSFULLY_MESSAGE % (random_password)
try:
send_email = EmailMessage(subject, message, to=[employee.email])
send_email.send()
except Exception as e:
print(e)
data = "<h1>%s</h1>" % config.EMAIL_SERVICE_ERROR
return Response(data)
data = "<h1>%s</h1>" % config.USER_SUCCESSFULLY_RESET_PASSWORD
return Response(data)
def send_daily_email(self):
subject = config.DAILY_EXECUTION_CONFIRMATION_SUBJECT
message = config.DAILY_EXECUTION_CONFIRMATION_MESSAGE
send_mail = EmailMessage(subject, message, to=[config.DAILY_EXECUTION_CONFIRMATION_EMAIL])
send_mail.send()
def send_blocked_notification_email(self, employee):
subject = config.USER_BLOCKED_NOTIFICATION_SUBJECT
message = config.USER_BLOCKED_NOTIFICATION_MESSAGE % employee.username
email = EmailMessage(subject, message, to=[employee.email])
email.send()
def create_message(self, ctx: T, from_email=None, to=None, bcc=None,
connection=None, attachments=None, headers=None,
alternatives=None, cc=None,
reply_to=None) -> EmailMessage:
'''
Creates and returns a :py:class:`django.core.mail.EmailMessage`
which contains the plaintext and HTML versions of the email,
using the context specified by ``ctx``.
Aside from ``ctx``, arguments to this method are the
same as those for :py:class:`~django.core.mail.EmailMessage`.
'''
msg = EmailMultiAlternatives(
subject=self.render_subject(ctx),
body=self.render_body_as_plaintext(ctx),
from_email=from_email,
to=to,
bcc=bcc,
connection=connection,
attachments=attachments,
headers=headers,
alternatives=alternatives,
cc=cc,
reply_to=reply_to,
)
msg.attach_alternative(self.render_body_as_html(ctx), 'text/html')
return msg
def render_mail(self, template_prefix, email, context):
"""
Renders an e-mail to `email`. `template_prefix` identifies the
e-mail that is to be sent, e.g. "account/email/email_confirmation"
"""
context = dict(context)
context['site'] = self.get_current_site()
subject = render_to_string('{0}_subject.txt'.format(template_prefix),
context)
# remove superfluous line breaks
subject = " ".join(subject.splitlines()).strip()
subject = self.format_email_subject(subject)
from_email = self.get_from_email()
bodies = {}
for ext in ['html', 'txt']:
try:
template_name = '{0}_message.{1}'.format(template_prefix, ext)
bodies[ext] = self.format_email_body(
render_to_string(template_name, context).strip(),
ext)
except TemplateDoesNotExist:
if ext == 'txt' and not bodies:
# We need at least one body
raise
if 'txt' in bodies:
msg = EmailMultiAlternatives(subject,
bodies['txt'],
from_email,
[email])
if 'html' in bodies:
msg.attach_alternative(bodies['html'], 'text/html')
else:
msg = EmailMessage(subject,
bodies['html'],
from_email,
[email])
msg.content_subtype = 'html' # Main content is now text/html
return msg
def post(self, request):
form = self.form_class(request.POST)
if form.is_valid():
user = form.save(commit=False)
if form.cleaned_data['password'] != form.cleaned_data['confirm_password']:
form.add_error("confirm_password", "Passwords do not match")
return render(request, "account/signup.html", {"form": form})
user.set_password(form.cleaned_data["password"])
user.is_active = False
user.save()
current_site = get_current_site(request)
message = render_to_string("account/activate_email.html", {
'user': user,
'domain': current_site.domain,
'uid': urlsafe_base64_encode(force_bytes(user.pk)),
'token': account_activation_token.make_token(user)
})
mail_subject = "Activate your Quora account"
to_email = form.cleaned_data['email']
email = EmailMessage(mail_subject, message, to=[to_email])
email.send()
profile = Profile(user=user)
profile.save()
return HttpResponse("Please activate your account with e-mail")
else:
return render(request, "account/signup.html", {"form": form})
def render_mail(template_prefix, recipient_email, substitutions,
from_email=FROM_EMAIL):
"""
Renders an e-mail to `email`. `template_prefix` identifies the
e-mail that is to be sent, e.g. "account/email/email_confirmation"
"""
substitutions.update(utils.get_substitutions_templates())
subject = render_to_string('{0}_subject.txt'.format(template_prefix),
context=Context(substitutions))
# remove superfluous line breaks
subject = " ".join(subject.splitlines()).strip()
prefix = '[' + settings.HACKATHON_NAME + ']'
subject = prefix + ' ' + subject
substitutions.update({'subject': subject})
bodies = {}
for ext in ['html', 'txt']:
try:
template_name = '{0}_message.{1}'.format(template_prefix, ext)
bodies[ext] = render_to_string(template_name,
Context(substitutions)).strip()
except TemplateDoesNotExist:
if ext == 'txt' and not bodies:
# We need at least one body
raise
if 'txt' in bodies:
msg = EmailMultiAlternatives(subject,
bodies['txt'],
from_email,
[recipient_email])
if 'html' in bodies:
msg.attach_alternative(bodies['html'], 'text/html')
else:
msg = EmailMessage(subject,
bodies['html'],
from_email,
[recipient_email])
msg.content_subtype = 'html' # Main content is now text/html
return msg