def add(self, location: str, content_type: str, payload: str, encoding: str = 'quoted-printable') -> None:
resource = EmailMessage()
if content_type == 'text/html':
resource.add_header('Content-Type', 'text/html', charset='utf-8')
else:
resource['Content-Type'] = content_type
if encoding == 'quoted-printable':
resource['Content-Transfer-Encoding'] = encoding
resource.set_payload(quopri.encodestring(payload.encode()))
elif encoding == 'base64':
resource['Content-Transfer-Encoding'] = encoding
resource.set_payload(base64.b64encode(payload))
elif encoding == 'base64-encoded': # Already base64 encoded
resource['Content-Transfer-Encoding'] = 'base64'
resource.set_payload(payload)
else:
raise ValueError('invalid encoding')
resource['Content-Location'] = location
self._msg.attach(resource)
python类EmailMessage()的实例源码
def send(self, target: str, subject: str, html_content_with_cids: str, inline_png_cids_filenames: {str : str}):
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = self.sender
msg['To'] = target
msg.set_content('')
msg.add_alternative(
html_content_with_cids, subtype='html'
)
for png_cid in inline_png_cids_filenames:
full_path_to_png = os.path.abspath(os.path.join(
os.path.dirname(__file__), inline_png_cids_filenames[png_cid]
))
with open(full_path_to_png, 'rb') as png_file:
file_contents = png_file.read()
msg.get_payload()[1].add_related(file_contents, 'image', 'png', cid=png_cid)
with smtplib.SMTP(self.smtp_server_host, self.smtp_server_port) as smtp_connection:
smtp_connection.starttls()
smtp_connection.login(self.username, self.password)
smtp_connection.send_message(msg)
def send_mail(app,
sender='%s service <%s>' % (settings.BRAND, settings.SERVER_EMAIL),
recipients=tuple(),
subject='',
body=''):
if not len(recipients):
return
async with app.smtp as conn:
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = sender
msg['To'] = ', '.join(recipients)
msg.set_content(body)
return await conn.send_message(msg, sender, recipients, timeout=5)
def __init__(self, _factory=None, *, policy=compat32):
"""_factory is called with no arguments to create a new message obj
The policy keyword specifies a policy object that controls a number of
aspects of the parser's operation. The default policy maintains
backward compatibility.
"""
self.policy = policy
self._factory_kwds = lambda: {'policy': self.policy}
if _factory is None:
# What this should be:
#self._factory = policy.default_message_factory
# but, because we are post 3.4 feature freeze, fix with temp hack:
if self.policy is compat32:
self._factory = message.Message
else:
self._factory = message.EmailMessage
else:
self._factory = _factory
try:
_factory(policy=self.policy)
except TypeError:
# Assume this is an old-style factory
self._factory_kwds = lambda: {}
self._input = BufferedSubFile()
self._msgstack = []
self._parse = self._parsegen().__next__
self._cur = None
self._last = None
self._headersonly = False
# Non-public interface for supporting Parser's headersonly flag
def __init__(self):
self._msg = EmailMessage()
self._msg['MIME-Version'] = '1.0'
self._msg.add_header('Content-Type', 'multipart/related', type='text/html')
def send_help_email(username:str, user_email:str, subject:str, message:str) -> bool:
"""
Sends an email to the netsoc email address containing the help data,
CC'ing all the SysAdmins and the user requesting help.
This enables us to reply to the email directly instead of copypasting the
from address and disconnecting history.
:param username the user requesting help
:param user_email the user's email address
:param subject the subject of the user's help requests
:param message the user's actual message
"""
message_body = \
"""
From: %s\n
Email: %s
%s
PS: Please "Reply All" to the emails so that you get a quicker response."""%(
username, user_email, message)
msg = EmailMessage()
msg.set_content(message_body)
msg["From"] = p.NETSOC_ADMIN_EMAIL_ADDRESS
msg["To"] = p.NETSOC_EMAIL_ADDRESS
msg["Subject"] = "[Netsoc Help] " + subject
msg["Cc"] = tuple(p.SYSADMIN_EMAILS + [user_email])
try:
with smtplib.SMTP("smtp.sendgrid.net", 587) as s:
s.login(p.SENDGRID_USERNAME, p.SENDGRID_PASSWORD)
s.send_message(msg)
except:
return False
return True
def send_sudo_request_email(username:str, user_email:str):
"""
Sends an email notifying SysAdmins that a user has requested an account on feynman.
:param username the server username of the user who made the request.
:param user_email the email address of that user to contact them for vetting.
"""
message_body = \
"""
Hi {username},
Thank you for making a request for an account with sudo privileges on feynman.netsoc.co.
We will be in touch shortly.
Best,
The UCC Netsoc SysAdmin Team.
PS: Please "Reply All" to the emails so that you get a quicker response.
""".format(username=username)
msg = EmailMessage()
msg.set_content(message_body)
msg["From"] = p.NETSOC_ADMIN_EMAIL_ADDRESS
msg["To"] = p.NETSOC_EMAIL_ADDRESS
msg["Subject"] = "[Netsoc Help] Sudo request on Feynman for {user}".format(
user=username)
msg["Cc"] = tuple(p.SYSADMIN_EMAILS + [user_email])
with smtplib.SMTP("smtp.sendgrid.net", 587) as s:
s.login(p.SENDGRID_USERNAME, p.SENDGRID_PASSWORD)
s.send_message(msg)
def signup(req):
if any(map(lambda key: key not in req.json, ["name", "email", "password"])):
logger.debug(f"Request is {req.json} but some arguments are missing.")
raise InvalidUsage("Missing argument")
if not await User.is_free(req.json["name"], req.json["email"]):
logger.debug(f"Request is {req.json} but name or email is already taken")
raise InvalidUsage("Username or email already taken")
guest = Guest(req.json["name"], req.json["email"], User.hashpwd(req.json["password"]))
chlg = await challenges.create_for(guest)
logger.info(f"Guest signed up with name: {guest.name} and email: {guest.email}. Challenge generated: {chlg}")
with open("mails" + sep + "challenge.txt") as mailtext:
mail = EmailMessage()
mail.set_content(mailtext.read().format(domain=req.app.config.domain,
scheme="https" if req.app.config.schemessl else "http",
challenge=chlg))
mail["Subject"] = "WebGames Registration Challenge"
mail["From"] = req.app.config.smtpuser
mail["To"] = guest.email
with SMTP(req.app.config.smtphost, req.app.config.smtpport) as smtp:
if req.app.config.smtpssl:
smtp.ehlo()
smtp.starttls()
smtp.ehlo()
else:
smtp.helo()
smtp.login(req.app.config.smtpuser, req.app.config.smtppwd)
smtp.send_message(mail)
return text(f"Challenge sent to {guest.email}")
def emit(self, record):
"""
Emit a record.
Format the record and send it to the specified addressees.
"""
try:
import smtplib
from email.message import EmailMessage
import email.utils
port = self.mailport
if not port:
port = smtplib.SMTP_PORT
smtp = smtplib.SMTP(self.mailhost, port, timeout=self.timeout)
msg = EmailMessage()
msg['From'] = self.fromaddr
msg['To'] = ','.join(self.toaddrs)
msg['Subject'] = self.getSubject(record)
msg['Date'] = email.utils.localtime()
msg.set_content(self.format(record))
if self.username:
if self.secure is not None:
smtp.ehlo()
smtp.starttls(*self.secure)
smtp.ehlo()
smtp.login(self.username, self.password)
smtp.send_message(msg)
smtp.quit()
except Exception:
self.handleError(record)
def send_email(self: celery.Task, to_name: str, to_addr: str, subject: str, text: str, html: str):
"""Send an email to a single address."""
# WARNING: when changing the signature of this function, also change the
# self.retry() call below.
cfg = current_app.config
# Construct the message
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = Address(cfg['MAIL_DEFAULT_FROM_NAME'], addr_spec=cfg['MAIL_DEFAULT_FROM_ADDR'])
msg['To'] = (Address(to_name, addr_spec=to_addr),)
msg.set_content(text)
msg.add_alternative(html, subtype='html')
# Refuse to send mail when we're testing.
if cfg['TESTING']:
log.warning('not sending mail to %s <%s> because we are TESTING', to_name, to_addr)
return
log.info('sending email to %s <%s>', to_name, to_addr)
# Send the message via local SMTP server.
try:
with smtplib.SMTP(cfg['SMTP_HOST'], cfg['SMTP_PORT'], timeout=cfg['SMTP_TIMEOUT']) as smtp:
if cfg.get('SMTP_USERNAME') and cfg.get('SMTP_PASSWORD'):
smtp.login(cfg['SMTP_USERNAME'], cfg['SMTP_PASSWORD'])
smtp.send_message(msg)
except (IOError, OSError) as ex:
log.exception('error sending email to %s <%s>, will retry later: %s',
to_name, to_addr, ex)
self.retry((to_name, to_addr, subject, text, html), countdown=cfg['MAIL_RETRY'])
else:
log.info('mail to %s <%s> successfully sent', to_name, to_addr)
def __init__(self, _factory=None, *, policy=compat32):
"""_factory is called with no arguments to create a new message obj
The policy keyword specifies a policy object that controls a number of
aspects of the parser's operation. The default policy maintains
backward compatibility.
"""
self.policy = policy
self._factory_kwds = lambda: {'policy': self.policy}
if _factory is None:
# What this should be:
#self._factory = policy.default_message_factory
# but, because we are post 3.4 feature freeze, fix with temp hack:
if self.policy is compat32:
self._factory = message.Message
else:
self._factory = message.EmailMessage
else:
self._factory = _factory
try:
_factory(policy=self.policy)
except TypeError:
# Assume this is an old-style factory
self._factory_kwds = lambda: {}
self._input = BufferedSubFile()
self._msgstack = []
self._parse = self._parsegen().__next__
self._cur = None
self._last = None
self._headersonly = False
# Non-public interface for supporting Parser's headersonly flag