def mime_multipart(_, context, arg):
"""mime_multipart composes a MIME multipart string.
Example:
"UserData": {
"Fn::Base64": {
"CFPP::MimeMultipart": [
["text/x-shellscript", {"CFPP::FileToString": "userdata.sh"}],
["text/cloud-config", {"CFPP::FileToString": "cloud-init.yaml"}]
]
}
}
"""
_raise_unless_mime_params(context, arg)
mime_doc = MIMEMultipart()
# set boundary explicitly so that they are stable based on path in the template.
mime_doc.set_boundary("=" * 10 + hashlib.sha1(".".join(context)).hexdigest() + "=" * 3)
for mime_type, contents in arg:
sub_message = MIMEText(contents, contents, sys.getdefaultencoding())
mime_doc.attach(sub_message)
return mime_doc.as_string()
python类MIMEText()的实例源码
def test_header_splitter(self):
eq = self.ndiffAssertEqual
msg = MIMEText('')
# It'd be great if we could use add_header() here, but that doesn't
# guarantee an order of the parameters.
msg['X-Foobar-Spoink-Defrobnit'] = (
'wasnipoop; giraffes="very-long-necked-animals"; '
'spooge="yummy"; hippos="gargantuan"; marshmallows="gooey"')
sfp = StringIO()
g = Generator(sfp)
g.flatten(msg)
eq(sfp.getvalue(), '''\
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
X-Foobar-Spoink-Defrobnit: wasnipoop; giraffes="very-long-necked-animals";
\tspooge="yummy"; hippos="gargantuan"; marshmallows="gooey"
''')
def test_mime_attachments_in_constructor(self):
eq = self.assertEqual
text1 = MIMEText('')
text2 = MIMEText('')
msg = MIMEMultipart(_subparts=(text1, text2))
eq(len(msg.get_payload()), 2)
eq(msg.get_payload(0), text1)
eq(msg.get_payload(1), text2)
# A general test of parser->model->generator idempotency. IOW, read a message
# in, parse it into a message object tree, then without touching the tree,
# regenerate the plain text. The original text and the transformed text
# should be identical. Note: that we ignore the Unix-From since that may
# contain a changed date.
def test__all__(self):
module = __import__('email')
# Can't use sorted() here due to Python 2.3 compatibility
all = module.__all__[:]
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
def CreateMessage(sender, to, subject, message_text):
"""Create a message for an email.
Args:
sender: Email address of the sender.
to: Email address of the receiver.
subject: The subject of the email message.
message_text: The text of the email message.
Returns:
An object containing a base64url encoded email object.
"""
message = MIMEText(message_text)
message['to'] = to
message['from'] = sender
message['subject'] = subject
return {'raw': base64.urlsafe_b64encode(message.as_string())}
def get_mime_message(text, html_text=None, **kwargs):
if not html_text:
instance = MIMEText(text)
else:
instance = MIMEMultipart('alternative')
instance.attach(MIMEText(text, 'plain'))
instance.attach(MIMEText(html_text, 'html'))
extra = MIMEBase('application', 'octet-stream')
extra.set_payload(b'test content')
encoders.encode_base64(extra)
extra.add_header('Content-Disposition', 'attachment', filename='report.pdf')
instance.attach(extra)
instance['X-Accept-Language'] = 'en-us, en'
for key, value in kwargs.items():
instance[key] = value
return instance
def from_mime(cls, message, manager):
"""
Instantiates ``Email`` instance from ``MIMEText`` instance.
:param message: ``email.mime.text.MIMEText`` instance.
:param manager: :py:class:`EmailManager` instance.
:return: :py:class:`Email`
"""
if isinstance(message, MIMEMultipart):
text, html, attachments = deconstruct_multipart(message)
else:
text, html, attachments = message.get_payload(decode=True).decode('utf8'), None, []
subject = prepare_header(message['Subject'])
sender = prepare_header(message['From'])
to = prepare_header(message['To'])
cc = prepare_header(message['Cc'])
bcc = prepare_header(message['Bcc'])
reply_to = prepare_header(message['Reply-To'])
tag = getattr(message, 'tag', None)
return cls(manager=manager, From=sender, To=to, TextBody=text, HtmlBody=html, Subject=subject, Cc=cc, Bcc=bcc,
ReplyTo=reply_to, Attachments=attachments, Tag=tag)
def test_header_splitter(self):
eq = self.ndiffAssertEqual
msg = MIMEText('')
# It'd be great if we could use add_header() here, but that doesn't
# guarantee an order of the parameters.
msg['X-Foobar-Spoink-Defrobnit'] = (
'wasnipoop; giraffes="very-long-necked-animals"; '
'spooge="yummy"; hippos="gargantuan"; marshmallows="gooey"')
sfp = StringIO()
g = Generator(sfp)
g.flatten(msg)
eq(sfp.getvalue(), '''\
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
X-Foobar-Spoink-Defrobnit: wasnipoop; giraffes="very-long-necked-animals";
spooge="yummy"; hippos="gargantuan"; marshmallows="gooey"
''')
def test_binary_body_with_encode_noop(self):
# Issue 16564: This does not produce an RFC valid message, since to be
# valid it should have a CTE of binary. But the below works, and is
# documented as working this way.
bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff'
msg = MIMEApplication(bytesdata, _encoder=encoders.encode_noop)
self.assertEqual(msg.get_payload(), bytesdata)
self.assertEqual(msg.get_payload(decode=True), bytesdata)
s = StringIO()
g = Generator(s)
g.flatten(msg)
wireform = s.getvalue()
msg2 = email.message_from_string(wireform)
self.assertEqual(msg.get_payload(), bytesdata)
self.assertEqual(msg2.get_payload(decode=True), bytesdata)
# Test the basic MIMEText class
def test_mime_attachments_in_constructor(self):
eq = self.assertEqual
text1 = MIMEText('')
text2 = MIMEText('')
msg = MIMEMultipart(_subparts=(text1, text2))
eq(len(msg.get_payload()), 2)
eq(msg.get_payload(0), text1)
eq(msg.get_payload(1), text2)
# A general test of parser->model->generator idempotency. IOW, read a message
# in, parse it into a message object tree, then without touching the tree,
# regenerate the plain text. The original text and the transformed text
# should be identical. Note: that we ignore the Unix-From since that may
# contain a changed date.
def test__all__(self):
module = __import__('email')
# Can't use sorted() here due to Python 2.3 compatibility
all = module.__all__[:]
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
def send_email(loop: asyncio.AbstractEventLoop, mail_from: str, mail_to: Union[Iterable, str],
subject: str, body: str, server: str='localhost') -> None:
"""Send an email to one or more recipients.
Only supports plain text emails with a single message body.
No attachments etc.
"""
if type(mail_to) == str:
mail_to = [mail_to]
smtp = aiosmtplib.SMTP(hostname=server, port=25, loop=loop)
try:
await smtp.connect()
for rcpt in mail_to:
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = mail_from
msg['To'] = rcpt
await smtp.send_message(msg)
await smtp.quit()
except aiosmtplib.errors.SMTPException as e:
log.msg('Error sending smtp notification: %s' % (str(e)), 'NOTIFICATIONS')
def send_email(msg_subj, msg_txt, msg_rcpt, i=0, count=0):
from email.mime.text import MIMEText
userId, gmail = buildGAPIServiceObject(API.GMAIL, _getValueFromOAuth(u'email'))
if not gmail:
return
msg = MIMEText(msg_txt)
msg[u'Subject'] = msg_subj
msg[u'From'] = userId
msg[u'To'] = msg_rcpt
action = Act.Get()
Act.Set(Act.SENDEMAIL)
try:
callGAPI(gmail.users().messages(), u'send',
userId=userId, body={u'raw': base64.urlsafe_b64encode(msg.as_string())}, fields=u'')
entityActionPerformed([Ent.RECIPIENT, msg_rcpt, Ent.MESSAGE, msg_subj], i, count)
except googleapiclient.errors.HttpError as e:
entityActionFailedWarning([Ent.RECIPIENT, msg_rcpt, Ent.MESSAGE, msg_subj], str(e), i, count)
Act.Set(action)
# Write a CSV file
def _flush_recipient(self, recipient):
if not isinstance(recipient["config"]["info"]["email"], list):
email_addresses = [recipient["config"]["info"]["email"]]
else:
email_addresses = list(set(recipient["config"]["info"]["email"]))
logging.info("Sending email to {} ({}) for {}".format(
recipient["id"],
", ".join(email_addresses),
", ".join([route["prefix"] for route in recipient["routes"]])
))
data = {
"id": recipient["id"],
"from_addr": self.from_addr,
"subject": self.subject,
"routes_list": self._format_list_of_routes(recipient["routes"])
}
msg = MIMEText(self.template.format(**data))
msg['Subject'] = self.subject
msg['From'] = self.from_addr
msg['To'] = ", ".join(email_addresses)
self._send_email(self.from_addr, email_addresses, msg.as_string())
def _flush_recipient(self, recipient):
if not isinstance(recipient["config"]["info"]["email"], list):
email_addresses = [recipient["config"]["info"]["email"]]
else:
email_addresses = list(set(recipient["config"]["info"]["email"]))
logging.info("Sending email to {} ({}) for {}".format(
recipient["id"],
", ".join(email_addresses),
", ".join([route["prefix"] for route in recipient["routes"]])
))
data = {
"id": recipient["id"],
"from_addr": self.from_addr,
"subject": self.subject,
"routes_list": self._format_list_of_routes(recipient["routes"])
}
msg = MIMEText(self.template.format(**data))
msg['Subject'] = self.subject
msg['From'] = self.from_addr
msg['To'] = ", ".join(email_addresses)
self._send_email(self.from_addr, email_addresses, msg.as_string())
def _send_mime_text_to_email(self, text, email):
"""
:type text: mime_text.MIMEText
:type email: str
:return:
"""
text['To'] = email
try:
pass
connection = smtplib.SMTP(self._smtp_config['host'])
connection.ehlo()
connection.starttls()
connection.ehlo()
connection.login(self._smtp_config['username'], self._smtp_config['password'])
connection.sendmail(self._smtp_config['sender'], email, text.as_string())
connection.quit()
except smtplib.SMTPException:
self._logger.exception('Unable to send email to address "{}"'.format(email))
def send_message(self, email_content):
# ???????
now = datetime.datetime.now()
header = self.smtp_email_header + '[' + str(now.month) + '-' + str(now.day) + ' ' + \
str(now.hour) + ':' + str(now.minute) + ':' + str(now.second) + ']'
msg = MIMEText(email_content, 'plain', 'utf-8')
msg['from'] = self.smtp_from_addr
msg['to'] = self.smtp_to_addr
msg['Subject'] = Header(header, 'utf-8').encode()
# ??
try:
smtp_server = smtplib.SMTP(self.smtp_server_host, self.smtp_server_port)
smtp_server.login(self.smtp_from_addr, self.smtp_server_password)
smtp_server.sendmail(self.smtp_from_addr, [self.smtp_to_addr], msg.as_string())
smtp_server.quit()
except Exception as e:
if log.isEnabledFor(logging.ERROR):
log.error("??????")
log.exception(e)
def send(To, Subject, Body, Cc=[], Bcc=[], html=False, files=[]):
"""Send an email
"""
subtype = 'html' if html else 'plain'
message = MIMEMultipart()
message['To'] = ', '.join(To)
message['Subject'] = Subject
message['Cc'] = ', '.join(Cc)
message['Bcc'] = ', '.join(Bcc)
message.attach(MIMEText(Body, subtype))
for f in files:
with open(f, "rb") as In:
part = MIMEApplication(In.read(), Name=basename(f))
part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f)
message.attach(part)
message = {'raw': base64.urlsafe_b64encode(message.as_string())}
credentials = oauth2client.file.Storage(CREDENTIALS_PATH).get()
Http = credentials.authorize(httplib2.Http())
service = discovery.build('gmail', 'v1', http=Http)
message = service.users().messages().send(userId='me', body=message).execute()
message_users.py 文件源码
项目:30-Days-of-Python
作者: codingforentrepreneurs
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def send_email(self):
self.make_messages()
if len(self.email_messages) > 0:
for detail in self.email_messages:
user_email = detail['email']
user_message = detail['message']
try:
email_conn = smtplib.SMTP(host, port)
email_conn.ehlo()
email_conn.starttls()
email_conn.login(username, password)
the_msg = MIMEMultipart("alternative")
the_msg['Subject'] = "Billing Update!"
the_msg["From"] = from_email
the_msg["To"] = user_email
part_1 = MIMEText(user_message, 'plain')
the_msg.attach(part_1)
email_conn.sendmail(from_email, [user_email], the_msg.as_string())
email_conn.quit()
except smtplib.SMTPException:
print("error sending message")
return True
return False
def send_email(self):
self.make_messages()
if len(self.email_messages) > 0:
for detail in self.email_messages:
user_email = detail['email']
user_message = detail['message']
try:
email_conn = smtplib.SMTP(host, port)
email_conn.ehlo()
email_conn.starttls()
email_conn.login(username, password)
the_msg = MIMEMultipart("alternative")
the_msg['Subject'] = "Billing Update!"
the_msg["From"] = from_email
the_msg["To"] = user_email
part_1 = MIMEText(user_message, 'plain')
the_msg.attach(part_1)
email_conn.sendmail(from_email, [user_email], the_msg.as_string())
email_conn.quit()
except smtplib.SMTPException:
print("error sending message")
return True
return False
message_users.py 文件源码
项目:30-Days-of-Python
作者: codingforentrepreneurs
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def send_email(self):
self.make_messages()
if len(self.email_messages) > 0:
for detail in self.email_messages:
user_email = detail['email']
user_message = detail['message']
try:
email_conn = smtplib.SMTP(host, port)
email_conn.ehlo()
email_conn.starttls()
email_conn.login(username, password)
the_msg = MIMEMultipart("alternative")
the_msg['Subject'] = "Billing Update!"
the_msg["From"] = from_email
the_msg["To"] = user_email
part_1 = MIMEText(user_message, 'plain')
the_msg.attach(part_1)
email_conn.sendmail(from_email, [user_email], the_msg.as_string())
email_conn.quit()
except smtplib.SMTPException:
print("error sending message")
return True
return False
message_users.py 文件源码
项目:30-Days-of-Python
作者: codingforentrepreneurs
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def send_email(self):
self.make_messages()
if len(self.email_messages) > 0:
for detail in self.email_messages:
user_email = detail['email']
user_message = detail['message']
try:
email_conn = smtplib.SMTP(host, port)
email_conn.ehlo()
email_conn.starttls()
email_conn.login(username, password)
the_msg = MIMEMultipart("alternative")
the_msg['Subject'] = "Billing Update!"
the_msg["From"] = from_email
the_msg["To"] = user_email
part_1 = MIMEText(user_message, 'plain')
the_msg.attach(part_1)
email_conn.sendmail(from_email, [user_email], the_msg.as_string())
email_conn.quit()
except smtplib.SMTPException:
print("error sending message")
return True
return False
message_users.py 文件源码
项目:30-Days-of-Python
作者: codingforentrepreneurs
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def send_email(self):
self.make_messages()
if len(self.email_messages) > 0:
for detail in self.email_messages:
user_email = detail['email']
user_message = detail['message']
try:
email_conn = smtplib.SMTP(host, port)
email_conn.ehlo()
email_conn.starttls()
email_conn.login(username, password)
the_msg = MIMEMultipart("alternative")
the_msg['Subject'] = "Billing Update!"
the_msg["From"] = from_email
the_msg["To"] = user_email
part_1 = MIMEText(user_message, 'plain')
the_msg.attach(part_1)
email_conn.sendmail(from_email, [user_email], the_msg.as_string())
email_conn.quit()
except smtplib.SMTPException:
print("error sending message")
return True
return False
message_users.py 文件源码
项目:30-Days-of-Python
作者: codingforentrepreneurs
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def send_email(self):
self.make_messages()
if len(self.email_messages) > 0:
for detail in self.email_messages:
user_email = detail['email']
user_message = detail['message']
try:
email_conn = smtplib.SMTP(host, port)
email_conn.ehlo()
email_conn.starttls()
email_conn.login(username, password)
the_msg = MIMEMultipart("alternative")
the_msg['Subject'] = "Billing Update!"
the_msg["From"] = from_email
the_msg["To"] = user_email
part_1 = MIMEText(user_message, 'plain')
the_msg.attach(part_1)
email_conn.sendmail(from_email, [user_email], the_msg.as_string())
email_conn.quit()
except smtplib.SMTPException:
print("error sending message")
return True
return False
message_users.py 文件源码
项目:30-Days-of-Python
作者: codingforentrepreneurs
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def send_email(self):
self.make_messages()
if len(self.email_messages) > 0:
for detail in self.email_messages:
user_email = detail['email']
user_message = detail['message']
try:
email_conn = smtplib.SMTP(host, port)
email_conn.ehlo()
email_conn.starttls()
email_conn.login(username, password)
the_msg = MIMEMultipart("alternative")
the_msg['Subject'] = "Billing Update!"
the_msg["From"] = from_email
the_msg["To"] = user_email
part_1 = MIMEText(user_message, 'plain')
the_msg.attach(part_1)
email_conn.sendmail(from_email, [user_email], the_msg.as_string())
email_conn.quit()
except smtplib.SMTPException:
print("error sending message")
return True
return False
message_users.py 文件源码
项目:30-Days-of-Python
作者: codingforentrepreneurs
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def send_email(self):
self.make_messages()
if len(self.email_messages) > 0:
for detail in self.email_messages:
user_email = detail['email']
user_message = detail['message']
try:
email_conn = smtplib.SMTP(host, port)
email_conn.ehlo()
email_conn.starttls()
email_conn.login(username, password)
the_msg = MIMEMultipart("alternative")
the_msg['Subject'] = "Billing Update!"
the_msg["From"] = from_email
the_msg["To"] = user_email
part_1 = MIMEText(user_message, 'plain')
the_msg.attach(part_1)
email_conn.sendmail(from_email, [user_email], the_msg.as_string())
email_conn.quit()
except smtplib.SMTPException:
print("error sending message")
return True
return False
def send_email(sender, sender_pass, receiver, subject, body):
# Create email
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = sender
msg['To'] = receiver
# Send email out
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(sender, sender_pass)
server.send_message(msg)
server.quit()
# Gets and caches your gmail address
def send_invite(user):
if _cfg("smtp-host") == "":
return
smtp = smtplib.SMTP_SSL(_cfg("smtp-host"), _cfgi("smtp-port"))
smtp.ehlo()
smtp.login(_cfg("smtp-user"), _cfg("smtp-password"))
with open("emails/invite") as f:
message = MIMEText(html.parser.HTMLParser().unescape(\
pystache.render(f.read(), {
'user': user,
"domain": _cfg("domain"),
"protocol": _cfg("protocol")
})))
message['Subject'] = "Your wank.party account is approved"
message['From'] = _cfg("smtp-user")
message['To'] = user.email
smtp.sendmail(_cfg("smtp-user"), [ user.email ], message.as_string())
smtp.quit()
def send_reset(user):
if _cfg("smtp-host") == "":
return
smtp = smtplib.SMTP_SSL(_cfg("smtp-host"), _cfgi("smtp-port"))
smtp.ehlo()
smtp.login(_cfg("smtp-user"), _cfg("smtp-password"))
with open("emails/reset") as f:
message = MIMEText(html.parser.HTMLParser().unescape(\
pystache.render(f.read(), {
'user': user,
"domain": _cfg("domain"),
"protocol": _cfg("protocol"),
'confirmation': user.passwordReset
})))
message['Subject'] = "Reset your wank.party password"
message['From'] = _cfg("smtp-user")
message['To'] = user.email
smtp.sendmail(_cfg("smtp-user"), [ user.email ], message.as_string())
smtp.quit()
def send_mail(to_list,sub,context):
me = mail_user
msg = MIMEText(context)
msg['Subject'] = sub
msg['From'] = me
msg['To'] = ";".join(to_list)
try:
s = smtplib.SMTP(mail_host)
s.login(mail_user,mail_pass)
s.sendmail(me, to_list, msg.as_string())
s.quit()
return True
except Exception, e:
return False